From f15fe58c8495f79e359e1baf64dd71fe744f1414 Mon Sep 17 00:00:00 2001 From: Vladimir Voronkov Date: Thu, 28 Sep 2023 13:03:33 +0300 Subject: [PATCH 1/3] Add stream writer helper for complicated csv --- .../generate-csv/csv-writer/helpers/index.js | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 workers/loc.api/generate-csv/csv-writer/helpers/index.js diff --git a/workers/loc.api/generate-csv/csv-writer/helpers/index.js b/workers/loc.api/generate-csv/csv-writer/helpers/index.js new file mode 100644 index 00000000..91b2cf8f --- /dev/null +++ b/workers/loc.api/generate-csv/csv-writer/helpers/index.js @@ -0,0 +1,42 @@ +'use strict' + +const { pipeline } = require('stream/promises') +const { stringify } = require('csv') + +const streamWriterToOne = async ( + rStream, + wStream, + writeFn, + opts +) => { + const { end = true } = opts ?? {} + const promise = pipeline(rStream, wStream, { end }) + + writeFn(rStream) + rStream.end() + + await promise +} + +const streamWriter = async (wStream, csvStreamDataMap) => { + for (const [i, csvStreamData] of csvStreamDataMap.entries()) { + const isLast = (i + 1) === csvStreamDataMap.length + const { + columnParams, + writeFn + } = csvStreamData + + const stringifier = stringify(columnParams) + await streamWriterToOne( + stringifier, + wStream, + writeFn, + { end: isLast } + ) + } +} + +module.exports = { + streamWriterToOne, + streamWriter +} From b1c394d2a396f2aacf1d819b0ac6b7369b4d7671 Mon Sep 17 00:00:00 2001 From: Vladimir Voronkov Date: Thu, 28 Sep 2023 13:05:24 +0300 Subject: [PATCH 2/3] Rework weighted-averages-report-csv-writer to prevent MaxListenersExceededWarning --- .../weighted-averages-report-csv-writer.js | 68 +++++++++---------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/workers/loc.api/generate-csv/csv-writer/weighted-averages-report-csv-writer.js b/workers/loc.api/generate-csv/csv-writer/weighted-averages-report-csv-writer.js index 86960164..15134bdf 100644 --- a/workers/loc.api/generate-csv/csv-writer/weighted-averages-report-csv-writer.js +++ b/workers/loc.api/generate-csv/csv-writer/weighted-averages-report-csv-writer.js @@ -1,13 +1,10 @@ 'use strict' -const { pipeline } = require('stream') -const { stringify } = require('csv') - const { write } = require('../../queue/write-data-to-stream/helpers') -const nope = () => {} +const { streamWriter } = require('./helpers') module.exports = ( rService, @@ -28,50 +25,53 @@ module.exports = ( queue.emit('progress', 0) if (typeof jobData === 'string') { - const stringifier = stringify( - { columns: ['mess'] } + await streamWriter( + wStream, + [{ + columnParams: { columns: ['mess'] }, + writeFn: (stream) => write([{ mess: jobData }], stream) + }] ) - pipeline(stringifier, wStream, nope) - write([{ mess: jobData }], stringifier) queue.emit('progress', 100) - stringifier.end() return } - wStream.setMaxListeners(50) - - const headerStringifier = stringify( - { columns: ['empty', 'buy', 'empty', 'empty', 'sell', 'empty', 'empty', 'cumulative', 'empty'] } - ) - const resStringifier = stringify({ - header: true, - columns: columnsCsv - }) - - pipeline(headerStringifier, wStream, nope) - pipeline(resStringifier, wStream, nope) - const { res } = await getDataFromApi({ getData: rService[name].bind(rService), args, callerName: 'CSV_WRITER' }) - write( - [{ empty: '', buy: 'Buy', sell: 'Sell', cumulative: 'Cumulative' }], - headerStringifier - ) - write( - res, - resStringifier, - formatSettings, - params + wStream.setMaxListeners(50) + + await streamWriter( + wStream, + [ + { + columnParams: { + columns: ['empty', 'buy', 'empty', 'empty', 'sell', 'empty', 'empty', 'cumulative', 'empty'] + }, + writeFn: (stream) => write( + [{ empty: '', buy: 'Buy', sell: 'Sell', cumulative: 'Cumulative' }], + stream + ) + }, + { + columnParams: { + header: true, + columns: columnsCsv + }, + writeFn: (stream) => write( + res, + stream, + formatSettings, + params + ) + } + ] ) queue.emit('progress', 100) - - headerStringifier.end() - resStringifier.end() } From 51674c706e4f01eea32844cb9aa9a45590dad27d Mon Sep 17 00:00:00 2001 From: Vladimir Voronkov Date: Thu, 28 Sep 2023 13:28:20 +0300 Subject: [PATCH 3/3] Fix params schema for weighted averages report csv --- workers/loc.api/generate-csv/csv.job.data.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workers/loc.api/generate-csv/csv.job.data.js b/workers/loc.api/generate-csv/csv.job.data.js index 9be0f766..963388bb 100644 --- a/workers/loc.api/generate-csv/csv.job.data.js +++ b/workers/loc.api/generate-csv/csv.job.data.js @@ -1038,7 +1038,7 @@ class CsvJobData { uId, uInfo ) { - checkParams(args, 'paramsSchemaForWeightedAveragesReportApiCsv') + checkParams(args, 'paramsSchemaForWeightedAveragesReportApiCsv', ['symbol']) const { userId,