Mercurial > hg
view contrib/python-zstandard/tests/test_buffer_util.py @ 50005:16b78c0de506
locking: take the `wlock` for the full `hg remove` duration
Otherwise, there is a race condition window between the time we resolve the
file to remove with the matcher and the time we lock the repo and modify the
dirstate.
For example, the working copy might have been updated away, or purged, and the
matched files would no longer be correct.
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Tue, 13 Dec 2022 04:22:46 +0100 |
parents | 5e84a96d865b |
children |
line wrap: on
line source
import struct import unittest import zstandard as zstd from .common import TestCase ss = struct.Struct("=QQ") class TestBufferWithSegments(TestCase): def test_arguments(self): if not hasattr(zstd, "BufferWithSegments"): self.skipTest("BufferWithSegments not available") with self.assertRaises(TypeError): zstd.BufferWithSegments() with self.assertRaises(TypeError): zstd.BufferWithSegments(b"foo") # Segments data should be a multiple of 16. with self.assertRaisesRegex( ValueError, "segments array size is not a multiple of 16" ): zstd.BufferWithSegments(b"foo", b"\x00\x00") def test_invalid_offset(self): if not hasattr(zstd, "BufferWithSegments"): self.skipTest("BufferWithSegments not available") with self.assertRaisesRegex( ValueError, "offset within segments array references memory" ): zstd.BufferWithSegments(b"foo", ss.pack(0, 4)) def test_invalid_getitem(self): if not hasattr(zstd, "BufferWithSegments"): self.skipTest("BufferWithSegments not available") b = zstd.BufferWithSegments(b"foo", ss.pack(0, 3)) with self.assertRaisesRegex(IndexError, "offset must be non-negative"): test = b[-10] with self.assertRaisesRegex(IndexError, "offset must be less than 1"): test = b[1] with self.assertRaisesRegex(IndexError, "offset must be less than 1"): test = b[2] def test_single(self): if not hasattr(zstd, "BufferWithSegments"): self.skipTest("BufferWithSegments not available") b = zstd.BufferWithSegments(b"foo", ss.pack(0, 3)) self.assertEqual(len(b), 1) self.assertEqual(b.size, 3) self.assertEqual(b.tobytes(), b"foo") self.assertEqual(len(b[0]), 3) self.assertEqual(b[0].offset, 0) self.assertEqual(b[0].tobytes(), b"foo") def test_multiple(self): if not hasattr(zstd, "BufferWithSegments"): self.skipTest("BufferWithSegments not available") b = zstd.BufferWithSegments( b"foofooxfooxy", b"".join([ss.pack(0, 3), ss.pack(3, 4), ss.pack(7, 5)]), ) self.assertEqual(len(b), 3) self.assertEqual(b.size, 12) self.assertEqual(b.tobytes(), b"foofooxfooxy") self.assertEqual(b[0].tobytes(), b"foo") self.assertEqual(b[1].tobytes(), b"foox") self.assertEqual(b[2].tobytes(), b"fooxy") class TestBufferWithSegmentsCollection(TestCase): def test_empty_constructor(self): if not hasattr(zstd, "BufferWithSegmentsCollection"): self.skipTest("BufferWithSegmentsCollection not available") with self.assertRaisesRegex( ValueError, "must pass at least 1 argument" ): zstd.BufferWithSegmentsCollection() def test_argument_validation(self): if not hasattr(zstd, "BufferWithSegmentsCollection"): self.skipTest("BufferWithSegmentsCollection not available") with self.assertRaisesRegex( TypeError, "arguments must be BufferWithSegments" ): zstd.BufferWithSegmentsCollection(None) with self.assertRaisesRegex( TypeError, "arguments must be BufferWithSegments" ): zstd.BufferWithSegmentsCollection( zstd.BufferWithSegments(b"foo", ss.pack(0, 3)), None ) with self.assertRaisesRegex( ValueError, "ZstdBufferWithSegments cannot be empty" ): zstd.BufferWithSegmentsCollection(zstd.BufferWithSegments(b"", b"")) def test_length(self): if not hasattr(zstd, "BufferWithSegmentsCollection"): self.skipTest("BufferWithSegmentsCollection not available") b1 = zstd.BufferWithSegments(b"foo", ss.pack(0, 3)) b2 = zstd.BufferWithSegments( b"barbaz", b"".join([ss.pack(0, 3), ss.pack(3, 3)]) ) c = zstd.BufferWithSegmentsCollection(b1) self.assertEqual(len(c), 1) self.assertEqual(c.size(), 3) c = zstd.BufferWithSegmentsCollection(b2) self.assertEqual(len(c), 2) self.assertEqual(c.size(), 6) c = zstd.BufferWithSegmentsCollection(b1, b2) self.assertEqual(len(c), 3) self.assertEqual(c.size(), 9) def test_getitem(self): if not hasattr(zstd, "BufferWithSegmentsCollection"): self.skipTest("BufferWithSegmentsCollection not available") b1 = zstd.BufferWithSegments(b"foo", ss.pack(0, 3)) b2 = zstd.BufferWithSegments( b"barbaz", b"".join([ss.pack(0, 3), ss.pack(3, 3)]) ) c = zstd.BufferWithSegmentsCollection(b1, b2) with self.assertRaisesRegex(IndexError, "offset must be less than 3"): c[3] with self.assertRaisesRegex(IndexError, "offset must be less than 3"): c[4] self.assertEqual(c[0].tobytes(), b"foo") self.assertEqual(c[1].tobytes(), b"bar") self.assertEqual(c[2].tobytes(), b"baz")