diff --git a/lib/actions/google/drive/sheets/google_sheets.js b/lib/actions/google/drive/sheets/google_sheets.js index 01e2eb12a..b38acc716 100644 --- a/lib/actions/google/drive/sheets/google_sheets.js +++ b/lib/actions/google/drive/sheets/google_sheets.js @@ -199,7 +199,7 @@ class GoogleSheetsAction extends google_drive_1.GoogleDriveAction { // The ignore is here because Typescript is not correctly inferring that I have done existence checks const sheetId = sheets.data.sheets[0].properties.sheetId; const columns = sheets.data.sheets[0].properties.gridProperties.columnCount; - let currentMaxRows = sheets.data.sheets[0].properties.gridProperties.rowCount; + let currentMaxRows = INITIAL_RESIZE; const maxPossibleRows = Math.floor(SHEETS_MAX_CELL_LIMIT / columns); const requestBody = { requests: [] }; let rowCount = 0; @@ -219,52 +219,55 @@ class GoogleSheetsAction extends google_drive_1.GoogleDriveAction { // Set the sheet's rows to max rows possible winston.info(`Setting sheet rows to ${INITIAL_RESIZE}`, request.webhookId); await this.retriableResize(INITIAL_RESIZE, sheet, spreadsheetId, sheetId, 0, request.webhookId); - csvparser.on("data", (line) => { - if (rowCount > maxPossibleRows) { - reject(`Cannot send more than ${maxPossibleRows} without exceeding limit of 5 million cells in Google Sheets`); - } - const rowIndex = rowCount++; - if (rowIndex >= currentMaxRows - 1) { - csvparser.pause(); - currentMaxRows = Math.min(rowCount + MAX_ROW_BUFFER_INCREASE, maxPossibleRows); - winston.info(`Pausing stream and resizing to ${currentMaxRows} rows`, { webhookId: request.webhookId }); - this.retriableResize(currentMaxRows, sheet, spreadsheetId, sheetId, 0, request.webhookId).then(() => { - winston.info("Resuming stream", { webhookId: request.webhookId }); - csvparser.resume(); - }).catch((e) => { - throw e; - }); - } - // Sanitize line data and properly encapsulate string formatting for CSV lines - const lineData = line.map((record) => { - record = record.replace(/\"/g, "\"\""); - return `"${record}"`; - }).join(","); - // @ts-ignore - requestBody.requests.push({ - pasteData: { - coordinate: { - sheetId, - columnIndex: 0, - rowIndex, + let line; + csvparser.on("readable", async () => { + line = csvparser.read(); + while (line !== null) { + if (rowCount > maxPossibleRows) { + reject(`Cannot send more than ${maxPossibleRows} without exceeding limit of 5 million cells in Google Sheets`); + } + const rowIndex = rowCount++; + if (rowIndex >= currentMaxRows - 1) { + currentMaxRows = Math.min(rowCount + MAX_ROW_BUFFER_INCREASE, maxPossibleRows); + winston.info(`Pausing stream and resizing to ${currentMaxRows} rows`, { webhookId: request.webhookId }); + await this.retriableResize(currentMaxRows, sheet, spreadsheetId, sheetId, 0, request.webhookId).then(() => { + winston.info("Resuming stream", { webhookId: request.webhookId }); + }).catch((e) => { + throw e; + }); + } + // Sanitize line data and properly encapsulate string formatting for CSV lines + const lineData = line.map((record) => { + record = record.replace(/\"/g, "\"\""); + return `"${record}"`; + }).join(","); + // @ts-ignore + requestBody.requests.push({ + pasteData: { + coordinate: { + sheetId, + columnIndex: 0, + rowIndex, + }, + data: lineData, + delimiter: ",", + type: "PASTE_NORMAL", }, - data: lineData, - delimiter: ",", - type: "PASTE_NORMAL", - }, - }); - // @ts-ignore - if (requestBody.requests.length > MAX_REQUEST_BATCH) { - const requestCopy = {}; - // Make sure to do a deep copy of the request - Object.assign(requestCopy, requestBody); - requestBody.requests = []; - promiseArray.push(this.flush(requestCopy, sheet, spreadsheetId, request.webhookId) - .catch((e) => { - this.sanitizeGaxiosError(e); - winston.debug(e, { webhookId: request.webhookId }); - throw e; - })); + }); + // @ts-ignore + if (requestBody.requests.length > MAX_REQUEST_BATCH) { + const requestCopy = {}; + // Make sure to do a deep copy of the request + Object.assign(requestCopy, requestBody); + requestBody.requests = []; + promiseArray.push(this.flush(requestCopy, sheet, spreadsheetId, request.webhookId) + .catch((e) => { + this.sanitizeGaxiosError(e); + winston.debug(e, { webhookId: request.webhookId }); + throw e; + })); + } + line = csvparser.read(); } }).on("end", () => { finished = true; diff --git a/src/actions/google/drive/sheets/google_sheets.ts b/src/actions/google/drive/sheets/google_sheets.ts index 43d2118ae..f0a2d2bf5 100644 --- a/src/actions/google/drive/sheets/google_sheets.ts +++ b/src/actions/google/drive/sheets/google_sheets.ts @@ -13,6 +13,7 @@ import Drive = drive_v3.Drive import Sheet = sheets_v4.Sheets const MAX_REQUEST_BATCH = process.env.GOOGLE_SHEETS_WRITE_BATCH ? Number(process.env.GOOGLE_SHEETS_WRITE_BATCH) : 4096 + const SHEETS_MAX_CELL_LIMIT = 5000000 const MAX_ROW_BUFFER_INCREASE = 6000 const INITIAL_RESIZE = 6000 @@ -222,7 +223,7 @@ export class GoogleSheetsAction extends GoogleDriveAction { // The ignore is here because Typescript is not correctly inferring that I have done existence checks const sheetId = sheets.data.sheets[0].properties.sheetId as number const columns = sheets.data.sheets[0].properties.gridProperties.columnCount as number - let currentMaxRows = sheets.data.sheets[0].properties.gridProperties.rowCount as number + let currentMaxRows = INITIAL_RESIZE const maxPossibleRows = Math.floor(SHEETS_MAX_CELL_LIMIT / columns) const requestBody: sheets_v4.Schema$BatchUpdateSpreadsheetRequest = {requests: []} let rowCount = 0 @@ -252,63 +253,66 @@ export class GoogleSheetsAction extends GoogleDriveAction { 0, request.webhookId!, ) + let line: any - csvparser.on("data", (line: any) => { - if (rowCount > maxPossibleRows) { - reject(`Cannot send more than ${maxPossibleRows} without exceeding limit of 5 million cells in Google Sheets`) - } - const rowIndex: number = rowCount++ - if (rowIndex >= currentMaxRows - 1) { - csvparser.pause() - currentMaxRows = Math.min(rowCount + MAX_ROW_BUFFER_INCREASE, maxPossibleRows) - winston.info(`Pausing stream and resizing to ${currentMaxRows} rows`, - {webhookId: request.webhookId}) - this.retriableResize( - currentMaxRows, - sheet, - spreadsheetId, - sheetId, - 0, - request.webhookId!, - ).then(() => { - winston.info("Resuming stream", {webhookId: request.webhookId}) - csvparser.resume() - }).catch((e: any) => { - throw e - }) - } - // Sanitize line data and properly encapsulate string formatting for CSV lines - const lineData = line.map((record: string) => { - record = record.replace(/\"/g, "\"\"") - return `"${record}"` - }).join(",") as string - // @ts-ignore - requestBody.requests.push({ - pasteData: { - coordinate: { + csvparser.on("readable", async () => { + line = csvparser.read() + while (line !== null) { + if (rowCount > maxPossibleRows) { + reject(`Cannot send more than ${maxPossibleRows} without exceeding limit of 5 million cells in Google Sheets`) + } + const rowIndex: number = rowCount++ + if (rowIndex >= currentMaxRows - 1) { + currentMaxRows = Math.min(rowCount + MAX_ROW_BUFFER_INCREASE, maxPossibleRows) + winston.info(`Pausing stream and resizing to ${currentMaxRows} rows`, + {webhookId: request.webhookId}) + await this.retriableResize( + currentMaxRows, + sheet, + spreadsheetId, sheetId, - columnIndex: 0, - rowIndex, + 0, + request.webhookId!, + ).then(() => { + winston.info("Resuming stream", {webhookId: request.webhookId}) + }).catch((e: any) => { + throw e + }) + } + // Sanitize line data and properly encapsulate string formatting for CSV lines + const lineData = line.map((record: string) => { + record = record.replace(/\"/g, "\"\"") + return `"${record}"` + }).join(",") as string + // @ts-ignore + requestBody.requests.push({ + pasteData: { + coordinate: { + sheetId, + columnIndex: 0, + rowIndex, + }, + data: lineData, + delimiter: ",", + type: "PASTE_NORMAL", }, - data: lineData, - delimiter: ",", - type: "PASTE_NORMAL", - }, - }) - // @ts-ignore - if (requestBody.requests.length > MAX_REQUEST_BATCH) { - const requestCopy: sheets_v4.Schema$BatchUpdateSpreadsheetRequest = {} - // Make sure to do a deep copy of the request - Object.assign(requestCopy, requestBody) - requestBody.requests = [] - promiseArray.push( - this.flush(requestCopy, sheet, spreadsheetId, request.webhookId!) - .catch((e: any) => { - this.sanitizeGaxiosError(e) - winston.debug(e, {webhookId: request.webhookId}) - throw e - }), - ) + }) + // @ts-ignore + if (requestBody.requests.length > MAX_REQUEST_BATCH) { + const requestCopy: sheets_v4.Schema$BatchUpdateSpreadsheetRequest = {} + // Make sure to do a deep copy of the request + Object.assign(requestCopy, requestBody) + requestBody.requests = [] + promiseArray.push( + this.flush(requestCopy, sheet, spreadsheetId, request.webhookId!) + .catch((e: any) => { + this.sanitizeGaxiosError(e) + winston.debug(e, {webhookId: request.webhookId}) + throw e + }), + ) + } + line = csvparser.read() } }).on("end", () => { finished = true