localrepo: extract stream clone application into reusable function
The existing stream_in method assumes a streaming clone is applied via
the wire protocol. Previous patches have enabled streaming clone data to
be produced and consumed outside the context of the wire protocol.
However, the consuming part was incomplete because it didn't deal with
things like updating the branch caches or writing out a requirements
file.
This patch finishes the separation of stream clone handling from the
wire protocol. After this patch, it is possible to consume stream clones
from arbitrary sources, including files. Mozilla plans to leverage this
to serve pre-generated stream clone files to consumers, drastically
reducing the wall and CPU time required to clone large repositories.
This will enable clones to be nearly as fast as `tar`.
--- a/mercurial/localrepo.py Thu May 21 10:27:45 2015 -0700
+++ b/mercurial/localrepo.py Thu May 21 10:41:06 2015 -0700
@@ -1755,28 +1755,40 @@
return util.hooks()
def stream_in(self, remote, remotereqs):
+ # Save remote branchmap. We will use it later
+ # to speed up branchcache creation
+ rbranchmap = None
+ if remote.capable("branchmap"):
+ rbranchmap = remote.branchmap()
+
+ fp = remote.stream_out()
+ l = fp.readline()
+ try:
+ resp = int(l)
+ except ValueError:
+ raise error.ResponseError(
+ _('unexpected response from remote server:'), l)
+ if resp == 1:
+ raise util.Abort(_('operation forbidden by server'))
+ elif resp == 2:
+ raise util.Abort(_('locking the remote repository failed'))
+ elif resp != 0:
+ raise util.Abort(_('the server sent an unknown error code'))
+
+ self.applystreamclone(remotereqs, rbranchmap, fp)
+ return len(self.heads()) + 1
+
+ def applystreamclone(self, remotereqs, remotebranchmap, fp):
+ """Apply stream clone data to this repository.
+
+ "remotereqs" is a set of requirements to handle the incoming data.
+ "remotebranchmap" is the result of a branchmap lookup on the remote. It
+ can be None.
+ "fp" is a file object containing the raw stream data, suitable for
+ feeding into exchange.consumestreamclone.
+ """
lock = self.lock()
try:
- # Save remote branchmap. We will use it later
- # to speed up branchcache creation
- rbranchmap = None
- if remote.capable("branchmap"):
- rbranchmap = remote.branchmap()
-
- fp = remote.stream_out()
- l = fp.readline()
- try:
- resp = int(l)
- except ValueError:
- raise error.ResponseError(
- _('unexpected response from remote server:'), l)
- if resp == 1:
- raise util.Abort(_('operation forbidden by server'))
- elif resp == 2:
- raise util.Abort(_('locking the remote repository failed'))
- elif resp != 0:
- raise util.Abort(_('the server sent an unknown error code'))
-
exchange.consumestreamclone(self, fp)
# new requirements = old non-format requirements +
@@ -1787,10 +1799,10 @@
self._applyopenerreqs()
self._writerequirements()
- if rbranchmap:
+ if remotebranchmap:
rbheads = []
closed = []
- for bheads in rbranchmap.itervalues():
+ for bheads in remotebranchmap.itervalues():
rbheads.extend(bheads)
for h in bheads:
r = self.changelog.rev(h)
@@ -1801,7 +1813,7 @@
if rbheads:
rtiprev = max((int(self.changelog.rev(node))
for node in rbheads))
- cache = branchmap.branchcache(rbranchmap,
+ cache = branchmap.branchcache(remotebranchmap,
self[rtiprev].node(),
rtiprev,
closednodes=closed)
@@ -1814,7 +1826,6 @@
cache.write(rview)
break
self.invalidate()
- return len(self.heads()) + 1
finally:
lock.release()