이벤트 기반 스트림을 사용할 때, 비동기 핸들러로 데이터를 처리하면 순서가 꼬일 수 있다.
이 글에서는 'AsyncIterator'를 활용하여 스트림 데이터를 순차적으로 처리하는 방법을 소개한다.
1. 비동기 핸들러 사용 시 병렬 처리
const fs = require('fs');
const { writeFile } = require('fs/promises');
const sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms));
const readStream = fs.createReadStream(__filename, { encoding: 'utf8', highWaterMark: 64 });
const writeFileName = `${__filename}-${Date.now()}`;
const write = async (chunk) => {
await sleep(Math.random() * 1000);
await writeFile(writeFileName, chunk, { flag: 'a' });
}
let counter = 0;
readStream.on('data', async (chunk) => {
console.log(counter);
counter++;
await write(chunk);
})
readStream.on('close', () => {
console.log('close');
})
readStream.on('error', (e) => {
console.log('error: ', e);
})
병렬로 처리되었기에 순서가 엉망진창이다.
async 함수 내의 await은 해당 콜백 내 비동기 작업을 기다리게 만들지만, on('data')로 들어오는 이벤트 자체는 여전히 비동기적으로 병렬 실행된다.
readStream.on('data', async (chunk) => {
...
await write(chunk);
});
2. AsyncIterator를 활용한 순차 처리
병렬 처리가 되던 스트림을 직렬로 처리하려면 아래와 같이 AsyncIterator를 사용하면 된다.
AsyncIterator는 비동기 처리를 구현한 객체를 배열과 같이 반복 처리를 한다.
for await (const 변수 of 비동기를_반복할_수_있는_객체) {
}
const fs = require('fs');
const { writeFile } = require('fs/promises');
const sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms));
const writeFileName = `${__filename}-${Date.now()}`;
const write = async (chunk) => {
await sleep(Math.random() * 1000);
await writeFile(writeFileName, chunk, { flag: 'a' });
}
const main = async () => {
const stream = fs.createReadStream(__filename, { encoding: 'utf8', highWaterMark: 64 });
let counter = 0;
for await (const chunk of stream) {
console.log(counter);
counter++;
await write(chunk);
}
}
main()
.catch((e) => console.error(e));
직렬로 처리되어 기존 파일과 동일하다.
이처럼 `for await...of` 구문을 사용하면 스트림 데이터를 순차적으로 처리할 수 있어, 파일 쓰기나 로그 적재처럼 순서가 중요한 작업에 유용하다.