diff --git a/src/node/internal/internal_zlib_base.ts b/src/node/internal/internal_zlib_base.ts index 443fd43c01e1..543b81fa09dd 100644 --- a/src/node/internal/internal_zlib_base.ts +++ b/src/node/internal/internal_zlib_base.ts @@ -83,7 +83,8 @@ function processCallback(this: zlibUtil.ZlibStream): void { return; } - const [availOutAfter, availInAfter] = state as unknown as [number, number]; + const availOutAfter = state[0] as number; + const availInAfter = state[1] as number; const inDelta = handle.availInBefore - availInAfter; self.bytesWritten += inDelta; @@ -119,9 +120,10 @@ function processCallback(this: zlibUtil.ZlibStream): void { handle.availInBefore = availInAfter; if (!streamBufferIsFull) { + ok(this.buffer, 'Buffer should not have been null'); this.write( handle.flushFlag, - this.buffer as NodeJS.TypedArray, // in + this.buffer, // in handle.inOff, // in_off handle.availInBefore, // in_len self._outBuffer, // out @@ -132,10 +134,11 @@ function processCallback(this: zlibUtil.ZlibStream): void { // eslint-disable-next-line @typescript-eslint/unbound-method const oldRead = self._read; self._read = (n): void => { + ok(this.buffer, 'Buffer should not have been null'); self._read = oldRead; this.write( handle.flushFlag, - this.buffer as NodeJS.TypedArray, // in + this.buffer, // in handle.inOff, // in_off handle.availInBefore, // in_len self._outBuffer, // out @@ -171,7 +174,7 @@ function processCallback(this: zlibUtil.ZlibStream): void { // Z_NO_FLUSH (< Z_TREES) < Z_BLOCK < Z_PARTIAL_FLUSH < // Z_SYNC_FLUSH < Z_FULL_FLUSH < Z_FINISH const flushiness: number[] = []; -const kFlushFlagList = [ +const kFlushFlagList: number[] = [ CONST_Z_NO_FLUSH, CONST_Z_BLOCK, CONST_Z_PARTIAL_FLUSH, @@ -239,16 +242,16 @@ function processChunkSync( }); while (true) { - // TODO(soon): This was `writeSync` before, but it's not anymore. - handle?.write( + ok(handle, 'Handle should have been defined'); + handle.writeSync( flushFlag, chunk, // in inOff, // in_off availInBefore, // in_len buffer, // out offset, // out_off - availOutBefore - ); // out_len + availOutBefore // out_len + ); if (error) throw error; else if (self[kError]) throw self[kError]; @@ -461,8 +464,7 @@ export class ZlibBase extends Transform { } } else if (this.writableEnded) { if (callback) { - /* eslint-disable-next-line @typescript-eslint/no-unsafe-call */ - queueMicrotask(callback); + this.once('end', callback); } } else { this.write(kFlushBuffers[kind as number], 'utf8', callback); @@ -517,16 +519,17 @@ export class ZlibBase extends Transform { } #processChunk(chunk: Buffer, flushFlag: number, cb: () => void): void { - if (this._handle == null) { + if (!this._handle) { /* eslint-disable-next-line @typescript-eslint/no-unsafe-call */ queueMicrotask(cb); return; } - this._handle.buffer = null; + this._handle.buffer = chunk; this._handle.cb = cb; this._handle.availOutBefore = this._chunkSize - this._outOffset; this._handle.availInBefore = chunk.byteLength; + this._handle.inOff = 0; this._handle.flushFlag = flushFlag; this._handle.write( @@ -603,7 +606,7 @@ export class Zlib extends ZlibBase { ); dictionary = options.dictionary; - if (dictionary != null && !isArrayBufferView(dictionary)) { + if (dictionary !== undefined && !isArrayBufferView(dictionary)) { if (isAnyArrayBuffer(dictionary)) { dictionary = Buffer.from(dictionary); } else { @@ -618,13 +621,18 @@ export class Zlib extends ZlibBase { const writeState = new Uint32Array(2); const handle = new zlibUtil.ZlibStream(mode); + handle.initialize( windowBits, level, memLevel, strategy, writeState, - processCallback, + + () => { + // eslint-disable-next-line @typescript-eslint/no-unsafe-call + queueMicrotask(processCallback.bind(handle)); + }, dictionary ); super(options ?? {}, mode, handle); @@ -635,7 +643,7 @@ export class Zlib extends ZlibBase { this._writeState = writeState; } - public params(level: number, strategy: number, callback: () => never): void { + public params(level: number, strategy: number, callback: () => void): void { checkRangesOrGetDefault( level, 'level', @@ -656,7 +664,7 @@ export class Zlib extends ZlibBase { ); } else { /* eslint-disable-next-line @typescript-eslint/no-unsafe-call */ - queueMicrotask(callback); + queueMicrotask(() => callback()); } } diff --git a/src/node/internal/zlib.d.ts b/src/node/internal/zlib.d.ts index a053454e7dd9..2ff378062caf 100644 --- a/src/node/internal/zlib.d.ts +++ b/src/node/internal/zlib.d.ts @@ -182,6 +182,15 @@ export class ZlibStream { outputOffset: number, outputLength: number ): void; + public writeSync( + flushFlag: number, + inputBuffer: NodeJS.TypedArray, + inputOffset: number, + inputLength: number, + outputBuffer: NodeJS.TypedArray, + outputOffset: number, + outputLength: number + ): void; public params(level: number, strategy: number): void; public reset(): void; diff --git a/src/workerd/api/node/tests/zlib-nodejs-test.js b/src/workerd/api/node/tests/zlib-nodejs-test.js index 4b9787d8b251..da3e217b6613 100644 --- a/src/workerd/api/node/tests/zlib-nodejs-test.js +++ b/src/workerd/api/node/tests/zlib-nodejs-test.js @@ -599,12 +599,12 @@ export const testFailedInit = { { const stream = zlib.createGzip({ level: NaN }); - assert.strictEqual(stream._level, zlib.constants.Z_DEFAULT_COMPRESSION); + strictEqual(stream._level, zlib.constants.Z_DEFAULT_COMPRESSION); } { const stream = zlib.createGzip({ strategy: NaN }); - assert.strictEqual(stream._strategy, zlib.constants.Z_DEFAULT_STRATEGY); + strictEqual(stream._strategy, zlib.constants.Z_DEFAULT_STRATEGY); } }, }; @@ -619,7 +619,7 @@ export const zlibDestroyTest = { { const ts = zlib.createGzip(); ts.destroy(); - assert.strictEqual(ts._handle, null); + strictEqual(ts._handle, null); const { promise, resolve, reject } = Promise.withResolvers(); promises.push(promise); @@ -640,7 +640,7 @@ export const zlibDestroyTest = { decompress.on('error', (err) => { errorCount++; decompress.close(); - assert.strictEqual(errorCount, 1, 'Error should only be emitted once'); + strictEqual(errorCount, 1, 'Error should only be emitted once'); resolve(); }); @@ -697,6 +697,65 @@ export const closeAfterError = { // }, // }; +// Tests are taken from: +// https://github.com/nodejs/node/blob/9edf4a0856681a7665bd9dcf2ca7cac252784b98/test/parallel/test-zlib-bytes-read.js +export const testZlibBytesRead = { + async test() { + const expectStr = 'abcdefghijklmnopqrstuvwxyz'.repeat(2); + const expectBuf = Buffer.from(expectStr); + + function createWriter(target, buffer) { + const writer = { size: 0 }; + const write = () => { + target.write(Buffer.from([buffer[writer.size++]]), () => { + if (writer.size < buffer.length) { + target.flush(write); + } else { + target.end(); + } + }); + }; + write(); + return writer; + } + + // This test is simplified a lot because of test runner limitations. + // TODO(soon): Add createBrotliCompress once it is implemented. + for (const method of ['createGzip', 'createDeflate', 'createDeflateRaw']) { + assert(method in zlib, `${method} is not available in "node:zlib"`); + const { promise, resolve, reject } = Promise.withResolvers(); + let compData = Buffer.alloc(0); + const comp = zlib[method](); + const compWriter = createWriter(comp, expectBuf); + comp.on('data', function (d) { + compData = Buffer.concat([compData, d]); + strictEqual( + this.bytesWritten, + compWriter.size, + `Should get write size on ${method} data.` + ); + }); + comp.on('error', reject); + comp.on('end', function () { + strictEqual( + this.bytesWritten, + compWriter.size, + `Should get write size on ${method} end.` + ); + strictEqual( + this.bytesWritten, + expectStr.length, + `Should get data size on ${method} end.` + ); + + resolve(); + }); + + await promise; + } + }, +}; + // Node.js tests relevant to zlib // // - [ ] test-zlib-brotli-16GB.js @@ -729,7 +788,7 @@ export const closeAfterError = { // - [ ] test-zlib-from-concatenated-gzip.js // - [ ] test-zlib-not-string-or-buffer.js // - [ ] test-zlib-write-after-end.js -// - [ ] test-zlib-bytes-read.js +// - [x] test-zlib-bytes-read.js // - [ ] test-zlib-destroy-pipe.js // - [ ] test-zlib-from-gzip.js // - [ ] test-zlib-object-write.js diff --git a/src/workerd/api/node/zlib-util.c++ b/src/workerd/api/node/zlib-util.c++ index 178d26b0d1cd..cbc2baa5d29a 100644 --- a/src/workerd/api/node/zlib-util.c++ +++ b/src/workerd/api/node/zlib-util.c++ @@ -132,6 +132,7 @@ void ZlibContext::initialize(int _level, memLevel = _memLevel; strategy = _strategy; flush = Z_NO_FLUSH; + err = Z_OK; switch (mode) { case ZlibMode::GZIP: @@ -185,6 +186,8 @@ kj::Maybe ZlibContext::setDictionary() { return kj::none; } + err = Z_OK; + switch (mode) { case ZlibMode::DEFLATE: case ZlibMode::DEFLATERAW: @@ -240,6 +243,7 @@ kj::Maybe ZlibContext::resetStream() { if (initialized_now && err != Z_OK) { return constructError("Failed to init stream before reset"); } + err = Z_OK; switch (mode) { case ZlibMode::DEFLATE: case ZlibMode::DEFLATERAW: @@ -267,9 +271,8 @@ void ZlibContext::work() { if (initialized_now && err != Z_OK) { return; } - const Bytef* next_expected_header_byte = nullptr; - err = Z_OK; + const Bytef* next_expected_header_byte = nullptr; // If the avail_out is left at 0, then it means that it ran out // of room. If there was avail_out left over, then it means @@ -285,12 +288,12 @@ void ZlibContext::work() { next_expected_header_byte = stream.next_in; } - if (next_expected_header_byte == nullptr) { - break; - } - switch (gzip_id_bytes_read) { case 0: + if (next_expected_header_byte == nullptr) { + break; + } + if (*next_expected_header_byte == GZIP_HEADER_ID1) { gzip_id_bytes_read = 1; next_expected_header_byte++; @@ -306,6 +309,10 @@ void ZlibContext::work() { [[fallthrough]]; case 1: + if (next_expected_header_byte == nullptr) { + break; + } + if (*next_expected_header_byte == GZIP_HEADER_ID2) { gzip_id_bytes_read = 2; mode = ZlibMode::GUNZIP; @@ -329,6 +336,7 @@ void ZlibContext::work() { // If data was encoded with dictionary (INFLATERAW will have it set in // SetDictionary, don't repeat that here) if (mode != ZlibMode::INFLATERAW && err == Z_NEED_DICT && !dictionary.empty()) { + // Load it err = inflateSetDictionary(&stream, dictionary.begin(), dictionary.size()); if (err == Z_OK) { // And try to decode again @@ -342,7 +350,7 @@ void ZlibContext::work() { } while (stream.avail_in > 0 && mode == ZlibMode::GUNZIP && err == Z_STREAM_END && - stream.next_in[0] != '\0') { + stream.next_in[0] != 0x00) { // Bytes remain in input buffer. Perhaps this is another compressed // member in the same archive, or just trailing garbage. // Trailing zero bytes are okay, though, since they are frequently @@ -367,13 +375,13 @@ kj::Maybe ZlibContext::setParams(int _level, int _strategy) { switch (mode) { case ZlibMode::DEFLATE: case ZlibMode::DEFLATERAW: - err = deflateParams(&stream, level, strategy); + err = deflateParams(&stream, _level, _strategy); break; default: break; } - if (err != Z_OK) { + if (err != Z_OK && err != Z_BUF_ERROR) { return constructError("Failed to set parameters"); } @@ -454,6 +462,7 @@ void CompressionStream::emitError( } template +template void CompressionStream::writeStream(jsg::Lock& js, int flush, kj::ArrayPtr input, @@ -465,14 +474,36 @@ void CompressionStream::writeStream(jsg::Lock& js, JSG_REQUIRE(!writing, Error, "Writing is in progress"_kj); JSG_REQUIRE(!pending_close, Error, "Pending close"_kj); + writing = true; + context.setBuffers(input, inputLength, output, outputLength); context.setFlush(flush); - // This implementation always follow the sync version. + if constexpr (!async) { + context.work(); + if (checkError(js)) { + updateWriteResult(); + writing = false; + } + return; + } + + // On Node.js, this is called as a result of `ScheduleWork()` call. + // Since, we implement the whole thing as sync, we're going to ahead and call the whole thing here. context.work(); - if (checkError(js)) { - context.getAfterWriteOffsets(writeResult); - writing = false; + + // This is implemented slightly differently in Node.js + // Node.js calls AfterThreadPoolWork(). + // Ref: https://github.com/nodejs/node/blob/9edf4a0856681a7665bd9dcf2ca7cac252784b98/src/node_zlib.cc#L402 + writing = false; + if (!checkError(js)) return; + updateWriteResult(); + KJ_IF_SOME(cb, writeCallback) { + cb(js); + } + + if (pending_close) { + close(); } } @@ -498,12 +529,20 @@ bool CompressionStream::checkError(jsg::Lock& js) { template void CompressionStream::initializeStream( - kj::ArrayPtr _writeResult, jsg::Function _writeCallback) { + jsg::BufferSource _writeResult, jsg::Function _writeCallback) { writeResult = kj::mv(_writeResult); writeCallback = kj::mv(_writeCallback); initialized = true; } +template +void CompressionStream::updateWriteResult() { + KJ_IF_SOME(wr, writeResult) { + auto ptr = wr.asArrayPtr(); + context.getAfterWriteResult(&ptr[1], &ptr[0]); + } +} + ZlibUtil::ZlibStream::ZlibStream(ZlibMode mode): CompressionStream() { context.setMode(mode); } @@ -512,19 +551,20 @@ void ZlibUtil::ZlibStream::initialize(int windowBits, int level, int memLevel, int strategy, - kj::Array writeState, + jsg::BufferSource writeState, jsg::Function writeCallback, jsg::Optional> dictionary) { - initializeStream(writeState.asPtr(), kj::mv(writeCallback)); + initializeStream(kj::mv(writeState), kj::mv(writeCallback)); context.initialize(level, windowBits, memLevel, strategy, kj::mv(dictionary)); } -void ZlibUtil::ZlibStream::write(jsg::Lock& js, +template +void ZlibUtil::ZlibStream::write_(jsg::Lock& js, int flush, - kj::Array input, + jsg::Optional> input, int inputOffset, int inputLength, - kj::Array output, + kj::ArrayPtr output, int outputOffset, int outputLength) { if (flush != Z_NO_FLUSH && flush != Z_PARTIAL_FLUSH && flush != Z_SYNC_FLUSH && @@ -532,20 +572,53 @@ void ZlibUtil::ZlibStream::write(jsg::Lock& js, JSG_FAIL_REQUIRE(Error, "Invalid flush value"); } - // Check bounds - JSG_REQUIRE(inputOffset >= 0 && inputOffset < input.size(), Error, - "Offset should be smaller than size and bigger than 0"_kj); - JSG_REQUIRE(input.size() >= inputLength, Error, "Invalid inputLength"_kj); - JSG_REQUIRE(outputOffset >= 0 && outputOffset < output.size(), Error, - "Offset should be smaller than size and bigger than 0"_kj); - JSG_REQUIRE(output.size() >= outputLength, Error, "Invalid outputLength"_kj); + // Use default values if input is not determined + if (input == kj::none) { + inputLength = 0; + inputOffset = 0; + } + + JSG_REQUIRE((inputLength > inputOffset) || inputLength == 0, Error, + kj::str("Input offset should be smaller or equal to length, but received offset: ", + inputOffset, " and length: ", inputLength)); + JSG_REQUIRE((outputLength > outputOffset) || outputLength == 0, Error, + kj::str("Output offset should be smaller or equal to length, but received offset: ", + outputOffset, " and length: ", outputLength)); - writeStream( - js, flush, input.slice(inputOffset), inputLength, output.slice(outputOffset), outputLength); + auto input_ensured = input.map([](auto& val) { return val.asPtr(); }).orDefault({}); + writeStream(js, flush, input_ensured.slice(inputOffset), inputLength, + output.slice(outputOffset), outputLength); } -void ZlibUtil::ZlibStream::params(int level, int strategy) { - context.setParams(level, strategy); +void ZlibUtil::ZlibStream::write(jsg::Lock& js, + int flush, + jsg::Optional> input, + int inputOffset, + int inputLength, + kj::Array output, + int outputOffset, + int outputLength) { + write_(js, flush, kj::mv(input), inputOffset, inputLength, output.asPtr(), outputOffset, + outputLength); +} + +void ZlibUtil::ZlibStream::writeSync(jsg::Lock& js, + int flush, + jsg::Optional> input, + int inputOffset, + int inputLength, + kj::Array output, + int outputOffset, + int outputLength) { + write_(js, flush, kj::mv(input), inputOffset, inputLength, output.asPtr(), outputOffset, + outputLength); +} + +void ZlibUtil::ZlibStream::params(jsg::Lock& js, int _level, int _strategy) { + context.setParams(_level, _strategy); + KJ_IF_SOME(err, context.getError()) { + emitError(js, kj::mv(err)); + } } void ZlibUtil::ZlibStream::reset(jsg::Lock& js) { diff --git a/src/workerd/api/node/zlib-util.h b/src/workerd/api/node/zlib-util.h index bd143583bea3..42764abc0caa 100644 --- a/src/workerd/api/node/zlib-util.h +++ b/src/workerd/api/node/zlib-util.h @@ -6,6 +6,7 @@ #include #include +#include #include #include @@ -108,15 +109,20 @@ class ZlibContext { void setFlush(int value) { flush = value; }; - void getAfterWriteOffsets(kj::ArrayPtr writeResult) const { - writeResult[0] = stream.avail_out; - writeResult[1] = stream.avail_in; + // Function signature is same as Node.js implementation. + // Ref: https://github.com/nodejs/node/blob/9edf4a0856681a7665bd9dcf2ca7cac252784b98/src/node_zlib.cc#L880 + void getAfterWriteResult(uint32_t* availIn, uint32_t* availOut) const { + *availIn = stream.avail_in; + *availOut = stream.avail_out; } void setMode(ZlibMode value) { mode = value; }; kj::Maybe resetStream(); kj::Maybe getError() const; + + // Equivalent to Node.js' `DoThreadPoolWork` function. + // Ref: https://github.com/nodejs/node/blob/9edf4a0856681a7665bd9dcf2ca7cac252784b98/src/node_zlib.cc#L760 void work(); uint getAvailIn() const { @@ -176,6 +182,7 @@ class CompressionStream { void close(); bool checkError(jsg::Lock& js); void emitError(jsg::Lock& js, const CompressionError& error); + template void writeStream(jsg::Lock& js, int flush, kj::ArrayPtr input, @@ -185,7 +192,8 @@ class CompressionStream { void setErrorHandler(CompressionStreamErrorHandler handler) { errorHandler = kj::mv(handler); }; - void initializeStream(kj::ArrayPtr _write_result, jsg::Function writeCallback); + void initializeStream(jsg::BufferSource _write_result, jsg::Function writeCallback); + void updateWriteResult(); protected: CompressionContext context; @@ -198,7 +206,7 @@ class CompressionStream { // Equivalent to `write_js_callback` in Node.js jsg::Optional> writeCallback; - kj::ArrayPtr writeResult = nullptr; + jsg::Optional writeResult; jsg::Optional errorHandler; }; @@ -219,24 +227,44 @@ class ZlibUtil final: public jsg::Object { int level, int memLevel, int strategy, - kj::Array writeState, + jsg::BufferSource writeState, jsg::Function writeCallback, jsg::Optional> dictionary); + template + void write_(jsg::Lock& js, + int flush, + jsg::Optional> input, + int inputOffset, + int inputLength, + kj::ArrayPtr output, + int outputOffset, + int outputLength); + + // TODO(soon): Find a way to expose functions with templates using JSG_METHOD. void write(jsg::Lock& js, int flush, - kj::Array input, + jsg::Optional> input, + int inputOffset, + int inputLength, + kj::Array output, + int outputOffset, + int outputLength); + void writeSync(jsg::Lock& js, + int flush, + jsg::Optional> input, int inputOffset, int inputLength, kj::Array output, int outputOffset, int outputLength); - void params(int level, int strategy); + void params(jsg::Lock& js, int level, int strategy); void reset(jsg::Lock& js); JSG_RESOURCE_TYPE(ZlibStream) { JSG_METHOD(initialize); JSG_METHOD(close); JSG_METHOD(write); + JSG_METHOD(writeSync); JSG_METHOD(params); JSG_METHOD(setErrorHandler); JSG_METHOD(reset); diff --git a/src/workerd/jsg/buffersource.h b/src/workerd/jsg/buffersource.h index 5d259c334512..f3c773edd4fc 100644 --- a/src/workerd/jsg/buffersource.h +++ b/src/workerd/jsg/buffersource.h @@ -112,26 +112,32 @@ class BackingStore { BackingStore& operator=(BackingStore&& other) = default; KJ_DISALLOW_COPY(BackingStore); - inline kj::ArrayPtr asArrayPtr() KJ_LIFETIMEBOUND { + template + inline kj::ArrayPtr asArrayPtr() KJ_LIFETIMEBOUND { KJ_ASSERT(backingStore != nullptr, "Invalid access after move."); - return kj::ArrayPtr( - static_cast(backingStore->Data()) + byteOffset, byteLength); + KJ_ASSERT(byteLength % sizeof(T) == 0); + return kj::ArrayPtr( + static_cast(backingStore->Data()) + byteOffset, byteLength / sizeof(T)); } - inline operator kj::ArrayPtr() KJ_LIFETIMEBOUND { - return asArrayPtr(); + template + inline operator kj::ArrayPtr() KJ_LIFETIMEBOUND { + return asArrayPtr(); } bool operator==(const BackingStore& other); - inline const kj::ArrayPtr asArrayPtr() const KJ_LIFETIMEBOUND { + template + inline const kj::ArrayPtr asArrayPtr() const KJ_LIFETIMEBOUND { KJ_ASSERT(backingStore != nullptr, "Invalid access after move."); - return kj::ArrayPtr( - static_cast(backingStore->Data()) + byteOffset, byteLength); + KJ_ASSERT(byteLength % sizeof(T) == 0); + return kj::ArrayPtr( + static_cast(backingStore->Data()) + byteOffset, byteLength / sizeof(T)); } - inline operator const kj::ArrayPtr() const KJ_LIFETIMEBOUND { - return asArrayPtr(); + template + inline operator const kj::ArrayPtr() const KJ_LIFETIMEBOUND { + return asArrayPtr(); } inline size_t size() const { @@ -303,20 +309,24 @@ class BufferSource { v8::Local getHandle(Lock& js); - inline kj::ArrayPtr asArrayPtr() KJ_LIFETIMEBOUND { - return KJ_ASSERT_NONNULL(maybeBackingStore).asArrayPtr(); + template + inline kj::ArrayPtr asArrayPtr() KJ_LIFETIMEBOUND { + return KJ_ASSERT_NONNULL(maybeBackingStore).asArrayPtr(); } - inline operator kj::ArrayPtr() KJ_LIFETIMEBOUND { - return asArrayPtr(); + template + inline operator kj::ArrayPtr() KJ_LIFETIMEBOUND { + return asArrayPtr(); } - inline const kj::ArrayPtr asArrayPtr() const KJ_LIFETIMEBOUND { - return KJ_ASSERT_NONNULL(maybeBackingStore).asArrayPtr(); + template + inline const kj::ArrayPtr asArrayPtr() const KJ_LIFETIMEBOUND { + return KJ_ASSERT_NONNULL(maybeBackingStore).asArrayPtr(); } - inline operator const kj::ArrayPtr() const KJ_LIFETIMEBOUND { - return asArrayPtr(); + template + inline operator const kj::ArrayPtr() const KJ_LIFETIMEBOUND { + return asArrayPtr(); } inline size_t size() const {