contrib/python-zstandard/c-ext/decompressor.c
changeset 31799 e0dc40530c5a
parent 30924 c32454d69b85
child 37495 b1fb341d8a61
--- a/contrib/python-zstandard/c-ext/decompressor.c	Sat Apr 01 13:43:52 2017 -0700
+++ b/contrib/python-zstandard/c-ext/decompressor.c	Sat Apr 01 15:24:03 2017 -0700
@@ -7,19 +7,37 @@
 */
 
 #include "python-zstandard.h"
+#include "pool.h"
 
 extern PyObject* ZstdError;
 
-ZSTD_DStream* DStream_from_ZstdDecompressor(ZstdDecompressor* decompressor) {
-	ZSTD_DStream* dstream;
+/**
+  * Ensure the ZSTD_DStream on a ZstdDecompressor is initialized and reset.
+  *
+  * This should be called before starting a decompression operation with a
+  * ZSTD_DStream on a ZstdDecompressor.
+  */
+int init_dstream(ZstdDecompressor* decompressor) {
 	void* dictData = NULL;
 	size_t dictSize = 0;
 	size_t zresult;
 
-	dstream = ZSTD_createDStream();
-	if (!dstream) {
+	/* Simple case of dstream already exists. Just reset it. */
+	if (decompressor->dstream) {
+		zresult = ZSTD_resetDStream(decompressor->dstream);
+		if (ZSTD_isError(zresult)) {
+			PyErr_Format(ZstdError, "could not reset DStream: %s",
+				ZSTD_getErrorName(zresult));
+			return -1;
+		}
+
+		return 0;
+	}
+
+	decompressor->dstream = ZSTD_createDStream();
+	if (!decompressor->dstream) {
 		PyErr_SetString(ZstdError, "could not create DStream");
-		return NULL;
+		return -1;
 	}
 
 	if (decompressor->dict) {
@@ -28,19 +46,23 @@
 	}
 
 	if (dictData) {
-		zresult = ZSTD_initDStream_usingDict(dstream, dictData, dictSize);
+		zresult = ZSTD_initDStream_usingDict(decompressor->dstream, dictData, dictSize);
 	}
 	else {
-		zresult = ZSTD_initDStream(dstream);
+		zresult = ZSTD_initDStream(decompressor->dstream);
 	}
 
 	if (ZSTD_isError(zresult)) {
+		/* Don't leave a reference to an invalid object. */
+		ZSTD_freeDStream(decompressor->dstream);
+		decompressor->dstream = NULL;
+
 		PyErr_Format(ZstdError, "could not initialize DStream: %s",
 			ZSTD_getErrorName(zresult));
-		return NULL;
+		return -1;
 	}
 
-	return dstream;
+	return 0;
 }
 
 PyDoc_STRVAR(Decompressor__doc__,
@@ -93,17 +115,23 @@
 }
 
 static void Decompressor_dealloc(ZstdDecompressor* self) {
-	if (self->dctx) {
-		ZSTD_freeDCtx(self->dctx);
-	}
-
-	Py_XDECREF(self->dict);
+	Py_CLEAR(self->dict);
 
 	if (self->ddict) {
 		ZSTD_freeDDict(self->ddict);
 		self->ddict = NULL;
 	}
 
+	if (self->dstream) {
+		ZSTD_freeDStream(self->dstream);
+		self->dstream = NULL;
+	}
+
+	if (self->dctx) {
+		ZSTD_freeDCtx(self->dctx);
+		self->dctx = NULL;
+	}
+
 	PyObject_Del(self);
 }
 
@@ -132,7 +160,6 @@
 	PyObject* dest;
 	size_t inSize = ZSTD_DStreamInSize();
 	size_t outSize = ZSTD_DStreamOutSize();
-	ZSTD_DStream* dstream;
 	ZSTD_inBuffer input;
 	ZSTD_outBuffer output;
 	Py_ssize_t totalRead = 0;
@@ -164,8 +191,7 @@
 	/* Prevent free on uninitialized memory in finally. */
 	output.dst = NULL;
 
-	dstream = DStream_from_ZstdDecompressor(self);
-	if (!dstream) {
+	if (0 != init_dstream(self)) {
 		res = NULL;
 		goto finally;
 	}
@@ -203,7 +229,7 @@
 
 		while (input.pos < input.size) {
 			Py_BEGIN_ALLOW_THREADS
-			zresult = ZSTD_decompressStream(dstream, &output, &input);
+			zresult = ZSTD_decompressStream(self->dstream, &output, &input);
 			Py_END_ALLOW_THREADS
 
 			if (ZSTD_isError(zresult)) {
@@ -230,24 +256,17 @@
 
 	/* Source stream is exhausted. Finish up. */
 
-	ZSTD_freeDStream(dstream);
-	dstream = NULL;
-
 	totalReadPy = PyLong_FromSsize_t(totalRead);
 	totalWritePy = PyLong_FromSsize_t(totalWrite);
 	res = PyTuple_Pack(2, totalReadPy, totalWritePy);
-	Py_DecRef(totalReadPy);
-	Py_DecRef(totalWritePy);
+	Py_DECREF(totalReadPy);
+	Py_DECREF(totalWritePy);
 
 finally:
 	if (output.dst) {
 		PyMem_Free(output.dst);
 	}
 
-	if (dstream) {
-		ZSTD_freeDStream(dstream);
-	}
-
 	return res;
 }
 
@@ -352,18 +371,18 @@
 
 	if (ZSTD_isError(zresult)) {
 		PyErr_Format(ZstdError, "decompression error: %s", ZSTD_getErrorName(zresult));
-		Py_DecRef(result);
+		Py_DECREF(result);
 		return NULL;
 	}
 	else if (decompressedSize && zresult != decompressedSize) {
 		PyErr_Format(ZstdError, "decompression error: decompressed %zu bytes; expected %llu",
 			zresult, decompressedSize);
-		Py_DecRef(result);
+		Py_DECREF(result);
 		return NULL;
 	}
 	else if (zresult < destCapacity) {
 		if (_PyBytes_Resize(&result, zresult)) {
-			Py_DecRef(result);
+			Py_DECREF(result);
 			return NULL;
 		}
 	}
@@ -382,22 +401,19 @@
 );
 
 static ZstdDecompressionObj* Decompressor_decompressobj(ZstdDecompressor* self) {
-	ZstdDecompressionObj* result = PyObject_New(ZstdDecompressionObj, &ZstdDecompressionObjType);
+	ZstdDecompressionObj* result = (ZstdDecompressionObj*)PyObject_CallObject((PyObject*)&ZstdDecompressionObjType, NULL);
 	if (!result) {
 		return NULL;
 	}
 
-	result->dstream = DStream_from_ZstdDecompressor(self);
-	if (!result->dstream) {
-		Py_DecRef((PyObject*)result);
+	if (0 != init_dstream(self)) {
+		Py_DECREF(result);
 		return NULL;
 	}
 
 	result->decompressor = self;
 	Py_INCREF(result->decompressor);
 
-	result->finished = 0;
-
 	return result;
 }
 
@@ -447,18 +463,11 @@
 		return NULL;
 	}
 
-	result = PyObject_New(ZstdDecompressorIterator, &ZstdDecompressorIteratorType);
+	result = (ZstdDecompressorIterator*)PyObject_CallObject((PyObject*)&ZstdDecompressorIteratorType, NULL);
 	if (!result) {
 		return NULL;
 	}
 
-	result->decompressor = NULL;
-	result->reader = NULL;
-	result->buffer = NULL;
-	result->dstream = NULL;
-	result->input.src = NULL;
-	result->output.dst = NULL;
-
 	if (PyObject_HasAttrString(reader, "read")) {
 		result->reader = reader;
 		Py_INCREF(result->reader);
@@ -475,8 +484,6 @@
 		if (0 != PyObject_GetBuffer(reader, result->buffer, PyBUF_CONTIG_RO)) {
 			goto except;
 		}
-
-		result->bufferOffset = 0;
 	}
 	else {
 		PyErr_SetString(PyExc_ValueError,
@@ -491,8 +498,7 @@
 	result->outSize = outSize;
 	result->skipBytes = skipBytes;
 
-	result->dstream = DStream_from_ZstdDecompressor(self);
-	if (!result->dstream) {
+	if (0 != init_dstream(self)) {
 		goto except;
 	}
 
@@ -501,16 +507,6 @@
 		PyErr_NoMemory();
 		goto except;
 	}
-	result->input.size = 0;
-	result->input.pos = 0;
-
-	result->output.dst = NULL;
-	result->output.size = 0;
-	result->output.pos = 0;
-
-	result->readCount = 0;
-	result->finishedInput = 0;
-	result->finishedOutput = 0;
 
 	goto finally;
 
@@ -563,7 +559,7 @@
 		return NULL;
 	}
 
-	result = PyObject_New(ZstdDecompressionWriter, &ZstdDecompressionWriterType);
+	result = (ZstdDecompressionWriter*)PyObject_CallObject((PyObject*)&ZstdDecompressionWriterType, NULL);
 	if (!result) {
 		return NULL;
 	}
@@ -576,9 +572,6 @@
 
 	result->outSize = outSize;
 
-	result->entered = 0;
-	result->dstream = NULL;
-
 	return result;
 }
 
@@ -776,6 +769,746 @@
 	return result;
 }
 
+typedef struct {
+	void* sourceData;
+	size_t sourceSize;
+	unsigned long long destSize;
+} FramePointer;
+
+typedef struct {
+	FramePointer* frames;
+	Py_ssize_t framesSize;
+	unsigned long long compressedSize;
+} FrameSources;
+
+typedef struct {
+	void* dest;
+	Py_ssize_t destSize;
+	BufferSegment* segments;
+	Py_ssize_t segmentsSize;
+} DestBuffer;
+
+typedef enum {
+	WorkerError_none = 0,
+	WorkerError_zstd = 1,
+	WorkerError_memory = 2,
+	WorkerError_sizeMismatch = 3,
+	WorkerError_unknownSize = 4,
+} WorkerError;
+
+typedef struct {
+	/* Source records and length */
+	FramePointer* framePointers;
+	/* Which records to process. */
+	Py_ssize_t startOffset;
+	Py_ssize_t endOffset;
+	unsigned long long totalSourceSize;
+
+	/* Compression state and settings. */
+	ZSTD_DCtx* dctx;
+	ZSTD_DDict* ddict;
+	int requireOutputSizes;
+
+	/* Output storage. */
+	DestBuffer* destBuffers;
+	Py_ssize_t destCount;
+
+	/* Item that error occurred on. */
+	Py_ssize_t errorOffset;
+	/* If an error occurred. */
+	WorkerError error;
+	/* result from zstd decompression operation */
+	size_t zresult;
+} WorkerState;
+
+static void decompress_worker(WorkerState* state) {
+	size_t allocationSize;
+	DestBuffer* destBuffer;
+	Py_ssize_t frameIndex;
+	Py_ssize_t localOffset = 0;
+	Py_ssize_t currentBufferStartIndex = state->startOffset;
+	Py_ssize_t remainingItems = state->endOffset - state->startOffset + 1;
+	void* tmpBuf;
+	Py_ssize_t destOffset = 0;
+	FramePointer* framePointers = state->framePointers;
+	size_t zresult;
+	unsigned long long totalOutputSize = 0;
+
+	assert(NULL == state->destBuffers);
+	assert(0 == state->destCount);
+	assert(state->endOffset - state->startOffset >= 0);
+
+	/*
+	 * We need to allocate a buffer to hold decompressed data. How we do this
+	 * depends on what we know about the output. The following scenarios are
+	 * possible:
+	 *
+	 * 1. All structs defining frames declare the output size.
+	 * 2. The decompressed size is embedded within the zstd frame.
+	 * 3. The decompressed size is not stored anywhere.
+	 *
+	 * For now, we only support #1 and #2.
+	 */
+
+	/* Resolve ouput segments. */
+	for (frameIndex = state->startOffset; frameIndex <= state->endOffset; frameIndex++) {
+		FramePointer* fp = &framePointers[frameIndex];
+
+		if (0 == fp->destSize) {
+			fp->destSize = ZSTD_getDecompressedSize(fp->sourceData, fp->sourceSize);
+			if (0 == fp->destSize && state->requireOutputSizes) {
+				state->error = WorkerError_unknownSize;
+				state->errorOffset = frameIndex;
+				return;
+			}
+		}
+
+		totalOutputSize += fp->destSize;
+	}
+
+	state->destBuffers = calloc(1, sizeof(DestBuffer));
+	if (NULL == state->destBuffers) {
+		state->error = WorkerError_memory;
+		return;
+	}
+
+	state->destCount = 1;
+
+	destBuffer = &state->destBuffers[state->destCount - 1];
+
+	assert(framePointers[state->startOffset].destSize > 0); /* For now. */
+
+	allocationSize = roundpow2(state->totalSourceSize);
+
+	if (framePointers[state->startOffset].destSize > allocationSize) {
+		allocationSize = roundpow2(framePointers[state->startOffset].destSize);
+	}
+
+	destBuffer->dest = malloc(allocationSize);
+	if (NULL == destBuffer->dest) {
+		state->error = WorkerError_memory;
+		return;
+	}
+
+	destBuffer->destSize = allocationSize;
+
+	destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
+	if (NULL == destBuffer->segments) {
+		/* Caller will free state->dest as part of cleanup. */
+		state->error = WorkerError_memory;
+		return;
+	}
+
+	destBuffer->segmentsSize = remainingItems;
+
+	for (frameIndex = state->startOffset; frameIndex <= state->endOffset; frameIndex++) {
+		const void* source = framePointers[frameIndex].sourceData;
+		const size_t sourceSize = framePointers[frameIndex].sourceSize;
+		void* dest;
+		const size_t decompressedSize = framePointers[frameIndex].destSize;
+		size_t destAvailable = destBuffer->destSize - destOffset;
+
+		assert(decompressedSize > 0); /* For now. */
+
+		/*
+		 * Not enough space in current buffer. Finish current before and allocate and
+		 * switch to a new one.
+		 */
+		if (decompressedSize > destAvailable) {
+			/*
+			 * Shrinking the destination buffer is optional. But it should be cheap,
+			 * so we just do it.
+			 */
+			if (destAvailable) {
+				tmpBuf = realloc(destBuffer->dest, destOffset);
+				if (NULL == tmpBuf) {
+					state->error = WorkerError_memory;
+					return;
+				}
+
+				destBuffer->dest = tmpBuf;
+				destBuffer->destSize = destOffset;
+			}
+
+			/* Truncate segments buffer. */
+			tmpBuf = realloc(destBuffer->segments,
+				(frameIndex - currentBufferStartIndex) * sizeof(BufferSegment));
+			if (NULL == tmpBuf) {
+				state->error = WorkerError_memory;
+				return;
+			}
+
+			destBuffer->segments = tmpBuf;
+			destBuffer->segmentsSize = frameIndex - currentBufferStartIndex;
+
+			/* Grow space for new DestBuffer. */
+			tmpBuf = realloc(state->destBuffers, (state->destCount + 1) * sizeof(DestBuffer));
+			if (NULL == tmpBuf) {
+				state->error = WorkerError_memory;
+				return;
+			}
+
+			state->destBuffers = tmpBuf;
+			state->destCount++;
+
+			destBuffer = &state->destBuffers[state->destCount - 1];
+
+			/* Don't take any chances will non-NULL pointers. */
+			memset(destBuffer, 0, sizeof(DestBuffer));
+
+			allocationSize = roundpow2(state->totalSourceSize);
+
+			if (decompressedSize > allocationSize) {
+				allocationSize = roundpow2(decompressedSize);
+			}
+
+			destBuffer->dest = malloc(allocationSize);
+			if (NULL == destBuffer->dest) {
+				state->error = WorkerError_memory;
+				return;
+			}
+
+			destBuffer->destSize = allocationSize;
+			destAvailable = allocationSize;
+			destOffset = 0;
+			localOffset = 0;
+
+			destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
+			if (NULL == destBuffer->segments) {
+				state->error = WorkerError_memory;
+				return;
+			}
+
+			destBuffer->segmentsSize = remainingItems;
+			currentBufferStartIndex = frameIndex;
+		}
+
+		dest = (char*)destBuffer->dest + destOffset;
+
+		if (state->ddict) {
+			zresult = ZSTD_decompress_usingDDict(state->dctx, dest, decompressedSize,
+				source, sourceSize, state->ddict);
+		}
+		else {
+			zresult = ZSTD_decompressDCtx(state->dctx, dest, decompressedSize,
+				source, sourceSize);
+		}
+
+		if (ZSTD_isError(zresult)) {
+			state->error = WorkerError_zstd;
+			state->zresult = zresult;
+			state->errorOffset = frameIndex;
+			return;
+		}
+		else if (zresult != decompressedSize) {
+			state->error = WorkerError_sizeMismatch;
+			state->zresult = zresult;
+			state->errorOffset = frameIndex;
+			return;
+		}
+
+		destBuffer->segments[localOffset].offset = destOffset;
+		destBuffer->segments[localOffset].length = decompressedSize;
+		destOffset += zresult;
+		localOffset++;
+		remainingItems--;
+	}
+
+	if (destBuffer->destSize > destOffset) {
+		tmpBuf = realloc(destBuffer->dest, destOffset);
+		if (NULL == tmpBuf) {
+			state->error = WorkerError_memory;
+			return;
+		}
+
+		destBuffer->dest = tmpBuf;
+		destBuffer->destSize = destOffset;
+	}
+}
+
+ZstdBufferWithSegmentsCollection* decompress_from_framesources(ZstdDecompressor* decompressor, FrameSources* frames,
+	unsigned int threadCount) {
+	void* dictData = NULL;
+	size_t dictSize = 0;
+	Py_ssize_t i = 0;
+	int errored = 0;
+	Py_ssize_t segmentsCount;
+	ZstdBufferWithSegments* bws = NULL;
+	PyObject* resultArg = NULL;
+	Py_ssize_t resultIndex;
+	ZstdBufferWithSegmentsCollection* result = NULL;
+	FramePointer* framePointers = frames->frames;
+	unsigned long long workerBytes = 0;
+	int currentThread = 0;
+	Py_ssize_t workerStartOffset = 0;
+	POOL_ctx* pool = NULL;
+	WorkerState* workerStates = NULL;
+	unsigned long long bytesPerWorker;
+
+	/* Caller should normalize 0 and negative values to 1 or larger. */
+	assert(threadCount >= 1);
+
+	/* More threads than inputs makes no sense under any conditions. */
+	threadCount = frames->framesSize < threadCount ? (unsigned int)frames->framesSize
+												   : threadCount;
+
+	/* TODO lower thread count if input size is too small and threads would just
+	   add overhead. */
+
+	if (decompressor->dict) {
+		dictData = decompressor->dict->dictData;
+		dictSize = decompressor->dict->dictSize;
+	}
+
+	if (dictData && !decompressor->ddict) {
+		Py_BEGIN_ALLOW_THREADS
+		decompressor->ddict = ZSTD_createDDict_byReference(dictData, dictSize);
+		Py_END_ALLOW_THREADS
+
+		if (!decompressor->ddict) {
+			PyErr_SetString(ZstdError, "could not create decompression dict");
+			return NULL;
+		}
+	}
+
+	/* If threadCount==1, we don't start a thread pool. But we do leverage the
+	   same API for dispatching work. */
+	workerStates = PyMem_Malloc(threadCount * sizeof(WorkerState));
+	if (NULL == workerStates) {
+		PyErr_NoMemory();
+		goto finally;
+	}
+
+	memset(workerStates, 0, threadCount * sizeof(WorkerState));
+
+	if (threadCount > 1) {
+		pool = POOL_create(threadCount, 1);
+		if (NULL == pool) {
+			PyErr_SetString(ZstdError, "could not initialize zstd thread pool");
+			goto finally;
+		}
+	}
+
+	bytesPerWorker = frames->compressedSize / threadCount;
+
+	for (i = 0; i < threadCount; i++) {
+		workerStates[i].dctx = ZSTD_createDCtx();
+		if (NULL == workerStates[i].dctx) {
+			PyErr_NoMemory();
+			goto finally;
+		}
+
+		ZSTD_copyDCtx(workerStates[i].dctx, decompressor->dctx);
+
+		workerStates[i].ddict = decompressor->ddict;
+		workerStates[i].framePointers = framePointers;
+		workerStates[i].requireOutputSizes = 1;
+	}
+
+	Py_BEGIN_ALLOW_THREADS
+	/* There are many ways to split work among workers.
+
+	   For now, we take a simple approach of splitting work so each worker
+	   gets roughly the same number of input bytes. This will result in more
+	   starvation than running N>threadCount jobs. But it avoids complications
+	   around state tracking, which could involve extra locking.
+	*/
+	for (i = 0; i < frames->framesSize; i++) {
+		workerBytes += frames->frames[i].sourceSize;
+
+		/*
+		 * The last worker/thread needs to handle all remaining work. Don't
+		 * trigger it prematurely. Defer to the block outside of the loop.
+		 * (But still process this loop so workerBytes is correct.
+		 */
+		if (currentThread == threadCount - 1) {
+			continue;
+		}
+
+		if (workerBytes >= bytesPerWorker) {
+			workerStates[currentThread].startOffset = workerStartOffset;
+			workerStates[currentThread].endOffset = i;
+			workerStates[currentThread].totalSourceSize = workerBytes;
+
+			if (threadCount > 1) {
+				POOL_add(pool, (POOL_function)decompress_worker, &workerStates[currentThread]);
+			}
+			else {
+				decompress_worker(&workerStates[currentThread]);
+			}
+			currentThread++;
+			workerStartOffset = i + 1;
+			workerBytes = 0;
+		}
+	}
+
+	if (workerBytes) {
+		workerStates[currentThread].startOffset = workerStartOffset;
+		workerStates[currentThread].endOffset = frames->framesSize - 1;
+		workerStates[currentThread].totalSourceSize = workerBytes;
+
+		if (threadCount > 1) {
+			POOL_add(pool, (POOL_function)decompress_worker, &workerStates[currentThread]);
+		}
+		else {
+			decompress_worker(&workerStates[currentThread]);
+		}
+	}
+
+	if (threadCount > 1) {
+		POOL_free(pool);
+		pool = NULL;
+	}
+	Py_END_ALLOW_THREADS
+
+	for (i = 0; i < threadCount; i++) {
+		switch (workerStates[i].error) {
+		case WorkerError_none:
+			break;
+
+		case WorkerError_zstd:
+			PyErr_Format(ZstdError, "error decompressing item %zd: %s",
+				workerStates[i].errorOffset, ZSTD_getErrorName(workerStates[i].zresult));
+			errored = 1;
+			break;
+
+		case WorkerError_memory:
+			PyErr_NoMemory();
+			errored = 1;
+			break;
+
+		case WorkerError_sizeMismatch:
+			PyErr_Format(ZstdError, "error decompressing item %zd: decompressed %zu bytes; expected %llu",
+				workerStates[i].errorOffset, workerStates[i].zresult,
+				framePointers[workerStates[i].errorOffset].destSize);
+			errored = 1;
+			break;
+
+		case WorkerError_unknownSize:
+			PyErr_Format(PyExc_ValueError, "could not determine decompressed size of item %zd",
+				workerStates[i].errorOffset);
+			errored = 1;
+			break;
+
+		default:
+			PyErr_Format(ZstdError, "unhandled error type: %d; this is a bug",
+				workerStates[i].error);
+			errored = 1;
+			break;
+		}
+
+		if (errored) {
+			break;
+		}
+	}
+
+	if (errored) {
+		goto finally;
+	}
+
+	segmentsCount = 0;
+	for (i = 0; i < threadCount; i++) {
+		segmentsCount += workerStates[i].destCount;
+	}
+
+	resultArg = PyTuple_New(segmentsCount);
+	if (NULL == resultArg) {
+		goto finally;
+	}
+
+	resultIndex = 0;
+
+	for (i = 0; i < threadCount; i++) {
+		Py_ssize_t bufferIndex;
+		WorkerState* state = &workerStates[i];
+
+		for (bufferIndex = 0; bufferIndex < state->destCount; bufferIndex++) {
+			DestBuffer* destBuffer = &state->destBuffers[bufferIndex];
+
+			bws = BufferWithSegments_FromMemory(destBuffer->dest, destBuffer->destSize,
+				destBuffer->segments, destBuffer->segmentsSize);
+			if (NULL == bws) {
+				goto finally;
+			}
+
+			/*
+			* Memory for buffer and segments was allocated using malloc() in worker
+			* and the memory is transferred to the BufferWithSegments instance. So
+			* tell instance to use free() and NULL the reference in the state struct
+			* so it isn't freed below.
+			*/
+			bws->useFree = 1;
+			destBuffer->dest = NULL;
+			destBuffer->segments = NULL;
+
+			PyTuple_SET_ITEM(resultArg, resultIndex++, (PyObject*)bws);
+		}
+	}
+
+	result = (ZstdBufferWithSegmentsCollection*)PyObject_CallObject(
+		(PyObject*)&ZstdBufferWithSegmentsCollectionType, resultArg);
+
+finally:
+	Py_CLEAR(resultArg);
+
+	if (workerStates) {
+		for (i = 0; i < threadCount; i++) {
+			Py_ssize_t bufferIndex;
+			WorkerState* state = &workerStates[i];
+
+			if (state->dctx) {
+				ZSTD_freeDCtx(state->dctx);
+			}
+
+			for (bufferIndex = 0; bufferIndex < state->destCount; bufferIndex++) {
+				if (state->destBuffers) {
+					/*
+					* Will be NULL if memory transfered to a BufferWithSegments.
+					* Otherwise it is left over after an error occurred.
+					*/
+					free(state->destBuffers[bufferIndex].dest);
+					free(state->destBuffers[bufferIndex].segments);
+				}
+			}
+
+			free(state->destBuffers);
+		}
+
+		PyMem_Free(workerStates);
+	}
+
+	POOL_free(pool);
+
+	return result;
+}
+
+PyDoc_STRVAR(Decompressor_multi_decompress_to_buffer__doc__,
+"Decompress multiple frames to output buffers\n"
+"\n"
+"Receives a ``BufferWithSegments``, a ``BufferWithSegmentsCollection`` or a\n"
+"list of bytes-like objects. Each item in the passed collection should be a\n"
+"compressed zstd frame.\n"
+"\n"
+"Unless ``decompressed_sizes`` is specified, the content size *must* be\n"
+"written into the zstd frame header. If ``decompressed_sizes`` is specified,\n"
+"it is an object conforming to the buffer protocol that represents an array\n"
+"of 64-bit unsigned integers in the machine's native format. Specifying\n"
+"``decompressed_sizes`` avoids a pre-scan of each frame to determine its\n"
+"output size.\n"
+"\n"
+"Returns a ``BufferWithSegmentsCollection`` containing the decompressed\n"
+"data. All decompressed data is allocated in a single memory buffer. The\n"
+"``BufferWithSegments`` instance tracks which objects are at which offsets\n"
+"and their respective lengths.\n"
+"\n"
+"The ``threads`` argument controls how many threads to use for operations.\n"
+"Negative values will use the same number of threads as logical CPUs on the\n"
+"machine.\n"
+);
+
+static ZstdBufferWithSegmentsCollection* Decompressor_multi_decompress_to_buffer(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
+	static char* kwlist[] = {
+		"frames",
+		"decompressed_sizes",
+		"threads",
+		NULL
+	};
+
+	PyObject* frames;
+	Py_buffer frameSizes;
+	int threads = 0;
+	Py_ssize_t frameCount;
+	Py_buffer* frameBuffers = NULL;
+	FramePointer* framePointers = NULL;
+	unsigned long long* frameSizesP = NULL;
+	unsigned long long totalInputSize = 0;
+	FrameSources frameSources;
+	ZstdBufferWithSegmentsCollection* result = NULL;
+	Py_ssize_t i;
+
+	memset(&frameSizes, 0, sizeof(frameSizes));
+
+#if PY_MAJOR_VERSION >= 3
+	if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|y*i:multi_decompress_to_buffer",
+#else
+	if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|s*i:multi_decompress_to_buffer",
+#endif
+		kwlist, &frames, &frameSizes, &threads)) {
+		return NULL;
+	}
+
+	if (frameSizes.buf) {
+		if (!PyBuffer_IsContiguous(&frameSizes, 'C') || frameSizes.ndim > 1) {
+			PyErr_SetString(PyExc_ValueError, "decompressed_sizes buffer should be contiguous and have a single dimension");
+			goto finally;
+		}
+
+		frameSizesP = (unsigned long long*)frameSizes.buf;
+	}
+
+	if (threads < 0) {
+		threads = cpu_count();
+	}
+
+	if (threads < 2) {
+		threads = 1;
+	}
+
+	if (PyObject_TypeCheck(frames, &ZstdBufferWithSegmentsType)) {
+		ZstdBufferWithSegments* buffer = (ZstdBufferWithSegments*)frames;
+		frameCount = buffer->segmentCount;
+
+		if (frameSizes.buf && frameSizes.len != frameCount * (Py_ssize_t)sizeof(unsigned long long)) {
+			PyErr_Format(PyExc_ValueError, "decompressed_sizes size mismatch; expected %zd, got %zd",
+				frameCount * sizeof(unsigned long long), frameSizes.len);
+			goto finally;
+		}
+
+		framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer));
+		if (!framePointers) {
+			PyErr_NoMemory();
+			goto finally;
+		}
+
+		for (i = 0; i < frameCount; i++) {
+			void* sourceData;
+			unsigned long long sourceSize;
+			unsigned long long decompressedSize = 0;
+
+			if (buffer->segments[i].offset + buffer->segments[i].length > buffer->dataSize) {
+				PyErr_Format(PyExc_ValueError, "item %zd has offset outside memory area", i);
+				goto finally;
+			}
+
+			sourceData = (char*)buffer->data + buffer->segments[i].offset;
+			sourceSize = buffer->segments[i].length;
+			totalInputSize += sourceSize;
+
+			if (frameSizesP) {
+				decompressedSize = frameSizesP[i];
+			}
+
+			framePointers[i].sourceData = sourceData;
+			framePointers[i].sourceSize = sourceSize;
+			framePointers[i].destSize = decompressedSize;
+		}
+	}
+	else if (PyObject_TypeCheck(frames, &ZstdBufferWithSegmentsCollectionType)) {
+		Py_ssize_t offset = 0;
+		ZstdBufferWithSegments* buffer;
+		ZstdBufferWithSegmentsCollection* collection = (ZstdBufferWithSegmentsCollection*)frames;
+
+		frameCount = BufferWithSegmentsCollection_length(collection);
+
+		if (frameSizes.buf && frameSizes.len != frameCount) {
+			PyErr_Format(PyExc_ValueError,
+				"decompressed_sizes size mismatch; expected %zd; got %zd",
+				frameCount * sizeof(unsigned long long), frameSizes.len);
+			goto finally;
+		}
+
+		framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer));
+		if (NULL == framePointers) {
+			PyErr_NoMemory();
+			goto finally;
+		}
+
+		/* Iterate the data structure directly because it is faster. */
+		for (i = 0; i < collection->bufferCount; i++) {
+			Py_ssize_t segmentIndex;
+			buffer = collection->buffers[i];
+
+			for (segmentIndex = 0; segmentIndex < buffer->segmentCount; segmentIndex++) {
+				if (buffer->segments[segmentIndex].offset + buffer->segments[segmentIndex].length > buffer->dataSize) {
+					PyErr_Format(PyExc_ValueError, "item %zd has offset outside memory area",
+						offset);
+					goto finally;
+				}
+
+				totalInputSize += buffer->segments[segmentIndex].length;
+
+				framePointers[offset].sourceData = (char*)buffer->data + buffer->segments[segmentIndex].offset;
+				framePointers[offset].sourceSize = buffer->segments[segmentIndex].length;
+				framePointers[offset].destSize = frameSizesP ? frameSizesP[offset] : 0;
+
+				offset++;
+			}
+		}
+	}
+	else if (PyList_Check(frames)) {
+		frameCount = PyList_GET_SIZE(frames);
+
+		if (frameSizes.buf && frameSizes.len != frameCount * (Py_ssize_t)sizeof(unsigned long long)) {
+			PyErr_Format(PyExc_ValueError, "decompressed_sizes size mismatch; expected %zd, got %zd",
+				frameCount * sizeof(unsigned long long), frameSizes.len);
+			goto finally;
+		}
+
+		framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer));
+		if (!framePointers) {
+			PyErr_NoMemory();
+			goto finally;
+		}
+
+		/*
+		 * It is not clear whether Py_buffer.buf is still valid after
+		 * PyBuffer_Release. So, we hold a reference to all Py_buffer instances
+		 * for the duration of the operation.
+		 */
+		frameBuffers = PyMem_Malloc(frameCount * sizeof(Py_buffer));
+		if (NULL == frameBuffers) {
+			PyErr_NoMemory();
+			goto finally;
+		}
+
+		memset(frameBuffers, 0, frameCount * sizeof(Py_buffer));
+
+		/* Do a pass to assemble info about our input buffers and output sizes. */
+		for (i = 0; i < frameCount; i++) {
+			if (0 != PyObject_GetBuffer(PyList_GET_ITEM(frames, i),
+				&frameBuffers[i], PyBUF_CONTIG_RO)) {
+				PyErr_Clear();
+				PyErr_Format(PyExc_TypeError, "item %zd not a bytes like object", i);
+				goto finally;
+			}
+
+			totalInputSize += frameBuffers[i].len;
+
+			framePointers[i].sourceData = frameBuffers[i].buf;
+			framePointers[i].sourceSize = frameBuffers[i].len;
+			framePointers[i].destSize = frameSizesP ? frameSizesP[i] : 0;
+		}
+	}
+	else {
+		PyErr_SetString(PyExc_TypeError, "argument must be list or BufferWithSegments");
+		goto finally;
+	}
+
+	/* We now have an array with info about our inputs and outputs. Feed it into
+	   our generic decompression function. */
+	frameSources.frames = framePointers;
+	frameSources.framesSize = frameCount;
+	frameSources.compressedSize = totalInputSize;
+
+	result = decompress_from_framesources(self, &frameSources, threads);
+
+finally:
+	if (frameSizes.buf) {
+		PyBuffer_Release(&frameSizes);
+	}
+	PyMem_Free(framePointers);
+
+	if (frameBuffers) {
+		for (i = 0; i < frameCount; i++) {
+			PyBuffer_Release(&frameBuffers[i]);
+		}
+
+		PyMem_Free(frameBuffers);
+	}
+
+	return result;
+}
+
 static PyMethodDef Decompressor_methods[] = {
 	{ "copy_stream", (PyCFunction)Decompressor_copy_stream, METH_VARARGS | METH_KEYWORDS,
 	Decompressor_copy_stream__doc__ },
@@ -789,6 +1522,8 @@
 	Decompressor_write_to__doc__ },
 	{ "decompress_content_dict_chain", (PyCFunction)Decompressor_decompress_content_dict_chain,
 	  METH_VARARGS | METH_KEYWORDS, Decompressor_decompress_content_dict_chain__doc__ },
+	{ "multi_decompress_to_buffer", (PyCFunction)Decompressor_multi_decompress_to_buffer,
+	  METH_VARARGS | METH_KEYWORDS, Decompressor_multi_decompress_to_buffer__doc__ },
 	{ NULL, NULL }
 };