Skip to content

Zip Back-pressure Handling #5128

@LRagji

Description

@LRagji

Node.js Version

v24.5.0 and even previous LTS version 22

NPM Version

latest

Operating System

Mac and Windows

Subsystem

zlib

Description

Does the following code handle Zip back-pressure correctly ? At least what i could gather from docus, blogs and LLMs it does, but I always get half of the input file when file size moves beyond 100MB why? or defect in Zlib...

import { appendFileSync, createReadStream, mkdirSync } from "node:fs";
import { Transform } from "node:stream";
import { pipeline } from "node:stream/promises";
import { Gzip, createGzip } from "node:zlib";

async function streamTodo(csvFilePath: string): Promise<void> {

    type TSplitStreamContext = {
        zipper?: Gzip
    };

    const tagsStreamContext = new Map<string, TSplitStreamContext>();

    const inputFileStream = createReadStream(csvFilePath, { encoding: 'utf8' });

    let chunkCounter = 0;

    const zipStream = new Transform({
        objectMode: true,
        transform(tagWiseCSVChunks: string, encoding, callback) {
            //const tagIdVSChunkRecords = Object.entries(tagWiseCSVChunks);
            chunkCounter++;
            let tagIdVSChunkRecords = new Map<string, string>();
            if (chunkCounter % 2 === 0) {
                tagIdVSChunkRecords.set("even", tagWiseCSVChunks);
            }
            else {
                tagIdVSChunkRecords.set("odd", tagWiseCSVChunks);
            }

            let upstreamPressure = 0;
            let downstreamBackPressure = 0;
            let invokeCallback = true;
            const invocationName = chunkCounter;

            for (const [tagId, csvChunk] of tagIdVSChunkRecords) {
                const existingContext = tagsStreamContext.get(tagId) ?? {} as TSplitStreamContext;

                if (existingContext.zipper == null || existingContext.zipper === undefined) {
                    existingContext.zipper = createGzip();

                    existingContext.zipper.on('data', (chunk: Buffer) => {
                        this.push({ [tagId]: chunk });
                        // if (this.push({ [tagId]: chunk }) === false) {
                        //     downstreamBackPressure++;
                        //     existingContext.zipper!.pause();
                        //     this.once('drain', () => {
                        //         downstreamBackPressure--;
                        //         if (downstreamBackPressure === 0) {
                        //             existingContext.zipper!.resume();
                        //         }
                        //     });
                        // }
                    });

                    tagsStreamContext.set(tagId, existingContext);
                }
                //existingContext.zipper.write(csvChunk);
                if (existingContext.zipper.write(csvChunk) === false) {
                    invokeCallback = false;
                    upstreamPressure++;
                    existingContext.zipper.once('drain', () => {
                        upstreamPressure--;
                        if (upstreamPressure === 0) {
                            if (chunkCounter < 10) callback();//This controls upstream flow
                            console.log(`[${invocationName}] Callback invoked after drain for invocation ${invocationName}`);
                        }
                    });
                }
            }
            if (invokeCallback === true) {
                callback();//This controls upstream flow
            }
            else {
                console.log(`[${invocationName}] Callback delayed due to upstream pressure in invocation ${invocationName}`);
            }

        },

        final(callback) {
            const promiseHandles = [];

            for (const [tagId, context] of tagsStreamContext) {
                if (context.zipper !== null && context.zipper !== undefined) {
                    promiseHandles.push(new Promise<void>((resolve, reject) => {
                        context.zipper!.once('end', resolve);
                        context.zipper!.once('error', reject);
                        context.zipper!.end();
                    }));
                }
            }

            Promise.all(promiseHandles)
                .then(() => { callback(); console.log('All zippers ended.'); })
                .catch(err => callback(err));
        }
    });

    const fileWriter = new Transform({
        objectMode: true,
        transform(chunk: Record<string, Buffer>, encoding, callback) {
            for (const [tagId, zippedBufferChunk] of Object.entries(chunk)) {
                appendFileSync(`./zipped/${tagId}.gz`, zippedBufferChunk, { flag: 'a' });
            }
            callback();
        }
    });

    await pipeline(
        inputFileStream,
        zipStream,
        fileWriter
    );

    tagsStreamContext.clear();
}

async function mainModule() {
    mkdirSync('./zipped', { recursive: true })
    console.log('Computing...');
    await streamTodo('./1.csv');
}


mainModule()
    .then(() => console.log('All operations completed successfully.'))
    .catch(err => console.error('Error during operations:', err));
//node --experimental-strip-types ./hello.mts

Minimal Reproduction

The above should do it, if the entire file code is required then below is the snippet isolated, reference defect raised nodejs/node#61202 I thought this is a defect in Zlib handling of events, but comments say otherwise but not sure cause the implementation feels by the book..

Output

File smaller than 100MB work perfectly fine, but beyond 100MB the are trimmed off.

Before You Submit

  • I have looked for issues that already exist before submitting this
  • My issue follows the guidelines in the README file, and follows the 'How to ask a good question' guide at https://stackoverflow.com/help/how-to-ask

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions