При пайпе стрима с объектом респонса клиент получает не всю пачку данных

Нужно конвертировать табличку из БД в csv отчёт.
Если сразу выгрузить всю табличку одним запросом, то приложение падает т.к. заканчивается память. Решил запрашивать данные из таблички порциями по 100 штук, конвертировать каждую запись в строчку отчёта и писать её в поток, который пайпится с экспрессовским респонсом.

Всё это происходит приблизительно вот так:

  1. получение записей из БД

    const select100Users = (maxUserCreationDateStr) => {
       return db.query(`
          SELECT * FROM users WHERE created_at < to_timestamp(${maxUserCreationDateStr})
          ORDER BY created_at DESC LIMIT 100`);
    }
    
  2. создание потока

    const { PassThrough } = require('stream');
    const getUserReportStream = () => {
       const stream = new PassThrough();
       writeUserReport(stream).catch((e) => stream.emit('error', e));
       return stream;
    };
    
  3. передача потока в респонс

    app.get('/report', (req, res) => {
       const stream = getUserReportStream();
       res.setHeader('Content-Type', 'application/vnd.ms-excel');
       res.setHeader(`Content-Disposition', 'attachment; filename="${ filename }"`);
    
       stream.pipe(res);
    });
    
  4. ну и наконец как я пишу данные в поток

    const writeUserReport(stream) => {
       let maxUserCreationDateGlobal = Math.trunc(Date.now() / 1000);
       let flag = true;
    
       stream.write(USER_REPORT_HEADER);
    
       while (flag) {
          const rows100 = await select100Users(maxUserCreationDateGlobal);
          console.log(rows100.length);
    
          if (rows100.length === 0) {
             flag = false;
          } else {
             let maxUserCreationDate = maxUserCreationDateGlobal;
    
             const users100 = await Promise.all(
                rows100.map((r) => {
                   const created_at = r.created_at;
                   const createdAt = new Date(created_at);
    
                   if (created_at && createdAt.toString() !== 'Invalid Date') {
                      const createdAtNumber = Math.trunc(createdAt.valueOf() / 1000);
                      maxUserCreationDate = Math.min(maxUserCreationDate, createdAtNumber);
                   }
    
                   return mapUser(r); // возвращает промис
                })
             );
    
             users100.forEach((u) => stream.write(generateCsvRowFromUser(u)));
             maxUserCreationDateGlobal = maxUserCreationDate;
    
             if (rows100.length < 100) {
                flag = false;
                console.log('***');
             }
          }
       }
    
       console.log('end');
       stream.end();
    };
    

в результате в консоли вижу вот такой вывод:

100 // 100
100 // 200
100 // 300
100 // 400
100 // 500
87  // 587
***
end

Но в скачанном файле я получаю 401 строчку (первая с USER_REPORT_HEADER). Такое ощущение, что stream.end() закрывает поток до того как из него вычитают все значения.

Аналогичным оброзом пробовал вместо PassThrough использовать BehaviorSubject из rxjs - результат такой же.

Как мне дождаться чтения из потока всех данных что я туда писал?
Или может кто-то может порекомендовать альтернативный вариант решить данную проблему.


Ответы (0 шт):