1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
|
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
# See LICENSE for details.
#
# Author: Clark Evans (cce@clarkevans.com)
#
"""
flow.base
This module contains the core exceptions and base classes in the flow module.
See flow.flow for more detailed information
"""
import twisted.python.compat
from twisted.internet import reactor
import time
#
# Exceptions used within flow
#
class Unsupported(NotImplementedError):
""" Indicates that the given stage does not know what to do
with the flow instruction that was returned.
"""
def __init__(self, inst):
msg = "Unsupported flow instruction: %s " % repr(inst)
TypeError.__init__(self,msg)
class NotReadyError(RuntimeError):
""" Raised when a stage has not been subject to a yield """
pass
#
# Abstract/Base Classes
#
class Instruction:
""" Has special meaning when yielded in a flow """
pass
class Controller:
"""
Flow controller
At the base of every flow, is a controller class which interprets the
instructions, especially the CallLater instructions. This is primarly just
a marker class to denote which classes consume Instruction events. If a
controller cannot handle a particular instruction, it raises the
Unsupported exception.
"""
pass
class CallLater(Instruction):
"""
Instruction to support callbacks
This is the instruction which is returned during the yield of the _Deferred
and Callback stage. The underlying flow driver should call the 'callLater'
function with the callable to be executed after each callback.
"""
def callLater(self, callable):
pass
class Cooperate(CallLater):
"""
Requests that processing be paused so other tasks can resume
Yield this object when the current chain would block or periodically during
an intensive processing task. The flow mechanism uses these objects to
signal that the current processing chain should be paused and resumed
later. This allows other delayed operations to be processed, etc. Usage
is quite simple::
# within some generator wrapped by a Controller
yield Cooperate(1) # yield for a second or more
"""
def __init__(self, timeout = 0):
self.timeout = timeout
def callLater(self, callable):
reactor.callLater(self.timeout, callable)
class Stage(Instruction):
"""
Abstract base defining protocol for iterator/generators in a flow
This is the primary component in the flow system, it is an iterable object
which must be passed to a yield statement before each call to next().
Usage::
iterable = DerivedStage( ... , SpamError, EggsError))
yield iterable
for result in iterable:
# handle good result, or SpamError or EggsError
yield iterable
Alternatively, when inside a generator, the next() method can be used
directly. In this case, if no results are available, StopIteration is
raised, and if left uncaught, will nicely end the generator. Of course,
unexpected failures are raised. This technique is especially useful when
pulling from more than one stage at a time. For example::
def someGenerator():
iterable = SomeStage( ... , SpamError, EggsError)
while True:
yield iterable
result = iterable.next()
# handle good result or SpamError or EggsError
For many generators, the results become available in chunks of rows. While
the default value is to get one row at a time, there is a 'chunked'
property which allows them to be returned via the next() method as many
rows rather than row by row. For example::
iterable = DerivedStage(...)
iterable.chunked = True
for results in iterable:
for result in results:
# handle good result
yield iterable
For those wishing more control at the cost of a painful experience, the
following member variables can be used to great effect::
- results: This is a list of results produced by the generator, they
can be fetched one by one using next() or in a group
together. If no results were produced, then this is an
empty list. These results should be removed from the list
after they are read; or, after reading all of the results
set to an empty list
- stop: This is true if the underlying generator has finished execution
(raised a StopIteration or returned). Note that several
results may exist, and stop may be true.
- failure: If the generator produced an exception, then it is wrapped
as a Failure object and put here. Note that several results
may have been produced before the failure. To ensure that
the failure isn't accidently reported twice, it is
adviseable to set stop to True.
The order in which these member variables is used is *critical* for
proper adherance to the flow protocol. First, all successful
results should be handled. Second, the iterable should be checked
to see if it is finished. Third, a failure should be checked;
while handling a failure, either the loop should be exited, or
the iterable's stop member should be set. For example::
iterable = SomeStage(...)
while True:
yield iterable
if iterable.results:
for result in iterable.results:
# handle good result
iterable.results = []
if iterable.stop:
break
if iterable.failure:
iterable.stop = True
# handle iterable.failure
break
"""
def __init__(self, *trap):
self._trap = trap
self.stop = False
self.failure = None
self.results = []
self.chunked = False
def __iter__(self):
return self
def next(self):
"""
return current result
This is the primary function to be called to retrieve the current
result. It complies with the iterator protocol by raising
StopIteration when the stage is complete. It also raises an exception
if it is called before the stage is yielded.
"""
if self.results:
if self.chunked:
ret = self.results
self.results = []
return ret
else:
return self.results.pop(0)
if self.stop:
raise StopIteration()
if self.failure:
self.stop = True
cr = self.failure.check(*self._trap)
if cr:
return cr
self.failure.raiseException()
raise NotReadyError("Must 'yield' this object before calling next()")
def _yield(self):
"""
executed during a yield statement by previous stage
This method is private within the scope of the flow module, it is used
by one stage in the flow to ask a subsequent stage to produce its
value. The result of the yield is then stored in self.result and is an
instance of Failure if a problem occurred.
"""
raise NotImplementedError
|