Mercurial > hg
view contrib/packaging/hgpackaging/util.py @ 41907:9da97f49d4f4
packaging: establish hgpackaging package
Previously, contrib/packaging behaved as a root to a
package directory and we had a "packagingutil" module. As I
work more on packaging code, we'll want to have more code
shared between different packaging tools. I think it makes
sense to have a single package containing multiple modules
than multiple top-level modules.
This commit establishes an "hgpackaging" package by moving
the existing packagingutil code to it.
Differential Revision: https://phab.mercurial-scm.org/D6083
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Thu, 07 Mar 2019 10:10:04 -0800 |
parents | contrib/packaging/packagingutil.py@d7dc4ac1ff84 |
children | c2237fe1359e |
line wrap: on
line source
# util.py - Common packaging utility code. # # Copyright 2019 Gregory Szorc <gregory.szorc@gmail.com> # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. # no-check-code because Python 3 native. import gzip import hashlib import pathlib import tarfile import urllib.request import zipfile def hash_path(p: pathlib.Path): h = hashlib.sha256() with p.open('rb') as fh: while True: chunk = fh.read(65536) if not chunk: break h.update(chunk) return h.hexdigest() class IntegrityError(Exception): """Represents an integrity error when downloading a URL.""" def secure_download_stream(url, size, sha256): """Securely download a URL to a stream of chunks. If the integrity of the download fails, an IntegrityError is raised. """ h = hashlib.sha256() length = 0 with urllib.request.urlopen(url) as fh: if not url.endswith('.gz') and fh.info().get('Content-Encoding') == 'gzip': fh = gzip.GzipFile(fileobj=fh) while True: chunk = fh.read(65536) if not chunk: break h.update(chunk) length += len(chunk) yield chunk digest = h.hexdigest() if length != size: raise IntegrityError('size mismatch on %s: wanted %d; got %d' % ( url, size, length)) if digest != sha256: raise IntegrityError('sha256 mismatch on %s: wanted %s; got %s' % ( url, sha256, digest)) def download_to_path(url: str, path: pathlib.Path, size: int, sha256: str): """Download a URL to a filesystem path, possibly with verification.""" # We download to a temporary file and rename at the end so there's # no chance of the final file being partially written or containing # bad data. print('downloading %s to %s' % (url, path)) if path.exists(): good = True if path.stat().st_size != size: print('existing file size is wrong; removing') good = False if good: if hash_path(path) != sha256: print('existing file hash is wrong; removing') good = False if good: print('%s exists and passes integrity checks' % path) return path.unlink() tmp = path.with_name('%s.tmp' % path.name) try: with tmp.open('wb') as fh: for chunk in secure_download_stream(url, size, sha256): fh.write(chunk) except IntegrityError: tmp.unlink() raise tmp.rename(path) print('successfully downloaded %s' % url) def download_entry(entry: dict, dest_path: pathlib.Path, local_name=None) -> pathlib.Path: url = entry['url'] local_name = local_name or url[url.rindex('/') + 1:] local_path = dest_path / local_name download_to_path(url, local_path, entry['size'], entry['sha256']) return local_path def extract_tar_to_directory(source: pathlib.Path, dest: pathlib.Path): with tarfile.open(source, 'r') as tf: tf.extractall(dest) def extract_zip_to_directory(source: pathlib.Path, dest: pathlib.Path): with zipfile.ZipFile(source, 'r') as zf: zf.extractall(dest)