Mercurial > hg
comparison contrib/python-zstandard/c-ext/compressor.c @ 31796:e0dc40530c5a
zstd: vendor python-zstandard 0.8.0
Commit 81e1f5bbf1fc54808649562d3ed829730765c540 from
https://github.com/indygreg/python-zstandard is imported without
modifications (other than removing unwanted files).
Updates relevant to Mercurial include:
* Support for multi-threaded compression (we can use this for
bundle and wire protocol compression).
* APIs for batch compression and decompression operations using
multiple threads and optimal memory allocation mechanism. (Can
be useful for revlog perf improvements.)
* A ``BufferWithSegments`` type that models a single memory buffer
containing N discrete items of known lengths. This type can be
used for very efficient 0-copy data operations.
# no-check-commit
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Sat, 01 Apr 2017 15:24:03 -0700 |
parents | c32454d69b85 |
children | b1fb341d8a61 |
comparison
equal
deleted
inserted
replaced
31795:2b130e26c3a4 | 31796:e0dc40530c5a |
---|---|
5 * This software may be modified and distributed under the terms | 5 * This software may be modified and distributed under the terms |
6 * of the BSD license. See the LICENSE file for details. | 6 * of the BSD license. See the LICENSE file for details. |
7 */ | 7 */ |
8 | 8 |
9 #include "python-zstandard.h" | 9 #include "python-zstandard.h" |
10 #include "pool.h" | |
10 | 11 |
11 extern PyObject* ZstdError; | 12 extern PyObject* ZstdError; |
12 | 13 |
13 int populate_cdict(ZstdCompressor* compressor, void* dictData, size_t dictSize, ZSTD_parameters* zparams) { | 14 int populate_cdict(ZstdCompressor* compressor, ZSTD_parameters* zparams) { |
14 ZSTD_customMem zmem; | 15 ZSTD_customMem zmem; |
15 assert(!compressor->cdict); | 16 |
17 if (compressor->cdict || !compressor->dict || !compressor->dict->dictData) { | |
18 return 0; | |
19 } | |
20 | |
16 Py_BEGIN_ALLOW_THREADS | 21 Py_BEGIN_ALLOW_THREADS |
17 memset(&zmem, 0, sizeof(zmem)); | 22 memset(&zmem, 0, sizeof(zmem)); |
18 compressor->cdict = ZSTD_createCDict_advanced(compressor->dict->dictData, | 23 compressor->cdict = ZSTD_createCDict_advanced(compressor->dict->dictData, |
19 compressor->dict->dictSize, 1, *zparams, zmem); | 24 compressor->dict->dictSize, 1, *zparams, zmem); |
20 Py_END_ALLOW_THREADS | 25 Py_END_ALLOW_THREADS |
26 | 31 |
27 return 0; | 32 return 0; |
28 } | 33 } |
29 | 34 |
30 /** | 35 /** |
31 * Initialize a zstd CStream from a ZstdCompressor instance. | 36 * Ensure the ZSTD_CStream on a ZstdCompressor instance is initialized. |
32 * | 37 * |
33 * Returns a ZSTD_CStream on success or NULL on failure. If NULL, a Python | 38 * Returns 0 on success. Other value on failure. Will set a Python exception |
34 * exception will be set. | 39 * on failure. |
35 */ | 40 */ |
36 ZSTD_CStream* CStream_from_ZstdCompressor(ZstdCompressor* compressor, Py_ssize_t sourceSize) { | 41 int init_cstream(ZstdCompressor* compressor, unsigned long long sourceSize) { |
37 ZSTD_CStream* cstream; | |
38 ZSTD_parameters zparams; | 42 ZSTD_parameters zparams; |
39 void* dictData = NULL; | 43 void* dictData = NULL; |
40 size_t dictSize = 0; | 44 size_t dictSize = 0; |
41 size_t zresult; | 45 size_t zresult; |
42 | 46 |
43 cstream = ZSTD_createCStream(); | 47 if (compressor->cstream) { |
44 if (!cstream) { | 48 zresult = ZSTD_resetCStream(compressor->cstream, sourceSize); |
45 PyErr_SetString(ZstdError, "cannot create CStream"); | 49 if (ZSTD_isError(zresult)) { |
46 return NULL; | 50 PyErr_Format(ZstdError, "could not reset CStream: %s", |
51 ZSTD_getErrorName(zresult)); | |
52 return -1; | |
53 } | |
54 | |
55 return 0; | |
56 } | |
57 | |
58 compressor->cstream = ZSTD_createCStream(); | |
59 if (!compressor->cstream) { | |
60 PyErr_SetString(ZstdError, "could not create CStream"); | |
61 return -1; | |
47 } | 62 } |
48 | 63 |
49 if (compressor->dict) { | 64 if (compressor->dict) { |
50 dictData = compressor->dict->dictData; | 65 dictData = compressor->dict->dictData; |
51 dictSize = compressor->dict->dictSize; | 66 dictSize = compressor->dict->dictSize; |
61 zparams.cParams = ZSTD_getCParams(compressor->compressionLevel, sourceSize, dictSize); | 76 zparams.cParams = ZSTD_getCParams(compressor->compressionLevel, sourceSize, dictSize); |
62 } | 77 } |
63 | 78 |
64 zparams.fParams = compressor->fparams; | 79 zparams.fParams = compressor->fparams; |
65 | 80 |
66 zresult = ZSTD_initCStream_advanced(cstream, dictData, dictSize, zparams, sourceSize); | 81 zresult = ZSTD_initCStream_advanced(compressor->cstream, dictData, dictSize, |
82 zparams, sourceSize); | |
67 | 83 |
68 if (ZSTD_isError(zresult)) { | 84 if (ZSTD_isError(zresult)) { |
69 ZSTD_freeCStream(cstream); | 85 ZSTD_freeCStream(compressor->cstream); |
86 compressor->cstream = NULL; | |
70 PyErr_Format(ZstdError, "cannot init CStream: %s", ZSTD_getErrorName(zresult)); | 87 PyErr_Format(ZstdError, "cannot init CStream: %s", ZSTD_getErrorName(zresult)); |
71 return NULL; | 88 return -1; |
72 } | 89 } |
73 | 90 |
74 return cstream; | 91 return 0;; |
92 } | |
93 | |
94 int init_mtcstream(ZstdCompressor* compressor, Py_ssize_t sourceSize) { | |
95 size_t zresult; | |
96 void* dictData = NULL; | |
97 size_t dictSize = 0; | |
98 ZSTD_parameters zparams; | |
99 | |
100 assert(compressor->mtcctx); | |
101 | |
102 if (compressor->dict) { | |
103 dictData = compressor->dict->dictData; | |
104 dictSize = compressor->dict->dictSize; | |
105 } | |
106 | |
107 memset(&zparams, 0, sizeof(zparams)); | |
108 if (compressor->cparams) { | |
109 ztopy_compression_parameters(compressor->cparams, &zparams.cParams); | |
110 } | |
111 else { | |
112 zparams.cParams = ZSTD_getCParams(compressor->compressionLevel, sourceSize, dictSize); | |
113 } | |
114 | |
115 zparams.fParams = compressor->fparams; | |
116 | |
117 zresult = ZSTDMT_initCStream_advanced(compressor->mtcctx, dictData, dictSize, | |
118 zparams, sourceSize); | |
119 | |
120 if (ZSTD_isError(zresult)) { | |
121 PyErr_Format(ZstdError, "cannot init CStream: %s", ZSTD_getErrorName(zresult)); | |
122 return -1; | |
123 } | |
124 | |
125 return 0; | |
75 } | 126 } |
76 | 127 |
77 PyDoc_STRVAR(ZstdCompressor__doc__, | 128 PyDoc_STRVAR(ZstdCompressor__doc__, |
78 "ZstdCompressor(level=None, dict_data=None, compression_params=None)\n" | 129 "ZstdCompressor(level=None, dict_data=None, compression_params=None)\n" |
79 "\n" | 130 "\n" |
101 " knows the size of the input data.\n" | 152 " knows the size of the input data.\n" |
102 "write_dict_id\n" | 153 "write_dict_id\n" |
103 " Determines whether the dictionary ID will be written into the compressed\n" | 154 " Determines whether the dictionary ID will be written into the compressed\n" |
104 " data. Defaults to True. Only adds content to the compressed data if\n" | 155 " data. Defaults to True. Only adds content to the compressed data if\n" |
105 " a dictionary is being used.\n" | 156 " a dictionary is being used.\n" |
157 "threads\n" | |
158 " Number of threads to use to compress data concurrently. When set,\n" | |
159 " compression operations are performed on multiple threads. The default\n" | |
160 " value (0) disables multi-threaded compression. A value of ``-1`` means to\n" | |
161 " set the number of threads to the number of detected logical CPUs.\n" | |
106 ); | 162 ); |
107 | 163 |
108 static int ZstdCompressor_init(ZstdCompressor* self, PyObject* args, PyObject* kwargs) { | 164 static int ZstdCompressor_init(ZstdCompressor* self, PyObject* args, PyObject* kwargs) { |
109 static char* kwlist[] = { | 165 static char* kwlist[] = { |
110 "level", | 166 "level", |
111 "dict_data", | 167 "dict_data", |
112 "compression_params", | 168 "compression_params", |
113 "write_checksum", | 169 "write_checksum", |
114 "write_content_size", | 170 "write_content_size", |
115 "write_dict_id", | 171 "write_dict_id", |
172 "threads", | |
116 NULL | 173 NULL |
117 }; | 174 }; |
118 | 175 |
119 int level = 3; | 176 int level = 3; |
120 ZstdCompressionDict* dict = NULL; | 177 ZstdCompressionDict* dict = NULL; |
121 CompressionParametersObject* params = NULL; | 178 CompressionParametersObject* params = NULL; |
122 PyObject* writeChecksum = NULL; | 179 PyObject* writeChecksum = NULL; |
123 PyObject* writeContentSize = NULL; | 180 PyObject* writeContentSize = NULL; |
124 PyObject* writeDictID = NULL; | 181 PyObject* writeDictID = NULL; |
125 | 182 int threads = 0; |
126 self->cctx = NULL; | 183 |
127 self->dict = NULL; | 184 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|iO!O!OOOi:ZstdCompressor", |
128 self->cparams = NULL; | |
129 self->cdict = NULL; | |
130 | |
131 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|iO!O!OOO:ZstdCompressor", | |
132 kwlist, &level, &ZstdCompressionDictType, &dict, | 185 kwlist, &level, &ZstdCompressionDictType, &dict, |
133 &CompressionParametersType, ¶ms, | 186 &CompressionParametersType, ¶ms, |
134 &writeChecksum, &writeContentSize, &writeDictID)) { | 187 &writeChecksum, &writeContentSize, &writeDictID, &threads)) { |
135 return -1; | 188 return -1; |
136 } | 189 } |
137 | 190 |
138 if (level < 1) { | 191 if (level < 1) { |
139 PyErr_SetString(PyExc_ValueError, "level must be greater than 0"); | 192 PyErr_SetString(PyExc_ValueError, "level must be greater than 0"); |
144 PyErr_Format(PyExc_ValueError, "level must be less than %d", | 197 PyErr_Format(PyExc_ValueError, "level must be less than %d", |
145 ZSTD_maxCLevel() + 1); | 198 ZSTD_maxCLevel() + 1); |
146 return -1; | 199 return -1; |
147 } | 200 } |
148 | 201 |
202 if (threads < 0) { | |
203 threads = cpu_count(); | |
204 } | |
205 | |
206 self->threads = threads; | |
207 | |
149 /* We create a ZSTD_CCtx for reuse among multiple operations to reduce the | 208 /* We create a ZSTD_CCtx for reuse among multiple operations to reduce the |
150 overhead of each compression operation. */ | 209 overhead of each compression operation. */ |
151 self->cctx = ZSTD_createCCtx(); | 210 if (threads) { |
152 if (!self->cctx) { | 211 self->mtcctx = ZSTDMT_createCCtx(threads); |
153 PyErr_NoMemory(); | 212 if (!self->mtcctx) { |
154 return -1; | 213 PyErr_NoMemory(); |
214 return -1; | |
215 } | |
216 } | |
217 else { | |
218 self->cctx = ZSTD_createCCtx(); | |
219 if (!self->cctx) { | |
220 PyErr_NoMemory(); | |
221 return -1; | |
222 } | |
155 } | 223 } |
156 | 224 |
157 self->compressionLevel = level; | 225 self->compressionLevel = level; |
158 | 226 |
159 if (dict) { | 227 if (dict) { |
180 | 248 |
181 return 0; | 249 return 0; |
182 } | 250 } |
183 | 251 |
184 static void ZstdCompressor_dealloc(ZstdCompressor* self) { | 252 static void ZstdCompressor_dealloc(ZstdCompressor* self) { |
253 if (self->cstream) { | |
254 ZSTD_freeCStream(self->cstream); | |
255 self->cstream = NULL; | |
256 } | |
257 | |
185 Py_XDECREF(self->cparams); | 258 Py_XDECREF(self->cparams); |
186 Py_XDECREF(self->dict); | 259 Py_XDECREF(self->dict); |
187 | 260 |
188 if (self->cdict) { | 261 if (self->cdict) { |
189 ZSTD_freeCDict(self->cdict); | 262 ZSTD_freeCDict(self->cdict); |
191 } | 264 } |
192 | 265 |
193 if (self->cctx) { | 266 if (self->cctx) { |
194 ZSTD_freeCCtx(self->cctx); | 267 ZSTD_freeCCtx(self->cctx); |
195 self->cctx = NULL; | 268 self->cctx = NULL; |
269 } | |
270 | |
271 if (self->mtcctx) { | |
272 ZSTDMT_freeCCtx(self->mtcctx); | |
273 self->mtcctx = NULL; | |
196 } | 274 } |
197 | 275 |
198 PyObject_Del(self); | 276 PyObject_Del(self); |
199 } | 277 } |
200 | 278 |
227 PyObject* source; | 305 PyObject* source; |
228 PyObject* dest; | 306 PyObject* dest; |
229 Py_ssize_t sourceSize = 0; | 307 Py_ssize_t sourceSize = 0; |
230 size_t inSize = ZSTD_CStreamInSize(); | 308 size_t inSize = ZSTD_CStreamInSize(); |
231 size_t outSize = ZSTD_CStreamOutSize(); | 309 size_t outSize = ZSTD_CStreamOutSize(); |
232 ZSTD_CStream* cstream; | |
233 ZSTD_inBuffer input; | 310 ZSTD_inBuffer input; |
234 ZSTD_outBuffer output; | 311 ZSTD_outBuffer output; |
235 Py_ssize_t totalRead = 0; | 312 Py_ssize_t totalRead = 0; |
236 Py_ssize_t totalWrite = 0; | 313 Py_ssize_t totalWrite = 0; |
237 char* readBuffer; | 314 char* readBuffer; |
259 } | 336 } |
260 | 337 |
261 /* Prevent free on uninitialized memory in finally. */ | 338 /* Prevent free on uninitialized memory in finally. */ |
262 output.dst = NULL; | 339 output.dst = NULL; |
263 | 340 |
264 cstream = CStream_from_ZstdCompressor(self, sourceSize); | 341 if (self->mtcctx) { |
265 if (!cstream) { | 342 if (init_mtcstream(self, sourceSize)) { |
266 res = NULL; | 343 res = NULL; |
267 goto finally; | 344 goto finally; |
345 } | |
346 } | |
347 else { | |
348 if (0 != init_cstream(self, sourceSize)) { | |
349 res = NULL; | |
350 goto finally; | |
351 } | |
268 } | 352 } |
269 | 353 |
270 output.dst = PyMem_Malloc(outSize); | 354 output.dst = PyMem_Malloc(outSize); |
271 if (!output.dst) { | 355 if (!output.dst) { |
272 PyErr_NoMemory(); | 356 PyErr_NoMemory(); |
298 input.size = readSize; | 382 input.size = readSize; |
299 input.pos = 0; | 383 input.pos = 0; |
300 | 384 |
301 while (input.pos < input.size) { | 385 while (input.pos < input.size) { |
302 Py_BEGIN_ALLOW_THREADS | 386 Py_BEGIN_ALLOW_THREADS |
303 zresult = ZSTD_compressStream(cstream, &output, &input); | 387 if (self->mtcctx) { |
388 zresult = ZSTDMT_compressStream(self->mtcctx, &output, &input); | |
389 } | |
390 else { | |
391 zresult = ZSTD_compressStream(self->cstream, &output, &input); | |
392 } | |
304 Py_END_ALLOW_THREADS | 393 Py_END_ALLOW_THREADS |
305 | 394 |
306 if (ZSTD_isError(zresult)) { | 395 if (ZSTD_isError(zresult)) { |
307 res = NULL; | 396 res = NULL; |
308 PyErr_Format(ZstdError, "zstd compress error: %s", ZSTD_getErrorName(zresult)); | 397 PyErr_Format(ZstdError, "zstd compress error: %s", ZSTD_getErrorName(zresult)); |
323 } | 412 } |
324 } | 413 } |
325 | 414 |
326 /* We've finished reading. Now flush the compressor stream. */ | 415 /* We've finished reading. Now flush the compressor stream. */ |
327 while (1) { | 416 while (1) { |
328 zresult = ZSTD_endStream(cstream, &output); | 417 if (self->mtcctx) { |
418 zresult = ZSTDMT_endStream(self->mtcctx, &output); | |
419 } | |
420 else { | |
421 zresult = ZSTD_endStream(self->cstream, &output); | |
422 } | |
329 if (ZSTD_isError(zresult)) { | 423 if (ZSTD_isError(zresult)) { |
330 PyErr_Format(ZstdError, "error ending compression stream: %s", | 424 PyErr_Format(ZstdError, "error ending compression stream: %s", |
331 ZSTD_getErrorName(zresult)); | 425 ZSTD_getErrorName(zresult)); |
332 res = NULL; | 426 res = NULL; |
333 goto finally; | 427 goto finally; |
348 if (!zresult) { | 442 if (!zresult) { |
349 break; | 443 break; |
350 } | 444 } |
351 } | 445 } |
352 | 446 |
353 ZSTD_freeCStream(cstream); | |
354 cstream = NULL; | |
355 | |
356 totalReadPy = PyLong_FromSsize_t(totalRead); | 447 totalReadPy = PyLong_FromSsize_t(totalRead); |
357 totalWritePy = PyLong_FromSsize_t(totalWrite); | 448 totalWritePy = PyLong_FromSsize_t(totalWrite); |
358 res = PyTuple_Pack(2, totalReadPy, totalWritePy); | 449 res = PyTuple_Pack(2, totalReadPy, totalWritePy); |
359 Py_DecRef(totalReadPy); | 450 Py_DECREF(totalReadPy); |
360 Py_DecRef(totalWritePy); | 451 Py_DECREF(totalWritePy); |
361 | 452 |
362 finally: | 453 finally: |
363 if (output.dst) { | 454 if (output.dst) { |
364 PyMem_Free(output.dst); | 455 PyMem_Free(output.dst); |
365 } | |
366 | |
367 if (cstream) { | |
368 ZSTD_freeCStream(cstream); | |
369 } | 456 } |
370 | 457 |
371 return res; | 458 return res; |
372 } | 459 } |
373 | 460 |
408 #endif | 495 #endif |
409 kwlist, &source, &sourceSize, &allowEmpty)) { | 496 kwlist, &source, &sourceSize, &allowEmpty)) { |
410 return NULL; | 497 return NULL; |
411 } | 498 } |
412 | 499 |
500 if (self->threads && self->dict) { | |
501 PyErr_SetString(ZstdError, | |
502 "compress() cannot be used with both dictionaries and multi-threaded compression"); | |
503 return NULL; | |
504 } | |
505 | |
506 if (self->threads && self->cparams) { | |
507 PyErr_SetString(ZstdError, | |
508 "compress() cannot be used with both compression parameters and multi-threaded compression"); | |
509 return NULL; | |
510 } | |
511 | |
413 /* Limitation in zstd C API doesn't let decompression side distinguish | 512 /* Limitation in zstd C API doesn't let decompression side distinguish |
414 between content size of 0 and unknown content size. This can make round | 513 between content size of 0 and unknown content size. This can make round |
415 tripping via Python difficult. Until this is fixed, require a flag | 514 tripping via Python difficult. Until this is fixed, require a flag |
416 to fire the footgun. | 515 to fire the footgun. |
417 https://github.com/indygreg/python-zstandard/issues/11 */ | 516 https://github.com/indygreg/python-zstandard/issues/11 */ |
454 Note: the compression parameters used for the first invocation (possibly | 553 Note: the compression parameters used for the first invocation (possibly |
455 derived from the source size) will be reused on all subsequent invocations. | 554 derived from the source size) will be reused on all subsequent invocations. |
456 https://github.com/facebook/zstd/issues/358 contains more info. We could | 555 https://github.com/facebook/zstd/issues/358 contains more info. We could |
457 potentially add an argument somewhere to control this behavior. | 556 potentially add an argument somewhere to control this behavior. |
458 */ | 557 */ |
459 if (dictData && !self->cdict) { | 558 if (0 != populate_cdict(self, &zparams)) { |
460 if (populate_cdict(self, dictData, dictSize, &zparams)) { | 559 Py_DECREF(output); |
461 Py_DECREF(output); | 560 return NULL; |
462 return NULL; | |
463 } | |
464 } | 561 } |
465 | 562 |
466 Py_BEGIN_ALLOW_THREADS | 563 Py_BEGIN_ALLOW_THREADS |
467 /* By avoiding ZSTD_compress(), we don't necessarily write out content | 564 if (self->mtcctx) { |
468 size. This means the argument to ZstdCompressor to control frame | 565 zresult = ZSTDMT_compressCCtx(self->mtcctx, dest, destSize, |
469 parameters is honored. */ | 566 source, sourceSize, self->compressionLevel); |
470 if (self->cdict) { | |
471 zresult = ZSTD_compress_usingCDict(self->cctx, dest, destSize, | |
472 source, sourceSize, self->cdict); | |
473 } | 567 } |
474 else { | 568 else { |
475 zresult = ZSTD_compress_advanced(self->cctx, dest, destSize, | 569 /* By avoiding ZSTD_compress(), we don't necessarily write out content |
476 source, sourceSize, dictData, dictSize, zparams); | 570 size. This means the argument to ZstdCompressor to control frame |
571 parameters is honored. */ | |
572 if (self->cdict) { | |
573 zresult = ZSTD_compress_usingCDict(self->cctx, dest, destSize, | |
574 source, sourceSize, self->cdict); | |
575 } | |
576 else { | |
577 zresult = ZSTD_compress_advanced(self->cctx, dest, destSize, | |
578 source, sourceSize, dictData, dictSize, zparams); | |
579 } | |
477 } | 580 } |
478 Py_END_ALLOW_THREADS | 581 Py_END_ALLOW_THREADS |
479 | 582 |
480 if (ZSTD_isError(zresult)) { | 583 if (ZSTD_isError(zresult)) { |
481 PyErr_Format(ZstdError, "cannot compress: %s", ZSTD_getErrorName(zresult)); | 584 PyErr_Format(ZstdError, "cannot compress: %s", ZSTD_getErrorName(zresult)); |
505 NULL | 608 NULL |
506 }; | 609 }; |
507 | 610 |
508 Py_ssize_t inSize = 0; | 611 Py_ssize_t inSize = 0; |
509 size_t outSize = ZSTD_CStreamOutSize(); | 612 size_t outSize = ZSTD_CStreamOutSize(); |
510 ZstdCompressionObj* result = PyObject_New(ZstdCompressionObj, &ZstdCompressionObjType); | 613 ZstdCompressionObj* result = NULL; |
614 | |
615 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|n:compressobj", kwlist, &inSize)) { | |
616 return NULL; | |
617 } | |
618 | |
619 result = (ZstdCompressionObj*)PyObject_CallObject((PyObject*)&ZstdCompressionObjType, NULL); | |
511 if (!result) { | 620 if (!result) { |
512 return NULL; | 621 return NULL; |
513 } | 622 } |
514 | 623 |
515 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|n:compressobj", kwlist, &inSize)) { | 624 if (self->mtcctx) { |
516 return NULL; | 625 if (init_mtcstream(self, inSize)) { |
517 } | 626 Py_DECREF(result); |
518 | 627 return NULL; |
519 result->cstream = CStream_from_ZstdCompressor(self, inSize); | 628 } |
520 if (!result->cstream) { | 629 } |
521 Py_DECREF(result); | 630 else { |
522 return NULL; | 631 if (0 != init_cstream(self, inSize)) { |
632 Py_DECREF(result); | |
633 return NULL; | |
634 } | |
523 } | 635 } |
524 | 636 |
525 result->output.dst = PyMem_Malloc(outSize); | 637 result->output.dst = PyMem_Malloc(outSize); |
526 if (!result->output.dst) { | 638 if (!result->output.dst) { |
527 PyErr_NoMemory(); | 639 PyErr_NoMemory(); |
528 Py_DECREF(result); | 640 Py_DECREF(result); |
529 return NULL; | 641 return NULL; |
530 } | 642 } |
531 result->output.size = outSize; | 643 result->output.size = outSize; |
532 result->output.pos = 0; | |
533 | |
534 result->compressor = self; | 644 result->compressor = self; |
535 Py_INCREF(result->compressor); | 645 Py_INCREF(result->compressor); |
536 | |
537 result->finished = 0; | |
538 | 646 |
539 return result; | 647 return result; |
540 } | 648 } |
541 | 649 |
542 PyDoc_STRVAR(ZstdCompressor_read_from__doc__, | 650 PyDoc_STRVAR(ZstdCompressor_read_from__doc__, |
577 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|nkk:read_from", kwlist, | 685 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|nkk:read_from", kwlist, |
578 &reader, &sourceSize, &inSize, &outSize)) { | 686 &reader, &sourceSize, &inSize, &outSize)) { |
579 return NULL; | 687 return NULL; |
580 } | 688 } |
581 | 689 |
582 result = PyObject_New(ZstdCompressorIterator, &ZstdCompressorIteratorType); | 690 result = (ZstdCompressorIterator*)PyObject_CallObject((PyObject*)&ZstdCompressorIteratorType, NULL); |
583 if (!result) { | 691 if (!result) { |
584 return NULL; | 692 return NULL; |
585 } | 693 } |
586 | |
587 result->compressor = NULL; | |
588 result->reader = NULL; | |
589 result->buffer = NULL; | |
590 result->cstream = NULL; | |
591 result->input.src = NULL; | |
592 result->output.dst = NULL; | |
593 result->readResult = NULL; | |
594 | |
595 if (PyObject_HasAttrString(reader, "read")) { | 694 if (PyObject_HasAttrString(reader, "read")) { |
596 result->reader = reader; | 695 result->reader = reader; |
597 Py_INCREF(result->reader); | 696 Py_INCREF(result->reader); |
598 } | 697 } |
599 else if (1 == PyObject_CheckBuffer(reader)) { | 698 else if (1 == PyObject_CheckBuffer(reader)) { |
606 | 705 |
607 if (0 != PyObject_GetBuffer(reader, result->buffer, PyBUF_CONTIG_RO)) { | 706 if (0 != PyObject_GetBuffer(reader, result->buffer, PyBUF_CONTIG_RO)) { |
608 goto except; | 707 goto except; |
609 } | 708 } |
610 | 709 |
611 result->bufferOffset = 0; | |
612 sourceSize = result->buffer->len; | 710 sourceSize = result->buffer->len; |
613 } | 711 } |
614 else { | 712 else { |
615 PyErr_SetString(PyExc_ValueError, | 713 PyErr_SetString(PyExc_ValueError, |
616 "must pass an object with a read() method or conforms to buffer protocol"); | 714 "must pass an object with a read() method or conforms to buffer protocol"); |
619 | 717 |
620 result->compressor = self; | 718 result->compressor = self; |
621 Py_INCREF(result->compressor); | 719 Py_INCREF(result->compressor); |
622 | 720 |
623 result->sourceSize = sourceSize; | 721 result->sourceSize = sourceSize; |
624 result->cstream = CStream_from_ZstdCompressor(self, sourceSize); | 722 |
625 if (!result->cstream) { | 723 if (self->mtcctx) { |
626 goto except; | 724 if (init_mtcstream(self, sourceSize)) { |
725 goto except; | |
726 } | |
727 } | |
728 else { | |
729 if (0 != init_cstream(self, sourceSize)) { | |
730 goto except; | |
731 } | |
627 } | 732 } |
628 | 733 |
629 result->inSize = inSize; | 734 result->inSize = inSize; |
630 result->outSize = outSize; | 735 result->outSize = outSize; |
631 | 736 |
633 if (!result->output.dst) { | 738 if (!result->output.dst) { |
634 PyErr_NoMemory(); | 739 PyErr_NoMemory(); |
635 goto except; | 740 goto except; |
636 } | 741 } |
637 result->output.size = outSize; | 742 result->output.size = outSize; |
638 result->output.pos = 0; | |
639 | |
640 result->input.src = NULL; | |
641 result->input.size = 0; | |
642 result->input.pos = 0; | |
643 | |
644 result->finishedInput = 0; | |
645 result->finishedOutput = 0; | |
646 | 743 |
647 goto finally; | 744 goto finally; |
648 | 745 |
649 except: | 746 except: |
650 if (result->cstream) { | 747 Py_XDECREF(result->compressor); |
651 ZSTD_freeCStream(result->cstream); | 748 Py_XDECREF(result->reader); |
652 result->cstream = NULL; | |
653 } | |
654 | |
655 Py_DecRef((PyObject*)result->compressor); | |
656 Py_DecRef(result->reader); | |
657 | |
658 Py_DECREF(result); | 749 Py_DECREF(result); |
659 result = NULL; | 750 result = NULL; |
660 | 751 |
661 finally: | 752 finally: |
662 return result; | 753 return result; |
701 if (!PyObject_HasAttrString(writer, "write")) { | 792 if (!PyObject_HasAttrString(writer, "write")) { |
702 PyErr_SetString(PyExc_ValueError, "must pass an object with a write() method"); | 793 PyErr_SetString(PyExc_ValueError, "must pass an object with a write() method"); |
703 return NULL; | 794 return NULL; |
704 } | 795 } |
705 | 796 |
706 result = PyObject_New(ZstdCompressionWriter, &ZstdCompressionWriterType); | 797 result = (ZstdCompressionWriter*)PyObject_CallObject((PyObject*)&ZstdCompressionWriterType, NULL); |
707 if (!result) { | 798 if (!result) { |
708 return NULL; | 799 return NULL; |
709 } | 800 } |
710 | 801 |
711 result->compressor = self; | 802 result->compressor = self; |
713 | 804 |
714 result->writer = writer; | 805 result->writer = writer; |
715 Py_INCREF(result->writer); | 806 Py_INCREF(result->writer); |
716 | 807 |
717 result->sourceSize = sourceSize; | 808 result->sourceSize = sourceSize; |
718 | |
719 result->outSize = outSize; | 809 result->outSize = outSize; |
720 | 810 |
721 result->entered = 0; | 811 return result; |
722 result->cstream = NULL; | 812 } |
813 | |
814 typedef struct { | |
815 void* sourceData; | |
816 size_t sourceSize; | |
817 } DataSource; | |
818 | |
819 typedef struct { | |
820 DataSource* sources; | |
821 Py_ssize_t sourcesSize; | |
822 unsigned long long totalSourceSize; | |
823 } DataSources; | |
824 | |
825 typedef struct { | |
826 void* dest; | |
827 Py_ssize_t destSize; | |
828 BufferSegment* segments; | |
829 Py_ssize_t segmentsSize; | |
830 } DestBuffer; | |
831 | |
832 typedef enum { | |
833 WorkerError_none = 0, | |
834 WorkerError_zstd = 1, | |
835 WorkerError_no_memory = 2, | |
836 } WorkerError; | |
837 | |
838 /** | |
839 * Holds state for an individual worker performing multi_compress_to_buffer work. | |
840 */ | |
841 typedef struct { | |
842 /* Used for compression. */ | |
843 ZSTD_CCtx* cctx; | |
844 ZSTD_CDict* cdict; | |
845 int cLevel; | |
846 CompressionParametersObject* cParams; | |
847 ZSTD_frameParameters fParams; | |
848 | |
849 /* What to compress. */ | |
850 DataSource* sources; | |
851 Py_ssize_t sourcesSize; | |
852 Py_ssize_t startOffset; | |
853 Py_ssize_t endOffset; | |
854 unsigned long long totalSourceSize; | |
855 | |
856 /* Result storage. */ | |
857 DestBuffer* destBuffers; | |
858 Py_ssize_t destCount; | |
859 | |
860 /* Error tracking. */ | |
861 WorkerError error; | |
862 size_t zresult; | |
863 Py_ssize_t errorOffset; | |
864 } WorkerState; | |
865 | |
866 static void compress_worker(WorkerState* state) { | |
867 Py_ssize_t inputOffset = state->startOffset; | |
868 Py_ssize_t remainingItems = state->endOffset - state->startOffset + 1; | |
869 Py_ssize_t currentBufferStartOffset = state->startOffset; | |
870 size_t zresult; | |
871 ZSTD_parameters zparams; | |
872 void* newDest; | |
873 size_t allocationSize; | |
874 size_t boundSize; | |
875 Py_ssize_t destOffset = 0; | |
876 DataSource* sources = state->sources; | |
877 DestBuffer* destBuffer; | |
878 | |
879 assert(!state->destBuffers); | |
880 assert(0 == state->destCount); | |
881 | |
882 if (state->cParams) { | |
883 ztopy_compression_parameters(state->cParams, &zparams.cParams); | |
884 } | |
885 | |
886 zparams.fParams = state->fParams; | |
887 | |
888 /* | |
889 * The total size of the compressed data is unknown until we actually | |
890 * compress data. That means we can't pre-allocate the exact size we need. | |
891 * | |
892 * There is a cost to every allocation and reallocation. So, it is in our | |
893 * interest to minimize the number of allocations. | |
894 * | |
895 * There is also a cost to too few allocations. If allocations are too | |
896 * large they may fail. If buffers are shared and all inputs become | |
897 * irrelevant at different lifetimes, then a reference to one segment | |
898 * in the buffer will keep the entire buffer alive. This leads to excessive | |
899 * memory usage. | |
900 * | |
901 * Our current strategy is to assume a compression ratio of 16:1 and | |
902 * allocate buffers of that size, rounded up to the nearest power of 2 | |
903 * (because computers like round numbers). That ratio is greater than what | |
904 * most inputs achieve. This is by design: we don't want to over-allocate. | |
905 * But we don't want to under-allocate and lead to too many buffers either. | |
906 */ | |
907 | |
908 state->destCount = 1; | |
909 | |
910 state->destBuffers = calloc(1, sizeof(DestBuffer)); | |
911 if (NULL == state->destBuffers) { | |
912 state->error = WorkerError_no_memory; | |
913 return; | |
914 } | |
915 | |
916 destBuffer = &state->destBuffers[state->destCount - 1]; | |
917 | |
918 /* | |
919 * Rather than track bounds and grow the segments buffer, allocate space | |
920 * to hold remaining items then truncate when we're done with it. | |
921 */ | |
922 destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment)); | |
923 if (NULL == destBuffer->segments) { | |
924 state->error = WorkerError_no_memory; | |
925 return; | |
926 } | |
927 | |
928 destBuffer->segmentsSize = remainingItems; | |
929 | |
930 allocationSize = roundpow2(state->totalSourceSize >> 4); | |
931 | |
932 /* If the maximum size of the output is larger than that, round up. */ | |
933 boundSize = ZSTD_compressBound(sources[inputOffset].sourceSize); | |
934 | |
935 if (boundSize > allocationSize) { | |
936 allocationSize = roundpow2(boundSize); | |
937 } | |
938 | |
939 destBuffer->dest = malloc(allocationSize); | |
940 if (NULL == destBuffer->dest) { | |
941 state->error = WorkerError_no_memory; | |
942 return; | |
943 } | |
944 | |
945 destBuffer->destSize = allocationSize; | |
946 | |
947 for (inputOffset = state->startOffset; inputOffset <= state->endOffset; inputOffset++) { | |
948 void* source = sources[inputOffset].sourceData; | |
949 size_t sourceSize = sources[inputOffset].sourceSize; | |
950 size_t destAvailable; | |
951 void* dest; | |
952 | |
953 destAvailable = destBuffer->destSize - destOffset; | |
954 boundSize = ZSTD_compressBound(sourceSize); | |
955 | |
956 /* | |
957 * Not enough space in current buffer to hold largest compressed output. | |
958 * So allocate and switch to a new output buffer. | |
959 */ | |
960 if (boundSize > destAvailable) { | |
961 /* | |
962 * The downsizing of the existing buffer is optional. It should be cheap | |
963 * (unlike growing). So we just do it. | |
964 */ | |
965 if (destAvailable) { | |
966 newDest = realloc(destBuffer->dest, destOffset); | |
967 if (NULL == newDest) { | |
968 state->error = WorkerError_no_memory; | |
969 return; | |
970 } | |
971 | |
972 destBuffer->dest = newDest; | |
973 destBuffer->destSize = destOffset; | |
974 } | |
975 | |
976 /* Truncate segments buffer. */ | |
977 newDest = realloc(destBuffer->segments, | |
978 (inputOffset - currentBufferStartOffset + 1) * sizeof(BufferSegment)); | |
979 if (NULL == newDest) { | |
980 state->error = WorkerError_no_memory; | |
981 return; | |
982 } | |
983 | |
984 destBuffer->segments = newDest; | |
985 destBuffer->segmentsSize = inputOffset - currentBufferStartOffset; | |
986 | |
987 /* Grow space for new struct. */ | |
988 /* TODO consider over-allocating so we don't do this every time. */ | |
989 newDest = realloc(state->destBuffers, (state->destCount + 1) * sizeof(DestBuffer)); | |
990 if (NULL == newDest) { | |
991 state->error = WorkerError_no_memory; | |
992 return; | |
993 } | |
994 | |
995 state->destBuffers = newDest; | |
996 state->destCount++; | |
997 | |
998 destBuffer = &state->destBuffers[state->destCount - 1]; | |
999 | |
1000 /* Don't take any chances with non-NULL pointers. */ | |
1001 memset(destBuffer, 0, sizeof(DestBuffer)); | |
1002 | |
1003 /** | |
1004 * We could dynamically update allocation size based on work done so far. | |
1005 * For now, keep is simple. | |
1006 */ | |
1007 allocationSize = roundpow2(state->totalSourceSize >> 4); | |
1008 | |
1009 if (boundSize > allocationSize) { | |
1010 allocationSize = roundpow2(boundSize); | |
1011 } | |
1012 | |
1013 destBuffer->dest = malloc(allocationSize); | |
1014 if (NULL == destBuffer->dest) { | |
1015 state->error = WorkerError_no_memory; | |
1016 return; | |
1017 } | |
1018 | |
1019 destBuffer->destSize = allocationSize; | |
1020 destAvailable = allocationSize; | |
1021 destOffset = 0; | |
1022 | |
1023 destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment)); | |
1024 if (NULL == destBuffer->segments) { | |
1025 state->error = WorkerError_no_memory; | |
1026 return; | |
1027 } | |
1028 | |
1029 destBuffer->segmentsSize = remainingItems; | |
1030 currentBufferStartOffset = inputOffset; | |
1031 } | |
1032 | |
1033 dest = (char*)destBuffer->dest + destOffset; | |
1034 | |
1035 if (state->cdict) { | |
1036 zresult = ZSTD_compress_usingCDict(state->cctx, dest, destAvailable, | |
1037 source, sourceSize, state->cdict); | |
1038 } | |
1039 else { | |
1040 if (!state->cParams) { | |
1041 zparams.cParams = ZSTD_getCParams(state->cLevel, sourceSize, 0); | |
1042 } | |
1043 | |
1044 zresult = ZSTD_compress_advanced(state->cctx, dest, destAvailable, | |
1045 source, sourceSize, NULL, 0, zparams); | |
1046 } | |
1047 | |
1048 if (ZSTD_isError(zresult)) { | |
1049 state->error = WorkerError_zstd; | |
1050 state->zresult = zresult; | |
1051 state->errorOffset = inputOffset; | |
1052 break; | |
1053 } | |
1054 | |
1055 destBuffer->segments[inputOffset - currentBufferStartOffset].offset = destOffset; | |
1056 destBuffer->segments[inputOffset - currentBufferStartOffset].length = zresult; | |
1057 | |
1058 destOffset += zresult; | |
1059 remainingItems--; | |
1060 } | |
1061 | |
1062 if (destBuffer->destSize > destOffset) { | |
1063 newDest = realloc(destBuffer->dest, destOffset); | |
1064 if (NULL == newDest) { | |
1065 state->error = WorkerError_no_memory; | |
1066 return; | |
1067 } | |
1068 | |
1069 destBuffer->dest = newDest; | |
1070 destBuffer->destSize = destOffset; | |
1071 } | |
1072 } | |
1073 | |
1074 ZstdBufferWithSegmentsCollection* compress_from_datasources(ZstdCompressor* compressor, | |
1075 DataSources* sources, unsigned int threadCount) { | |
1076 ZSTD_parameters zparams; | |
1077 unsigned long long bytesPerWorker; | |
1078 POOL_ctx* pool = NULL; | |
1079 WorkerState* workerStates = NULL; | |
1080 Py_ssize_t i; | |
1081 unsigned long long workerBytes = 0; | |
1082 Py_ssize_t workerStartOffset = 0; | |
1083 size_t currentThread = 0; | |
1084 int errored = 0; | |
1085 Py_ssize_t segmentsCount = 0; | |
1086 Py_ssize_t segmentIndex; | |
1087 PyObject* segmentsArg = NULL; | |
1088 ZstdBufferWithSegments* buffer; | |
1089 ZstdBufferWithSegmentsCollection* result = NULL; | |
1090 | |
1091 assert(sources->sourcesSize > 0); | |
1092 assert(sources->totalSourceSize > 0); | |
1093 assert(threadCount >= 1); | |
1094 | |
1095 /* More threads than inputs makes no sense. */ | |
1096 threadCount = sources->sourcesSize < threadCount ? (unsigned int)sources->sourcesSize | |
1097 : threadCount; | |
1098 | |
1099 /* TODO lower thread count when input size is too small and threads would add | |
1100 overhead. */ | |
1101 | |
1102 /* | |
1103 * When dictionaries are used, parameters are derived from the size of the | |
1104 * first element. | |
1105 * | |
1106 * TODO come up with a better mechanism. | |
1107 */ | |
1108 memset(&zparams, 0, sizeof(zparams)); | |
1109 if (compressor->cparams) { | |
1110 ztopy_compression_parameters(compressor->cparams, &zparams.cParams); | |
1111 } | |
1112 else { | |
1113 zparams.cParams = ZSTD_getCParams(compressor->compressionLevel, | |
1114 sources->sources[0].sourceSize, | |
1115 compressor->dict ? compressor->dict->dictSize : 0); | |
1116 } | |
1117 | |
1118 zparams.fParams = compressor->fparams; | |
1119 | |
1120 if (0 != populate_cdict(compressor, &zparams)) { | |
1121 return NULL; | |
1122 } | |
1123 | |
1124 workerStates = PyMem_Malloc(threadCount * sizeof(WorkerState)); | |
1125 if (NULL == workerStates) { | |
1126 PyErr_NoMemory(); | |
1127 goto finally; | |
1128 } | |
1129 | |
1130 memset(workerStates, 0, threadCount * sizeof(WorkerState)); | |
1131 | |
1132 if (threadCount > 1) { | |
1133 pool = POOL_create(threadCount, 1); | |
1134 if (NULL == pool) { | |
1135 PyErr_SetString(ZstdError, "could not initialize zstd thread pool"); | |
1136 goto finally; | |
1137 } | |
1138 } | |
1139 | |
1140 bytesPerWorker = sources->totalSourceSize / threadCount; | |
1141 | |
1142 for (i = 0; i < threadCount; i++) { | |
1143 workerStates[i].cctx = ZSTD_createCCtx(); | |
1144 if (!workerStates[i].cctx) { | |
1145 PyErr_NoMemory(); | |
1146 goto finally; | |
1147 } | |
1148 | |
1149 workerStates[i].cdict = compressor->cdict; | |
1150 workerStates[i].cLevel = compressor->compressionLevel; | |
1151 workerStates[i].cParams = compressor->cparams; | |
1152 workerStates[i].fParams = compressor->fparams; | |
1153 | |
1154 workerStates[i].sources = sources->sources; | |
1155 workerStates[i].sourcesSize = sources->sourcesSize; | |
1156 } | |
1157 | |
1158 Py_BEGIN_ALLOW_THREADS | |
1159 for (i = 0; i < sources->sourcesSize; i++) { | |
1160 workerBytes += sources->sources[i].sourceSize; | |
1161 | |
1162 /* | |
1163 * The last worker/thread needs to handle all remaining work. Don't | |
1164 * trigger it prematurely. Defer to the block outside of the loop | |
1165 * to run the last worker/thread. But do still process this loop | |
1166 * so workerBytes is correct. | |
1167 */ | |
1168 if (currentThread == threadCount - 1) { | |
1169 continue; | |
1170 } | |
1171 | |
1172 if (workerBytes >= bytesPerWorker) { | |
1173 assert(currentThread < threadCount); | |
1174 workerStates[currentThread].totalSourceSize = workerBytes; | |
1175 workerStates[currentThread].startOffset = workerStartOffset; | |
1176 workerStates[currentThread].endOffset = i; | |
1177 | |
1178 if (threadCount > 1) { | |
1179 POOL_add(pool, (POOL_function)compress_worker, &workerStates[currentThread]); | |
1180 } | |
1181 else { | |
1182 compress_worker(&workerStates[currentThread]); | |
1183 } | |
1184 | |
1185 currentThread++; | |
1186 workerStartOffset = i + 1; | |
1187 workerBytes = 0; | |
1188 } | |
1189 } | |
1190 | |
1191 if (workerBytes) { | |
1192 assert(currentThread < threadCount); | |
1193 workerStates[currentThread].totalSourceSize = workerBytes; | |
1194 workerStates[currentThread].startOffset = workerStartOffset; | |
1195 workerStates[currentThread].endOffset = sources->sourcesSize - 1; | |
1196 | |
1197 if (threadCount > 1) { | |
1198 POOL_add(pool, (POOL_function)compress_worker, &workerStates[currentThread]); | |
1199 } | |
1200 else { | |
1201 compress_worker(&workerStates[currentThread]); | |
1202 } | |
1203 } | |
1204 | |
1205 if (threadCount > 1) { | |
1206 POOL_free(pool); | |
1207 pool = NULL; | |
1208 } | |
1209 | |
1210 Py_END_ALLOW_THREADS | |
1211 | |
1212 for (i = 0; i < threadCount; i++) { | |
1213 switch (workerStates[i].error) { | |
1214 case WorkerError_no_memory: | |
1215 PyErr_NoMemory(); | |
1216 errored = 1; | |
1217 break; | |
1218 | |
1219 case WorkerError_zstd: | |
1220 PyErr_Format(ZstdError, "error compressing item %zd: %s", | |
1221 workerStates[i].errorOffset, ZSTD_getErrorName(workerStates[i].zresult)); | |
1222 errored = 1; | |
1223 break; | |
1224 default: | |
1225 ; | |
1226 } | |
1227 | |
1228 if (errored) { | |
1229 break; | |
1230 } | |
1231 | |
1232 } | |
1233 | |
1234 if (errored) { | |
1235 goto finally; | |
1236 } | |
1237 | |
1238 segmentsCount = 0; | |
1239 for (i = 0; i < threadCount; i++) { | |
1240 WorkerState* state = &workerStates[i]; | |
1241 segmentsCount += state->destCount; | |
1242 } | |
1243 | |
1244 segmentsArg = PyTuple_New(segmentsCount); | |
1245 if (NULL == segmentsArg) { | |
1246 goto finally; | |
1247 } | |
1248 | |
1249 segmentIndex = 0; | |
1250 | |
1251 for (i = 0; i < threadCount; i++) { | |
1252 Py_ssize_t j; | |
1253 WorkerState* state = &workerStates[i]; | |
1254 | |
1255 for (j = 0; j < state->destCount; j++) { | |
1256 DestBuffer* destBuffer = &state->destBuffers[j]; | |
1257 buffer = BufferWithSegments_FromMemory(destBuffer->dest, destBuffer->destSize, | |
1258 destBuffer->segments, destBuffer->segmentsSize); | |
1259 | |
1260 if (NULL == buffer) { | |
1261 goto finally; | |
1262 } | |
1263 | |
1264 /* Tell instance to use free() instsead of PyMem_Free(). */ | |
1265 buffer->useFree = 1; | |
1266 | |
1267 /* | |
1268 * BufferWithSegments_FromMemory takes ownership of the backing memory. | |
1269 * Unset it here so it doesn't get freed below. | |
1270 */ | |
1271 destBuffer->dest = NULL; | |
1272 destBuffer->segments = NULL; | |
1273 | |
1274 PyTuple_SET_ITEM(segmentsArg, segmentIndex++, (PyObject*)buffer); | |
1275 } | |
1276 } | |
1277 | |
1278 result = (ZstdBufferWithSegmentsCollection*)PyObject_CallObject( | |
1279 (PyObject*)&ZstdBufferWithSegmentsCollectionType, segmentsArg); | |
1280 | |
1281 finally: | |
1282 Py_CLEAR(segmentsArg); | |
1283 | |
1284 if (pool) { | |
1285 POOL_free(pool); | |
1286 } | |
1287 | |
1288 if (workerStates) { | |
1289 Py_ssize_t j; | |
1290 | |
1291 for (i = 0; i < threadCount; i++) { | |
1292 WorkerState state = workerStates[i]; | |
1293 | |
1294 if (state.cctx) { | |
1295 ZSTD_freeCCtx(state.cctx); | |
1296 } | |
1297 | |
1298 /* malloc() is used in worker thread. */ | |
1299 | |
1300 for (j = 0; j < state.destCount; j++) { | |
1301 if (state.destBuffers) { | |
1302 free(state.destBuffers[j].dest); | |
1303 free(state.destBuffers[j].segments); | |
1304 } | |
1305 } | |
1306 | |
1307 | |
1308 free(state.destBuffers); | |
1309 } | |
1310 | |
1311 PyMem_Free(workerStates); | |
1312 } | |
1313 | |
1314 return result; | |
1315 } | |
1316 | |
1317 PyDoc_STRVAR(ZstdCompressor_multi_compress_to_buffer__doc__, | |
1318 "Compress multiple pieces of data as a single operation\n" | |
1319 "\n" | |
1320 "Receives a ``BufferWithSegmentsCollection``, a ``BufferWithSegments``, or\n" | |
1321 "a list of bytes like objects holding data to compress.\n" | |
1322 "\n" | |
1323 "Returns a ``BufferWithSegmentsCollection`` holding compressed data.\n" | |
1324 "\n" | |
1325 "This function is optimized to perform multiple compression operations as\n" | |
1326 "as possible with as little overhead as possbile.\n" | |
1327 ); | |
1328 | |
1329 static ZstdBufferWithSegmentsCollection* ZstdCompressor_multi_compress_to_buffer(ZstdCompressor* self, PyObject* args, PyObject* kwargs) { | |
1330 static char* kwlist[] = { | |
1331 "data", | |
1332 "threads", | |
1333 NULL | |
1334 }; | |
1335 | |
1336 PyObject* data; | |
1337 int threads = 0; | |
1338 Py_buffer* dataBuffers = NULL; | |
1339 DataSources sources; | |
1340 Py_ssize_t i; | |
1341 Py_ssize_t sourceCount = 0; | |
1342 ZstdBufferWithSegmentsCollection* result = NULL; | |
1343 | |
1344 if (self->mtcctx) { | |
1345 PyErr_SetString(ZstdError, | |
1346 "function cannot be called on ZstdCompressor configured for multi-threaded compression"); | |
1347 return NULL; | |
1348 } | |
1349 | |
1350 memset(&sources, 0, sizeof(sources)); | |
1351 | |
1352 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|i:multi_compress_to_buffer", kwlist, | |
1353 &data, &threads)) { | |
1354 return NULL; | |
1355 } | |
1356 | |
1357 if (threads < 0) { | |
1358 threads = cpu_count(); | |
1359 } | |
1360 | |
1361 if (threads < 2) { | |
1362 threads = 1; | |
1363 } | |
1364 | |
1365 if (PyObject_TypeCheck(data, &ZstdBufferWithSegmentsType)) { | |
1366 ZstdBufferWithSegments* buffer = (ZstdBufferWithSegments*)data; | |
1367 | |
1368 sources.sources = PyMem_Malloc(buffer->segmentCount * sizeof(DataSource)); | |
1369 if (NULL == sources.sources) { | |
1370 PyErr_NoMemory(); | |
1371 goto finally; | |
1372 } | |
1373 | |
1374 for (i = 0; i < buffer->segmentCount; i++) { | |
1375 sources.sources[i].sourceData = (char*)buffer->data + buffer->segments[i].offset; | |
1376 sources.sources[i].sourceSize = buffer->segments[i].length; | |
1377 sources.totalSourceSize += buffer->segments[i].length; | |
1378 } | |
1379 | |
1380 sources.sourcesSize = buffer->segmentCount; | |
1381 } | |
1382 else if (PyObject_TypeCheck(data, &ZstdBufferWithSegmentsCollectionType)) { | |
1383 Py_ssize_t j; | |
1384 Py_ssize_t offset = 0; | |
1385 ZstdBufferWithSegments* buffer; | |
1386 ZstdBufferWithSegmentsCollection* collection = (ZstdBufferWithSegmentsCollection*)data; | |
1387 | |
1388 sourceCount = BufferWithSegmentsCollection_length(collection); | |
1389 | |
1390 sources.sources = PyMem_Malloc(sourceCount * sizeof(DataSource)); | |
1391 if (NULL == sources.sources) { | |
1392 PyErr_NoMemory(); | |
1393 goto finally; | |
1394 } | |
1395 | |
1396 for (i = 0; i < collection->bufferCount; i++) { | |
1397 buffer = collection->buffers[i]; | |
1398 | |
1399 for (j = 0; j < buffer->segmentCount; j++) { | |
1400 sources.sources[offset].sourceData = (char*)buffer->data + buffer->segments[j].offset; | |
1401 sources.sources[offset].sourceSize = buffer->segments[j].length; | |
1402 sources.totalSourceSize += buffer->segments[j].length; | |
1403 | |
1404 offset++; | |
1405 } | |
1406 } | |
1407 | |
1408 sources.sourcesSize = sourceCount; | |
1409 } | |
1410 else if (PyList_Check(data)) { | |
1411 sourceCount = PyList_GET_SIZE(data); | |
1412 | |
1413 sources.sources = PyMem_Malloc(sourceCount * sizeof(DataSource)); | |
1414 if (NULL == sources.sources) { | |
1415 PyErr_NoMemory(); | |
1416 goto finally; | |
1417 } | |
1418 | |
1419 /* | |
1420 * It isn't clear whether the address referred to by Py_buffer.buf | |
1421 * is still valid after PyBuffer_Release. We we hold a reference to all | |
1422 * Py_buffer instances for the duration of the operation. | |
1423 */ | |
1424 dataBuffers = PyMem_Malloc(sourceCount * sizeof(Py_buffer)); | |
1425 if (NULL == dataBuffers) { | |
1426 PyErr_NoMemory(); | |
1427 goto finally; | |
1428 } | |
1429 | |
1430 memset(dataBuffers, 0, sourceCount * sizeof(Py_buffer)); | |
1431 | |
1432 for (i = 0; i < sourceCount; i++) { | |
1433 if (0 != PyObject_GetBuffer(PyList_GET_ITEM(data, i), | |
1434 &dataBuffers[i], PyBUF_CONTIG_RO)) { | |
1435 PyErr_Clear(); | |
1436 PyErr_Format(PyExc_TypeError, "item %zd not a bytes like object", i); | |
1437 goto finally; | |
1438 } | |
1439 | |
1440 sources.sources[i].sourceData = dataBuffers[i].buf; | |
1441 sources.sources[i].sourceSize = dataBuffers[i].len; | |
1442 sources.totalSourceSize += dataBuffers[i].len; | |
1443 } | |
1444 | |
1445 sources.sourcesSize = sourceCount; | |
1446 } | |
1447 else { | |
1448 PyErr_SetString(PyExc_TypeError, "argument must be list of BufferWithSegments"); | |
1449 goto finally; | |
1450 } | |
1451 | |
1452 if (0 == sources.sourcesSize) { | |
1453 PyErr_SetString(PyExc_ValueError, "no source elements found"); | |
1454 goto finally; | |
1455 } | |
1456 | |
1457 if (0 == sources.totalSourceSize) { | |
1458 PyErr_SetString(PyExc_ValueError, "source elements are empty"); | |
1459 goto finally; | |
1460 } | |
1461 | |
1462 result = compress_from_datasources(self, &sources, threads); | |
1463 | |
1464 finally: | |
1465 PyMem_Free(sources.sources); | |
1466 | |
1467 if (dataBuffers) { | |
1468 for (i = 0; i < sourceCount; i++) { | |
1469 PyBuffer_Release(&dataBuffers[i]); | |
1470 } | |
1471 | |
1472 PyMem_Free(dataBuffers); | |
1473 } | |
723 | 1474 |
724 return result; | 1475 return result; |
725 } | 1476 } |
726 | 1477 |
727 static PyMethodDef ZstdCompressor_methods[] = { | 1478 static PyMethodDef ZstdCompressor_methods[] = { |
733 METH_VARARGS | METH_KEYWORDS, ZstdCompressor_copy_stream__doc__ }, | 1484 METH_VARARGS | METH_KEYWORDS, ZstdCompressor_copy_stream__doc__ }, |
734 { "read_from", (PyCFunction)ZstdCompressor_read_from, | 1485 { "read_from", (PyCFunction)ZstdCompressor_read_from, |
735 METH_VARARGS | METH_KEYWORDS, ZstdCompressor_read_from__doc__ }, | 1486 METH_VARARGS | METH_KEYWORDS, ZstdCompressor_read_from__doc__ }, |
736 { "write_to", (PyCFunction)ZstdCompressor_write_to, | 1487 { "write_to", (PyCFunction)ZstdCompressor_write_to, |
737 METH_VARARGS | METH_KEYWORDS, ZstdCompressor_write_to___doc__ }, | 1488 METH_VARARGS | METH_KEYWORDS, ZstdCompressor_write_to___doc__ }, |
1489 { "multi_compress_to_buffer", (PyCFunction)ZstdCompressor_multi_compress_to_buffer, | |
1490 METH_VARARGS | METH_KEYWORDS, ZstdCompressor_multi_compress_to_buffer__doc__ }, | |
738 { NULL, NULL } | 1491 { NULL, NULL } |
739 }; | 1492 }; |
740 | 1493 |
741 PyTypeObject ZstdCompressorType = { | 1494 PyTypeObject ZstdCompressorType = { |
742 PyVarObject_HEAD_INIT(NULL, 0) | 1495 PyVarObject_HEAD_INIT(NULL, 0) |