Protocol switch from using generators to stream-like objects.
This allows the the pull side to precisely control how much data is
read so that another encapsulation layer is not needed.
An http client gets a response with a finite size. Because ssh clients
need to keep the stream open, we must not read more data than is sent
in a response. But due to the streaming nature of the changegroup
scheme, only the piece that's parsing the data knows how far it's
allowed to read.
This means the generator scheme isn't fine-grained enough. Instead we
need file-like objects with a read(x) method. This switches everything
for push/pull over to using file-like objects rather than generators.
--- a/mercurial/commands.py Wed Jul 06 22:14:10 2005 -0800
+++ b/mercurial/commands.py Wed Jul 06 22:20:12 2005 -0800
@@ -867,18 +867,21 @@
"""export the repository via HTTP"""
if opts["stdio"]:
+ fin, fout = sys.stdin, sys.stdout
+ sys.stdout = sys.stderr
+
def getarg():
- argline = sys.stdin.readline()[:-1]
+ argline = fin.readline()[:-1]
arg, l = argline.split()
- val = sys.stdin.read(int(l))
+ val = fin.read(int(l))
return arg, val
def respond(v):
- sys.stdout.write("%d\n" % len(v))
- sys.stdout.write(v)
- sys.stdout.flush()
+ fout.write("%d\n" % len(v))
+ fout.write(v)
+ fout.flush()
while 1:
- cmd = sys.stdin.readline()[:-1]
+ cmd = fin.readline()[:-1]
if cmd == '':
return
if cmd == "heads":
@@ -903,24 +906,13 @@
arg, roots = getarg()
nodes = map(hg.bin, roots.split(" "))
- b = []
- t = 0
- for chunk in repo.changegroup(nodes):
- t += len(chunk)
- b.append(chunk)
- if t > 4096:
- sys.stdout.write(struct.pack(">l", t))
- for c in b:
- sys.stdout.write(c)
- t = 0
- b = []
+ cg = repo.changegroup(nodes)
+ while 1:
+ d = cg.read(4096)
+ if not d: break
+ fout.write(d)
- sys.stdout.write(struct.pack(">l", t))
- for c in b:
- sys.stdout.write(c)
-
- sys.stdout.write(struct.pack(">l", -1))
- sys.stdout.flush()
+ out.flush()
def openlog(opt, default):
if opts[opt] and opts[opt] != '-': return open(opts[opt], 'w')
--- a/mercurial/hg.py Wed Jul 06 22:14:10 2005 -0800
+++ b/mercurial/hg.py Wed Jul 06 22:20:12 2005 -0800
@@ -1025,35 +1025,6 @@
return remote.addchangegroup(cg)
def changegroup(self, basenodes):
- nodes = self.newer(basenodes)
-
- # construct the link map
- linkmap = {}
- for n in nodes:
- linkmap[self.changelog.rev(n)] = n
-
- # construct a list of all changed files
- changed = {}
- for n in nodes:
- c = self.changelog.read(n)
- for f in c[3]:
- changed[f] = 1
- changed = changed.keys()
- changed.sort()
-
- # the changegroup is changesets + manifests + all file revs
- revs = [ self.changelog.rev(n) for n in nodes ]
-
- for y in self.changelog.group(linkmap): yield y
- for y in self.manifest.group(linkmap): yield y
- for f in changed:
- yield struct.pack(">l", len(f) + 4) + f
- g = self.file(f).group(linkmap)
- for y in g:
- yield y
-
- def addchangegroup(self, generator):
-
class genread:
def __init__(self, generator):
self.g = generator
@@ -1067,6 +1038,40 @@
d, self.buf = self.buf[:l], self.buf[l:]
return d
+ def gengroup():
+ nodes = self.newer(basenodes)
+
+ # construct the link map
+ linkmap = {}
+ for n in nodes:
+ linkmap[self.changelog.rev(n)] = n
+
+ # construct a list of all changed files
+ changed = {}
+ for n in nodes:
+ c = self.changelog.read(n)
+ for f in c[3]:
+ changed[f] = 1
+ changed = changed.keys()
+ changed.sort()
+
+ # the changegroup is changesets + manifests + all file revs
+ revs = [ self.changelog.rev(n) for n in nodes ]
+
+ for y in self.changelog.group(linkmap): yield y
+ for y in self.manifest.group(linkmap): yield y
+ for f in changed:
+ yield struct.pack(">l", len(f) + 4) + f
+ g = self.file(f).group(linkmap)
+ for y in g:
+ yield y
+
+ yield struct.pack(">l", 0)
+
+ return genread(gengroup())
+
+ def addchangegroup(self, source):
+
def getchunk():
d = source.read(4)
if not d: return ""
@@ -1087,10 +1092,9 @@
def revmap(x):
return self.changelog.rev(x)
- if not generator: return
+ if not source: return
changesets = files = revisions = 0
- source = genread(generator)
tr = self.transaction()
# pull off the changeset group
@@ -1592,17 +1596,27 @@
def changegroup(self, nodes):
n = " ".join(map(hex, nodes))
- zd = zlib.decompressobj()
f = self.do_cmd("changegroup", roots=n)
bytes = 0
- while 1:
- d = f.read(4096)
- bytes += len(d)
- if not d:
- yield zd.flush()
- break
- yield zd.decompress(d)
- self.ui.note("%d bytes of data transfered\n" % bytes)
+
+ class zread:
+ def __init__(self, f):
+ self.zd = zlib.decompressobj()
+ self.f = f
+ self.buf = ""
+ def read(self, l):
+ while l > len(self.buf):
+ r = f.read(4096)
+ if r:
+ self.buf += self.zd.decompress(r)
+ else:
+ self.buf += self.zd.flush()
+ break
+ d, self.buf = self.buf[:l], self.buf[l:]
+ return d
+
+ return zread(f)
+
class sshrepository:
def __init__(self, ui, path):
@@ -1680,14 +1694,7 @@
def changegroup(self, nodes):
n = " ".join(map(hex, nodes))
f = self.do_cmd("changegroup", roots=n)
- bytes = 0
- while 1:
- l = struct.unpack(">l", f.read(4))[0]
- if l == -1: break
- d = f.read(l)
- bytes += len(d)
- yield d
- self.ui.note("%d bytes of data transfered\n" % bytes)
+ return self.pipei
def repository(ui, path=None, create=0):
if path:
--- a/mercurial/hgweb.py Wed Jul 06 22:14:10 2005 -0800
+++ b/mercurial/hgweb.py Wed Jul 06 22:20:12 2005 -0800
@@ -687,7 +687,10 @@
nodes = map(bin, args['roots'][0].split(" "))
z = zlib.compressobj()
- for chunk in self.repo.changegroup(nodes):
+ f = self.repo.changegroup(nodes)
+ while 1:
+ chunk = f.read(4096)
+ if not chunk: break
sys.stdout.write(z.compress(chunk))
sys.stdout.write(z.flush())