Mercurial > hg
comparison mercurial/streamclone.py @ 26443:d947086d8973
streamclone: move code out of exchange.py
We bulk move functions from exchange.py related to streaming clones.
Function names were renamed slightly to drop a component redundant with
the module name. Docstrings and comments referencing old names and
locations were updated accordingly.
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Fri, 02 Oct 2015 16:05:52 -0700 |
parents | ef8d27f53204 |
children | 623743010133 |
comparison
equal
deleted
inserted
replaced
26442:ef8d27f53204 | 26443:d947086d8973 |
---|---|
5 # This software may be used and distributed according to the terms of the | 5 # This software may be used and distributed according to the terms of the |
6 # GNU General Public License version 2 or any later version. | 6 # GNU General Public License version 2 or any later version. |
7 | 7 |
8 from __future__ import absolute_import | 8 from __future__ import absolute_import |
9 | 9 |
10 import time | |
11 | |
10 from .i18n import _ | 12 from .i18n import _ |
11 from . import ( | 13 from . import ( |
12 branchmap, | 14 branchmap, |
13 error, | 15 error, |
14 exchange, | 16 store, |
15 util, | 17 util, |
16 ) | 18 ) |
19 | |
20 # This is it's own function so extensions can override it. | |
21 def _walkstreamfiles(repo): | |
22 return repo.store.walk() | |
23 | |
24 def generatev1(repo): | |
25 """Emit content for version 1 of a streaming clone. | |
26 | |
27 This is a generator of raw chunks that constitute a streaming clone. | |
28 | |
29 The stream begins with a line of 2 space-delimited integers containing the | |
30 number of entries and total bytes size. | |
31 | |
32 Next, are N entries for each file being transferred. Each file entry starts | |
33 as a line with the file name and integer size delimited by a null byte. | |
34 The raw file data follows. Following the raw file data is the next file | |
35 entry, or EOF. | |
36 | |
37 When used on the wire protocol, an additional line indicating protocol | |
38 success will be prepended to the stream. This function is not responsible | |
39 for adding it. | |
40 | |
41 This function will obtain a repository lock to ensure a consistent view of | |
42 the store is captured. It therefore may raise LockError. | |
43 """ | |
44 entries = [] | |
45 total_bytes = 0 | |
46 # Get consistent snapshot of repo, lock during scan. | |
47 lock = repo.lock() | |
48 try: | |
49 repo.ui.debug('scanning\n') | |
50 for name, ename, size in _walkstreamfiles(repo): | |
51 if size: | |
52 entries.append((name, size)) | |
53 total_bytes += size | |
54 finally: | |
55 lock.release() | |
56 | |
57 repo.ui.debug('%d files, %d bytes to transfer\n' % | |
58 (len(entries), total_bytes)) | |
59 yield '%d %d\n' % (len(entries), total_bytes) | |
60 | |
61 svfs = repo.svfs | |
62 oldaudit = svfs.mustaudit | |
63 debugflag = repo.ui.debugflag | |
64 svfs.mustaudit = False | |
65 | |
66 try: | |
67 for name, size in entries: | |
68 if debugflag: | |
69 repo.ui.debug('sending %s (%d bytes)\n' % (name, size)) | |
70 # partially encode name over the wire for backwards compat | |
71 yield '%s\0%d\n' % (store.encodedir(name), size) | |
72 if size <= 65536: | |
73 fp = svfs(name) | |
74 try: | |
75 data = fp.read(size) | |
76 finally: | |
77 fp.close() | |
78 yield data | |
79 else: | |
80 for chunk in util.filechunkiter(svfs(name), limit=size): | |
81 yield chunk | |
82 finally: | |
83 svfs.mustaudit = oldaudit | |
84 | |
85 def consumev1(repo, fp): | |
86 """Apply the contents from version 1 of a streaming clone file handle. | |
87 | |
88 This takes the output from "streamout" and applies it to the specified | |
89 repository. | |
90 | |
91 Like "streamout," the status line added by the wire protocol is not handled | |
92 by this function. | |
93 """ | |
94 lock = repo.lock() | |
95 try: | |
96 repo.ui.status(_('streaming all changes\n')) | |
97 l = fp.readline() | |
98 try: | |
99 total_files, total_bytes = map(int, l.split(' ', 1)) | |
100 except (ValueError, TypeError): | |
101 raise error.ResponseError( | |
102 _('unexpected response from remote server:'), l) | |
103 repo.ui.status(_('%d files to transfer, %s of data\n') % | |
104 (total_files, util.bytecount(total_bytes))) | |
105 handled_bytes = 0 | |
106 repo.ui.progress(_('clone'), 0, total=total_bytes) | |
107 start = time.time() | |
108 | |
109 tr = repo.transaction(_('clone')) | |
110 try: | |
111 for i in xrange(total_files): | |
112 # XXX doesn't support '\n' or '\r' in filenames | |
113 l = fp.readline() | |
114 try: | |
115 name, size = l.split('\0', 1) | |
116 size = int(size) | |
117 except (ValueError, TypeError): | |
118 raise error.ResponseError( | |
119 _('unexpected response from remote server:'), l) | |
120 if repo.ui.debugflag: | |
121 repo.ui.debug('adding %s (%s)\n' % | |
122 (name, util.bytecount(size))) | |
123 # for backwards compat, name was partially encoded | |
124 ofp = repo.svfs(store.decodedir(name), 'w') | |
125 for chunk in util.filechunkiter(fp, limit=size): | |
126 handled_bytes += len(chunk) | |
127 repo.ui.progress(_('clone'), handled_bytes, | |
128 total=total_bytes) | |
129 ofp.write(chunk) | |
130 ofp.close() | |
131 tr.close() | |
132 finally: | |
133 tr.release() | |
134 | |
135 # Writing straight to files circumvented the inmemory caches | |
136 repo.invalidate() | |
137 | |
138 elapsed = time.time() - start | |
139 if elapsed <= 0: | |
140 elapsed = 0.001 | |
141 repo.ui.progress(_('clone'), None) | |
142 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % | |
143 (util.bytecount(total_bytes), elapsed, | |
144 util.bytecount(total_bytes / elapsed))) | |
145 finally: | |
146 lock.release() | |
17 | 147 |
18 def streamin(repo, remote, remotereqs): | 148 def streamin(repo, remote, remotereqs): |
19 # Save remote branchmap. We will use it later | 149 # Save remote branchmap. We will use it later |
20 # to speed up branchcache creation | 150 # to speed up branchcache creation |
21 rbranchmap = None | 151 rbranchmap = None |
44 | 174 |
45 "remotereqs" is a set of requirements to handle the incoming data. | 175 "remotereqs" is a set of requirements to handle the incoming data. |
46 "remotebranchmap" is the result of a branchmap lookup on the remote. It | 176 "remotebranchmap" is the result of a branchmap lookup on the remote. It |
47 can be None. | 177 can be None. |
48 "fp" is a file object containing the raw stream data, suitable for | 178 "fp" is a file object containing the raw stream data, suitable for |
49 feeding into exchange.consumestreamclone. | 179 feeding into consumev1(). |
50 """ | 180 """ |
51 lock = repo.lock() | 181 lock = repo.lock() |
52 try: | 182 try: |
53 exchange.consumestreamclone(repo, fp) | 183 consumev1(repo, fp) |
54 | 184 |
55 # new requirements = old non-format requirements + | 185 # new requirements = old non-format requirements + |
56 # new format-related remote requirements | 186 # new format-related remote requirements |
57 # requirements from the streamed-in repository | 187 # requirements from the streamed-in repository |
58 repo.requirements = remotereqs | ( | 188 repo.requirements = remotereqs | ( |