view contrib/python-zstandard/c-ext/compressoriterator.c @ 38420:c1f4364f9336

fix: include cleanupnodes() in transaction As pointed out by Yuya, we need a transaction to make sure the state before the call to cleanupnodes() is not observable. Differential Revision: https://phab.mercurial-scm.org/D3823
author Martin von Zweigbergk <martinvonz@google.com>
date Wed, 20 Jun 2018 09:27:30 -0700
parents b1fb341d8a61
children 675775c33ab6
line wrap: on
line source

/**
* Copyright (c) 2016-present, Gregory Szorc
* All rights reserved.
*
* This software may be modified and distributed under the terms
* of the BSD license. See the LICENSE file for details.
*/

#include "python-zstandard.h"

#define min(a, b) (((a) < (b)) ? (a) : (b))

extern PyObject* ZstdError;

PyDoc_STRVAR(ZstdCompressorIterator__doc__,
"Represents an iterator of compressed data.\n"
);

static void ZstdCompressorIterator_dealloc(ZstdCompressorIterator* self) {
	Py_XDECREF(self->readResult);
	Py_XDECREF(self->compressor);
	Py_XDECREF(self->reader);

	if (self->buffer.buf) {
		PyBuffer_Release(&self->buffer);
		memset(&self->buffer, 0, sizeof(self->buffer));
	}

	if (self->output.dst) {
		PyMem_Free(self->output.dst);
		self->output.dst = NULL;
	}

	PyObject_Del(self);
}

static PyObject* ZstdCompressorIterator_iter(PyObject* self) {
	Py_INCREF(self);
	return self;
}

static PyObject* ZstdCompressorIterator_iternext(ZstdCompressorIterator* self) {
	size_t zresult;
	PyObject* readResult = NULL;
	PyObject* chunk;
	char* readBuffer;
	Py_ssize_t readSize = 0;
	Py_ssize_t bufferRemaining;

	if (self->finishedOutput) {
		PyErr_SetString(PyExc_StopIteration, "output flushed");
		return NULL;
	}

feedcompressor:

	/* If we have data left in the input, consume it. */
	if (self->input.pos < self->input.size) {
		Py_BEGIN_ALLOW_THREADS
		zresult = ZSTD_compress_generic(self->compressor->cctx, &self->output,
			&self->input, ZSTD_e_continue);
		Py_END_ALLOW_THREADS

		/* Release the Python object holding the input buffer. */
		if (self->input.pos == self->input.size) {
			self->input.src = NULL;
			self->input.pos = 0;
			self->input.size = 0;
			Py_DECREF(self->readResult);
			self->readResult = NULL;
		}

		if (ZSTD_isError(zresult)) {
			PyErr_Format(ZstdError, "zstd compress error: %s", ZSTD_getErrorName(zresult));
			return NULL;
		}

		/* If it produced output data, emit it. */
		if (self->output.pos) {
			chunk = PyBytes_FromStringAndSize(self->output.dst, self->output.pos);
			self->output.pos = 0;
			return chunk;
		}
	}

	/* We should never have output data sitting around after a previous call. */
	assert(self->output.pos == 0);

	/* The code above should have either emitted a chunk and returned or consumed
	the entire input buffer. So the state of the input buffer is not
	relevant. */
	if (!self->finishedInput) {
		if (self->reader) {
			readResult = PyObject_CallMethod(self->reader, "read", "I", self->inSize);
			if (!readResult) {
				PyErr_SetString(ZstdError, "could not read() from source");
				return NULL;
			}

			PyBytes_AsStringAndSize(readResult, &readBuffer, &readSize);
		}
		else {
			assert(self->buffer.buf);

			/* Only support contiguous C arrays. */
			assert(self->buffer.strides == NULL && self->buffer.suboffsets == NULL);
			assert(self->buffer.itemsize == 1);

			readBuffer = (char*)self->buffer.buf + self->bufferOffset;
			bufferRemaining = self->buffer.len - self->bufferOffset;
			readSize = min(bufferRemaining, (Py_ssize_t)self->inSize);
			self->bufferOffset += readSize;
		}

		if (0 == readSize) {
			Py_XDECREF(readResult);
			self->finishedInput = 1;
		}
		else {
			self->readResult = readResult;
		}
	}

	/* EOF */
	if (0 == readSize) {
		self->input.src = NULL;
		self->input.size = 0;
		self->input.pos = 0;

		zresult = ZSTD_compress_generic(self->compressor->cctx, &self->output,
			&self->input, ZSTD_e_end);
		if (ZSTD_isError(zresult)) {
			PyErr_Format(ZstdError, "error ending compression stream: %s",
				ZSTD_getErrorName(zresult));
			return NULL;
		}

		assert(self->output.pos);

		if (0 == zresult) {
			self->finishedOutput = 1;
		}

		chunk = PyBytes_FromStringAndSize(self->output.dst, self->output.pos);
		self->output.pos = 0;
		return chunk;
	}

	/* New data from reader. Feed into compressor. */
	self->input.src = readBuffer;
	self->input.size = readSize;
	self->input.pos = 0;

	Py_BEGIN_ALLOW_THREADS
	zresult = ZSTD_compress_generic(self->compressor->cctx, &self->output,
		&self->input, ZSTD_e_continue);
	Py_END_ALLOW_THREADS

	/* The input buffer currently points to memory managed by Python
	(readBuffer). This object was allocated by this function. If it wasn't
	fully consumed, we need to release it in a subsequent function call.
	If it is fully consumed, do that now.
	*/
	if (self->input.pos == self->input.size) {
		self->input.src = NULL;
		self->input.pos = 0;
		self->input.size = 0;
		Py_XDECREF(self->readResult);
		self->readResult = NULL;
	}

	if (ZSTD_isError(zresult)) {
		PyErr_Format(ZstdError, "zstd compress error: %s", ZSTD_getErrorName(zresult));
		return NULL;
	}

	assert(self->input.pos <= self->input.size);

	/* If we didn't write anything, start the process over. */
	if (0 == self->output.pos) {
		goto feedcompressor;
	}

	chunk = PyBytes_FromStringAndSize(self->output.dst, self->output.pos);
	self->output.pos = 0;
	return chunk;
}

