При пайпе стрима с объектом респонса клиент получает не всю пачку данных
Нужно конвертировать табличку из БД в csv отчёт.
Если сразу выгрузить всю табличку одним запросом, то приложение падает т.к. заканчивается память. Решил запрашивать данные из таблички порциями по 100 штук, конвертировать каждую запись в строчку отчёта и писать её в поток, который пайпится с экспрессовским респонсом.
Всё это происходит приблизительно вот так:
получение записей из БД
const select100Users = (maxUserCreationDateStr) => { return db.query(` SELECT * FROM users WHERE created_at < to_timestamp(${maxUserCreationDateStr}) ORDER BY created_at DESC LIMIT 100`); }создание потока
const { PassThrough } = require('stream'); const getUserReportStream = () => { const stream = new PassThrough(); writeUserReport(stream).catch((e) => stream.emit('error', e)); return stream; };передача потока в респонс
app.get('/report', (req, res) => { const stream = getUserReportStream(); res.setHeader('Content-Type', 'application/vnd.ms-excel'); res.setHeader(`Content-Disposition', 'attachment; filename="${ filename }"`); stream.pipe(res); });ну и наконец как я пишу данные в поток
const writeUserReport(stream) => { let maxUserCreationDateGlobal = Math.trunc(Date.now() / 1000); let flag = true; stream.write(USER_REPORT_HEADER); while (flag) { const rows100 = await select100Users(maxUserCreationDateGlobal); console.log(rows100.length); if (rows100.length === 0) { flag = false; } else { let maxUserCreationDate = maxUserCreationDateGlobal; const users100 = await Promise.all( rows100.map((r) => { const created_at = r.created_at; const createdAt = new Date(created_at); if (created_at && createdAt.toString() !== 'Invalid Date') { const createdAtNumber = Math.trunc(createdAt.valueOf() / 1000); maxUserCreationDate = Math.min(maxUserCreationDate, createdAtNumber); } return mapUser(r); // возвращает промис }) ); users100.forEach((u) => stream.write(generateCsvRowFromUser(u))); maxUserCreationDateGlobal = maxUserCreationDate; if (rows100.length < 100) { flag = false; console.log('***'); } } } console.log('end'); stream.end(); };
в результате в консоли вижу вот такой вывод:
100 // 100
100 // 200
100 // 300
100 // 400
100 // 500
87 // 587
***
end
Но в скачанном файле я получаю 401 строчку (первая с USER_REPORT_HEADER). Такое ощущение, что stream.end() закрывает поток до того как из него вычитают все значения.
Аналогичным оброзом пробовал вместо PassThrough использовать BehaviorSubject из rxjs - результат такой же.
Как мне дождаться чтения из потока всех данных что я туда писал?
Или может кто-то может порекомендовать альтернативный вариант решить данную проблему.