-
Notifications
You must be signed in to change notification settings - Fork 305
Open
Description
Node.js Version
v24.5.0 and even previous LTS version 22
NPM Version
latest
Operating System
Mac and Windows
Subsystem
zlib
Description
Does the following code handle Zip back-pressure correctly ? At least what i could gather from docus, blogs and LLMs it does, but I always get half of the input file when file size moves beyond 100MB why? or defect in Zlib...
import { appendFileSync, createReadStream, mkdirSync } from "node:fs";
import { Transform } from "node:stream";
import { pipeline } from "node:stream/promises";
import { Gzip, createGzip } from "node:zlib";
async function streamTodo(csvFilePath: string): Promise<void> {
type TSplitStreamContext = {
zipper?: Gzip
};
const tagsStreamContext = new Map<string, TSplitStreamContext>();
const inputFileStream = createReadStream(csvFilePath, { encoding: 'utf8' });
let chunkCounter = 0;
const zipStream = new Transform({
objectMode: true,
transform(tagWiseCSVChunks: string, encoding, callback) {
//const tagIdVSChunkRecords = Object.entries(tagWiseCSVChunks);
chunkCounter++;
let tagIdVSChunkRecords = new Map<string, string>();
if (chunkCounter % 2 === 0) {
tagIdVSChunkRecords.set("even", tagWiseCSVChunks);
}
else {
tagIdVSChunkRecords.set("odd", tagWiseCSVChunks);
}
let upstreamPressure = 0;
let downstreamBackPressure = 0;
let invokeCallback = true;
const invocationName = chunkCounter;
for (const [tagId, csvChunk] of tagIdVSChunkRecords) {
const existingContext = tagsStreamContext.get(tagId) ?? {} as TSplitStreamContext;
if (existingContext.zipper == null || existingContext.zipper === undefined) {
existingContext.zipper = createGzip();
existingContext.zipper.on('data', (chunk: Buffer) => {
this.push({ [tagId]: chunk });
// if (this.push({ [tagId]: chunk }) === false) {
// downstreamBackPressure++;
// existingContext.zipper!.pause();
// this.once('drain', () => {
// downstreamBackPressure--;
// if (downstreamBackPressure === 0) {
// existingContext.zipper!.resume();
// }
// });
// }
});
tagsStreamContext.set(tagId, existingContext);
}
//existingContext.zipper.write(csvChunk);
if (existingContext.zipper.write(csvChunk) === false) {
invokeCallback = false;
upstreamPressure++;
existingContext.zipper.once('drain', () => {
upstreamPressure--;
if (upstreamPressure === 0) {
if (chunkCounter < 10) callback();//This controls upstream flow
console.log(`[${invocationName}] Callback invoked after drain for invocation ${invocationName}`);
}
});
}
}
if (invokeCallback === true) {
callback();//This controls upstream flow
}
else {
console.log(`[${invocationName}] Callback delayed due to upstream pressure in invocation ${invocationName}`);
}
},
final(callback) {
const promiseHandles = [];
for (const [tagId, context] of tagsStreamContext) {
if (context.zipper !== null && context.zipper !== undefined) {
promiseHandles.push(new Promise<void>((resolve, reject) => {
context.zipper!.once('end', resolve);
context.zipper!.once('error', reject);
context.zipper!.end();
}));
}
}
Promise.all(promiseHandles)
.then(() => { callback(); console.log('All zippers ended.'); })
.catch(err => callback(err));
}
});
const fileWriter = new Transform({
objectMode: true,
transform(chunk: Record<string, Buffer>, encoding, callback) {
for (const [tagId, zippedBufferChunk] of Object.entries(chunk)) {
appendFileSync(`./zipped/${tagId}.gz`, zippedBufferChunk, { flag: 'a' });
}
callback();
}
});
await pipeline(
inputFileStream,
zipStream,
fileWriter
);
tagsStreamContext.clear();
}
async function mainModule() {
mkdirSync('./zipped', { recursive: true })
console.log('Computing...');
await streamTodo('./1.csv');
}
mainModule()
.then(() => console.log('All operations completed successfully.'))
.catch(err => console.error('Error during operations:', err));
//node --experimental-strip-types ./hello.mts
Minimal Reproduction
The above should do it, if the entire file code is required then below is the snippet isolated, reference defect raised nodejs/node#61202 I thought this is a defect in Zlib handling of events, but comments say otherwise but not sure cause the implementation feels by the book..
Output
File smaller than 100MB work perfectly fine, but beyond 100MB the are trimmed off.
Before You Submit
- I have looked for issues that already exist before submitting this
- My issue follows the guidelines in the README file, and follows the 'How to ask a good question' guide at https://stackoverflow.com/help/how-to-ask
Metadata
Metadata
Assignees
Labels
No labels