diff contrib/python-zstandard/c-ext/decompressionreader.c @ 42070:675775c33ab6

zstandard: vendor python-zstandard 0.11 The upstream source distribution from PyPI was extracted. Unwanted files were removed. The clang-format ignore list was updated to reflect the new source of files. The project contains a vendored copy of zstandard 1.3.8. The old version was 1.3.6. This should result in some minor performance wins. test-check-py3-compat.t was updated to reflect now-passing tests on Python 3.8. Some HTTP tests were updated to reflect new zstd compression output. # no-check-commit because 3rd party code has different style guidelines Differential Revision: https://phab.mercurial-scm.org/D6199
author Gregory Szorc <gregory.szorc@gmail.com>
date Thu, 04 Apr 2019 17:34:43 -0700
parents 73fef626dae3
children 69de49c4e39c
line wrap: on
line diff
--- a/contrib/python-zstandard/c-ext/decompressionreader.c	Thu Apr 04 15:24:03 2019 -0700
+++ b/contrib/python-zstandard/c-ext/decompressionreader.c	Thu Apr 04 17:34:43 2019 -0700
@@ -102,6 +102,114 @@
 	Py_RETURN_FALSE;
 }
 
+/**
+ * Read available input.
+ *
+ * Returns 0 if no data was added to input.
+ * Returns 1 if new input data is available.
+ * Returns -1 on error and sets a Python exception as a side-effect.
+ */
+int read_decompressor_input(ZstdDecompressionReader* self) {
+	if (self->finishedInput) {
+		return 0;
+	}
+
+	if (self->input.pos != self->input.size) {
+		return 0;
+	}
+
+	if (self->reader) {
+        Py_buffer buffer;
+
+        assert(self->readResult == NULL);
+        self->readResult = PyObject_CallMethod(self->reader, "read",
+            "k", self->readSize);
+        if (NULL == self->readResult) {
+            return -1;
+        }
+
+        memset(&buffer, 0, sizeof(buffer));
+
+        if (0 != PyObject_GetBuffer(self->readResult, &buffer, PyBUF_CONTIG_RO)) {
+            return -1;
+        }
+
+        /* EOF */
+        if (0 == buffer.len) {
+            self->finishedInput = 1;
+            Py_CLEAR(self->readResult);
+        }
+        else {
+            self->input.src = buffer.buf;
+            self->input.size = buffer.len;
+            self->input.pos = 0;
+        }
+
+        PyBuffer_Release(&buffer);
+	}
+	else {
+		assert(self->buffer.buf);
+        /*
+         * We should only get here once since expectation is we always
+         * exhaust input buffer before reading again.
+         */
+        assert(self->input.src == NULL);
+
+		self->input.src = self->buffer.buf;
+        self->input.size = self->buffer.len;
+        self->input.pos = 0;
+	}
+
+	return 1;
+}
+
+/**
+ * Decompresses available input into an output buffer.
+ *
+ * Returns 0 if we need more input.
+ * Returns 1 if output buffer should be emitted.
+ * Returns -1 on error and sets a Python exception.
+ */
+int decompress_input(ZstdDecompressionReader* self, ZSTD_outBuffer* output) {
+	size_t zresult;
+
+	if (self->input.pos >= self->input.size) {
+		return 0;
+	}
+
+	Py_BEGIN_ALLOW_THREADS
+	zresult = ZSTD_decompressStream(self->decompressor->dctx, output, &self->input);
+	Py_END_ALLOW_THREADS
+
+	/* Input exhausted. Clear our state tracking. */
+	if (self->input.pos == self->input.size) {
+		memset(&self->input, 0, sizeof(self->input));
+		Py_CLEAR(self->readResult);
+
+		if (self->buffer.buf) {
+			self->finishedInput = 1;
+		}
+	}
+
+	if (ZSTD_isError(zresult)) {
+		PyErr_Format(ZstdError, "zstd decompress error: %s", ZSTD_getErrorName(zresult));
+		return -1;
+	}
+
+	/* We fulfilled the full read request. Signal to emit. */
+	if (output->pos && output->pos == output->size) {
+		return 1;
+	}
+	/* We're at the end of a frame and we aren't allowed to return data
+	   spanning frames. */
+	else if (output->pos && zresult == 0 && !self->readAcrossFrames) {
+		return 1;
+	}
+
+	/* There is more room in the output. Signal to collect more data. */
+	return 0;
+}
+
 static PyObject* reader_read(ZstdDecompressionReader* self, PyObject* args, PyObject* kwargs) {
 	static char* kwlist[] = {
 		"size",
@@ -113,26 +221,30 @@
 	char* resultBuffer;
 	Py_ssize_t resultSize;
 	ZSTD_outBuffer output;
-	size_t zresult;
+	int decompressResult, readResult;
 
 	if (self->closed) {
 		PyErr_SetString(PyExc_ValueError, "stream is closed");
 		return NULL;
 	}
 
-	if (self->finishedOutput) {
-		return PyBytes_FromStringAndSize("", 0);
-	}
-
-	if (!PyArg_ParseTupleAndKeywords(args, kwargs, "n", kwlist, &size)) {
+	if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|n", kwlist, &size)) {
 		return NULL;
 	}
 
-	if (size < 1) {
-		PyErr_SetString(PyExc_ValueError, "cannot read negative or size 0 amounts");
+	if (size < -1) {
+		PyErr_SetString(PyExc_ValueError, "cannot read negative amounts less than -1");
 		return NULL;
 	}
 
+	if (size == -1) {
+		return PyObject_CallMethod((PyObject*)self, "readall", NULL);
+	}
+
+	if (self->finishedOutput || size == 0) {
+		return PyBytes_FromStringAndSize("", 0);
+	}
+
 	result = PyBytes_FromStringAndSize(NULL, size);
 	if (NULL == result) {
 		return NULL;
@@ -146,85 +258,38 @@
 
 readinput:
 
-	/* Consume input data left over from last time. */
-	if (self->input.pos < self->input.size) {
-		Py_BEGIN_ALLOW_THREADS
-		zresult = ZSTD_decompress_generic(self->decompressor->dctx,
-			&output, &self->input);
-		Py_END_ALLOW_THREADS
+	decompressResult = decompress_input(self, &output);
 
-		/* Input exhausted. Clear our state tracking. */
-		if (self->input.pos == self->input.size) {
-			memset(&self->input, 0, sizeof(self->input));
-			Py_CLEAR(self->readResult);
+	if (-1 == decompressResult) {
+		Py_XDECREF(result);
+		return NULL;
+	}
+	else if (0 == decompressResult) { }
+	else if (1 == decompressResult) {
+		self->bytesDecompressed += output.pos;
 
-			if (self->buffer.buf) {
-				self->finishedInput = 1;
+		if (output.pos != output.size) {
+			if (safe_pybytes_resize(&result, output.pos)) {
+				Py_XDECREF(result);
+				return NULL;
 			}
 		}
-
-		if (ZSTD_isError(zresult)) {
-			PyErr_Format(ZstdError, "zstd decompress error: %s", ZSTD_getErrorName(zresult));
-			return NULL;
-		}
-		else if (0 == zresult) {
-			self->finishedOutput = 1;
-		}
-
-		/* We fulfilled the full read request. Emit it. */
-		if (output.pos && output.pos == output.size) {
-			self->bytesDecompressed += output.size;
-			return result;
-		}
-
-		/*
-		 * There is more room in the output. Fall through to try to collect
-		 * more data so we can try to fill the output.
-		 */
+		return result;
+	}
+	else {
+		assert(0);
 	}
 
-	if (!self->finishedInput) {
-		if (self->reader) {
-			Py_buffer buffer;
-
-			assert(self->readResult == NULL);
-			self->readResult = PyObject_CallMethod(self->reader, "read",
-				"k", self->readSize);
-			if (NULL == self->readResult) {
-				return NULL;
-			}
-
-			memset(&buffer, 0, sizeof(buffer));
-
-			if (0 != PyObject_GetBuffer(self->readResult, &buffer, PyBUF_CONTIG_RO)) {
-				return NULL;
-			}
+	readResult = read_decompressor_input(self);
 
-			/* EOF */
-			if (0 == buffer.len) {
-				self->finishedInput = 1;
-				Py_CLEAR(self->readResult);
-			}
-			else {
-				self->input.src = buffer.buf;
-				self->input.size = buffer.len;
-				self->input.pos = 0;
-			}
-
-			PyBuffer_Release(&buffer);
-		}
-		else {
-			assert(self->buffer.buf);
-			/*
-			 * We should only get here once since above block will exhaust
-			 * source buffer until finishedInput is set.
-			 */
-			assert(self->input.src == NULL);
-
-			self->input.src = self->buffer.buf;
-			self->input.size = self->buffer.len;
-			self->input.pos = 0;
-		}
+	if (-1 == readResult) {
+		Py_XDECREF(result);
+		return NULL;
+	}
+	else if (0 == readResult) {}
+	else if (1 == readResult) {}
+	else {
+		assert(0);
 	}
 
 	if (self->input.size) {
@@ -242,18 +307,288 @@
 	return result;
 }
 
+static PyObject* reader_read1(ZstdDecompressionReader* self, PyObject* args, PyObject* kwargs) {
+	static char* kwlist[] = {
+		"size",
+		NULL
+	};
+
+	Py_ssize_t size = -1;
+	PyObject* result = NULL;
+	char* resultBuffer;
+	Py_ssize_t resultSize;
+	ZSTD_outBuffer output;
+
+	if (self->closed) {
+		PyErr_SetString(PyExc_ValueError, "stream is closed");
+		return NULL;
+	}
+
+	if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|n", kwlist, &size)) {
+		return NULL;
+	}
+
+	if (size < -1) {
+		PyErr_SetString(PyExc_ValueError, "cannot read negative amounts less than -1");
+		return NULL;
+	}
+
+	if (self->finishedOutput || size == 0) {
+		return PyBytes_FromStringAndSize("", 0);
+	}
+
+	if (size == -1) {
+		size = ZSTD_DStreamOutSize();
+	}
+
+	result = PyBytes_FromStringAndSize(NULL, size);
+	if (NULL == result) {
+		return NULL;
+	}
+
+	PyBytes_AsStringAndSize(result, &resultBuffer, &resultSize);
+
+	output.dst = resultBuffer;
+	output.size = resultSize;
+	output.pos = 0;
+
+	/* read1() is supposed to use at most 1 read() from the underlying stream.
+	 * However, we can't satisfy this requirement with decompression due to the
+	 * nature of how decompression works. Our strategy is to read + decompress
+	 * until we get any output, at which point we return. This satisfies the
+	 * intent of the read1() API to limit read operations.
+	 */
+	while (!self->finishedInput) {
+		int readResult, decompressResult;
+
+		readResult = read_decompressor_input(self);
+		if (-1 == readResult) {
+			Py_XDECREF(result);
+			return NULL;
+		}
+		else if (0 == readResult || 1 == readResult) { }
+		else {
+			assert(0);
+		}
+
+		decompressResult = decompress_input(self, &output);
+
+		if (-1 == decompressResult) {
+			Py_XDECREF(result);
+			return NULL;
+		}
+		else if (0 == decompressResult || 1 == decompressResult) { }
+		else {
+			assert(0);
+		}
+
+		if (output.pos) {
+		    break;
+		}
+	}
+
+	self->bytesDecompressed += output.pos;
+	if (safe_pybytes_resize(&result, output.pos)) {
+		Py_XDECREF(result);
+		return NULL;
+	}
+
+	return result;
+}
+
+static PyObject* reader_readinto(ZstdDecompressionReader* self, PyObject* args) {
+	Py_buffer dest;
+	ZSTD_outBuffer output;
+	int decompressResult, readResult;
+	PyObject* result = NULL;
+
+	if (self->closed) {
+		PyErr_SetString(PyExc_ValueError, "stream is closed");
+		return NULL;
+	}
+
+	if (self->finishedOutput) {
+		return PyLong_FromLong(0);
+	}
+
+	if (!PyArg_ParseTuple(args, "w*:readinto", &dest)) {
+		return NULL;
+	}
+
+	if (!PyBuffer_IsContiguous(&dest, 'C') || dest.ndim > 1) {
+		PyErr_SetString(PyExc_ValueError,
+			"destination buffer should be contiguous and have at most one dimension");
+	    goto finally;
+	}
+
+	output.dst = dest.buf;
+	output.size = dest.len;
+	output.pos = 0;
+
+readinput:
+
+	decompressResult = decompress_input(self, &output);
+
+	if (-1 == decompressResult) {
+		goto finally;
+	}
+	else if (0 == decompressResult) { }
+	else if (1 == decompressResult) {
+		self->bytesDecompressed += output.pos;
+		result = PyLong_FromSize_t(output.pos);
+		goto finally;
+	}
+	else {
+		assert(0);
+	}
+
+	readResult = read_decompressor_input(self);
+
+	if (-1 == readResult) {
+		goto finally;
+	}
+	else if (0 == readResult) {}
+	else if (1 == readResult) {}
+	else {
+		assert(0);
+	}
+
+	if (self->input.size) {
+		goto readinput;
+	}
+
+	/* EOF */
+	self->bytesDecompressed += output.pos;
+	result = PyLong_FromSize_t(output.pos);
+
+finally:
+	PyBuffer_Release(&dest);
+
+	return result;
+}
+
+static PyObject* reader_readinto1(ZstdDecompressionReader* self, PyObject* args) {
+	Py_buffer dest;
+	ZSTD_outBuffer output;
+	PyObject* result = NULL;
+
+	if (self->closed) {
+		PyErr_SetString(PyExc_ValueError, "stream is closed");
+		return NULL;
+	}
+
+	if (self->finishedOutput) {
+		return PyLong_FromLong(0);
+	}
+
+	if (!PyArg_ParseTuple(args, "w*:readinto1", &dest)) {
+		return NULL;
+	}
+
+	if (!PyBuffer_IsContiguous(&dest, 'C') || dest.ndim > 1) {
+		PyErr_SetString(PyExc_ValueError,
+			"destination buffer should be contiguous and have at most one dimension");
+	    goto finally;
+	}
+
+	output.dst = dest.buf;
+	output.size = dest.len;
+	output.pos = 0;
+
+	while (!self->finishedInput && !self->finishedOutput) {
+		int decompressResult, readResult;
+
+		readResult = read_decompressor_input(self);
+
+		if (-1 == readResult) {
+			goto finally;
+		}
+		else if (0 == readResult || 1 == readResult) {}
+		else {
+			assert(0);
+		}
+
+		decompressResult = decompress_input(self, &output);
+
+		if (-1 == decompressResult) {
+			goto finally;
+		}
+		else if (0 == decompressResult || 1 == decompressResult) {}
+		else {
+			assert(0);
+		}
+
+		if (output.pos) {
+			break;
+		}
+	}
+
+	self->bytesDecompressed += output.pos;
+	result = PyLong_FromSize_t(output.pos);
+
+finally:
+	PyBuffer_Release(&dest);
+
+	return result;
+}
+
 static PyObject* reader_readall(PyObject* self) {
-	PyErr_SetNone(PyExc_NotImplementedError);
-	return NULL;
+	PyObject* chunks = NULL;
+	PyObject* empty = NULL;
+	PyObject* result = NULL;
+
+	/* Our strategy is to collect chunks into a list then join all the
+	 * chunks at the end. We could potentially use e.g. an io.BytesIO. But
+	 * this feels simple enough to implement and avoids potentially expensive
+	 * reallocations of large buffers.
+	 */
+	chunks = PyList_New(0);
+	if (NULL == chunks) {
+		return NULL;
+	}
+
+	while (1) {
+		PyObject* chunk = PyObject_CallMethod(self, "read", "i", 1048576);
+		if (NULL == chunk) {
+			Py_DECREF(chunks);
+			return NULL;
+		}
+
+		if (!PyBytes_Size(chunk)) {
+			Py_DECREF(chunk);
+			break;
+		}
+
+		if (PyList_Append(chunks, chunk)) {
+			Py_DECREF(chunk);
+			Py_DECREF(chunks);
+			return NULL;
+		}
+
+		Py_DECREF(chunk);
+	}
+
+	empty = PyBytes_FromStringAndSize("", 0);
+	if (NULL == empty) {
+		Py_DECREF(chunks);
+		return NULL;
+	}
+
+	result = PyObject_CallMethod(empty, "join", "O", chunks);
+
+	Py_DECREF(empty);
+	Py_DECREF(chunks);
+
+	return result;
 }
 
 static PyObject* reader_readline(PyObject* self) {
-	PyErr_SetNone(PyExc_NotImplementedError);
+	set_unsupported_operation();
 	return NULL;
 }
 
 static PyObject* reader_readlines(PyObject* self) {
-	PyErr_SetNone(PyExc_NotImplementedError);
+	set_unsupported_operation();
 	return NULL;
 }
 
@@ -345,12 +680,12 @@
 }
 
 static PyObject* reader_iter(PyObject* self) {
-	PyErr_SetNone(PyExc_NotImplementedError);
+	set_unsupported_operation();
 	return NULL;
 }
 
 static PyObject* reader_iternext(PyObject* self) {
-	PyErr_SetNone(PyExc_NotImplementedError);
+	set_unsupported_operation();
 	return NULL;
 }
 
@@ -367,6 +702,10 @@
 	PyDoc_STR("Returns True") },
 	{ "read", (PyCFunction)reader_read, METH_VARARGS | METH_KEYWORDS,
 	PyDoc_STR("read compressed data") },
+	{ "read1", (PyCFunction)reader_read1, METH_VARARGS | METH_KEYWORDS,
+	PyDoc_STR("read compressed data") },
+	{ "readinto", (PyCFunction)reader_readinto, METH_VARARGS, NULL },
+	{ "readinto1", (PyCFunction)reader_readinto1, METH_VARARGS, NULL },
 	{ "readall", (PyCFunction)reader_readall, METH_NOARGS, PyDoc_STR("Not implemented") },
 	{ "readline", (PyCFunction)reader_readline, METH_NOARGS, PyDoc_STR("Not implemented") },
 	{ "readlines", (PyCFunction)reader_readlines, METH_NOARGS, PyDoc_STR("Not implemented") },