PyTypeObject ZstdCompressorIteratorType = {
	PyVarObject_HEAD_INIT(NULL, 0)
	"zstd.ZstdCompressorIterator",   /* tp_name */
	sizeof(ZstdCompressorIterator),  /* tp_basicsize */
	0,                               /* tp_itemsize */
	(destructor)ZstdCompressorIterator_dealloc, /* tp_dealloc */
	0,                               /* tp_print */
	0,                               /* tp_getattr */
	0,                               /* tp_setattr */
	0,                               /* tp_compare */
	0,                               /* tp_repr */
	0,                               /* tp_as_number */
	0,                               /* tp_as_sequence */
	0,                               /* tp_as_mapping */
	0,                               /* tp_hash */
	0,                               /* tp_call */
	0,                               /* tp_str */
	0,                               /* tp_getattro */
	0,                               /* tp_setattro */
	0,                               /* tp_as_buffer */
	Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
	ZstdCompressorIterator__doc__,   /* tp_doc */
	0,                               /* tp_traverse */
	0,                               /* tp_clear */
	0,                               /* tp_richcompare */
	0,                               /* tp_weaklistoffset */
	ZstdCompressorIterator_iter,     /* tp_iter */
	(iternextfunc)ZstdCompressorIterator_iternext, /* tp_iternext */
	0,                               /* tp_methods */
	0,                               /* tp_members */
	0,                               /* tp_getset */
	0,                               /* tp_base */
	0,                               /* tp_dict */
	0,                               /* tp_descr_get */
	0,                               /* tp_descr_set */
	0,                               /* tp_dictoffset */
	0,                               /* tp_init */
	0,                               /* tp_alloc */
	PyType_GenericNew,              /* tp_new */
};

void compressoriterator_module_init(PyObject* mod) {
	Py_TYPE(&ZstdCompressorIteratorType) = &PyType_Type;
	if (PyType_Ready(&ZstdCompressorIteratorType) < 0) {
		return;
	}
}