changeset 47400:9b841267253c

util: add `nb_bytes` argument to `copyfile` to partially copy a file When set, this allow to copy only the first `nb_bytes` of a file. This will be useful for censor/strip operation with revlogv2. Differential Revision: https://phab.mercurial-scm.org/D10798
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Sun, 30 May 2021 18:08:52 +0200
parents 34cc102c73f5
children 1efe3cdef53a
files mercurial/util.py
diffstat 1 files changed, 15 insertions(+), 1 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/util.py	Sun May 30 16:20:36 2021 +0200
+++ b/mercurial/util.py	Sun May 30 18:08:52 2021 +0200
@@ -1909,7 +1909,9 @@
 }
 
 
-def copyfile(src, dest, hardlink=False, copystat=False, checkambig=False):
+def copyfile(
+    src, dest, hardlink=False, copystat=False, checkambig=False, nb_bytes=None
+):
     """copy a file, preserving mode and optionally other stat info like
     atime/mtime
 
@@ -1918,6 +1920,8 @@
     repo.wlock).
 
     copystat and checkambig should be exclusive.
+
+    nb_bytes: if set only copy the first `nb_bytes` of the source file.
     """
     assert not (copystat and checkambig)
     oldstat = None
@@ -1937,6 +1941,9 @@
     if hardlink:
         try:
             oslink(src, dest)
+            if nb_bytes is not None:
+                m = "the `nb_bytes` argument is incompatible with `hardlink`"
+                raise error.ProgrammingError(m)
             return
         except (IOError, OSError):
             pass  # fall back to normal copy
@@ -1944,6 +1951,9 @@
         os.symlink(os.readlink(src), dest)
         # copytime is ignored for symlinks, but in general copytime isn't needed
         # for them anyway
+        if nb_bytes is not None:
+            m = "cannot use `nb_bytes` on a symlink"
+            raise error.ProgrammingError(m)
     else:
         try:
             shutil.copyfile(src, dest)
@@ -1960,6 +1970,10 @@
                             oldstat.stat[stat.ST_MTIME] + 1
                         ) & 0x7FFFFFFF
                         os.utime(dest, (advanced, advanced))
+            # We could do something smarter using `copy_file_range` call or similar
+            if nb_bytes is not None:
+                with open(dest, mode='r+') as f:
+                    f.truncate(nb_bytes)
         except shutil.Error as inst:
             raise error.Abort(stringutil.forcebytestr(inst))