comparison mercurial/exchange.py @ 45807:74271829ddc0

clonebundles: move a bundle of clone bundle related code to a new module In the process on general clone bundle automatically, we need to make some function available more widely. This is a good opportunity to extract a significant amount of code from `mercurial.exchange` into a new `mercurial.bundlecaches`. This make `mercurial.exchange` move under the 3K line range (hooray…). The module is called `bundlecaches` because I expect it to be eventually useful for more than just clone bundle (like pull bunbles). Differential Revision: https://phab.mercurial-scm.org/D9208
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Thu, 15 Oct 2020 15:57:36 +0200
parents a736ab681b78
children ac362d5a7893
comparison
equal deleted inserted replaced
45806:88a47cbf063c 45807:74271829ddc0
14 from .node import ( 14 from .node import (
15 hex, 15 hex,
16 nullid, 16 nullid,
17 nullrev, 17 nullrev,
18 ) 18 )
19 from .thirdparty import attr
20 from . import ( 19 from . import (
21 bookmarks as bookmod, 20 bookmarks as bookmod,
22 bundle2, 21 bundle2,
22 bundlecaches,
23 changegroup, 23 changegroup,
24 discovery, 24 discovery,
25 error, 25 error,
26 exchangev2, 26 exchangev2,
27 lock as lockmod, 27 lock as lockmod,
32 phases, 32 phases,
33 pushkey, 33 pushkey,
34 pycompat, 34 pycompat,
35 requirements, 35 requirements,
36 scmutil, 36 scmutil,
37 sslutil,
38 streamclone, 37 streamclone,
39 url as urlmod, 38 url as urlmod,
40 util, 39 util,
41 wireprototypes, 40 wireprototypes,
42 ) 41 )
47 46
48 urlerr = util.urlerr 47 urlerr = util.urlerr
49 urlreq = util.urlreq 48 urlreq = util.urlreq
50 49
51 _NARROWACL_SECTION = b'narrowacl' 50 _NARROWACL_SECTION = b'narrowacl'
52
53 # Maps bundle version human names to changegroup versions.
54 _bundlespeccgversions = {
55 b'v1': b'01',
56 b'v2': b'02',
57 b'packed1': b's1',
58 b'bundle2': b'02', # legacy
59 }
60
61 # Maps bundle version with content opts to choose which part to bundle
62 _bundlespeccontentopts = {
63 b'v1': {
64 b'changegroup': True,
65 b'cg.version': b'01',
66 b'obsolescence': False,
67 b'phases': False,
68 b'tagsfnodescache': False,
69 b'revbranchcache': False,
70 },
71 b'v2': {
72 b'changegroup': True,
73 b'cg.version': b'02',
74 b'obsolescence': False,
75 b'phases': False,
76 b'tagsfnodescache': True,
77 b'revbranchcache': True,
78 },
79 b'packed1': {b'cg.version': b's1'},
80 }
81 _bundlespeccontentopts[b'bundle2'] = _bundlespeccontentopts[b'v2']
82
83 _bundlespecvariants = {
84 b"streamv2": {
85 b"changegroup": False,
86 b"streamv2": True,
87 b"tagsfnodescache": False,
88 b"revbranchcache": False,
89 }
90 }
91
92 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
93 _bundlespecv1compengines = {b'gzip', b'bzip2', b'none'}
94
95
96 @attr.s
97 class bundlespec(object):
98 compression = attr.ib()
99 wirecompression = attr.ib()
100 version = attr.ib()
101 wireversion = attr.ib()
102 params = attr.ib()
103 contentopts = attr.ib()
104
105
106 def parsebundlespec(repo, spec, strict=True):
107 """Parse a bundle string specification into parts.
108
109 Bundle specifications denote a well-defined bundle/exchange format.
110 The content of a given specification should not change over time in
111 order to ensure that bundles produced by a newer version of Mercurial are
112 readable from an older version.
113
114 The string currently has the form:
115
116 <compression>-<type>[;<parameter0>[;<parameter1>]]
117
118 Where <compression> is one of the supported compression formats
119 and <type> is (currently) a version string. A ";" can follow the type and
120 all text afterwards is interpreted as URI encoded, ";" delimited key=value
121 pairs.
122
123 If ``strict`` is True (the default) <compression> is required. Otherwise,
124 it is optional.
125
126 Returns a bundlespec object of (compression, version, parameters).
127 Compression will be ``None`` if not in strict mode and a compression isn't
128 defined.
129
130 An ``InvalidBundleSpecification`` is raised when the specification is
131 not syntactically well formed.
132
133 An ``UnsupportedBundleSpecification`` is raised when the compression or
134 bundle type/version is not recognized.
135
136 Note: this function will likely eventually return a more complex data
137 structure, including bundle2 part information.
138 """
139
140 def parseparams(s):
141 if b';' not in s:
142 return s, {}
143
144 params = {}
145 version, paramstr = s.split(b';', 1)
146
147 for p in paramstr.split(b';'):
148 if b'=' not in p:
149 raise error.InvalidBundleSpecification(
150 _(
151 b'invalid bundle specification: '
152 b'missing "=" in parameter: %s'
153 )
154 % p
155 )
156
157 key, value = p.split(b'=', 1)
158 key = urlreq.unquote(key)
159 value = urlreq.unquote(value)
160 params[key] = value
161
162 return version, params
163
164 if strict and b'-' not in spec:
165 raise error.InvalidBundleSpecification(
166 _(
167 b'invalid bundle specification; '
168 b'must be prefixed with compression: %s'
169 )
170 % spec
171 )
172
173 if b'-' in spec:
174 compression, version = spec.split(b'-', 1)
175
176 if compression not in util.compengines.supportedbundlenames:
177 raise error.UnsupportedBundleSpecification(
178 _(b'%s compression is not supported') % compression
179 )
180
181 version, params = parseparams(version)
182
183 if version not in _bundlespeccgversions:
184 raise error.UnsupportedBundleSpecification(
185 _(b'%s is not a recognized bundle version') % version
186 )
187 else:
188 # Value could be just the compression or just the version, in which
189 # case some defaults are assumed (but only when not in strict mode).
190 assert not strict
191
192 spec, params = parseparams(spec)
193
194 if spec in util.compengines.supportedbundlenames:
195 compression = spec
196 version = b'v1'
197 # Generaldelta repos require v2.
198 if b'generaldelta' in repo.requirements:
199 version = b'v2'
200 # Modern compression engines require v2.
201 if compression not in _bundlespecv1compengines:
202 version = b'v2'
203 elif spec in _bundlespeccgversions:
204 if spec == b'packed1':
205 compression = b'none'
206 else:
207 compression = b'bzip2'
208 version = spec
209 else:
210 raise error.UnsupportedBundleSpecification(
211 _(b'%s is not a recognized bundle specification') % spec
212 )
213
214 # Bundle version 1 only supports a known set of compression engines.
215 if version == b'v1' and compression not in _bundlespecv1compengines:
216 raise error.UnsupportedBundleSpecification(
217 _(b'compression engine %s is not supported on v1 bundles')
218 % compression
219 )
220
221 # The specification for packed1 can optionally declare the data formats
222 # required to apply it. If we see this metadata, compare against what the
223 # repo supports and error if the bundle isn't compatible.
224 if version == b'packed1' and b'requirements' in params:
225 requirements = set(params[b'requirements'].split(b','))
226 missingreqs = requirements - repo.supportedformats
227 if missingreqs:
228 raise error.UnsupportedBundleSpecification(
229 _(b'missing support for repository features: %s')
230 % b', '.join(sorted(missingreqs))
231 )
232
233 # Compute contentopts based on the version
234 contentopts = _bundlespeccontentopts.get(version, {}).copy()
235
236 # Process the variants
237 if b"stream" in params and params[b"stream"] == b"v2":
238 variant = _bundlespecvariants[b"streamv2"]
239 contentopts.update(variant)
240
241 engine = util.compengines.forbundlename(compression)
242 compression, wirecompression = engine.bundletype()
243 wireversion = _bundlespeccgversions[version]
244
245 return bundlespec(
246 compression, wirecompression, version, wireversion, params, contentopts
247 )
248 51
249 52
250 def readbundle(ui, fh, fname, vfs=None): 53 def readbundle(ui, fh, fname, vfs=None):
251 header = changegroup.readexactly(fh, 4) 54 header = changegroup.readexactly(fh, 4)
252 55
2865 2668
2866 # If we call the wire protocol command, that's good enough to record the 2669 # If we call the wire protocol command, that's good enough to record the
2867 # attempt. 2670 # attempt.
2868 pullop.clonebundleattempted = True 2671 pullop.clonebundleattempted = True
2869 2672
2870 entries = parseclonebundlesmanifest(repo, res) 2673 entries = bundlecaches.parseclonebundlesmanifest(repo, res)
2871 if not entries: 2674 if not entries:
2872 repo.ui.note( 2675 repo.ui.note(
2873 _( 2676 _(
2874 b'no clone bundles available on remote; ' 2677 b'no clone bundles available on remote; '
2875 b'falling back to regular clone\n' 2678 b'falling back to regular clone\n'
2876 ) 2679 )
2877 ) 2680 )
2878 return 2681 return
2879 2682
2880 entries = filterclonebundleentries( 2683 entries = bundlecaches.filterclonebundleentries(
2881 repo, entries, streamclonerequested=pullop.streamclonerequested 2684 repo, entries, streamclonerequested=pullop.streamclonerequested
2882 ) 2685 )
2883 2686
2884 if not entries: 2687 if not entries:
2885 # There is a thundering herd concern here. However, if a server 2688 # There is a thundering herd concern here. However, if a server
2896 repo.ui.warn( 2699 repo.ui.warn(
2897 _(b'(you may want to report this to the server operator)\n') 2700 _(b'(you may want to report this to the server operator)\n')
2898 ) 2701 )
2899 return 2702 return
2900 2703
2901 entries = sortclonebundleentries(repo.ui, entries) 2704 entries = bundlecaches.sortclonebundleentries(repo.ui, entries)
2902 2705
2903 url = entries[0][b'URL'] 2706 url = entries[0][b'URL']
2904 repo.ui.status(_(b'applying clone bundle from %s\n') % url) 2707 repo.ui.status(_(b'applying clone bundle from %s\n') % url)
2905 if trypullbundlefromurl(repo.ui, repo, url): 2708 if trypullbundlefromurl(repo.ui, repo, url):
2906 repo.ui.status(_(b'finished applying clone bundle\n')) 2709 repo.ui.status(_(b'finished applying clone bundle\n'))
2921 b'"--config ui.clonebundles=false"' 2724 b'"--config ui.clonebundles=false"'
2922 ), 2725 ),
2923 ) 2726 )
2924 2727
2925 2728
2926 def parseclonebundlesmanifest(repo, s):
2927 """Parses the raw text of a clone bundles manifest.
2928
2929 Returns a list of dicts. The dicts have a ``URL`` key corresponding
2930 to the URL and other keys are the attributes for the entry.
2931 """
2932 m = []
2933 for line in s.splitlines():
2934 fields = line.split()
2935 if not fields:
2936 continue
2937 attrs = {b'URL': fields[0]}
2938 for rawattr in fields[1:]:
2939 key, value = rawattr.split(b'=', 1)
2940 key = urlreq.unquote(key)
2941 value = urlreq.unquote(value)
2942 attrs[key] = value
2943
2944 # Parse BUNDLESPEC into components. This makes client-side
2945 # preferences easier to specify since you can prefer a single
2946 # component of the BUNDLESPEC.
2947 if key == b'BUNDLESPEC':
2948 try:
2949 bundlespec = parsebundlespec(repo, value)
2950 attrs[b'COMPRESSION'] = bundlespec.compression
2951 attrs[b'VERSION'] = bundlespec.version
2952 except error.InvalidBundleSpecification:
2953 pass
2954 except error.UnsupportedBundleSpecification:
2955 pass
2956
2957 m.append(attrs)
2958
2959 return m
2960
2961
2962 def isstreamclonespec(bundlespec):
2963 # Stream clone v1
2964 if bundlespec.wirecompression == b'UN' and bundlespec.wireversion == b's1':
2965 return True
2966
2967 # Stream clone v2
2968 if (
2969 bundlespec.wirecompression == b'UN'
2970 and bundlespec.wireversion == b'02'
2971 and bundlespec.contentopts.get(b'streamv2')
2972 ):
2973 return True
2974
2975 return False
2976
2977
2978 def filterclonebundleentries(repo, entries, streamclonerequested=False):
2979 """Remove incompatible clone bundle manifest entries.
2980
2981 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
2982 and returns a new list consisting of only the entries that this client
2983 should be able to apply.
2984
2985 There is no guarantee we'll be able to apply all returned entries because
2986 the metadata we use to filter on may be missing or wrong.
2987 """
2988 newentries = []
2989 for entry in entries:
2990 spec = entry.get(b'BUNDLESPEC')
2991 if spec:
2992 try:
2993 bundlespec = parsebundlespec(repo, spec, strict=True)
2994
2995 # If a stream clone was requested, filter out non-streamclone
2996 # entries.
2997 if streamclonerequested and not isstreamclonespec(bundlespec):
2998 repo.ui.debug(
2999 b'filtering %s because not a stream clone\n'
3000 % entry[b'URL']
3001 )
3002 continue
3003
3004 except error.InvalidBundleSpecification as e:
3005 repo.ui.debug(stringutil.forcebytestr(e) + b'\n')
3006 continue
3007 except error.UnsupportedBundleSpecification as e:
3008 repo.ui.debug(
3009 b'filtering %s because unsupported bundle '
3010 b'spec: %s\n' % (entry[b'URL'], stringutil.forcebytestr(e))
3011 )
3012 continue
3013 # If we don't have a spec and requested a stream clone, we don't know
3014 # what the entry is so don't attempt to apply it.
3015 elif streamclonerequested:
3016 repo.ui.debug(
3017 b'filtering %s because cannot determine if a stream '
3018 b'clone bundle\n' % entry[b'URL']
3019 )
3020 continue
3021
3022 if b'REQUIRESNI' in entry and not sslutil.hassni:
3023 repo.ui.debug(
3024 b'filtering %s because SNI not supported\n' % entry[b'URL']
3025 )
3026 continue
3027
3028 if b'REQUIREDRAM' in entry:
3029 try:
3030 requiredram = util.sizetoint(entry[b'REQUIREDRAM'])
3031 except error.ParseError:
3032 repo.ui.debug(
3033 b'filtering %s due to a bad REQUIREDRAM attribute\n'
3034 % entry[b'URL']
3035 )
3036 continue
3037 actualram = repo.ui.estimatememory()
3038 if actualram is not None and actualram * 0.66 < requiredram:
3039 repo.ui.debug(
3040 b'filtering %s as it needs more than 2/3 of system memory\n'
3041 % entry[b'URL']
3042 )
3043 continue
3044
3045 newentries.append(entry)
3046
3047 return newentries
3048
3049
3050 class clonebundleentry(object):
3051 """Represents an item in a clone bundles manifest.
3052
3053 This rich class is needed to support sorting since sorted() in Python 3
3054 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
3055 won't work.
3056 """
3057
3058 def __init__(self, value, prefers):
3059 self.value = value
3060 self.prefers = prefers
3061
3062 def _cmp(self, other):
3063 for prefkey, prefvalue in self.prefers:
3064 avalue = self.value.get(prefkey)
3065 bvalue = other.value.get(prefkey)
3066
3067 # Special case for b missing attribute and a matches exactly.
3068 if avalue is not None and bvalue is None and avalue == prefvalue:
3069 return -1
3070
3071 # Special case for a missing attribute and b matches exactly.
3072 if bvalue is not None and avalue is None and bvalue == prefvalue:
3073 return 1
3074
3075 # We can't compare unless attribute present on both.
3076 if avalue is None or bvalue is None:
3077 continue
3078
3079 # Same values should fall back to next attribute.
3080 if avalue == bvalue:
3081 continue
3082
3083 # Exact matches come first.
3084 if avalue == prefvalue:
3085 return -1
3086 if bvalue == prefvalue:
3087 return 1
3088
3089 # Fall back to next attribute.
3090 continue
3091
3092 # If we got here we couldn't sort by attributes and prefers. Fall
3093 # back to index order.
3094 return 0
3095
3096 def __lt__(self, other):
3097 return self._cmp(other) < 0
3098
3099 def __gt__(self, other):
3100 return self._cmp(other) > 0
3101
3102 def __eq__(self, other):
3103 return self._cmp(other) == 0
3104
3105 def __le__(self, other):
3106 return self._cmp(other) <= 0
3107
3108 def __ge__(self, other):
3109 return self._cmp(other) >= 0
3110
3111 def __ne__(self, other):
3112 return self._cmp(other) != 0
3113
3114
3115 def sortclonebundleentries(ui, entries):
3116 prefers = ui.configlist(b'ui', b'clonebundleprefers')
3117 if not prefers:
3118 return list(entries)
3119
3120 def _split(p):
3121 if b'=' not in p:
3122 hint = _(b"each comma separated item should be key=value pairs")
3123 raise error.Abort(
3124 _(b"invalid ui.clonebundleprefers item: %s") % p, hint=hint
3125 )
3126 return p.split(b'=', 1)
3127
3128 prefers = [_split(p) for p in prefers]
3129
3130 items = sorted(clonebundleentry(v, prefers) for v in entries)
3131 return [i.value for i in items]
3132
3133
3134 def trypullbundlefromurl(ui, repo, url): 2729 def trypullbundlefromurl(ui, repo, url):
3135 """Attempt to apply a bundle from a URL.""" 2730 """Attempt to apply a bundle from a URL."""
3136 with repo.lock(), repo.transaction(b'bundleurl') as tr: 2731 with repo.lock(), repo.transaction(b'bundleurl') as tr:
3137 try: 2732 try:
3138 fh = urlmod.open(ui, url) 2733 fh = urlmod.open(ui, url)