--- a/contrib/python-zstandard/c-ext/decompressor.c Sat Apr 01 13:43:52 2017 -0700
+++ b/contrib/python-zstandard/c-ext/decompressor.c Sat Apr 01 15:24:03 2017 -0700
@@ -7,19 +7,37 @@
*/
#include "python-zstandard.h"
+#include "pool.h"
extern PyObject* ZstdError;
-ZSTD_DStream* DStream_from_ZstdDecompressor(ZstdDecompressor* decompressor) {
- ZSTD_DStream* dstream;
+/**
+ * Ensure the ZSTD_DStream on a ZstdDecompressor is initialized and reset.
+ *
+ * This should be called before starting a decompression operation with a
+ * ZSTD_DStream on a ZstdDecompressor.
+ */
+int init_dstream(ZstdDecompressor* decompressor) {
void* dictData = NULL;
size_t dictSize = 0;
size_t zresult;
- dstream = ZSTD_createDStream();
- if (!dstream) {
+ /* Simple case of dstream already exists. Just reset it. */
+ if (decompressor->dstream) {
+ zresult = ZSTD_resetDStream(decompressor->dstream);
+ if (ZSTD_isError(zresult)) {
+ PyErr_Format(ZstdError, "could not reset DStream: %s",
+ ZSTD_getErrorName(zresult));
+ return -1;
+ }
+
+ return 0;
+ }
+
+ decompressor->dstream = ZSTD_createDStream();
+ if (!decompressor->dstream) {
PyErr_SetString(ZstdError, "could not create DStream");
- return NULL;
+ return -1;
}
if (decompressor->dict) {
@@ -28,19 +46,23 @@
}
if (dictData) {
- zresult = ZSTD_initDStream_usingDict(dstream, dictData, dictSize);
+ zresult = ZSTD_initDStream_usingDict(decompressor->dstream, dictData, dictSize);
}
else {
- zresult = ZSTD_initDStream(dstream);
+ zresult = ZSTD_initDStream(decompressor->dstream);
}
if (ZSTD_isError(zresult)) {
+ /* Don't leave a reference to an invalid object. */
+ ZSTD_freeDStream(decompressor->dstream);
+ decompressor->dstream = NULL;
+
PyErr_Format(ZstdError, "could not initialize DStream: %s",
ZSTD_getErrorName(zresult));
- return NULL;
+ return -1;
}
- return dstream;
+ return 0;
}
PyDoc_STRVAR(Decompressor__doc__,
@@ -93,17 +115,23 @@
}
static void Decompressor_dealloc(ZstdDecompressor* self) {
- if (self->dctx) {
- ZSTD_freeDCtx(self->dctx);
- }
-
- Py_XDECREF(self->dict);
+ Py_CLEAR(self->dict);
if (self->ddict) {
ZSTD_freeDDict(self->ddict);
self->ddict = NULL;
}
+ if (self->dstream) {
+ ZSTD_freeDStream(self->dstream);
+ self->dstream = NULL;
+ }
+
+ if (self->dctx) {
+ ZSTD_freeDCtx(self->dctx);
+ self->dctx = NULL;
+ }
+
PyObject_Del(self);
}
@@ -132,7 +160,6 @@
PyObject* dest;
size_t inSize = ZSTD_DStreamInSize();
size_t outSize = ZSTD_DStreamOutSize();
- ZSTD_DStream* dstream;
ZSTD_inBuffer input;
ZSTD_outBuffer output;
Py_ssize_t totalRead = 0;
@@ -164,8 +191,7 @@
/* Prevent free on uninitialized memory in finally. */
output.dst = NULL;
- dstream = DStream_from_ZstdDecompressor(self);
- if (!dstream) {
+ if (0 != init_dstream(self)) {
res = NULL;
goto finally;
}
@@ -203,7 +229,7 @@
while (input.pos < input.size) {
Py_BEGIN_ALLOW_THREADS
- zresult = ZSTD_decompressStream(dstream, &output, &input);
+ zresult = ZSTD_decompressStream(self->dstream, &output, &input);
Py_END_ALLOW_THREADS
if (ZSTD_isError(zresult)) {
@@ -230,24 +256,17 @@
/* Source stream is exhausted. Finish up. */
- ZSTD_freeDStream(dstream);
- dstream = NULL;
-
totalReadPy = PyLong_FromSsize_t(totalRead);
totalWritePy = PyLong_FromSsize_t(totalWrite);
res = PyTuple_Pack(2, totalReadPy, totalWritePy);
- Py_DecRef(totalReadPy);
- Py_DecRef(totalWritePy);
+ Py_DECREF(totalReadPy);
+ Py_DECREF(totalWritePy);
finally:
if (output.dst) {
PyMem_Free(output.dst);
}
- if (dstream) {
- ZSTD_freeDStream(dstream);
- }
-
return res;
}
@@ -352,18 +371,18 @@
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "decompression error: %s", ZSTD_getErrorName(zresult));
- Py_DecRef(result);
+ Py_DECREF(result);
return NULL;
}
else if (decompressedSize && zresult != decompressedSize) {
PyErr_Format(ZstdError, "decompression error: decompressed %zu bytes; expected %llu",
zresult, decompressedSize);
- Py_DecRef(result);
+ Py_DECREF(result);
return NULL;
}
else if (zresult < destCapacity) {
if (_PyBytes_Resize(&result, zresult)) {
- Py_DecRef(result);
+ Py_DECREF(result);
return NULL;
}
}
@@ -382,22 +401,19 @@
);
static ZstdDecompressionObj* Decompressor_decompressobj(ZstdDecompressor* self) {
- ZstdDecompressionObj* result = PyObject_New(ZstdDecompressionObj, &ZstdDecompressionObjType);
+ ZstdDecompressionObj* result = (ZstdDecompressionObj*)PyObject_CallObject((PyObject*)&ZstdDecompressionObjType, NULL);
if (!result) {
return NULL;
}
- result->dstream = DStream_from_ZstdDecompressor(self);
- if (!result->dstream) {
- Py_DecRef((PyObject*)result);
+ if (0 != init_dstream(self)) {
+ Py_DECREF(result);
return NULL;
}
result->decompressor = self;
Py_INCREF(result->decompressor);
- result->finished = 0;
-
return result;
}
@@ -447,18 +463,11 @@
return NULL;
}
- result = PyObject_New(ZstdDecompressorIterator, &ZstdDecompressorIteratorType);
+ result = (ZstdDecompressorIterator*)PyObject_CallObject((PyObject*)&ZstdDecompressorIteratorType, NULL);
if (!result) {
return NULL;
}
- result->decompressor = NULL;
- result->reader = NULL;
- result->buffer = NULL;
- result->dstream = NULL;
- result->input.src = NULL;
- result->output.dst = NULL;
-
if (PyObject_HasAttrString(reader, "read")) {
result->reader = reader;
Py_INCREF(result->reader);
@@ -475,8 +484,6 @@
if (0 != PyObject_GetBuffer(reader, result->buffer, PyBUF_CONTIG_RO)) {
goto except;
}
-
- result->bufferOffset = 0;
}
else {
PyErr_SetString(PyExc_ValueError,
@@ -491,8 +498,7 @@
result->outSize = outSize;
result->skipBytes = skipBytes;
- result->dstream = DStream_from_ZstdDecompressor(self);
- if (!result->dstream) {
+ if (0 != init_dstream(self)) {
goto except;
}
@@ -501,16 +507,6 @@
PyErr_NoMemory();
goto except;
}
- result->input.size = 0;
- result->input.pos = 0;
-
- result->output.dst = NULL;
- result->output.size = 0;
- result->output.pos = 0;
-
- result->readCount = 0;
- result->finishedInput = 0;
- result->finishedOutput = 0;
goto finally;
@@ -563,7 +559,7 @@
return NULL;
}
- result = PyObject_New(ZstdDecompressionWriter, &ZstdDecompressionWriterType);
+ result = (ZstdDecompressionWriter*)PyObject_CallObject((PyObject*)&ZstdDecompressionWriterType, NULL);
if (!result) {
return NULL;
}
@@ -576,9 +572,6 @@
result->outSize = outSize;
- result->entered = 0;
- result->dstream = NULL;
-
return result;
}
@@ -776,6 +769,746 @@
return result;
}
+typedef struct {
+ void* sourceData;
+ size_t sourceSize;
+ unsigned long long destSize;
+} FramePointer;
+
+typedef struct {
+ FramePointer* frames;
+ Py_ssize_t framesSize;
+ unsigned long long compressedSize;
+} FrameSources;
+
+typedef struct {
+ void* dest;
+ Py_ssize_t destSize;
+ BufferSegment* segments;
+ Py_ssize_t segmentsSize;
+} DestBuffer;
+
+typedef enum {
+ WorkerError_none = 0,
+ WorkerError_zstd = 1,
+ WorkerError_memory = 2,
+ WorkerError_sizeMismatch = 3,
+ WorkerError_unknownSize = 4,
+} WorkerError;
+
+typedef struct {
+ /* Source records and length */
+ FramePointer* framePointers;
+ /* Which records to process. */
+ Py_ssize_t startOffset;
+ Py_ssize_t endOffset;
+ unsigned long long totalSourceSize;
+
+ /* Compression state and settings. */
+ ZSTD_DCtx* dctx;
+ ZSTD_DDict* ddict;
+ int requireOutputSizes;
+
+ /* Output storage. */
+ DestBuffer* destBuffers;
+ Py_ssize_t destCount;
+
+ /* Item that error occurred on. */
+ Py_ssize_t errorOffset;
+ /* If an error occurred. */
+ WorkerError error;
+ /* result from zstd decompression operation */
+ size_t zresult;
+} WorkerState;
+
+static void decompress_worker(WorkerState* state) {
+ size_t allocationSize;
+ DestBuffer* destBuffer;
+ Py_ssize_t frameIndex;
+ Py_ssize_t localOffset = 0;
+ Py_ssize_t currentBufferStartIndex = state->startOffset;
+ Py_ssize_t remainingItems = state->endOffset - state->startOffset + 1;
+ void* tmpBuf;
+ Py_ssize_t destOffset = 0;
+ FramePointer* framePointers = state->framePointers;
+ size_t zresult;
+ unsigned long long totalOutputSize = 0;
+
+ assert(NULL == state->destBuffers);
+ assert(0 == state->destCount);
+ assert(state->endOffset - state->startOffset >= 0);
+
+ /*
+ * We need to allocate a buffer to hold decompressed data. How we do this
+ * depends on what we know about the output. The following scenarios are
+ * possible:
+ *
+ * 1. All structs defining frames declare the output size.
+ * 2. The decompressed size is embedded within the zstd frame.
+ * 3. The decompressed size is not stored anywhere.
+ *
+ * For now, we only support #1 and #2.
+ */
+
+ /* Resolve ouput segments. */
+ for (frameIndex = state->startOffset; frameIndex <= state->endOffset; frameIndex++) {
+ FramePointer* fp = &framePointers[frameIndex];
+
+ if (0 == fp->destSize) {
+ fp->destSize = ZSTD_getDecompressedSize(fp->sourceData, fp->sourceSize);
+ if (0 == fp->destSize && state->requireOutputSizes) {
+ state->error = WorkerError_unknownSize;
+ state->errorOffset = frameIndex;
+ return;
+ }
+ }
+
+ totalOutputSize += fp->destSize;
+ }
+
+ state->destBuffers = calloc(1, sizeof(DestBuffer));
+ if (NULL == state->destBuffers) {
+ state->error = WorkerError_memory;
+ return;
+ }
+
+ state->destCount = 1;
+
+ destBuffer = &state->destBuffers[state->destCount - 1];
+
+ assert(framePointers[state->startOffset].destSize > 0); /* For now. */
+
+ allocationSize = roundpow2(state->totalSourceSize);
+
+ if (framePointers[state->startOffset].destSize > allocationSize) {
+ allocationSize = roundpow2(framePointers[state->startOffset].destSize);
+ }
+
+ destBuffer->dest = malloc(allocationSize);
+ if (NULL == destBuffer->dest) {
+ state->error = WorkerError_memory;
+ return;
+ }
+
+ destBuffer->destSize = allocationSize;
+
+ destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
+ if (NULL == destBuffer->segments) {
+ /* Caller will free state->dest as part of cleanup. */
+ state->error = WorkerError_memory;
+ return;
+ }
+
+ destBuffer->segmentsSize = remainingItems;
+
+ for (frameIndex = state->startOffset; frameIndex <= state->endOffset; frameIndex++) {
+ const void* source = framePointers[frameIndex].sourceData;
+ const size_t sourceSize = framePointers[frameIndex].sourceSize;
+ void* dest;
+ const size_t decompressedSize = framePointers[frameIndex].destSize;
+ size_t destAvailable = destBuffer->destSize - destOffset;
+
+ assert(decompressedSize > 0); /* For now. */
+
+ /*
+ * Not enough space in current buffer. Finish current before and allocate and
+ * switch to a new one.
+ */
+ if (decompressedSize > destAvailable) {
+ /*
+ * Shrinking the destination buffer is optional. But it should be cheap,
+ * so we just do it.
+ */
+ if (destAvailable) {
+ tmpBuf = realloc(destBuffer->dest, destOffset);
+ if (NULL == tmpBuf) {
+ state->error = WorkerError_memory;
+ return;
+ }
+
+ destBuffer->dest = tmpBuf;
+ destBuffer->destSize = destOffset;
+ }
+
+ /* Truncate segments buffer. */
+ tmpBuf = realloc(destBuffer->segments,
+ (frameIndex - currentBufferStartIndex) * sizeof(BufferSegment));
+ if (NULL == tmpBuf) {
+ state->error = WorkerError_memory;
+ return;
+ }
+
+ destBuffer->segments = tmpBuf;
+ destBuffer->segmentsSize = frameIndex - currentBufferStartIndex;
+
+ /* Grow space for new DestBuffer. */
+ tmpBuf = realloc(state->destBuffers, (state->destCount + 1) * sizeof(DestBuffer));
+ if (NULL == tmpBuf) {
+ state->error = WorkerError_memory;
+ return;
+ }
+
+ state->destBuffers = tmpBuf;
+ state->destCount++;
+
+ destBuffer = &state->destBuffers[state->destCount - 1];
+
+ /* Don't take any chances will non-NULL pointers. */
+ memset(destBuffer, 0, sizeof(DestBuffer));
+
+ allocationSize = roundpow2(state->totalSourceSize);
+
+ if (decompressedSize > allocationSize) {
+ allocationSize = roundpow2(decompressedSize);
+ }
+
+ destBuffer->dest = malloc(allocationSize);
+ if (NULL == destBuffer->dest) {
+ state->error = WorkerError_memory;
+ return;
+ }
+
+ destBuffer->destSize = allocationSize;
+ destAvailable = allocationSize;
+ destOffset = 0;
+ localOffset = 0;
+
+ destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
+ if (NULL == destBuffer->segments) {
+ state->error = WorkerError_memory;
+ return;
+ }
+
+ destBuffer->segmentsSize = remainingItems;
+ currentBufferStartIndex = frameIndex;
+ }
+
+ dest = (char*)destBuffer->dest + destOffset;
+
+ if (state->ddict) {
+ zresult = ZSTD_decompress_usingDDict(state->dctx, dest, decompressedSize,
+ source, sourceSize, state->ddict);
+ }
+ else {
+ zresult = ZSTD_decompressDCtx(state->dctx, dest, decompressedSize,
+ source, sourceSize);
+ }
+
+ if (ZSTD_isError(zresult)) {
+ state->error = WorkerError_zstd;
+ state->zresult = zresult;
+ state->errorOffset = frameIndex;
+ return;
+ }
+ else if (zresult != decompressedSize) {
+ state->error = WorkerError_sizeMismatch;
+ state->zresult = zresult;
+ state->errorOffset = frameIndex;
+ return;
+ }
+
+ destBuffer->segments[localOffset].offset = destOffset;
+ destBuffer->segments[localOffset].length = decompressedSize;
+ destOffset += zresult;
+ localOffset++;
+ remainingItems--;
+ }
+
+ if (destBuffer->destSize > destOffset) {
+ tmpBuf = realloc(destBuffer->dest, destOffset);
+ if (NULL == tmpBuf) {
+ state->error = WorkerError_memory;
+ return;
+ }
+
+ destBuffer->dest = tmpBuf;
+ destBuffer->destSize = destOffset;
+ }
+}
+
+ZstdBufferWithSegmentsCollection* decompress_from_framesources(ZstdDecompressor* decompressor, FrameSources* frames,
+ unsigned int threadCount) {
+ void* dictData = NULL;
+ size_t dictSize = 0;
+ Py_ssize_t i = 0;
+ int errored = 0;
+ Py_ssize_t segmentsCount;
+ ZstdBufferWithSegments* bws = NULL;
+ PyObject* resultArg = NULL;
+ Py_ssize_t resultIndex;
+ ZstdBufferWithSegmentsCollection* result = NULL;
+ FramePointer* framePointers = frames->frames;
+ unsigned long long workerBytes = 0;
+ int currentThread = 0;
+ Py_ssize_t workerStartOffset = 0;
+ POOL_ctx* pool = NULL;
+ WorkerState* workerStates = NULL;
+ unsigned long long bytesPerWorker;
+
+ /* Caller should normalize 0 and negative values to 1 or larger. */
+ assert(threadCount >= 1);
+
+ /* More threads than inputs makes no sense under any conditions. */
+ threadCount = frames->framesSize < threadCount ? (unsigned int)frames->framesSize
+ : threadCount;
+
+ /* TODO lower thread count if input size is too small and threads would just
+ add overhead. */
+
+ if (decompressor->dict) {
+ dictData = decompressor->dict->dictData;
+ dictSize = decompressor->dict->dictSize;
+ }
+
+ if (dictData && !decompressor->ddict) {
+ Py_BEGIN_ALLOW_THREADS
+ decompressor->ddict = ZSTD_createDDict_byReference(dictData, dictSize);
+ Py_END_ALLOW_THREADS
+
+ if (!decompressor->ddict) {
+ PyErr_SetString(ZstdError, "could not create decompression dict");
+ return NULL;
+ }
+ }
+
+ /* If threadCount==1, we don't start a thread pool. But we do leverage the
+ same API for dispatching work. */
+ workerStates = PyMem_Malloc(threadCount * sizeof(WorkerState));
+ if (NULL == workerStates) {
+ PyErr_NoMemory();
+ goto finally;
+ }
+
+ memset(workerStates, 0, threadCount * sizeof(WorkerState));
+
+ if (threadCount > 1) {
+ pool = POOL_create(threadCount, 1);
+ if (NULL == pool) {
+ PyErr_SetString(ZstdError, "could not initialize zstd thread pool");
+ goto finally;
+ }
+ }
+
+ bytesPerWorker = frames->compressedSize / threadCount;
+
+ for (i = 0; i < threadCount; i++) {
+ workerStates[i].dctx = ZSTD_createDCtx();
+ if (NULL == workerStates[i].dctx) {
+ PyErr_NoMemory();
+ goto finally;
+ }
+
+ ZSTD_copyDCtx(workerStates[i].dctx, decompressor->dctx);
+
+ workerStates[i].ddict = decompressor->ddict;
+ workerStates[i].framePointers = framePointers;
+ workerStates[i].requireOutputSizes = 1;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ /* There are many ways to split work among workers.
+
+ For now, we take a simple approach of splitting work so each worker
+ gets roughly the same number of input bytes. This will result in more
+ starvation than running N>threadCount jobs. But it avoids complications
+ around state tracking, which could involve extra locking.
+ */
+ for (i = 0; i < frames->framesSize; i++) {
+ workerBytes += frames->frames[i].sourceSize;
+
+ /*
+ * The last worker/thread needs to handle all remaining work. Don't
+ * trigger it prematurely. Defer to the block outside of the loop.
+ * (But still process this loop so workerBytes is correct.
+ */
+ if (currentThread == threadCount - 1) {
+ continue;
+ }
+
+ if (workerBytes >= bytesPerWorker) {
+ workerStates[currentThread].startOffset = workerStartOffset;
+ workerStates[currentThread].endOffset = i;
+ workerStates[currentThread].totalSourceSize = workerBytes;
+
+ if (threadCount > 1) {
+ POOL_add(pool, (POOL_function)decompress_worker, &workerStates[currentThread]);
+ }
+ else {
+ decompress_worker(&workerStates[currentThread]);
+ }
+ currentThread++;
+ workerStartOffset = i + 1;
+ workerBytes = 0;
+ }
+ }
+
+ if (workerBytes) {
+ workerStates[currentThread].startOffset = workerStartOffset;
+ workerStates[currentThread].endOffset = frames->framesSize - 1;
+ workerStates[currentThread].totalSourceSize = workerBytes;
+
+ if (threadCount > 1) {
+ POOL_add(pool, (POOL_function)decompress_worker, &workerStates[currentThread]);
+ }
+ else {
+ decompress_worker(&workerStates[currentThread]);
+ }
+ }
+
+ if (threadCount > 1) {
+ POOL_free(pool);
+ pool = NULL;
+ }
+ Py_END_ALLOW_THREADS
+
+ for (i = 0; i < threadCount; i++) {
+ switch (workerStates[i].error) {
+ case WorkerError_none:
+ break;
+
+ case WorkerError_zstd:
+ PyErr_Format(ZstdError, "error decompressing item %zd: %s",
+ workerStates[i].errorOffset, ZSTD_getErrorName(workerStates[i].zresult));
+ errored = 1;
+ break;
+
+ case WorkerError_memory:
+ PyErr_NoMemory();
+ errored = 1;
+ break;
+
+ case WorkerError_sizeMismatch:
+ PyErr_Format(ZstdError, "error decompressing item %zd: decompressed %zu bytes; expected %llu",
+ workerStates[i].errorOffset, workerStates[i].zresult,
+ framePointers[workerStates[i].errorOffset].destSize);
+ errored = 1;
+ break;
+
+ case WorkerError_unknownSize:
+ PyErr_Format(PyExc_ValueError, "could not determine decompressed size of item %zd",
+ workerStates[i].errorOffset);
+ errored = 1;
+ break;
+
+ default:
+ PyErr_Format(ZstdError, "unhandled error type: %d; this is a bug",
+ workerStates[i].error);
+ errored = 1;
+ break;
+ }
+
+ if (errored) {
+ break;
+ }
+ }
+
+ if (errored) {
+ goto finally;
+ }
+
+ segmentsCount = 0;
+ for (i = 0; i < threadCount; i++) {
+ segmentsCount += workerStates[i].destCount;
+ }
+
+ resultArg = PyTuple_New(segmentsCount);
+ if (NULL == resultArg) {
+ goto finally;
+ }
+
+ resultIndex = 0;
+
+ for (i = 0; i < threadCount; i++) {
+ Py_ssize_t bufferIndex;
+ WorkerState* state = &workerStates[i];
+
+ for (bufferIndex = 0; bufferIndex < state->destCount; bufferIndex++) {
+ DestBuffer* destBuffer = &state->destBuffers[bufferIndex];
+
+ bws = BufferWithSegments_FromMemory(destBuffer->dest, destBuffer->destSize,
+ destBuffer->segments, destBuffer->segmentsSize);
+ if (NULL == bws) {
+ goto finally;
+ }
+
+ /*
+ * Memory for buffer and segments was allocated using malloc() in worker
+ * and the memory is transferred to the BufferWithSegments instance. So
+ * tell instance to use free() and NULL the reference in the state struct
+ * so it isn't freed below.
+ */
+ bws->useFree = 1;
+ destBuffer->dest = NULL;
+ destBuffer->segments = NULL;
+
+ PyTuple_SET_ITEM(resultArg, resultIndex++, (PyObject*)bws);
+ }
+ }
+
+ result = (ZstdBufferWithSegmentsCollection*)PyObject_CallObject(
+ (PyObject*)&ZstdBufferWithSegmentsCollectionType, resultArg);
+
+finally:
+ Py_CLEAR(resultArg);
+
+ if (workerStates) {
+ for (i = 0; i < threadCount; i++) {
+ Py_ssize_t bufferIndex;
+ WorkerState* state = &workerStates[i];
+
+ if (state->dctx) {
+ ZSTD_freeDCtx(state->dctx);
+ }
+
+ for (bufferIndex = 0; bufferIndex < state->destCount; bufferIndex++) {
+ if (state->destBuffers) {
+ /*
+ * Will be NULL if memory transfered to a BufferWithSegments.
+ * Otherwise it is left over after an error occurred.
+ */
+ free(state->destBuffers[bufferIndex].dest);
+ free(state->destBuffers[bufferIndex].segments);
+ }
+ }
+
+ free(state->destBuffers);
+ }
+
+ PyMem_Free(workerStates);
+ }
+
+ POOL_free(pool);
+
+ return result;
+}
+
+PyDoc_STRVAR(Decompressor_multi_decompress_to_buffer__doc__,
+"Decompress multiple frames to output buffers\n"
+"\n"
+"Receives a ``BufferWithSegments``, a ``BufferWithSegmentsCollection`` or a\n"
+"list of bytes-like objects. Each item in the passed collection should be a\n"
+"compressed zstd frame.\n"
+"\n"
+"Unless ``decompressed_sizes`` is specified, the content size *must* be\n"
+"written into the zstd frame header. If ``decompressed_sizes`` is specified,\n"
+"it is an object conforming to the buffer protocol that represents an array\n"
+"of 64-bit unsigned integers in the machine's native format. Specifying\n"
+"``decompressed_sizes`` avoids a pre-scan of each frame to determine its\n"
+"output size.\n"
+"\n"
+"Returns a ``BufferWithSegmentsCollection`` containing the decompressed\n"
+"data. All decompressed data is allocated in a single memory buffer. The\n"
+"``BufferWithSegments`` instance tracks which objects are at which offsets\n"
+"and their respective lengths.\n"
+"\n"
+"The ``threads`` argument controls how many threads to use for operations.\n"
+"Negative values will use the same number of threads as logical CPUs on the\n"
+"machine.\n"
+);
+
+static ZstdBufferWithSegmentsCollection* Decompressor_multi_decompress_to_buffer(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
+ static char* kwlist[] = {
+ "frames",
+ "decompressed_sizes",
+ "threads",
+ NULL
+ };
+
+ PyObject* frames;
+ Py_buffer frameSizes;
+ int threads = 0;
+ Py_ssize_t frameCount;
+ Py_buffer* frameBuffers = NULL;
+ FramePointer* framePointers = NULL;
+ unsigned long long* frameSizesP = NULL;
+ unsigned long long totalInputSize = 0;
+ FrameSources frameSources;
+ ZstdBufferWithSegmentsCollection* result = NULL;
+ Py_ssize_t i;
+
+ memset(&frameSizes, 0, sizeof(frameSizes));
+
+#if PY_MAJOR_VERSION >= 3
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|y*i:multi_decompress_to_buffer",
+#else
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|s*i:multi_decompress_to_buffer",
+#endif
+ kwlist, &frames, &frameSizes, &threads)) {
+ return NULL;
+ }
+
+ if (frameSizes.buf) {
+ if (!PyBuffer_IsContiguous(&frameSizes, 'C') || frameSizes.ndim > 1) {
+ PyErr_SetString(PyExc_ValueError, "decompressed_sizes buffer should be contiguous and have a single dimension");
+ goto finally;
+ }
+
+ frameSizesP = (unsigned long long*)frameSizes.buf;
+ }
+
+ if (threads < 0) {
+ threads = cpu_count();
+ }
+
+ if (threads < 2) {
+ threads = 1;
+ }
+
+ if (PyObject_TypeCheck(frames, &ZstdBufferWithSegmentsType)) {
+ ZstdBufferWithSegments* buffer = (ZstdBufferWithSegments*)frames;
+ frameCount = buffer->segmentCount;
+
+ if (frameSizes.buf && frameSizes.len != frameCount * (Py_ssize_t)sizeof(unsigned long long)) {
+ PyErr_Format(PyExc_ValueError, "decompressed_sizes size mismatch; expected %zd, got %zd",
+ frameCount * sizeof(unsigned long long), frameSizes.len);
+ goto finally;
+ }
+
+ framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer));
+ if (!framePointers) {
+ PyErr_NoMemory();
+ goto finally;
+ }
+
+ for (i = 0; i < frameCount; i++) {
+ void* sourceData;
+ unsigned long long sourceSize;
+ unsigned long long decompressedSize = 0;
+
+ if (buffer->segments[i].offset + buffer->segments[i].length > buffer->dataSize) {
+ PyErr_Format(PyExc_ValueError, "item %zd has offset outside memory area", i);
+ goto finally;
+ }
+
+ sourceData = (char*)buffer->data + buffer->segments[i].offset;
+ sourceSize = buffer->segments[i].length;
+ totalInputSize += sourceSize;
+
+ if (frameSizesP) {
+ decompressedSize = frameSizesP[i];
+ }
+
+ framePointers[i].sourceData = sourceData;
+ framePointers[i].sourceSize = sourceSize;
+ framePointers[i].destSize = decompressedSize;
+ }
+ }
+ else if (PyObject_TypeCheck(frames, &ZstdBufferWithSegmentsCollectionType)) {
+ Py_ssize_t offset = 0;
+ ZstdBufferWithSegments* buffer;
+ ZstdBufferWithSegmentsCollection* collection = (ZstdBufferWithSegmentsCollection*)frames;
+
+ frameCount = BufferWithSegmentsCollection_length(collection);
+
+ if (frameSizes.buf && frameSizes.len != frameCount) {
+ PyErr_Format(PyExc_ValueError,
+ "decompressed_sizes size mismatch; expected %zd; got %zd",
+ frameCount * sizeof(unsigned long long), frameSizes.len);
+ goto finally;
+ }
+
+ framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer));
+ if (NULL == framePointers) {
+ PyErr_NoMemory();
+ goto finally;
+ }
+
+ /* Iterate the data structure directly because it is faster. */
+ for (i = 0; i < collection->bufferCount; i++) {
+ Py_ssize_t segmentIndex;
+ buffer = collection->buffers[i];
+
+ for (segmentIndex = 0; segmentIndex < buffer->segmentCount; segmentIndex++) {
+ if (buffer->segments[segmentIndex].offset + buffer->segments[segmentIndex].length > buffer->dataSize) {
+ PyErr_Format(PyExc_ValueError, "item %zd has offset outside memory area",
+ offset);
+ goto finally;
+ }
+
+ totalInputSize += buffer->segments[segmentIndex].length;
+
+ framePointers[offset].sourceData = (char*)buffer->data + buffer->segments[segmentIndex].offset;
+ framePointers[offset].sourceSize = buffer->segments[segmentIndex].length;
+ framePointers[offset].destSize = frameSizesP ? frameSizesP[offset] : 0;
+
+ offset++;
+ }
+ }
+ }
+ else if (PyList_Check(frames)) {
+ frameCount = PyList_GET_SIZE(frames);
+
+ if (frameSizes.buf && frameSizes.len != frameCount * (Py_ssize_t)sizeof(unsigned long long)) {
+ PyErr_Format(PyExc_ValueError, "decompressed_sizes size mismatch; expected %zd, got %zd",
+ frameCount * sizeof(unsigned long long), frameSizes.len);
+ goto finally;
+ }
+
+ framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer));
+ if (!framePointers) {
+ PyErr_NoMemory();
+ goto finally;
+ }
+
+ /*
+ * It is not clear whether Py_buffer.buf is still valid after
+ * PyBuffer_Release. So, we hold a reference to all Py_buffer instances
+ * for the duration of the operation.
+ */
+ frameBuffers = PyMem_Malloc(frameCount * sizeof(Py_buffer));
+ if (NULL == frameBuffers) {
+ PyErr_NoMemory();
+ goto finally;
+ }
+
+ memset(frameBuffers, 0, frameCount * sizeof(Py_buffer));
+
+ /* Do a pass to assemble info about our input buffers and output sizes. */
+ for (i = 0; i < frameCount; i++) {
+ if (0 != PyObject_GetBuffer(PyList_GET_ITEM(frames, i),
+ &frameBuffers[i], PyBUF_CONTIG_RO)) {
+ PyErr_Clear();
+ PyErr_Format(PyExc_TypeError, "item %zd not a bytes like object", i);
+ goto finally;
+ }
+
+ totalInputSize += frameBuffers[i].len;
+
+ framePointers[i].sourceData = frameBuffers[i].buf;
+ framePointers[i].sourceSize = frameBuffers[i].len;
+ framePointers[i].destSize = frameSizesP ? frameSizesP[i] : 0;
+ }
+ }
+ else {
+ PyErr_SetString(PyExc_TypeError, "argument must be list or BufferWithSegments");
+ goto finally;
+ }
+
+ /* We now have an array with info about our inputs and outputs. Feed it into
+ our generic decompression function. */
+ frameSources.frames = framePointers;
+ frameSources.framesSize = frameCount;
+ frameSources.compressedSize = totalInputSize;
+
+ result = decompress_from_framesources(self, &frameSources, threads);
+
+finally:
+ if (frameSizes.buf) {
+ PyBuffer_Release(&frameSizes);
+ }
+ PyMem_Free(framePointers);
+
+ if (frameBuffers) {
+ for (i = 0; i < frameCount; i++) {
+ PyBuffer_Release(&frameBuffers[i]);
+ }
+
+ PyMem_Free(frameBuffers);
+ }
+
+ return result;
+}
+
static PyMethodDef Decompressor_methods[] = {
{ "copy_stream", (PyCFunction)Decompressor_copy_stream, METH_VARARGS | METH_KEYWORDS,
Decompressor_copy_stream__doc__ },
@@ -789,6 +1522,8 @@
Decompressor_write_to__doc__ },
{ "decompress_content_dict_chain", (PyCFunction)Decompressor_decompress_content_dict_chain,
METH_VARARGS | METH_KEYWORDS, Decompressor_decompress_content_dict_chain__doc__ },
+ { "multi_decompress_to_buffer", (PyCFunction)Decompressor_multi_decompress_to_buffer,
+ METH_VARARGS | METH_KEYWORDS, Decompressor_multi_decompress_to_buffer__doc__ },
{ NULL, NULL }
};