Mercurial > hg
view mercurial/sshserver.py @ 35427:02b36e860e0b
workers: implemented worker on windows
This change implements thread based worker on windows.
The handling of exception from within threads will happen in separate diff.
The worker is for now used in mercurial/merge.py and in lfs extension
After multiple tests and milions of files materiealized, thousands lfs fetched
it seems that neither merge.py nor lfs/blobstore.py is thread unsafe. I also
looked through the code and besides the backgroundfilecloser (handled in base
of this) things look good.
The performance boost of this on windows is
~50% for sparse --enable-profile
* Speedup of hg up/rebase - not exactly measured
Test Plan:
Ran 10s of hg sparse --enable-profile and --disable-profile operations on large profiles and verified that workers are running. Used sysinternals suite to see that all threads are spawned and run as they should
Run various other operations on the repo including update and rebase
Ran tests on CentOS and all tests that pass on @ pass here
Differential Revision: https://phab.mercurial-scm.org/D1458
author | Wojciech Lis <wlis@fb.com> |
---|---|
date | Mon, 20 Nov 2017 10:25:29 -0800 |
parents | 5326e4ef1dab |
children | 8cdb671dbd0b |
line wrap: on
line source
# sshserver.py - ssh protocol server support for mercurial # # Copyright 2005-2007 Matt Mackall <mpm@selenic.com> # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com> # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. from __future__ import absolute_import import sys from .i18n import _ from . import ( encoding, error, hook, util, wireproto, ) class sshserver(wireproto.abstractserverproto): def __init__(self, ui, repo): self.ui = ui self.repo = repo self.lock = None self.fin = ui.fin self.fout = ui.fout self.name = 'ssh' hook.redirect(True) ui.fout = repo.ui.fout = ui.ferr # Prevent insertion/deletion of CRs util.setbinary(self.fin) util.setbinary(self.fout) def getargs(self, args): data = {} keys = args.split() for n in xrange(len(keys)): argline = self.fin.readline()[:-1] arg, l = argline.split() if arg not in keys: raise error.Abort(_("unexpected parameter %r") % arg) if arg == '*': star = {} for k in xrange(int(l)): argline = self.fin.readline()[:-1] arg, l = argline.split() val = self.fin.read(int(l)) star[arg] = val data['*'] = star else: val = self.fin.read(int(l)) data[arg] = val return [data[k] for k in keys] def getarg(self, name): return self.getargs(name)[0] def getfile(self, fpout): self.sendresponse('') count = int(self.fin.readline()) while count: fpout.write(self.fin.read(count)) count = int(self.fin.readline()) def redirect(self): pass def sendresponse(self, v): self.fout.write("%d\n" % len(v)) self.fout.write(v) self.fout.flush() def sendstream(self, source): write = self.fout.write if source.reader: gen = iter(lambda: source.reader.read(4096), '') else: gen = source.gen for chunk in gen: write(chunk) self.fout.flush() def sendpushresponse(self, rsp): self.sendresponse('') self.sendresponse(str(rsp.res)) def sendpusherror(self, rsp): self.sendresponse(rsp.res) def sendooberror(self, rsp): self.ui.ferr.write('%s\n-\n' % rsp.message) self.ui.ferr.flush() self.fout.write('\n') self.fout.flush() def serve_forever(self): try: while self.serve_one(): pass finally: if self.lock is not None: self.lock.release() sys.exit(0) handlers = { str: sendresponse, wireproto.streamres: sendstream, wireproto.pushres: sendpushresponse, wireproto.pusherr: sendpusherror, wireproto.ooberror: sendooberror, } def serve_one(self): cmd = self.fin.readline()[:-1] if cmd and cmd in wireproto.commands: rsp = wireproto.dispatch(self.repo, self, cmd) self.handlers[rsp.__class__](self, rsp) elif cmd: impl = getattr(self, 'do_' + cmd, None) if impl: r = impl() if r is not None: self.sendresponse(r) else: self.sendresponse("") return cmd != '' def _client(self): client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0] return 'remote:ssh:' + client