71 def set(self, value): |
71 def set(self, value): |
72 if util.safehasattr(self, 'value'): |
72 if util.safehasattr(self, 'value'): |
73 raise error.RepoError("future is already set") |
73 raise error.RepoError("future is already set") |
74 self.value = value |
74 self.value = value |
75 |
75 |
76 class batcher(object): |
|
77 '''base class for batches of commands submittable in a single request |
|
78 |
|
79 All methods invoked on instances of this class are simply queued and |
|
80 return a a future for the result. Once you call submit(), all the queued |
|
81 calls are performed and the results set in their respective futures. |
|
82 ''' |
|
83 def __init__(self): |
|
84 self.calls = [] |
|
85 def __getattr__(self, name): |
|
86 def call(*args, **opts): |
|
87 resref = future() |
|
88 # Please don't invent non-ascii method names, or you will |
|
89 # give core hg a very sad time. |
|
90 self.calls.append((name.encode('ascii'), args, opts, resref,)) |
|
91 return resref |
|
92 return call |
|
93 def submit(self): |
|
94 raise NotImplementedError() |
|
95 |
|
96 class iterbatcher(batcher): |
|
97 |
|
98 def submit(self): |
|
99 raise NotImplementedError() |
|
100 |
|
101 def results(self): |
|
102 raise NotImplementedError() |
|
103 |
|
104 class remoteiterbatcher(iterbatcher): |
|
105 def __init__(self, remote): |
|
106 super(remoteiterbatcher, self).__init__() |
|
107 self._remote = remote |
|
108 |
|
109 def __getattr__(self, name): |
|
110 # Validate this method is batchable, since submit() only supports |
|
111 # batchable methods. |
|
112 fn = getattr(self._remote, name) |
|
113 if not getattr(fn, 'batchable', None): |
|
114 raise error.ProgrammingError('Attempted to batch a non-batchable ' |
|
115 'call to %r' % name) |
|
116 |
|
117 return super(remoteiterbatcher, self).__getattr__(name) |
|
118 |
|
119 def submit(self): |
|
120 """Break the batch request into many patch calls and pipeline them. |
|
121 |
|
122 This is mostly valuable over http where request sizes can be |
|
123 limited, but can be used in other places as well. |
|
124 """ |
|
125 # 2-tuple of (command, arguments) that represents what will be |
|
126 # sent over the wire. |
|
127 requests = [] |
|
128 |
|
129 # 4-tuple of (command, final future, @batchable generator, remote |
|
130 # future). |
|
131 results = [] |
|
132 |
|
133 for command, args, opts, finalfuture in self.calls: |
|
134 mtd = getattr(self._remote, command) |
|
135 batchable = mtd.batchable(mtd.__self__, *args, **opts) |
|
136 |
|
137 commandargs, fremote = next(batchable) |
|
138 assert fremote |
|
139 requests.append((command, commandargs)) |
|
140 results.append((command, finalfuture, batchable, fremote)) |
|
141 |
|
142 if requests: |
|
143 self._resultiter = self._remote._submitbatch(requests) |
|
144 |
|
145 self._results = results |
|
146 |
|
147 def results(self): |
|
148 for command, finalfuture, batchable, remotefuture in self._results: |
|
149 # Get the raw result, set it in the remote future, feed it |
|
150 # back into the @batchable generator so it can be decoded, and |
|
151 # set the result on the final future to this value. |
|
152 remoteresult = next(self._resultiter) |
|
153 remotefuture.set(remoteresult) |
|
154 finalfuture.set(next(batchable)) |
|
155 |
|
156 # Verify our @batchable generators only emit 2 values. |
|
157 try: |
|
158 next(batchable) |
|
159 except StopIteration: |
|
160 pass |
|
161 else: |
|
162 raise error.ProgrammingError('%s @batchable generator emitted ' |
|
163 'unexpected value count' % command) |
|
164 |
|
165 yield finalfuture.value |
|
166 |
|
167 def encodebatchcmds(req): |
76 def encodebatchcmds(req): |
168 """Return a ``cmds`` argument value for the ``batch`` command.""" |
77 """Return a ``cmds`` argument value for the ``batch`` command.""" |
169 escapearg = wireprototypes.escapebatcharg |
78 escapearg = wireprototypes.escapebatcharg |
170 |
79 |
171 cmds = [] |
80 cmds = [] |