comparison contrib/python-zstandard/c-ext/compressor.c @ 31796:e0dc40530c5a

zstd: vendor python-zstandard 0.8.0 Commit 81e1f5bbf1fc54808649562d3ed829730765c540 from https://github.com/indygreg/python-zstandard is imported without modifications (other than removing unwanted files). Updates relevant to Mercurial include: * Support for multi-threaded compression (we can use this for bundle and wire protocol compression). * APIs for batch compression and decompression operations using multiple threads and optimal memory allocation mechanism. (Can be useful for revlog perf improvements.) * A ``BufferWithSegments`` type that models a single memory buffer containing N discrete items of known lengths. This type can be used for very efficient 0-copy data operations. # no-check-commit
author Gregory Szorc <gregory.szorc@gmail.com>
date Sat, 01 Apr 2017 15:24:03 -0700
parents c32454d69b85
children b1fb341d8a61
comparison
equal deleted inserted replaced
31795:2b130e26c3a4 31796:e0dc40530c5a
5 * This software may be modified and distributed under the terms 5 * This software may be modified and distributed under the terms
6 * of the BSD license. See the LICENSE file for details. 6 * of the BSD license. See the LICENSE file for details.
7 */ 7 */
8 8
9 #include "python-zstandard.h" 9 #include "python-zstandard.h"
10 #include "pool.h"
10 11
11 extern PyObject* ZstdError; 12 extern PyObject* ZstdError;
12 13
13 int populate_cdict(ZstdCompressor* compressor, void* dictData, size_t dictSize, ZSTD_parameters* zparams) { 14 int populate_cdict(ZstdCompressor* compressor, ZSTD_parameters* zparams) {
14 ZSTD_customMem zmem; 15 ZSTD_customMem zmem;
15 assert(!compressor->cdict); 16
17 if (compressor->cdict || !compressor->dict || !compressor->dict->dictData) {
18 return 0;
19 }
20
16 Py_BEGIN_ALLOW_THREADS 21 Py_BEGIN_ALLOW_THREADS
17 memset(&zmem, 0, sizeof(zmem)); 22 memset(&zmem, 0, sizeof(zmem));
18 compressor->cdict = ZSTD_createCDict_advanced(compressor->dict->dictData, 23 compressor->cdict = ZSTD_createCDict_advanced(compressor->dict->dictData,
19 compressor->dict->dictSize, 1, *zparams, zmem); 24 compressor->dict->dictSize, 1, *zparams, zmem);
20 Py_END_ALLOW_THREADS 25 Py_END_ALLOW_THREADS
26 31
27 return 0; 32 return 0;
28 } 33 }
29 34
30 /** 35 /**
31 * Initialize a zstd CStream from a ZstdCompressor instance. 36 * Ensure the ZSTD_CStream on a ZstdCompressor instance is initialized.
32 * 37 *
33 * Returns a ZSTD_CStream on success or NULL on failure. If NULL, a Python 38 * Returns 0 on success. Other value on failure. Will set a Python exception
34 * exception will be set. 39 * on failure.
35 */ 40 */
36 ZSTD_CStream* CStream_from_ZstdCompressor(ZstdCompressor* compressor, Py_ssize_t sourceSize) { 41 int init_cstream(ZstdCompressor* compressor, unsigned long long sourceSize) {
37 ZSTD_CStream* cstream;
38 ZSTD_parameters zparams; 42 ZSTD_parameters zparams;
39 void* dictData = NULL; 43 void* dictData = NULL;
40 size_t dictSize = 0; 44 size_t dictSize = 0;
41 size_t zresult; 45 size_t zresult;
42 46
43 cstream = ZSTD_createCStream(); 47 if (compressor->cstream) {
44 if (!cstream) { 48 zresult = ZSTD_resetCStream(compressor->cstream, sourceSize);
45 PyErr_SetString(ZstdError, "cannot create CStream"); 49 if (ZSTD_isError(zresult)) {
46 return NULL; 50 PyErr_Format(ZstdError, "could not reset CStream: %s",
51 ZSTD_getErrorName(zresult));
52 return -1;
53 }
54
55 return 0;
56 }
57
58 compressor->cstream = ZSTD_createCStream();
59 if (!compressor->cstream) {
60 PyErr_SetString(ZstdError, "could not create CStream");
61 return -1;
47 } 62 }
48 63
49 if (compressor->dict) { 64 if (compressor->dict) {
50 dictData = compressor->dict->dictData; 65 dictData = compressor->dict->dictData;
51 dictSize = compressor->dict->dictSize; 66 dictSize = compressor->dict->dictSize;
61 zparams.cParams = ZSTD_getCParams(compressor->compressionLevel, sourceSize, dictSize); 76 zparams.cParams = ZSTD_getCParams(compressor->compressionLevel, sourceSize, dictSize);
62 } 77 }
63 78
64 zparams.fParams = compressor->fparams; 79 zparams.fParams = compressor->fparams;
65 80
66 zresult = ZSTD_initCStream_advanced(cstream, dictData, dictSize, zparams, sourceSize); 81 zresult = ZSTD_initCStream_advanced(compressor->cstream, dictData, dictSize,
82 zparams, sourceSize);
67 83
68 if (ZSTD_isError(zresult)) { 84 if (ZSTD_isError(zresult)) {
69 ZSTD_freeCStream(cstream); 85 ZSTD_freeCStream(compressor->cstream);
86 compressor->cstream = NULL;
70 PyErr_Format(ZstdError, "cannot init CStream: %s", ZSTD_getErrorName(zresult)); 87 PyErr_Format(ZstdError, "cannot init CStream: %s", ZSTD_getErrorName(zresult));
71 return NULL; 88 return -1;
72 } 89 }
73 90
74 return cstream; 91 return 0;;
92 }
93
94 int init_mtcstream(ZstdCompressor* compressor, Py_ssize_t sourceSize) {
95 size_t zresult;
96 void* dictData = NULL;
97 size_t dictSize = 0;
98 ZSTD_parameters zparams;
99
100 assert(compressor->mtcctx);
101
102 if (compressor->dict) {
103 dictData = compressor->dict->dictData;
104 dictSize = compressor->dict->dictSize;
105 }
106
107 memset(&zparams, 0, sizeof(zparams));
108 if (compressor->cparams) {
109 ztopy_compression_parameters(compressor->cparams, &zparams.cParams);
110 }
111 else {
112 zparams.cParams = ZSTD_getCParams(compressor->compressionLevel, sourceSize, dictSize);
113 }
114
115 zparams.fParams = compressor->fparams;
116
117 zresult = ZSTDMT_initCStream_advanced(compressor->mtcctx, dictData, dictSize,
118 zparams, sourceSize);
119
120 if (ZSTD_isError(zresult)) {
121 PyErr_Format(ZstdError, "cannot init CStream: %s", ZSTD_getErrorName(zresult));
122 return -1;
123 }
124
125 return 0;
75 } 126 }
76 127
77 PyDoc_STRVAR(ZstdCompressor__doc__, 128 PyDoc_STRVAR(ZstdCompressor__doc__,
78 "ZstdCompressor(level=None, dict_data=None, compression_params=None)\n" 129 "ZstdCompressor(level=None, dict_data=None, compression_params=None)\n"
79 "\n" 130 "\n"
101 " knows the size of the input data.\n" 152 " knows the size of the input data.\n"
102 "write_dict_id\n" 153 "write_dict_id\n"
103 " Determines whether the dictionary ID will be written into the compressed\n" 154 " Determines whether the dictionary ID will be written into the compressed\n"
104 " data. Defaults to True. Only adds content to the compressed data if\n" 155 " data. Defaults to True. Only adds content to the compressed data if\n"
105 " a dictionary is being used.\n" 156 " a dictionary is being used.\n"
157 "threads\n"
158 " Number of threads to use to compress data concurrently. When set,\n"
159 " compression operations are performed on multiple threads. The default\n"
160 " value (0) disables multi-threaded compression. A value of ``-1`` means to\n"
161 " set the number of threads to the number of detected logical CPUs.\n"
106 ); 162 );
107 163
108 static int ZstdCompressor_init(ZstdCompressor* self, PyObject* args, PyObject* kwargs) { 164 static int ZstdCompressor_init(ZstdCompressor* self, PyObject* args, PyObject* kwargs) {
109 static char* kwlist[] = { 165 static char* kwlist[] = {
110 "level", 166 "level",
111 "dict_data", 167 "dict_data",
112 "compression_params", 168 "compression_params",
113 "write_checksum", 169 "write_checksum",
114 "write_content_size", 170 "write_content_size",
115 "write_dict_id", 171 "write_dict_id",
172 "threads",
116 NULL 173 NULL
117 }; 174 };
118 175
119 int level = 3; 176 int level = 3;
120 ZstdCompressionDict* dict = NULL; 177 ZstdCompressionDict* dict = NULL;
121 CompressionParametersObject* params = NULL; 178 CompressionParametersObject* params = NULL;
122 PyObject* writeChecksum = NULL; 179 PyObject* writeChecksum = NULL;
123 PyObject* writeContentSize = NULL; 180 PyObject* writeContentSize = NULL;
124 PyObject* writeDictID = NULL; 181 PyObject* writeDictID = NULL;
125 182 int threads = 0;
126 self->cctx = NULL; 183
127 self->dict = NULL; 184 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|iO!O!OOOi:ZstdCompressor",
128 self->cparams = NULL;
129 self->cdict = NULL;
130
131 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|iO!O!OOO:ZstdCompressor",
132 kwlist, &level, &ZstdCompressionDictType, &dict, 185 kwlist, &level, &ZstdCompressionDictType, &dict,
133 &CompressionParametersType, &params, 186 &CompressionParametersType, &params,
134 &writeChecksum, &writeContentSize, &writeDictID)) { 187 &writeChecksum, &writeContentSize, &writeDictID, &threads)) {
135 return -1; 188 return -1;
136 } 189 }
137 190
138 if (level < 1) { 191 if (level < 1) {
139 PyErr_SetString(PyExc_ValueError, "level must be greater than 0"); 192 PyErr_SetString(PyExc_ValueError, "level must be greater than 0");
144 PyErr_Format(PyExc_ValueError, "level must be less than %d", 197 PyErr_Format(PyExc_ValueError, "level must be less than %d",
145 ZSTD_maxCLevel() + 1); 198 ZSTD_maxCLevel() + 1);
146 return -1; 199 return -1;
147 } 200 }
148 201
202 if (threads < 0) {
203 threads = cpu_count();
204 }
205
206 self->threads = threads;
207
149 /* We create a ZSTD_CCtx for reuse among multiple operations to reduce the 208 /* We create a ZSTD_CCtx for reuse among multiple operations to reduce the
150 overhead of each compression operation. */ 209 overhead of each compression operation. */
151 self->cctx = ZSTD_createCCtx(); 210 if (threads) {
152 if (!self->cctx) { 211 self->mtcctx = ZSTDMT_createCCtx(threads);
153 PyErr_NoMemory(); 212 if (!self->mtcctx) {
154 return -1; 213 PyErr_NoMemory();
214 return -1;
215 }
216 }
217 else {
218 self->cctx = ZSTD_createCCtx();
219 if (!self->cctx) {
220 PyErr_NoMemory();
221 return -1;
222 }
155 } 223 }
156 224
157 self->compressionLevel = level; 225 self->compressionLevel = level;
158 226
159 if (dict) { 227 if (dict) {
180 248
181 return 0; 249 return 0;
182 } 250 }
183 251
184 static void ZstdCompressor_dealloc(ZstdCompressor* self) { 252 static void ZstdCompressor_dealloc(ZstdCompressor* self) {
253 if (self->cstream) {
254 ZSTD_freeCStream(self->cstream);
255 self->cstream = NULL;
256 }
257
185 Py_XDECREF(self->cparams); 258 Py_XDECREF(self->cparams);
186 Py_XDECREF(self->dict); 259 Py_XDECREF(self->dict);
187 260
188 if (self->cdict) { 261 if (self->cdict) {
189 ZSTD_freeCDict(self->cdict); 262 ZSTD_freeCDict(self->cdict);
191 } 264 }
192 265
193 if (self->cctx) { 266 if (self->cctx) {
194 ZSTD_freeCCtx(self->cctx); 267 ZSTD_freeCCtx(self->cctx);
195 self->cctx = NULL; 268 self->cctx = NULL;
269 }
270
271 if (self->mtcctx) {
272 ZSTDMT_freeCCtx(self->mtcctx);
273 self->mtcctx = NULL;
196 } 274 }
197 275
198 PyObject_Del(self); 276 PyObject_Del(self);
199 } 277 }
200 278
227 PyObject* source; 305 PyObject* source;
228 PyObject* dest; 306 PyObject* dest;
229 Py_ssize_t sourceSize = 0; 307 Py_ssize_t sourceSize = 0;
230 size_t inSize = ZSTD_CStreamInSize(); 308 size_t inSize = ZSTD_CStreamInSize();
231 size_t outSize = ZSTD_CStreamOutSize(); 309 size_t outSize = ZSTD_CStreamOutSize();
232 ZSTD_CStream* cstream;
233 ZSTD_inBuffer input; 310 ZSTD_inBuffer input;
234 ZSTD_outBuffer output; 311 ZSTD_outBuffer output;
235 Py_ssize_t totalRead = 0; 312 Py_ssize_t totalRead = 0;
236 Py_ssize_t totalWrite = 0; 313 Py_ssize_t totalWrite = 0;
237 char* readBuffer; 314 char* readBuffer;
259 } 336 }
260 337
261 /* Prevent free on uninitialized memory in finally. */ 338 /* Prevent free on uninitialized memory in finally. */
262 output.dst = NULL; 339 output.dst = NULL;
263 340
264 cstream = CStream_from_ZstdCompressor(self, sourceSize); 341 if (self->mtcctx) {
265 if (!cstream) { 342 if (init_mtcstream(self, sourceSize)) {
266 res = NULL; 343 res = NULL;
267 goto finally; 344 goto finally;
345 }
346 }
347 else {
348 if (0 != init_cstream(self, sourceSize)) {
349 res = NULL;
350 goto finally;
351 }
268 } 352 }
269 353
270 output.dst = PyMem_Malloc(outSize); 354 output.dst = PyMem_Malloc(outSize);
271 if (!output.dst) { 355 if (!output.dst) {
272 PyErr_NoMemory(); 356 PyErr_NoMemory();
298 input.size = readSize; 382 input.size = readSize;
299 input.pos = 0; 383 input.pos = 0;
300 384
301 while (input.pos < input.size) { 385 while (input.pos < input.size) {
302 Py_BEGIN_ALLOW_THREADS 386 Py_BEGIN_ALLOW_THREADS
303 zresult = ZSTD_compressStream(cstream, &output, &input); 387 if (self->mtcctx) {
388 zresult = ZSTDMT_compressStream(self->mtcctx, &output, &input);
389 }
390 else {
391 zresult = ZSTD_compressStream(self->cstream, &output, &input);
392 }
304 Py_END_ALLOW_THREADS 393 Py_END_ALLOW_THREADS
305 394
306 if (ZSTD_isError(zresult)) { 395 if (ZSTD_isError(zresult)) {
307 res = NULL; 396 res = NULL;
308 PyErr_Format(ZstdError, "zstd compress error: %s", ZSTD_getErrorName(zresult)); 397 PyErr_Format(ZstdError, "zstd compress error: %s", ZSTD_getErrorName(zresult));
323 } 412 }
324 } 413 }
325 414
326 /* We've finished reading. Now flush the compressor stream. */ 415 /* We've finished reading. Now flush the compressor stream. */
327 while (1) { 416 while (1) {
328 zresult = ZSTD_endStream(cstream, &output); 417 if (self->mtcctx) {
418 zresult = ZSTDMT_endStream(self->mtcctx, &output);
419 }
420 else {
421 zresult = ZSTD_endStream(self->cstream, &output);
422 }
329 if (ZSTD_isError(zresult)) { 423 if (ZSTD_isError(zresult)) {
330 PyErr_Format(ZstdError, "error ending compression stream: %s", 424 PyErr_Format(ZstdError, "error ending compression stream: %s",
331 ZSTD_getErrorName(zresult)); 425 ZSTD_getErrorName(zresult));
332 res = NULL; 426 res = NULL;
333 goto finally; 427 goto finally;
348 if (!zresult) { 442 if (!zresult) {
349 break; 443 break;
350 } 444 }
351 } 445 }
352 446
353 ZSTD_freeCStream(cstream);
354 cstream = NULL;
355
356 totalReadPy = PyLong_FromSsize_t(totalRead); 447 totalReadPy = PyLong_FromSsize_t(totalRead);
357 totalWritePy = PyLong_FromSsize_t(totalWrite); 448 totalWritePy = PyLong_FromSsize_t(totalWrite);
358 res = PyTuple_Pack(2, totalReadPy, totalWritePy); 449 res = PyTuple_Pack(2, totalReadPy, totalWritePy);
359 Py_DecRef(totalReadPy); 450 Py_DECREF(totalReadPy);
360 Py_DecRef(totalWritePy); 451 Py_DECREF(totalWritePy);
361 452
362 finally: 453 finally:
363 if (output.dst) { 454 if (output.dst) {
364 PyMem_Free(output.dst); 455 PyMem_Free(output.dst);
365 }
366
367 if (cstream) {
368 ZSTD_freeCStream(cstream);
369 } 456 }
370 457
371 return res; 458 return res;
372 } 459 }
373 460
408 #endif 495 #endif
409 kwlist, &source, &sourceSize, &allowEmpty)) { 496 kwlist, &source, &sourceSize, &allowEmpty)) {
410 return NULL; 497 return NULL;
411 } 498 }
412 499
500 if (self->threads && self->dict) {
501 PyErr_SetString(ZstdError,
502 "compress() cannot be used with both dictionaries and multi-threaded compression");
503 return NULL;
504 }
505
506 if (self->threads && self->cparams) {
507 PyErr_SetString(ZstdError,
508 "compress() cannot be used with both compression parameters and multi-threaded compression");
509 return NULL;
510 }
511
413 /* Limitation in zstd C API doesn't let decompression side distinguish 512 /* Limitation in zstd C API doesn't let decompression side distinguish
414 between content size of 0 and unknown content size. This can make round 513 between content size of 0 and unknown content size. This can make round
415 tripping via Python difficult. Until this is fixed, require a flag 514 tripping via Python difficult. Until this is fixed, require a flag
416 to fire the footgun. 515 to fire the footgun.
417 https://github.com/indygreg/python-zstandard/issues/11 */ 516 https://github.com/indygreg/python-zstandard/issues/11 */
454 Note: the compression parameters used for the first invocation (possibly 553 Note: the compression parameters used for the first invocation (possibly
455 derived from the source size) will be reused on all subsequent invocations. 554 derived from the source size) will be reused on all subsequent invocations.
456 https://github.com/facebook/zstd/issues/358 contains more info. We could 555 https://github.com/facebook/zstd/issues/358 contains more info. We could
457 potentially add an argument somewhere to control this behavior. 556 potentially add an argument somewhere to control this behavior.
458 */ 557 */
459 if (dictData && !self->cdict) { 558 if (0 != populate_cdict(self, &zparams)) {
460 if (populate_cdict(self, dictData, dictSize, &zparams)) { 559 Py_DECREF(output);
461 Py_DECREF(output); 560 return NULL;
462 return NULL;
463 }
464 } 561 }
465 562
466 Py_BEGIN_ALLOW_THREADS 563 Py_BEGIN_ALLOW_THREADS
467 /* By avoiding ZSTD_compress(), we don't necessarily write out content 564 if (self->mtcctx) {
468 size. This means the argument to ZstdCompressor to control frame 565 zresult = ZSTDMT_compressCCtx(self->mtcctx, dest, destSize,
469 parameters is honored. */ 566 source, sourceSize, self->compressionLevel);
470 if (self->cdict) {
471 zresult = ZSTD_compress_usingCDict(self->cctx, dest, destSize,
472 source, sourceSize, self->cdict);
473 } 567 }
474 else { 568 else {
475 zresult = ZSTD_compress_advanced(self->cctx, dest, destSize, 569 /* By avoiding ZSTD_compress(), we don't necessarily write out content
476 source, sourceSize, dictData, dictSize, zparams); 570 size. This means the argument to ZstdCompressor to control frame
571 parameters is honored. */
572 if (self->cdict) {
573 zresult = ZSTD_compress_usingCDict(self->cctx, dest, destSize,
574 source, sourceSize, self->cdict);
575 }
576 else {
577 zresult = ZSTD_compress_advanced(self->cctx, dest, destSize,
578 source, sourceSize, dictData, dictSize, zparams);
579 }
477 } 580 }
478 Py_END_ALLOW_THREADS 581 Py_END_ALLOW_THREADS
479 582
480 if (ZSTD_isError(zresult)) { 583 if (ZSTD_isError(zresult)) {
481 PyErr_Format(ZstdError, "cannot compress: %s", ZSTD_getErrorName(zresult)); 584 PyErr_Format(ZstdError, "cannot compress: %s", ZSTD_getErrorName(zresult));
505 NULL 608 NULL
506 }; 609 };
507 610
508 Py_ssize_t inSize = 0; 611 Py_ssize_t inSize = 0;
509 size_t outSize = ZSTD_CStreamOutSize(); 612 size_t outSize = ZSTD_CStreamOutSize();
510 ZstdCompressionObj* result = PyObject_New(ZstdCompressionObj, &ZstdCompressionObjType); 613 ZstdCompressionObj* result = NULL;
614
615 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|n:compressobj", kwlist, &inSize)) {
616 return NULL;
617 }
618
619 result = (ZstdCompressionObj*)PyObject_CallObject((PyObject*)&ZstdCompressionObjType, NULL);
511 if (!result) { 620 if (!result) {
512 return NULL; 621 return NULL;
513 } 622 }
514 623
515 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|n:compressobj", kwlist, &inSize)) { 624 if (self->mtcctx) {
516 return NULL; 625 if (init_mtcstream(self, inSize)) {
517 } 626 Py_DECREF(result);
518 627 return NULL;
519 result->cstream = CStream_from_ZstdCompressor(self, inSize); 628 }
520 if (!result->cstream) { 629 }
521 Py_DECREF(result); 630 else {
522 return NULL; 631 if (0 != init_cstream(self, inSize)) {
632 Py_DECREF(result);
633 return NULL;
634 }
523 } 635 }
524 636
525 result->output.dst = PyMem_Malloc(outSize); 637 result->output.dst = PyMem_Malloc(outSize);
526 if (!result->output.dst) { 638 if (!result->output.dst) {
527 PyErr_NoMemory(); 639 PyErr_NoMemory();
528 Py_DECREF(result); 640 Py_DECREF(result);
529 return NULL; 641 return NULL;
530 } 642 }
531 result->output.size = outSize; 643 result->output.size = outSize;
532 result->output.pos = 0;
533
534 result->compressor = self; 644 result->compressor = self;
535 Py_INCREF(result->compressor); 645 Py_INCREF(result->compressor);
536
537 result->finished = 0;
538 646
539 return result; 647 return result;
540 } 648 }
541 649
542 PyDoc_STRVAR(ZstdCompressor_read_from__doc__, 650 PyDoc_STRVAR(ZstdCompressor_read_from__doc__,
577 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|nkk:read_from", kwlist, 685 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|nkk:read_from", kwlist,
578 &reader, &sourceSize, &inSize, &outSize)) { 686 &reader, &sourceSize, &inSize, &outSize)) {
579 return NULL; 687 return NULL;
580 } 688 }
581 689
582 result = PyObject_New(ZstdCompressorIterator, &ZstdCompressorIteratorType); 690 result = (ZstdCompressorIterator*)PyObject_CallObject((PyObject*)&ZstdCompressorIteratorType, NULL);
583 if (!result) { 691 if (!result) {
584 return NULL; 692 return NULL;
585 } 693 }
586
587 result->compressor = NULL;
588 result->reader = NULL;
589 result->buffer = NULL;
590 result->cstream = NULL;
591 result->input.src = NULL;
592 result->output.dst = NULL;
593 result->readResult = NULL;
594
595 if (PyObject_HasAttrString(reader, "read")) { 694 if (PyObject_HasAttrString(reader, "read")) {
596 result->reader = reader; 695 result->reader = reader;
597 Py_INCREF(result->reader); 696 Py_INCREF(result->reader);
598 } 697 }
599 else if (1 == PyObject_CheckBuffer(reader)) { 698 else if (1 == PyObject_CheckBuffer(reader)) {
606 705
607 if (0 != PyObject_GetBuffer(reader, result->buffer, PyBUF_CONTIG_RO)) { 706 if (0 != PyObject_GetBuffer(reader, result->buffer, PyBUF_CONTIG_RO)) {
608 goto except; 707 goto except;
609 } 708 }
610 709
611 result->bufferOffset = 0;
612 sourceSize = result->buffer->len; 710 sourceSize = result->buffer->len;
613 } 711 }
614 else { 712 else {
615 PyErr_SetString(PyExc_ValueError, 713 PyErr_SetString(PyExc_ValueError,
616 "must pass an object with a read() method or conforms to buffer protocol"); 714 "must pass an object with a read() method or conforms to buffer protocol");
619 717
620 result->compressor = self; 718 result->compressor = self;
621 Py_INCREF(result->compressor); 719 Py_INCREF(result->compressor);
622 720
623 result->sourceSize = sourceSize; 721 result->sourceSize = sourceSize;
624 result->cstream = CStream_from_ZstdCompressor(self, sourceSize); 722
625 if (!result->cstream) { 723 if (self->mtcctx) {
626 goto except; 724 if (init_mtcstream(self, sourceSize)) {
725 goto except;
726 }
727 }
728 else {
729 if (0 != init_cstream(self, sourceSize)) {
730 goto except;
731 }
627 } 732 }
628 733
629 result->inSize = inSize; 734 result->inSize = inSize;
630 result->outSize = outSize; 735 result->outSize = outSize;
631 736
633 if (!result->output.dst) { 738 if (!result->output.dst) {
634 PyErr_NoMemory(); 739 PyErr_NoMemory();
635 goto except; 740 goto except;
636 } 741 }
637 result->output.size = outSize; 742 result->output.size = outSize;
638 result->output.pos = 0;
639
640 result->input.src = NULL;
641 result->input.size = 0;
642 result->input.pos = 0;
643
644 result->finishedInput = 0;
645 result->finishedOutput = 0;
646 743
647 goto finally; 744 goto finally;
648 745
649 except: 746 except:
650 if (result->cstream) { 747 Py_XDECREF(result->compressor);
651 ZSTD_freeCStream(result->cstream); 748 Py_XDECREF(result->reader);
652 result->cstream = NULL;
653 }
654
655 Py_DecRef((PyObject*)result->compressor);
656 Py_DecRef(result->reader);
657
658 Py_DECREF(result); 749 Py_DECREF(result);
659 result = NULL; 750 result = NULL;
660 751
661 finally: 752 finally:
662 return result; 753 return result;
701 if (!PyObject_HasAttrString(writer, "write")) { 792 if (!PyObject_HasAttrString(writer, "write")) {
702 PyErr_SetString(PyExc_ValueError, "must pass an object with a write() method"); 793 PyErr_SetString(PyExc_ValueError, "must pass an object with a write() method");
703 return NULL; 794 return NULL;
704 } 795 }
705 796
706 result = PyObject_New(ZstdCompressionWriter, &ZstdCompressionWriterType); 797 result = (ZstdCompressionWriter*)PyObject_CallObject((PyObject*)&ZstdCompressionWriterType, NULL);
707 if (!result) { 798 if (!result) {
708 return NULL; 799 return NULL;
709 } 800 }
710 801
711 result->compressor = self; 802 result->compressor = self;
713 804
714 result->writer = writer; 805 result->writer = writer;
715 Py_INCREF(result->writer); 806 Py_INCREF(result->writer);
716 807
717 result->sourceSize = sourceSize; 808 result->sourceSize = sourceSize;
718
719 result->outSize = outSize; 809 result->outSize = outSize;
720 810
721 result->entered = 0; 811 return result;
722 result->cstream = NULL; 812 }
813
814 typedef struct {
815 void* sourceData;
816 size_t sourceSize;
817 } DataSource;
818
819 typedef struct {
820 DataSource* sources;
821 Py_ssize_t sourcesSize;
822 unsigned long long totalSourceSize;
823 } DataSources;
824
825 typedef struct {
826 void* dest;
827 Py_ssize_t destSize;
828 BufferSegment* segments;
829 Py_ssize_t segmentsSize;
830 } DestBuffer;
831
832 typedef enum {
833 WorkerError_none = 0,
834 WorkerError_zstd = 1,
835 WorkerError_no_memory = 2,
836 } WorkerError;
837
838 /**
839 * Holds state for an individual worker performing multi_compress_to_buffer work.
840 */
841 typedef struct {
842 /* Used for compression. */
843 ZSTD_CCtx* cctx;
844 ZSTD_CDict* cdict;
845 int cLevel;
846 CompressionParametersObject* cParams;
847 ZSTD_frameParameters fParams;
848
849 /* What to compress. */
850 DataSource* sources;
851 Py_ssize_t sourcesSize;
852 Py_ssize_t startOffset;
853 Py_ssize_t endOffset;
854 unsigned long long totalSourceSize;
855
856 /* Result storage. */
857 DestBuffer* destBuffers;
858 Py_ssize_t destCount;
859
860 /* Error tracking. */
861 WorkerError error;
862 size_t zresult;
863 Py_ssize_t errorOffset;
864 } WorkerState;
865
866 static void compress_worker(WorkerState* state) {
867 Py_ssize_t inputOffset = state->startOffset;
868 Py_ssize_t remainingItems = state->endOffset - state->startOffset + 1;
869 Py_ssize_t currentBufferStartOffset = state->startOffset;
870 size_t zresult;
871 ZSTD_parameters zparams;
872 void* newDest;
873 size_t allocationSize;
874 size_t boundSize;
875 Py_ssize_t destOffset = 0;
876 DataSource* sources = state->sources;
877 DestBuffer* destBuffer;
878
879 assert(!state->destBuffers);
880 assert(0 == state->destCount);
881
882 if (state->cParams) {
883 ztopy_compression_parameters(state->cParams, &zparams.cParams);
884 }
885
886 zparams.fParams = state->fParams;
887
888 /*
889 * The total size of the compressed data is unknown until we actually
890 * compress data. That means we can't pre-allocate the exact size we need.
891 *
892 * There is a cost to every allocation and reallocation. So, it is in our
893 * interest to minimize the number of allocations.
894 *
895 * There is also a cost to too few allocations. If allocations are too
896 * large they may fail. If buffers are shared and all inputs become
897 * irrelevant at different lifetimes, then a reference to one segment
898 * in the buffer will keep the entire buffer alive. This leads to excessive
899 * memory usage.
900 *
901 * Our current strategy is to assume a compression ratio of 16:1 and
902 * allocate buffers of that size, rounded up to the nearest power of 2
903 * (because computers like round numbers). That ratio is greater than what
904 * most inputs achieve. This is by design: we don't want to over-allocate.
905 * But we don't want to under-allocate and lead to too many buffers either.
906 */
907
908 state->destCount = 1;
909
910 state->destBuffers = calloc(1, sizeof(DestBuffer));
911 if (NULL == state->destBuffers) {
912 state->error = WorkerError_no_memory;
913 return;
914 }
915
916 destBuffer = &state->destBuffers[state->destCount - 1];
917
918 /*
919 * Rather than track bounds and grow the segments buffer, allocate space
920 * to hold remaining items then truncate when we're done with it.
921 */
922 destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
923 if (NULL == destBuffer->segments) {
924 state->error = WorkerError_no_memory;
925 return;
926 }
927
928 destBuffer->segmentsSize = remainingItems;
929
930 allocationSize = roundpow2(state->totalSourceSize >> 4);
931
932 /* If the maximum size of the output is larger than that, round up. */
933 boundSize = ZSTD_compressBound(sources[inputOffset].sourceSize);
934
935 if (boundSize > allocationSize) {
936 allocationSize = roundpow2(boundSize);
937 }
938
939 destBuffer->dest = malloc(allocationSize);
940 if (NULL == destBuffer->dest) {
941 state->error = WorkerError_no_memory;
942 return;
943 }
944
945 destBuffer->destSize = allocationSize;
946
947 for (inputOffset = state->startOffset; inputOffset <= state->endOffset; inputOffset++) {
948 void* source = sources[inputOffset].sourceData;
949 size_t sourceSize = sources[inputOffset].sourceSize;
950 size_t destAvailable;
951 void* dest;
952
953 destAvailable = destBuffer->destSize - destOffset;
954 boundSize = ZSTD_compressBound(sourceSize);
955
956 /*
957 * Not enough space in current buffer to hold largest compressed output.
958 * So allocate and switch to a new output buffer.
959 */
960 if (boundSize > destAvailable) {
961 /*
962 * The downsizing of the existing buffer is optional. It should be cheap
963 * (unlike growing). So we just do it.
964 */
965 if (destAvailable) {
966 newDest = realloc(destBuffer->dest, destOffset);
967 if (NULL == newDest) {
968 state->error = WorkerError_no_memory;
969 return;
970 }
971
972 destBuffer->dest = newDest;
973 destBuffer->destSize = destOffset;
974 }
975
976 /* Truncate segments buffer. */
977 newDest = realloc(destBuffer->segments,
978 (inputOffset - currentBufferStartOffset + 1) * sizeof(BufferSegment));
979 if (NULL == newDest) {
980 state->error = WorkerError_no_memory;
981 return;
982 }
983
984 destBuffer->segments = newDest;
985 destBuffer->segmentsSize = inputOffset - currentBufferStartOffset;
986
987 /* Grow space for new struct. */
988 /* TODO consider over-allocating so we don't do this every time. */
989 newDest = realloc(state->destBuffers, (state->destCount + 1) * sizeof(DestBuffer));
990 if (NULL == newDest) {
991 state->error = WorkerError_no_memory;
992 return;
993 }
994
995 state->destBuffers = newDest;
996 state->destCount++;
997
998 destBuffer = &state->destBuffers[state->destCount - 1];
999
1000 /* Don't take any chances with non-NULL pointers. */
1001 memset(destBuffer, 0, sizeof(DestBuffer));
1002
1003 /**
1004 * We could dynamically update allocation size based on work done so far.
1005 * For now, keep is simple.
1006 */
1007 allocationSize = roundpow2(state->totalSourceSize >> 4);
1008
1009 if (boundSize > allocationSize) {
1010 allocationSize = roundpow2(boundSize);
1011 }
1012
1013 destBuffer->dest = malloc(allocationSize);
1014 if (NULL == destBuffer->dest) {
1015 state->error = WorkerError_no_memory;
1016 return;
1017 }
1018
1019 destBuffer->destSize = allocationSize;
1020 destAvailable = allocationSize;
1021 destOffset = 0;
1022
1023 destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
1024 if (NULL == destBuffer->segments) {
1025 state->error = WorkerError_no_memory;
1026 return;
1027 }
1028
1029 destBuffer->segmentsSize = remainingItems;
1030 currentBufferStartOffset = inputOffset;
1031 }
1032
1033 dest = (char*)destBuffer->dest + destOffset;
1034
1035 if (state->cdict) {
1036 zresult = ZSTD_compress_usingCDict(state->cctx, dest, destAvailable,
1037 source, sourceSize, state->cdict);
1038 }
1039 else {
1040 if (!state->cParams) {
1041 zparams.cParams = ZSTD_getCParams(state->cLevel, sourceSize, 0);
1042 }
1043
1044 zresult = ZSTD_compress_advanced(state->cctx, dest, destAvailable,
1045 source, sourceSize, NULL, 0, zparams);
1046 }
1047
1048 if (ZSTD_isError(zresult)) {
1049 state->error = WorkerError_zstd;
1050 state->zresult = zresult;
1051 state->errorOffset = inputOffset;
1052 break;
1053 }
1054
1055 destBuffer->segments[inputOffset - currentBufferStartOffset].offset = destOffset;
1056 destBuffer->segments[inputOffset - currentBufferStartOffset].length = zresult;
1057
1058 destOffset += zresult;
1059 remainingItems--;
1060 }
1061
1062 if (destBuffer->destSize > destOffset) {
1063 newDest = realloc(destBuffer->dest, destOffset);
1064 if (NULL == newDest) {
1065 state->error = WorkerError_no_memory;
1066 return;
1067 }
1068
1069 destBuffer->dest = newDest;
1070 destBuffer->destSize = destOffset;
1071 }
1072 }
1073
1074 ZstdBufferWithSegmentsCollection* compress_from_datasources(ZstdCompressor* compressor,
1075 DataSources* sources, unsigned int threadCount) {
1076 ZSTD_parameters zparams;
1077 unsigned long long bytesPerWorker;
1078 POOL_ctx* pool = NULL;
1079 WorkerState* workerStates = NULL;
1080 Py_ssize_t i;
1081 unsigned long long workerBytes = 0;
1082 Py_ssize_t workerStartOffset = 0;
1083 size_t currentThread = 0;
1084 int errored = 0;
1085 Py_ssize_t segmentsCount = 0;
1086 Py_ssize_t segmentIndex;
1087 PyObject* segmentsArg = NULL;
1088 ZstdBufferWithSegments* buffer;
1089 ZstdBufferWithSegmentsCollection* result = NULL;
1090
1091 assert(sources->sourcesSize > 0);
1092 assert(sources->totalSourceSize > 0);
1093 assert(threadCount >= 1);
1094
1095 /* More threads than inputs makes no sense. */
1096 threadCount = sources->sourcesSize < threadCount ? (unsigned int)sources->sourcesSize
1097 : threadCount;
1098
1099 /* TODO lower thread count when input size is too small and threads would add
1100 overhead. */
1101
1102 /*
1103 * When dictionaries are used, parameters are derived from the size of the
1104 * first element.
1105 *
1106 * TODO come up with a better mechanism.
1107 */
1108 memset(&zparams, 0, sizeof(zparams));
1109 if (compressor->cparams) {
1110 ztopy_compression_parameters(compressor->cparams, &zparams.cParams);
1111 }
1112 else {
1113 zparams.cParams = ZSTD_getCParams(compressor->compressionLevel,
1114 sources->sources[0].sourceSize,
1115 compressor->dict ? compressor->dict->dictSize : 0);
1116 }
1117
1118 zparams.fParams = compressor->fparams;
1119
1120 if (0 != populate_cdict(compressor, &zparams)) {
1121 return NULL;
1122 }
1123
1124 workerStates = PyMem_Malloc(threadCount * sizeof(WorkerState));
1125 if (NULL == workerStates) {
1126 PyErr_NoMemory();
1127 goto finally;
1128 }
1129
1130 memset(workerStates, 0, threadCount * sizeof(WorkerState));
1131
1132 if (threadCount > 1) {
1133 pool = POOL_create(threadCount, 1);
1134 if (NULL == pool) {
1135 PyErr_SetString(ZstdError, "could not initialize zstd thread pool");
1136 goto finally;
1137 }
1138 }
1139
1140 bytesPerWorker = sources->totalSourceSize / threadCount;
1141
1142 for (i = 0; i < threadCount; i++) {
1143 workerStates[i].cctx = ZSTD_createCCtx();
1144 if (!workerStates[i].cctx) {
1145 PyErr_NoMemory();
1146 goto finally;
1147 }
1148
1149 workerStates[i].cdict = compressor->cdict;
1150 workerStates[i].cLevel = compressor->compressionLevel;
1151 workerStates[i].cParams = compressor->cparams;
1152 workerStates[i].fParams = compressor->fparams;
1153
1154 workerStates[i].sources = sources->sources;
1155 workerStates[i].sourcesSize = sources->sourcesSize;
1156 }
1157
1158 Py_BEGIN_ALLOW_THREADS
1159 for (i = 0; i < sources->sourcesSize; i++) {
1160 workerBytes += sources->sources[i].sourceSize;
1161
1162 /*
1163 * The last worker/thread needs to handle all remaining work. Don't
1164 * trigger it prematurely. Defer to the block outside of the loop
1165 * to run the last worker/thread. But do still process this loop
1166 * so workerBytes is correct.
1167 */
1168 if (currentThread == threadCount - 1) {
1169 continue;
1170 }
1171
1172 if (workerBytes >= bytesPerWorker) {
1173 assert(currentThread < threadCount);
1174 workerStates[currentThread].totalSourceSize = workerBytes;
1175 workerStates[currentThread].startOffset = workerStartOffset;
1176 workerStates[currentThread].endOffset = i;
1177
1178 if (threadCount > 1) {
1179 POOL_add(pool, (POOL_function)compress_worker, &workerStates[currentThread]);
1180 }
1181 else {
1182 compress_worker(&workerStates[currentThread]);
1183 }
1184
1185 currentThread++;
1186 workerStartOffset = i + 1;
1187 workerBytes = 0;
1188 }
1189 }
1190
1191 if (workerBytes) {
1192 assert(currentThread < threadCount);
1193 workerStates[currentThread].totalSourceSize = workerBytes;
1194 workerStates[currentThread].startOffset = workerStartOffset;
1195 workerStates[currentThread].endOffset = sources->sourcesSize - 1;
1196
1197 if (threadCount > 1) {
1198 POOL_add(pool, (POOL_function)compress_worker, &workerStates[currentThread]);
1199 }
1200 else {
1201 compress_worker(&workerStates[currentThread]);
1202 }
1203 }
1204
1205 if (threadCount > 1) {
1206 POOL_free(pool);
1207 pool = NULL;
1208 }
1209
1210 Py_END_ALLOW_THREADS
1211
1212 for (i = 0; i < threadCount; i++) {
1213 switch (workerStates[i].error) {
1214 case WorkerError_no_memory:
1215 PyErr_NoMemory();
1216 errored = 1;
1217 break;
1218
1219 case WorkerError_zstd:
1220 PyErr_Format(ZstdError, "error compressing item %zd: %s",
1221 workerStates[i].errorOffset, ZSTD_getErrorName(workerStates[i].zresult));
1222 errored = 1;
1223 break;
1224 default:
1225 ;
1226 }
1227
1228 if (errored) {
1229 break;
1230 }
1231
1232 }
1233
1234 if (errored) {
1235 goto finally;
1236 }
1237
1238 segmentsCount = 0;
1239 for (i = 0; i < threadCount; i++) {
1240 WorkerState* state = &workerStates[i];
1241 segmentsCount += state->destCount;
1242 }
1243
1244 segmentsArg = PyTuple_New(segmentsCount);
1245 if (NULL == segmentsArg) {
1246 goto finally;
1247 }
1248
1249 segmentIndex = 0;
1250
1251 for (i = 0; i < threadCount; i++) {
1252 Py_ssize_t j;
1253 WorkerState* state = &workerStates[i];
1254
1255 for (j = 0; j < state->destCount; j++) {
1256 DestBuffer* destBuffer = &state->destBuffers[j];
1257 buffer = BufferWithSegments_FromMemory(destBuffer->dest, destBuffer->destSize,
1258 destBuffer->segments, destBuffer->segmentsSize);
1259
1260 if (NULL == buffer) {
1261 goto finally;
1262 }
1263
1264 /* Tell instance to use free() instsead of PyMem_Free(). */
1265 buffer->useFree = 1;
1266
1267 /*
1268 * BufferWithSegments_FromMemory takes ownership of the backing memory.
1269 * Unset it here so it doesn't get freed below.
1270 */
1271 destBuffer->dest = NULL;
1272 destBuffer->segments = NULL;
1273
1274 PyTuple_SET_ITEM(segmentsArg, segmentIndex++, (PyObject*)buffer);
1275 }
1276 }
1277
1278 result = (ZstdBufferWithSegmentsCollection*)PyObject_CallObject(
1279 (PyObject*)&ZstdBufferWithSegmentsCollectionType, segmentsArg);
1280
1281 finally:
1282 Py_CLEAR(segmentsArg);
1283
1284 if (pool) {
1285 POOL_free(pool);
1286 }
1287
1288 if (workerStates) {
1289 Py_ssize_t j;
1290
1291 for (i = 0; i < threadCount; i++) {
1292 WorkerState state = workerStates[i];
1293
1294 if (state.cctx) {
1295 ZSTD_freeCCtx(state.cctx);
1296 }
1297
1298 /* malloc() is used in worker thread. */
1299
1300 for (j = 0; j < state.destCount; j++) {
1301 if (state.destBuffers) {
1302 free(state.destBuffers[j].dest);
1303 free(state.destBuffers[j].segments);
1304 }
1305 }
1306
1307
1308 free(state.destBuffers);
1309 }
1310
1311 PyMem_Free(workerStates);
1312 }
1313
1314 return result;
1315 }
1316
1317 PyDoc_STRVAR(ZstdCompressor_multi_compress_to_buffer__doc__,
1318 "Compress multiple pieces of data as a single operation\n"
1319 "\n"
1320 "Receives a ``BufferWithSegmentsCollection``, a ``BufferWithSegments``, or\n"
1321 "a list of bytes like objects holding data to compress.\n"
1322 "\n"
1323 "Returns a ``BufferWithSegmentsCollection`` holding compressed data.\n"
1324 "\n"
1325 "This function is optimized to perform multiple compression operations as\n"
1326 "as possible with as little overhead as possbile.\n"
1327 );
1328
1329 static ZstdBufferWithSegmentsCollection* ZstdCompressor_multi_compress_to_buffer(ZstdCompressor* self, PyObject* args, PyObject* kwargs) {
1330 static char* kwlist[] = {
1331 "data",
1332 "threads",
1333 NULL
1334 };
1335
1336 PyObject* data;
1337 int threads = 0;
1338 Py_buffer* dataBuffers = NULL;
1339 DataSources sources;
1340 Py_ssize_t i;
1341 Py_ssize_t sourceCount = 0;
1342 ZstdBufferWithSegmentsCollection* result = NULL;
1343
1344 if (self->mtcctx) {
1345 PyErr_SetString(ZstdError,
1346 "function cannot be called on ZstdCompressor configured for multi-threaded compression");
1347 return NULL;
1348 }
1349
1350 memset(&sources, 0, sizeof(sources));
1351
1352 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|i:multi_compress_to_buffer", kwlist,
1353 &data, &threads)) {
1354 return NULL;
1355 }
1356
1357 if (threads < 0) {
1358 threads = cpu_count();
1359 }
1360
1361 if (threads < 2) {
1362 threads = 1;
1363 }
1364
1365 if (PyObject_TypeCheck(data, &ZstdBufferWithSegmentsType)) {
1366 ZstdBufferWithSegments* buffer = (ZstdBufferWithSegments*)data;
1367
1368 sources.sources = PyMem_Malloc(buffer->segmentCount * sizeof(DataSource));
1369 if (NULL == sources.sources) {
1370 PyErr_NoMemory();
1371 goto finally;
1372 }
1373
1374 for (i = 0; i < buffer->segmentCount; i++) {
1375 sources.sources[i].sourceData = (char*)buffer->data + buffer->segments[i].offset;
1376 sources.sources[i].sourceSize = buffer->segments[i].length;
1377 sources.totalSourceSize += buffer->segments[i].length;
1378 }
1379
1380 sources.sourcesSize = buffer->segmentCount;
1381 }
1382 else if (PyObject_TypeCheck(data, &ZstdBufferWithSegmentsCollectionType)) {
1383 Py_ssize_t j;
1384 Py_ssize_t offset = 0;
1385 ZstdBufferWithSegments* buffer;
1386 ZstdBufferWithSegmentsCollection* collection = (ZstdBufferWithSegmentsCollection*)data;
1387
1388 sourceCount = BufferWithSegmentsCollection_length(collection);
1389
1390 sources.sources = PyMem_Malloc(sourceCount * sizeof(DataSource));
1391 if (NULL == sources.sources) {
1392 PyErr_NoMemory();
1393 goto finally;
1394 }
1395
1396 for (i = 0; i < collection->bufferCount; i++) {
1397 buffer = collection->buffers[i];
1398
1399 for (j = 0; j < buffer->segmentCount; j++) {
1400 sources.sources[offset].sourceData = (char*)buffer->data + buffer->segments[j].offset;
1401 sources.sources[offset].sourceSize = buffer->segments[j].length;
1402 sources.totalSourceSize += buffer->segments[j].length;
1403
1404 offset++;
1405 }
1406 }
1407
1408 sources.sourcesSize = sourceCount;
1409 }
1410 else if (PyList_Check(data)) {
1411 sourceCount = PyList_GET_SIZE(data);
1412
1413 sources.sources = PyMem_Malloc(sourceCount * sizeof(DataSource));
1414 if (NULL == sources.sources) {
1415 PyErr_NoMemory();
1416 goto finally;
1417 }
1418
1419 /*
1420 * It isn't clear whether the address referred to by Py_buffer.buf
1421 * is still valid after PyBuffer_Release. We we hold a reference to all
1422 * Py_buffer instances for the duration of the operation.
1423 */
1424 dataBuffers = PyMem_Malloc(sourceCount * sizeof(Py_buffer));
1425 if (NULL == dataBuffers) {
1426 PyErr_NoMemory();
1427 goto finally;
1428 }
1429
1430 memset(dataBuffers, 0, sourceCount * sizeof(Py_buffer));
1431
1432 for (i = 0; i < sourceCount; i++) {
1433 if (0 != PyObject_GetBuffer(PyList_GET_ITEM(data, i),
1434 &dataBuffers[i], PyBUF_CONTIG_RO)) {
1435 PyErr_Clear();
1436 PyErr_Format(PyExc_TypeError, "item %zd not a bytes like object", i);
1437 goto finally;
1438 }
1439
1440 sources.sources[i].sourceData = dataBuffers[i].buf;
1441 sources.sources[i].sourceSize = dataBuffers[i].len;
1442 sources.totalSourceSize += dataBuffers[i].len;
1443 }
1444
1445 sources.sourcesSize = sourceCount;
1446 }
1447 else {
1448 PyErr_SetString(PyExc_TypeError, "argument must be list of BufferWithSegments");
1449 goto finally;
1450 }
1451
1452 if (0 == sources.sourcesSize) {
1453 PyErr_SetString(PyExc_ValueError, "no source elements found");
1454 goto finally;
1455 }
1456
1457 if (0 == sources.totalSourceSize) {
1458 PyErr_SetString(PyExc_ValueError, "source elements are empty");
1459 goto finally;
1460 }
1461
1462 result = compress_from_datasources(self, &sources, threads);
1463
1464 finally:
1465 PyMem_Free(sources.sources);
1466
1467 if (dataBuffers) {
1468 for (i = 0; i < sourceCount; i++) {
1469 PyBuffer_Release(&dataBuffers[i]);
1470 }
1471
1472 PyMem_Free(dataBuffers);
1473 }
723 1474
724 return result; 1475 return result;
725 } 1476 }
726 1477
727 static PyMethodDef ZstdCompressor_methods[] = { 1478 static PyMethodDef ZstdCompressor_methods[] = {
733 METH_VARARGS | METH_KEYWORDS, ZstdCompressor_copy_stream__doc__ }, 1484 METH_VARARGS | METH_KEYWORDS, ZstdCompressor_copy_stream__doc__ },
734 { "read_from", (PyCFunction)ZstdCompressor_read_from, 1485 { "read_from", (PyCFunction)ZstdCompressor_read_from,
735 METH_VARARGS | METH_KEYWORDS, ZstdCompressor_read_from__doc__ }, 1486 METH_VARARGS | METH_KEYWORDS, ZstdCompressor_read_from__doc__ },
736 { "write_to", (PyCFunction)ZstdCompressor_write_to, 1487 { "write_to", (PyCFunction)ZstdCompressor_write_to,
737 METH_VARARGS | METH_KEYWORDS, ZstdCompressor_write_to___doc__ }, 1488 METH_VARARGS | METH_KEYWORDS, ZstdCompressor_write_to___doc__ },
1489 { "multi_compress_to_buffer", (PyCFunction)ZstdCompressor_multi_compress_to_buffer,
1490 METH_VARARGS | METH_KEYWORDS, ZstdCompressor_multi_compress_to_buffer__doc__ },
738 { NULL, NULL } 1491 { NULL, NULL }
739 }; 1492 };
740 1493
741 PyTypeObject ZstdCompressorType = { 1494 PyTypeObject ZstdCompressorType = {
742 PyVarObject_HEAD_INIT(NULL, 0) 1495 PyVarObject_HEAD_INIT(NULL, 0)