summaryrefslogtreecommitdiffstats
path: root/tools/buildbot/pylibs/twisted/flow/base.py
blob: aa8cc4cdec3659b6235cd5b391117e7511678db1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
# See LICENSE for details.

#
# Author: Clark Evans  (cce@clarkevans.com)
#
"""
flow.base

This module contains the core exceptions and base classes in the flow module.
See flow.flow for more detailed information
"""

import twisted.python.compat
from twisted.internet import reactor
import time

#
# Exceptions used within flow
# 
class Unsupported(NotImplementedError):
    """ Indicates that the given stage does not know what to do 
        with the flow instruction that was returned.
    """
    def __init__(self, inst):
        msg = "Unsupported flow instruction: %s " % repr(inst)
        TypeError.__init__(self,msg)

class NotReadyError(RuntimeError):
    """ Raised when a stage has not been subject to a yield """
    pass

#
# Abstract/Base Classes
#

class Instruction:
    """ Has special meaning when yielded in a flow """
    pass
 
class Controller:
    """
    Flow controller

    At the base of every flow, is a controller class which interprets the
    instructions, especially the CallLater instructions.  This is primarly just
    a marker class to denote which classes consume Instruction events.  If a
    controller cannot handle a particular instruction, it raises the
    Unsupported exception.
    """
    pass

class CallLater(Instruction):
    """
    Instruction to support callbacks

    This is the instruction which is returned during the yield of the _Deferred
    and Callback stage.  The underlying flow driver should call the 'callLater'
    function with the callable to be executed after each callback.
    """
    def callLater(self, callable):
        pass

class Cooperate(CallLater):
    """
    Requests that processing be paused so other tasks can resume

    Yield this object when the current chain would block or periodically during
    an intensive processing task.  The flow mechanism uses these objects to
    signal that the current processing chain should be paused and resumed
    later.  This allows other delayed operations to be processed, etc.  Usage
    is quite simple::

       # within some generator wrapped by a Controller
       yield Cooperate(1)  # yield for a second or more

    """
    def __init__(self, timeout = 0):
        self.timeout = timeout
    def callLater(self, callable):
        reactor.callLater(self.timeout, callable)

class Stage(Instruction):
    """
    Abstract base defining protocol for iterator/generators in a flow

    This is the primary component in the flow system, it is an iterable object
    which must be passed to a yield statement before each call to next().
    Usage::

       iterable = DerivedStage( ... , SpamError, EggsError))
       yield iterable
       for result in iterable:
           # handle good result, or SpamError or EggsError
           yield iterable

    Alternatively, when inside a generator, the next() method can be used
    directly.  In this case, if no results are available, StopIteration is
    raised, and if left uncaught, will nicely end the generator.  Of course,
    unexpected failures are raised.  This technique is especially useful when
    pulling from more than one stage at a time.  For example::

         def someGenerator():
             iterable = SomeStage( ... , SpamError, EggsError)
             while True:
                 yield iterable
                 result = iterable.next()
                 # handle good result or SpamError or EggsError

    For many generators, the results become available in chunks of rows.  While
    the default value is to get one row at a time, there is a 'chunked'
    property which allows them to be returned via the next() method as many
    rows rather than row by row.  For example::

         iterable = DerivedStage(...)
         iterable.chunked = True
         for results in iterable:
             for result in results:
                  # handle good result
             yield iterable

    For those wishing more control at the cost of a painful experience, the
    following member variables can be used to great effect::

        - results: This is a list of results produced by the generator, they
                   can be fetched one by one using next() or in a group
                   together.  If no results were produced, then this is an
                   empty list.  These results should be removed from the list
                   after they are read; or, after reading all of the results
                   set to an empty list

        - stop: This is true if the underlying generator has finished execution
                (raised a StopIteration or returned).  Note that several
                results may exist, and stop may be true.

        - failure: If the generator produced an exception, then it is wrapped
                   as a Failure object and put here.  Note that several results
                   may have been produced before the failure.  To ensure that
                   the failure isn't accidently reported twice, it is
                   adviseable to set stop to True.

    The order in which these member variables is used is *critical* for
    proper adherance to the flow protocol.   First, all successful
    results should be handled.  Second, the iterable should be checked
    to see if it is finished.  Third, a failure should be checked;
    while handling a failure, either the loop should be exited, or
    the iterable's stop member should be set.  For example::

         iterable = SomeStage(...)
         while True:
             yield iterable
             if iterable.results:
                 for result in iterable.results:
                     # handle good result
                 iterable.results = []
             if iterable.stop:
                 break
             if iterable.failure:
                 iterable.stop = True
                 # handle iterable.failure
                 break
    """
    def __init__(self, *trap):
        self._trap = trap
        self.stop = False
        self.failure = None
        self.results = []
        self.chunked = False
    
    def __iter__(self):
        return self

    def next(self):
        """
        return current result

        This is the primary function to be called to retrieve the current
        result.  It complies with the iterator protocol by raising
        StopIteration when the stage is complete.  It also raises an exception
        if it is called before the stage is yielded.
        """
        if self.results:
            if self.chunked:
                ret = self.results
                self.results = []
                return ret
            else:
                return self.results.pop(0)
        if self.stop:
            raise StopIteration()

        if self.failure:
            self.stop = True

            cr = self.failure.check(*self._trap)

            if cr:
                return cr

            self.failure.raiseException()

        raise NotReadyError("Must 'yield' this object before calling next()")

    def _yield(self):
        """
        executed during a yield statement by previous stage

        This method is private within the scope of the flow module, it is used
        by one stage in the flow to ask a subsequent stage to produce its
        value.  The result of the yield is then stored in self.result and is an
        instance of Failure if a problem occurred.
        """
        raise NotImplementedError