comparison mercurial/wireproto.py @ 25912:cbbdd085c991

batching: migrate basic noop batching into peer.peer "Real" batching only makes sense for wirepeers, but it greatly simplifies the clients of peer instances if they can be ignorant to actual batching capabilities of that peer. By moving the not-really-batched batching code into peer.peer, all peer instances now work with the batching API, thus simplifying users. This leaves a couple of name forwards in wirepeer.py. Originally I had planned to clean those up, but it kind of unclarifies other bits of code that want to use batching, so I think it makes sense for the names to stay exposed by wireproto. Specifically, almost nothing is currently aware of peer (see largefiles.proto for an example), so making them be aware of the peer module *and* the wireproto module seems like some abstraction leakage. I *think* the right long-term fix would actually be to make wireproto an implementation detail that clients wouldn't need to know about, but I don't really know what that would entail at the moment. As far as I'm aware, no clients of batching in third-party extensions will need updating, which is nice icing.
author Augie Fackler <augie@google.com>
date Wed, 05 Aug 2015 14:51:34 -0400
parents 26579a91f4fb
children fa14ba7b9667
comparison
equal deleted inserted replaced
25910:b4a85ddadcb9 25912:cbbdd085c991
56 """return 4096 chunks from a changegroup object 56 """return 4096 chunks from a changegroup object
57 57
58 Some protocols may have compressed the contents.""" 58 Some protocols may have compressed the contents."""
59 raise NotImplementedError() 59 raise NotImplementedError()
60 60
61 # abstract batching support 61 class remotebatch(peer.batcher):
62
63 class future(object):
64 '''placeholder for a value to be set later'''
65 def set(self, value):
66 if util.safehasattr(self, 'value'):
67 raise error.RepoError("future is already set")
68 self.value = value
69
70 class batcher(object):
71 '''base class for batches of commands submittable in a single request
72
73 All methods invoked on instances of this class are simply queued and
74 return a a future for the result. Once you call submit(), all the queued
75 calls are performed and the results set in their respective futures.
76 '''
77 def __init__(self):
78 self.calls = []
79 def __getattr__(self, name):
80 def call(*args, **opts):
81 resref = future()
82 self.calls.append((name, args, opts, resref,))
83 return resref
84 return call
85 def submit(self):
86 pass
87
88 class localbatch(batcher):
89 '''performs the queued calls directly'''
90 def __init__(self, local):
91 batcher.__init__(self)
92 self.local = local
93 def submit(self):
94 for name, args, opts, resref in self.calls:
95 resref.set(getattr(self.local, name)(*args, **opts))
96
97 class remotebatch(batcher):
98 '''batches the queued calls; uses as few roundtrips as possible''' 62 '''batches the queued calls; uses as few roundtrips as possible'''
99 def __init__(self, remote): 63 def __init__(self, remote):
100 '''remote must support _submitbatch(encbatch) and 64 '''remote must support _submitbatch(encbatch) and
101 _submitone(op, encargs)''' 65 _submitone(op, encargs)'''
102 batcher.__init__(self) 66 peer.batcher.__init__(self)
103 self.remote = remote 67 self.remote = remote
104 def submit(self): 68 def submit(self):
105 req, rsp = [], [] 69 req, rsp = [], []
106 for name, args, opts, resref in self.calls: 70 for name, args, opts, resref in self.calls:
107 mtd = getattr(self.remote, name) 71 mtd = getattr(self.remote, name)
126 for encres, r in zip(encresults, rsp): 90 for encres, r in zip(encresults, rsp):
127 batchable, encresref, resref = r 91 batchable, encresref, resref = r
128 encresref.set(encres) 92 encresref.set(encres)
129 resref.set(batchable.next()) 93 resref.set(batchable.next())
130 94
131 def batchable(f): 95 # Forward a couple of names from peer to make wireproto interactions
132 '''annotation for batchable methods 96 # slightly more sensible.
133 97 batchable = peer.batchable
134 Such methods must implement a coroutine as follows: 98 future = peer.future
135
136 @batchable
137 def sample(self, one, two=None):
138 # Handle locally computable results first:
139 if not one:
140 yield "a local result", None
141 # Build list of encoded arguments suitable for your wire protocol:
142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 # Create future for injection of encoded result:
144 encresref = future()
145 # Return encoded arguments and future:
146 yield encargs, encresref
147 # Assuming the future to be filled with the result from the batched
148 # request now. Decode it:
149 yield decode(encresref.value)
150
151 The decorator returns a function which wraps this coroutine as a plain
152 method, but adds the original method as an attribute called "batchable",
153 which is used by remotebatch to split the call into separate encoding and
154 decoding phases.
155 '''
156 def plain(*args, **opts):
157 batchable = f(*args, **opts)
158 encargsorres, encresref = batchable.next()
159 if not encresref:
160 return encargsorres # a local result in this case
161 self = args[0]
162 encresref.set(self._submitone(f.func_name, encargsorres))
163 return batchable.next()
164 setattr(plain, 'batchable', f)
165 return plain
166 99
167 # list of nodes encoding / decoding 100 # list of nodes encoding / decoding
168 101
169 def decodelist(l, sep=' '): 102 def decodelist(l, sep=' '):
170 if l: 103 if l: