mercurial/sshserver.py
author Henrik Stuart <henrik.stuart@edlund.dk>
Sat, 23 May 2009 17:02:49 +0200
changeset 8562 e3495c399006
parent 8312 b87a50b7125c
child 9198 061eeb602354
permissions -rw-r--r--
named branches: server branchmap wire protocol support (issue736) The repository command, 'branchmap', returns a dictionary, branchname -> [branchheads], and will be implemented for localrepo, httprepo and sshrepo. The following wire format is used for returning data: branchname1 branch1head2 branch1head2 ... branchname2 ... ... Branch names are URL encoded to escape white space, and branch heads are sent as hex encoded node ids. All branches and all their heads are sent. The background and motivation for this command is the desire for a richer named branch semantics when pushing changesets. The details are explained in the original proposal which is included below. 1. BACKGROUND The algorithm currently implemented in Mercurial only considers the graph theoretical heads when determining whether new heads are created, rather than using the branch heads as a count (the algorithm considers a branch head effectively closed when it is merged into another branch or a new named branch is started from that point onward). Our particular problem with the algorithm is that we'd like to see the following case working without forcing a push: Upsteam has: (0:dev) ---- (1:dev) \ `--- (2:stable) Someone merges stable into dev: (0:dev) ---- (1:dev) ------(3:dev) \ / `--- (2:stable) --------´ This can be pushed without --force (as it should). Now someone else does some coding on stable (a bug fix, say): (0:dev) ---- (1:dev) ------(3:dev) \ / `--- (2:stable) ---------´---------(4:stable) This time we need --force to push. We allow this to be pushed without using --force by getting all the remote branch heads (by extending the wire protocol with a new function). We would, furthermore, also prefer if it is impossible to push a new branch without --force (or a later --newbranch option so --force isn't shoe-horned into too many disparate functions, if need be), except of course in the case where the remote repository is empty. This is what our patches accomplish. 2. ALTERNATIVES We have, of course, considered some alternatives to reconstructing enough information to decide whether we are creating new remote branch heads, before we added the new wire protocol command. 2.1. LOOKUP ON REMOTE The main alternative is to use the information from remote.heads() and remote.lookup() to try to reconstruct enough graph information to decide whether we are creating new heads. This is not adequate as illustrated below. Remember that each lookup is typically a request-response pair over SSH or HTTP(S). If we have a simple repository at the remote end like this: (0:dev) ---- (1:dev) ---- (3:stable) \ `--- (2:dev) then remote.heads() will yield [2, 3]. Assume we have nodes [0, 1, 2] locally and want to create a new node, 4:dev, as a descendant from (1:dev), which should be OK as 1:dev is a branch head. If we do remote.lookup('dev') we will get [2]. Thus, we can get information about whether a branch exists on the remote server or not, but this does not solve our problem of figuring out whether we are creating new heads or not. Pushing 4:dev ought to be OK, since after the push, we still only have two heads on branch a. Using remote.lookup() and remote.heads() is thus not adequate to consistently decide whether we are creating new remote heads (e.g. in this situation the latter would never return 1:dev). 2.2. USING INCOMING TO RECONSTRUCT THE GRAPH An alternative would be to use information equivalent to hg incoming to get the full remote graph in addition to the local graph. To do this, we would have to get a changegroup(subset) bundle representing the remote end (which may be a substantial amount of data), getting the branch heads from an instantiated bundlerepository, deleting the bundle, and finally, we can compute the prepush logic. While this is backwards compatible, it will cause a possibly substantial slowdown of the push command as it first needs to pull in all changes. 3. FURTHER ARGUMENTS IN FAVOUR OF THE BRANCHMAP WIRE-PROTOCOL EXTENSION Currently, the commands incoming and pull, work based on the tip of a given branch if used with "-r branchname", making it hard to get all revisions of a certain branch only (if it has multiple heads). This can be solved by requesting the remote's branchheads and letting the revisions to be used with the command be these heads. This can be done by extending the commands with a new option, e.g.: hg pull -b branchname which will be turned into the equivalent of: hg pull -r branchhead1 -r branchhead2 -r branchhead3 We have a simple follow-up patch that can do this ready as well (although not submitted yet as it is pending the acceptance of the branch patch). 4. WRAP-UP We generally find that the branchmap wire protocol extension can provide better named branch support to Mercurial. Currently, some things, like the initial push scenario in this mail, are fairly counter-intuitive, and the more often you have to force push, the more it is likely you will get a lot of spurious and unnecessary merge nodes. Also, restricting incoming and pull to all changes on a branch rather than changes on the tip-most head would be a sensible extension to making named branches a first class citizen in Mercurial. Currently, named branches sometimes feel like a late-coming unwanted step-child. We have run it in a production environment for a while, with fewer multiple heads occurring in our repositories and fewer confused users as a result. Also, it fixes the long-standing issue 736. Co-contributor: Sune Foldager <cryo@cyanite.org>

# sshserver.py - ssh protocol server support for mercurial
#
# Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
# Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2, incorporated herein by reference.

from i18n import _
from node import bin, hex
import streamclone, util, hook
import os, sys, tempfile, urllib

class sshserver(object):
    def __init__(self, ui, repo):
        self.ui = ui
        self.repo = repo
        self.lock = None
        self.fin = sys.stdin
        self.fout = sys.stdout

        hook.redirect(True)
        sys.stdout = sys.stderr

        # Prevent insertion/deletion of CRs
        util.set_binary(self.fin)
        util.set_binary(self.fout)

    def getarg(self):
        argline = self.fin.readline()[:-1]
        arg, l = argline.split()
        val = self.fin.read(int(l))
        return arg, val

    def respond(self, v):
        self.fout.write("%d\n" % len(v))
        self.fout.write(v)
        self.fout.flush()

    def serve_forever(self):
        try:
            while self.serve_one(): pass
        finally:
            if self.lock is not None:
                self.lock.release()
        sys.exit(0)

    def serve_one(self):
        cmd = self.fin.readline()[:-1]
        if cmd:
            impl = getattr(self, 'do_' + cmd, None)
            if impl: impl()
            else: self.respond("")
        return cmd != ''

    def do_lookup(self):
        arg, key = self.getarg()
        assert arg == 'key'
        try:
            r = hex(self.repo.lookup(key))
            success = 1
        except Exception,inst:
            r = str(inst)
            success = 0
        self.respond("%s %s\n" % (success, r))

    def do_branchmap(self):
        branchmap = self.repo.branchmap()
        heads = []
        for branch, nodes in branchmap.iteritems():
            branchname = urllib.quote(branch)
            branchnodes = [hex(node) for node in nodes]
            heads.append('%s %s' % (branchname, ' '.join(branchnodes)))
        self.respond('\n'.join(heads))

    def do_heads(self):
        h = self.repo.heads()
        self.respond(" ".join(map(hex, h)) + "\n")

    def do_hello(self):
        '''the hello command returns a set of lines describing various
        interesting things about the server, in an RFC822-like format.
        Currently the only one defined is "capabilities", which
        consists of a line in the form:

        capabilities: space separated list of tokens
        '''

        caps = ['unbundle', 'lookup', 'changegroupsubset', 'branchmap']
        if self.ui.configbool('server', 'uncompressed'):
            caps.append('stream=%d' % self.repo.changelog.version)
        self.respond("capabilities: %s\n" % (' '.join(caps),))

    def do_lock(self):
        '''DEPRECATED - allowing remote client to lock repo is not safe'''

        self.lock = self.repo.lock()
        self.respond("")

    def do_unlock(self):
        '''DEPRECATED'''

        if self.lock:
            self.lock.release()
        self.lock = None
        self.respond("")

    def do_branches(self):
        arg, nodes = self.getarg()
        nodes = map(bin, nodes.split(" "))
        r = []
        for b in self.repo.branches(nodes):
            r.append(" ".join(map(hex, b)) + "\n")
        self.respond("".join(r))

    def do_between(self):
        arg, pairs = self.getarg()
        pairs = [map(bin, p.split("-")) for p in pairs.split(" ")]
        r = []
        for b in self.repo.between(pairs):
            r.append(" ".join(map(hex, b)) + "\n")
        self.respond("".join(r))

    def do_changegroup(self):
        nodes = []
        arg, roots = self.getarg()
        nodes = map(bin, roots.split(" "))

        cg = self.repo.changegroup(nodes, 'serve')
        while True:
            d = cg.read(4096)
            if not d:
                break
            self.fout.write(d)

        self.fout.flush()

    def do_changegroupsubset(self):
        argmap = dict([self.getarg(), self.getarg()])
        bases = [bin(n) for n in argmap['bases'].split(' ')]
        heads = [bin(n) for n in argmap['heads'].split(' ')]

        cg = self.repo.changegroupsubset(bases, heads, 'serve')
        while True:
            d = cg.read(4096)
            if not d:
                break
            self.fout.write(d)

        self.fout.flush()

    def do_addchangegroup(self):
        '''DEPRECATED'''

        if not self.lock:
            self.respond("not locked")
            return

        self.respond("")
        r = self.repo.addchangegroup(self.fin, 'serve', self.client_url())
        self.respond(str(r))

    def client_url(self):
        client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
        return 'remote:ssh:' + client

    def do_unbundle(self):
        their_heads = self.getarg()[1].split()

        def check_heads():
            heads = map(hex, self.repo.heads())
            return their_heads == [hex('force')] or their_heads == heads

        # fail early if possible
        if not check_heads():
            self.respond(_('unsynced changes'))
            return

        self.respond('')

        # write bundle data to temporary file because it can be big
        tempname = fp = None
        try:
            fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
            fp = os.fdopen(fd, 'wb+')

            count = int(self.fin.readline())
            while count:
                fp.write(self.fin.read(count))
                count = int(self.fin.readline())

            was_locked = self.lock is not None
            if not was_locked:
                self.lock = self.repo.lock()
            try:
                if not check_heads():
                    # someone else committed/pushed/unbundled while we
                    # were transferring data
                    self.respond(_('unsynced changes'))
                    return
                self.respond('')

                # push can proceed

                fp.seek(0)
                r = self.repo.addchangegroup(fp, 'serve', self.client_url())
                self.respond(str(r))
            finally:
                if not was_locked:
                    self.lock.release()
                    self.lock = None
        finally:
            if fp is not None:
                fp.close()
            if tempname is not None:
                os.unlink(tempname)

    def do_stream_out(self):
        try:
            for chunk in streamclone.stream_out(self.repo):
                self.fout.write(chunk)
            self.fout.flush()
        except streamclone.StreamException, inst:
            self.fout.write(str(inst))
            self.fout.flush()