|
1 from __future__ import absolute_import |
|
2 |
|
3 import collections |
|
4 import errno |
|
5 import hashlib |
|
6 import mmap |
|
7 import os |
|
8 import struct |
|
9 import time |
|
10 |
|
11 from mercurial.i18n import _ |
|
12 from mercurial import ( |
|
13 policy, |
|
14 pycompat, |
|
15 util, |
|
16 vfs as vfsmod, |
|
17 ) |
|
18 from . import shallowutil |
|
19 |
|
20 osutil = policy.importmod(r'osutil') |
|
21 |
|
22 # The pack version supported by this implementation. This will need to be |
|
23 # rev'd whenever the byte format changes. Ex: changing the fanout prefix, |
|
24 # changing any of the int sizes, changing the delta algorithm, etc. |
|
25 PACKVERSIONSIZE = 1 |
|
26 INDEXVERSIONSIZE = 2 |
|
27 |
|
28 FANOUTSTART = INDEXVERSIONSIZE |
|
29 |
|
30 # Constant that indicates a fanout table entry hasn't been filled in. (This does |
|
31 # not get serialized) |
|
32 EMPTYFANOUT = -1 |
|
33 |
|
34 # The fanout prefix is the number of bytes that can be addressed by the fanout |
|
35 # table. Example: a fanout prefix of 1 means we use the first byte of a hash to |
|
36 # look in the fanout table (which will be 2^8 entries long). |
|
37 SMALLFANOUTPREFIX = 1 |
|
38 LARGEFANOUTPREFIX = 2 |
|
39 |
|
40 # The number of entries in the index at which point we switch to a large fanout. |
|
41 # It is chosen to balance the linear scan through a sparse fanout, with the |
|
42 # size of the bisect in actual index. |
|
43 # 2^16 / 8 was chosen because it trades off (1 step fanout scan + 5 step |
|
44 # bisect) with (8 step fanout scan + 1 step bisect) |
|
45 # 5 step bisect = log(2^16 / 8 / 255) # fanout |
|
46 # 10 step fanout scan = 2^16 / (2^16 / 8) # fanout space divided by entries |
|
47 SMALLFANOUTCUTOFF = 2**16 / 8 |
|
48 |
|
49 # The amount of time to wait between checking for new packs. This prevents an |
|
50 # exception when data is moved to a new pack after the process has already |
|
51 # loaded the pack list. |
|
52 REFRESHRATE = 0.1 |
|
53 |
|
54 if pycompat.isposix: |
|
55 # With glibc 2.7+ the 'e' flag uses O_CLOEXEC when opening. |
|
56 # The 'e' flag will be ignored on older versions of glibc. |
|
57 PACKOPENMODE = 'rbe' |
|
58 else: |
|
59 PACKOPENMODE = 'rb' |
|
60 |
|
61 class _cachebackedpacks(object): |
|
62 def __init__(self, packs, cachesize): |
|
63 self._packs = set(packs) |
|
64 self._lrucache = util.lrucachedict(cachesize) |
|
65 self._lastpack = None |
|
66 |
|
67 # Avoid cold start of the cache by populating the most recent packs |
|
68 # in the cache. |
|
69 for i in reversed(range(min(cachesize, len(packs)))): |
|
70 self._movetofront(packs[i]) |
|
71 |
|
72 def _movetofront(self, pack): |
|
73 # This effectively makes pack the first entry in the cache. |
|
74 self._lrucache[pack] = True |
|
75 |
|
76 def _registerlastpackusage(self): |
|
77 if self._lastpack is not None: |
|
78 self._movetofront(self._lastpack) |
|
79 self._lastpack = None |
|
80 |
|
81 def add(self, pack): |
|
82 self._registerlastpackusage() |
|
83 |
|
84 # This method will mostly be called when packs are not in cache. |
|
85 # Therefore, adding pack to the cache. |
|
86 self._movetofront(pack) |
|
87 self._packs.add(pack) |
|
88 |
|
89 def __iter__(self): |
|
90 self._registerlastpackusage() |
|
91 |
|
92 # Cache iteration is based on LRU. |
|
93 for pack in self._lrucache: |
|
94 self._lastpack = pack |
|
95 yield pack |
|
96 |
|
97 cachedpacks = set(pack for pack in self._lrucache) |
|
98 # Yield for paths not in the cache. |
|
99 for pack in self._packs - cachedpacks: |
|
100 self._lastpack = pack |
|
101 yield pack |
|
102 |
|
103 # Data not found in any pack. |
|
104 self._lastpack = None |
|
105 |
|
106 class basepackstore(object): |
|
107 # Default cache size limit for the pack files. |
|
108 DEFAULTCACHESIZE = 100 |
|
109 |
|
110 def __init__(self, ui, path): |
|
111 self.ui = ui |
|
112 self.path = path |
|
113 |
|
114 # lastrefesh is 0 so we'll immediately check for new packs on the first |
|
115 # failure. |
|
116 self.lastrefresh = 0 |
|
117 |
|
118 packs = [] |
|
119 for filepath, __, __ in self._getavailablepackfilessorted(): |
|
120 try: |
|
121 pack = self.getpack(filepath) |
|
122 except Exception as ex: |
|
123 # An exception may be thrown if the pack file is corrupted |
|
124 # somehow. Log a warning but keep going in this case, just |
|
125 # skipping this pack file. |
|
126 # |
|
127 # If this is an ENOENT error then don't even bother logging. |
|
128 # Someone could have removed the file since we retrieved the |
|
129 # list of paths. |
|
130 if getattr(ex, 'errno', None) != errno.ENOENT: |
|
131 ui.warn(_('unable to load pack %s: %s\n') % (filepath, ex)) |
|
132 continue |
|
133 packs.append(pack) |
|
134 |
|
135 self.packs = _cachebackedpacks(packs, self.DEFAULTCACHESIZE) |
|
136 |
|
137 def _getavailablepackfiles(self): |
|
138 """For each pack file (a index/data file combo), yields: |
|
139 (full path without extension, mtime, size) |
|
140 |
|
141 mtime will be the mtime of the index/data file (whichever is newer) |
|
142 size is the combined size of index/data file |
|
143 """ |
|
144 indexsuffixlen = len(self.INDEXSUFFIX) |
|
145 packsuffixlen = len(self.PACKSUFFIX) |
|
146 |
|
147 ids = set() |
|
148 sizes = collections.defaultdict(lambda: 0) |
|
149 mtimes = collections.defaultdict(lambda: []) |
|
150 try: |
|
151 for filename, type, stat in osutil.listdir(self.path, stat=True): |
|
152 id = None |
|
153 if filename[-indexsuffixlen:] == self.INDEXSUFFIX: |
|
154 id = filename[:-indexsuffixlen] |
|
155 elif filename[-packsuffixlen:] == self.PACKSUFFIX: |
|
156 id = filename[:-packsuffixlen] |
|
157 |
|
158 # Since we expect to have two files corresponding to each ID |
|
159 # (the index file and the pack file), we can yield once we see |
|
160 # it twice. |
|
161 if id: |
|
162 sizes[id] += stat.st_size # Sum both files' sizes together |
|
163 mtimes[id].append(stat.st_mtime) |
|
164 if id in ids: |
|
165 yield (os.path.join(self.path, id), max(mtimes[id]), |
|
166 sizes[id]) |
|
167 else: |
|
168 ids.add(id) |
|
169 except OSError as ex: |
|
170 if ex.errno != errno.ENOENT: |
|
171 raise |
|
172 |
|
173 def _getavailablepackfilessorted(self): |
|
174 """Like `_getavailablepackfiles`, but also sorts the files by mtime, |
|
175 yielding newest files first. |
|
176 |
|
177 This is desirable, since it is more likely newer packfiles have more |
|
178 desirable data. |
|
179 """ |
|
180 files = [] |
|
181 for path, mtime, size in self._getavailablepackfiles(): |
|
182 files.append((mtime, size, path)) |
|
183 files = sorted(files, reverse=True) |
|
184 for mtime, size, path in files: |
|
185 yield path, mtime, size |
|
186 |
|
187 def gettotalsizeandcount(self): |
|
188 """Returns the total disk size (in bytes) of all the pack files in |
|
189 this store, and the count of pack files. |
|
190 |
|
191 (This might be smaller than the total size of the ``self.path`` |
|
192 directory, since this only considers fuly-writen pack files, and not |
|
193 temporary files or other detritus on the directory.) |
|
194 """ |
|
195 totalsize = 0 |
|
196 count = 0 |
|
197 for __, __, size in self._getavailablepackfiles(): |
|
198 totalsize += size |
|
199 count += 1 |
|
200 return totalsize, count |
|
201 |
|
202 def getmetrics(self): |
|
203 """Returns metrics on the state of this store.""" |
|
204 size, count = self.gettotalsizeandcount() |
|
205 return { |
|
206 'numpacks': count, |
|
207 'totalpacksize': size, |
|
208 } |
|
209 |
|
210 def getpack(self, path): |
|
211 raise NotImplementedError() |
|
212 |
|
213 def getmissing(self, keys): |
|
214 missing = keys |
|
215 for pack in self.packs: |
|
216 missing = pack.getmissing(missing) |
|
217 |
|
218 # Ensures better performance of the cache by keeping the most |
|
219 # recently accessed pack at the beginning in subsequent iterations. |
|
220 if not missing: |
|
221 return missing |
|
222 |
|
223 if missing: |
|
224 for pack in self.refresh(): |
|
225 missing = pack.getmissing(missing) |
|
226 |
|
227 return missing |
|
228 |
|
229 def markledger(self, ledger, options=None): |
|
230 for pack in self.packs: |
|
231 pack.markledger(ledger) |
|
232 |
|
233 def markforrefresh(self): |
|
234 """Tells the store that there may be new pack files, so the next time it |
|
235 has a lookup miss it should check for new files.""" |
|
236 self.lastrefresh = 0 |
|
237 |
|
238 def refresh(self): |
|
239 """Checks for any new packs on disk, adds them to the main pack list, |
|
240 and returns a list of just the new packs.""" |
|
241 now = time.time() |
|
242 |
|
243 # If we experience a lot of misses (like in the case of getmissing() on |
|
244 # new objects), let's only actually check disk for new stuff every once |
|
245 # in a while. Generally this code path should only ever matter when a |
|
246 # repack is going on in the background, and that should be pretty rare |
|
247 # to have that happen twice in quick succession. |
|
248 newpacks = [] |
|
249 if now > self.lastrefresh + REFRESHRATE: |
|
250 self.lastrefresh = now |
|
251 previous = set(p.path for p in self.packs) |
|
252 for filepath, __, __ in self._getavailablepackfilessorted(): |
|
253 if filepath not in previous: |
|
254 newpack = self.getpack(filepath) |
|
255 newpacks.append(newpack) |
|
256 self.packs.add(newpack) |
|
257 |
|
258 return newpacks |
|
259 |
|
260 class versionmixin(object): |
|
261 # Mix-in for classes with multiple supported versions |
|
262 VERSION = None |
|
263 SUPPORTED_VERSIONS = [0] |
|
264 |
|
265 def _checkversion(self, version): |
|
266 if version in self.SUPPORTED_VERSIONS: |
|
267 if self.VERSION is None: |
|
268 # only affect this instance |
|
269 self.VERSION = version |
|
270 elif self.VERSION != version: |
|
271 raise RuntimeError('inconsistent version: %s' % version) |
|
272 else: |
|
273 raise RuntimeError('unsupported version: %s' % version) |
|
274 |
|
275 class basepack(versionmixin): |
|
276 # The maximum amount we should read via mmap before remmaping so the old |
|
277 # pages can be released (100MB) |
|
278 MAXPAGEDIN = 100 * 1024**2 |
|
279 |
|
280 SUPPORTED_VERSIONS = [0] |
|
281 |
|
282 def __init__(self, path): |
|
283 self.path = path |
|
284 self.packpath = path + self.PACKSUFFIX |
|
285 self.indexpath = path + self.INDEXSUFFIX |
|
286 |
|
287 self.indexsize = os.stat(self.indexpath).st_size |
|
288 self.datasize = os.stat(self.packpath).st_size |
|
289 |
|
290 self._index = None |
|
291 self._data = None |
|
292 self.freememory() # initialize the mmap |
|
293 |
|
294 version = struct.unpack('!B', self._data[:PACKVERSIONSIZE])[0] |
|
295 self._checkversion(version) |
|
296 |
|
297 version, config = struct.unpack('!BB', self._index[:INDEXVERSIONSIZE]) |
|
298 self._checkversion(version) |
|
299 |
|
300 if 0b10000000 & config: |
|
301 self.params = indexparams(LARGEFANOUTPREFIX, version) |
|
302 else: |
|
303 self.params = indexparams(SMALLFANOUTPREFIX, version) |
|
304 |
|
305 @util.propertycache |
|
306 def _fanouttable(self): |
|
307 params = self.params |
|
308 rawfanout = self._index[FANOUTSTART:FANOUTSTART + params.fanoutsize] |
|
309 fanouttable = [] |
|
310 for i in pycompat.xrange(0, params.fanoutcount): |
|
311 loc = i * 4 |
|
312 fanoutentry = struct.unpack('!I', rawfanout[loc:loc + 4])[0] |
|
313 fanouttable.append(fanoutentry) |
|
314 return fanouttable |
|
315 |
|
316 @util.propertycache |
|
317 def _indexend(self): |
|
318 if self.VERSION == 0: |
|
319 return self.indexsize |
|
320 else: |
|
321 nodecount = struct.unpack_from('!Q', self._index, |
|
322 self.params.indexstart - 8)[0] |
|
323 return self.params.indexstart + nodecount * self.INDEXENTRYLENGTH |
|
324 |
|
325 def freememory(self): |
|
326 """Unmap and remap the memory to free it up after known expensive |
|
327 operations. Return True if self._data and self._index were reloaded. |
|
328 """ |
|
329 if self._index: |
|
330 if self._pagedin < self.MAXPAGEDIN: |
|
331 return False |
|
332 |
|
333 self._index.close() |
|
334 self._data.close() |
|
335 |
|
336 # TODO: use an opener/vfs to access these paths |
|
337 with open(self.indexpath, PACKOPENMODE) as indexfp: |
|
338 # memory-map the file, size 0 means whole file |
|
339 self._index = mmap.mmap(indexfp.fileno(), 0, |
|
340 access=mmap.ACCESS_READ) |
|
341 with open(self.packpath, PACKOPENMODE) as datafp: |
|
342 self._data = mmap.mmap(datafp.fileno(), 0, access=mmap.ACCESS_READ) |
|
343 |
|
344 self._pagedin = 0 |
|
345 return True |
|
346 |
|
347 def getmissing(self, keys): |
|
348 raise NotImplementedError() |
|
349 |
|
350 def markledger(self, ledger, options=None): |
|
351 raise NotImplementedError() |
|
352 |
|
353 def cleanup(self, ledger): |
|
354 raise NotImplementedError() |
|
355 |
|
356 def __iter__(self): |
|
357 raise NotImplementedError() |
|
358 |
|
359 def iterentries(self): |
|
360 raise NotImplementedError() |
|
361 |
|
362 class mutablebasepack(versionmixin): |
|
363 |
|
364 def __init__(self, ui, packdir, version=0): |
|
365 self._checkversion(version) |
|
366 |
|
367 opener = vfsmod.vfs(packdir) |
|
368 opener.createmode = 0o444 |
|
369 self.opener = opener |
|
370 |
|
371 self.entries = {} |
|
372 |
|
373 shallowutil.mkstickygroupdir(ui, packdir) |
|
374 self.packfp, self.packpath = opener.mkstemp( |
|
375 suffix=self.PACKSUFFIX + '-tmp') |
|
376 self.idxfp, self.idxpath = opener.mkstemp( |
|
377 suffix=self.INDEXSUFFIX + '-tmp') |
|
378 self.packfp = os.fdopen(self.packfp, 'w+') |
|
379 self.idxfp = os.fdopen(self.idxfp, 'w+') |
|
380 self.sha = hashlib.sha1() |
|
381 self._closed = False |
|
382 |
|
383 # The opener provides no way of doing permission fixup on files created |
|
384 # via mkstemp, so we must fix it ourselves. We can probably fix this |
|
385 # upstream in vfs.mkstemp so we don't need to use the private method. |
|
386 opener._fixfilemode(opener.join(self.packpath)) |
|
387 opener._fixfilemode(opener.join(self.idxpath)) |
|
388 |
|
389 # Write header |
|
390 # TODO: make it extensible (ex: allow specifying compression algorithm, |
|
391 # a flexible key/value header, delta algorithm, fanout size, etc) |
|
392 versionbuf = struct.pack('!B', self.VERSION) # unsigned 1 byte int |
|
393 self.writeraw(versionbuf) |
|
394 |
|
395 def __enter__(self): |
|
396 return self |
|
397 |
|
398 def __exit__(self, exc_type, exc_value, traceback): |
|
399 if exc_type is None: |
|
400 self.close() |
|
401 else: |
|
402 self.abort() |
|
403 |
|
404 def abort(self): |
|
405 # Unclean exit |
|
406 self._cleantemppacks() |
|
407 |
|
408 def writeraw(self, data): |
|
409 self.packfp.write(data) |
|
410 self.sha.update(data) |
|
411 |
|
412 def close(self, ledger=None): |
|
413 if self._closed: |
|
414 return |
|
415 |
|
416 try: |
|
417 sha = self.sha.hexdigest() |
|
418 self.packfp.close() |
|
419 self.writeindex() |
|
420 |
|
421 if len(self.entries) == 0: |
|
422 # Empty pack |
|
423 self._cleantemppacks() |
|
424 self._closed = True |
|
425 return None |
|
426 |
|
427 self.opener.rename(self.packpath, sha + self.PACKSUFFIX) |
|
428 try: |
|
429 self.opener.rename(self.idxpath, sha + self.INDEXSUFFIX) |
|
430 except Exception as ex: |
|
431 try: |
|
432 self.opener.unlink(sha + self.PACKSUFFIX) |
|
433 except Exception: |
|
434 pass |
|
435 # Throw exception 'ex' explicitly since a normal 'raise' would |
|
436 # potentially throw an exception from the unlink cleanup. |
|
437 raise ex |
|
438 except Exception: |
|
439 # Clean up temp packs in all exception cases |
|
440 self._cleantemppacks() |
|
441 raise |
|
442 |
|
443 self._closed = True |
|
444 result = self.opener.join(sha) |
|
445 if ledger: |
|
446 ledger.addcreated(result) |
|
447 return result |
|
448 |
|
449 def _cleantemppacks(self): |
|
450 try: |
|
451 self.opener.unlink(self.packpath) |
|
452 except Exception: |
|
453 pass |
|
454 try: |
|
455 self.opener.unlink(self.idxpath) |
|
456 except Exception: |
|
457 pass |
|
458 |
|
459 def writeindex(self): |
|
460 rawindex = '' |
|
461 |
|
462 largefanout = len(self.entries) > SMALLFANOUTCUTOFF |
|
463 if largefanout: |
|
464 params = indexparams(LARGEFANOUTPREFIX, self.VERSION) |
|
465 else: |
|
466 params = indexparams(SMALLFANOUTPREFIX, self.VERSION) |
|
467 |
|
468 fanouttable = [EMPTYFANOUT] * params.fanoutcount |
|
469 |
|
470 # Precompute the location of each entry |
|
471 locations = {} |
|
472 count = 0 |
|
473 for node in sorted(self.entries.iterkeys()): |
|
474 location = count * self.INDEXENTRYLENGTH |
|
475 locations[node] = location |
|
476 count += 1 |
|
477 |
|
478 # Must use [0] on the unpack result since it's always a tuple. |
|
479 fanoutkey = struct.unpack(params.fanoutstruct, |
|
480 node[:params.fanoutprefix])[0] |
|
481 if fanouttable[fanoutkey] == EMPTYFANOUT: |
|
482 fanouttable[fanoutkey] = location |
|
483 |
|
484 rawfanouttable = '' |
|
485 last = 0 |
|
486 for offset in fanouttable: |
|
487 offset = offset if offset != EMPTYFANOUT else last |
|
488 last = offset |
|
489 rawfanouttable += struct.pack('!I', offset) |
|
490 |
|
491 rawentrieslength = struct.pack('!Q', len(self.entries)) |
|
492 |
|
493 # The index offset is the it's location in the file. So after the 2 byte |
|
494 # header and the fanouttable. |
|
495 rawindex = self.createindex(locations, 2 + len(rawfanouttable)) |
|
496 |
|
497 self._writeheader(params) |
|
498 self.idxfp.write(rawfanouttable) |
|
499 if self.VERSION == 1: |
|
500 self.idxfp.write(rawentrieslength) |
|
501 self.idxfp.write(rawindex) |
|
502 self.idxfp.close() |
|
503 |
|
504 def createindex(self, nodelocations): |
|
505 raise NotImplementedError() |
|
506 |
|
507 def _writeheader(self, indexparams): |
|
508 # Index header |
|
509 # <version: 1 byte> |
|
510 # <large fanout: 1 bit> # 1 means 2^16, 0 means 2^8 |
|
511 # <unused: 7 bit> # future use (compression, delta format, etc) |
|
512 config = 0 |
|
513 if indexparams.fanoutprefix == LARGEFANOUTPREFIX: |
|
514 config = 0b10000000 |
|
515 self.idxfp.write(struct.pack('!BB', self.VERSION, config)) |
|
516 |
|
517 class indexparams(object): |
|
518 __slots__ = ('fanoutprefix', 'fanoutstruct', 'fanoutcount', 'fanoutsize', |
|
519 'indexstart') |
|
520 |
|
521 def __init__(self, prefixsize, version): |
|
522 self.fanoutprefix = prefixsize |
|
523 |
|
524 # The struct pack format for fanout table location (i.e. the format that |
|
525 # converts the node prefix into an integer location in the fanout |
|
526 # table). |
|
527 if prefixsize == SMALLFANOUTPREFIX: |
|
528 self.fanoutstruct = '!B' |
|
529 elif prefixsize == LARGEFANOUTPREFIX: |
|
530 self.fanoutstruct = '!H' |
|
531 else: |
|
532 raise ValueError("invalid fanout prefix size: %s" % prefixsize) |
|
533 |
|
534 # The number of fanout table entries |
|
535 self.fanoutcount = 2**(prefixsize * 8) |
|
536 |
|
537 # The total bytes used by the fanout table |
|
538 self.fanoutsize = self.fanoutcount * 4 |
|
539 |
|
540 self.indexstart = FANOUTSTART + self.fanoutsize |
|
541 if version == 1: |
|
542 # Skip the index length |
|
543 self.indexstart += 8 |