From f2ab35357b85679103366f1fcf37929e1ad4dccd Mon Sep 17 00:00:00 2001 From: timothycarambat Date: Thu, 15 May 2025 12:26:08 -0700 Subject: [PATCH 1/3] wip --- server/package.json | 1 + server/utils/files/index.js | 93 ++++++++++++++++++++++++++++++------- 2 files changed, 76 insertions(+), 18 deletions(-) diff --git a/server/package.json b/server/package.json index f226239d898..194915aac7b 100644 --- a/server/package.json +++ b/server/package.json @@ -38,6 +38,7 @@ "@qdrant/js-client-rest": "^1.9.0", "@xenova/transformers": "^2.14.0", "@zilliz/milvus2-sdk-node": "^2.3.5", + "JSONStream": "^1.3.5", "adm-zip": "^0.5.16", "apache-arrow": "19.0.0", "bcrypt": "^5.1.0", diff --git a/server/utils/files/index.js b/server/utils/files/index.js index 4b33fbc0c82..06ce16b1f9a 100644 --- a/server/utils/files/index.js +++ b/server/utils/files/index.js @@ -1,5 +1,6 @@ const fs = require("fs"); const path = require("path"); +const JSONStream = require("JSONStream"); const { v5: uuidv5 } = require("uuid"); const { Document } = require("../../models/documents"); const { DocumentSyncQueue } = require("../../models/documentSyncQueue"); @@ -25,7 +26,10 @@ async function fileData(filePath = null) { } async function viewLocalFiles() { + const start = Date.now(); + if (!fs.existsSync(documentsPath)) fs.mkdirSync(documentsPath); + const filePromises = []; const liveSyncAvailable = await DocumentSyncQueue.enabled(); const directory = { name: "documents", @@ -43,29 +47,24 @@ async function viewLocalFiles() { type: "folder", items: [], }; + const subfiles = fs.readdirSync(folderPath); const filenames = {}; - for (const subfile of subfiles) { - if (path.extname(subfile) !== ".json") continue; - const filePath = path.join(folderPath, subfile); - const rawData = fs.readFileSync(filePath, "utf8"); + for (let i = 0; i < subfiles.length; i++) { + const subfile = subfiles[i]; const cachefilename = `${file}/${subfile}`; - const { pageContent, ...metadata } = JSON.parse(rawData); - subdocs.items.push({ - name: subfile, - type: "file", - ...metadata, - cached: await cachedVectorInformation(cachefilename, true), - canWatch: liveSyncAvailable - ? DocumentSyncQueue.canWatch(metadata) - : false, - // pinnedWorkspaces: [], // This is the list of workspaceIds that have pinned this document - // watched: false, // boolean to indicate if this document is watched in ANY workspace - }); + if (path.extname(subfile) !== ".json") continue; + filePromises.push( + fileToPickerData({ + pathToFile: path.join(folderPath, subfile), + liveSyncAvailable + }) + ); filenames[cachefilename] = subfile; } - + const results = await Promise.all(filePromises); + subdocs.items.push(...results); // Grab the pinned workspaces and watched documents for this folder's documents // at the time of the query so we don't have to re-query the database for each file const pinnedWorkspacesByDocument = @@ -88,6 +87,7 @@ async function viewLocalFiles() { ...directory.items.filter((folder) => folder.name !== "custom-documents"), ].filter((i) => !!i); + console.log(`Time taken to load documents: ${Date.now() - start}ms`); return directory; } @@ -266,7 +266,7 @@ function hasVectorCachedFiles() { fs.readdirSync(vectorCachePath)?.filter((name) => name.endsWith(".json")) .length !== 0 ); - } catch {} + } catch { } return false; } @@ -335,6 +335,63 @@ function purgeEntireVectorCache() { return; } +const FILE_READ_SIZE_THRESHOLD = 30 * (1024 * 1024); // 30MB +async function fileToPickerData({ pathToFile, liveSyncAvailable = false }) { + let metadata = {}; + const filename = path.basename(pathToFile); + const fileStats = fs.statSync(pathToFile); + const cachedStatus = await cachedVectorInformation(pathToFile, true); + const canWatchStatus = liveSyncAvailable + ? DocumentSyncQueue.canWatch(metadata) + : false; + + if (fileStats.size < FILE_READ_SIZE_THRESHOLD) { + const rawData = fs.readFileSync(pathToFile, "utf8"); + metadata = JSON.parse(rawData); + return { + name: filename, + type: "file", + ...metadata, + cached: cachedStatus, + canWatch: canWatchStatus, + // pinnedWorkspaces: [], // This is the list of workspaceIds that have pinned this document + // watched: false, // boolean to indicate if this document is watched in ANY workspace + } + } + + console.log( + `Stream-parsing ${path.basename(pathToFile)} because it exceeds the ${FILE_READ_SIZE_THRESHOLD} byte limit.` + ); + + const stream = fs.createReadStream(pathToFile, { encoding: "utf8" }); + const parser = JSONStream.parse("$*"); + try { + metadata = await new Promise((resolve, reject) => { + let result = {}; + parser.on("data", (data) => { + if (data.key === "pageContent") return; + result[data.key] = data.value; + }); + parser.on("end", () => resolve(result)); + parser.on("error", reject); + stream.pipe(parser); + }); + } finally { + stream.destroy(); + parser.destroy(); + } + + console.log({ metadata }); + + return { + name: filename, + type: "file", + ...metadata, + cached: cachedStatus, + canWatch: canWatchStatus, + }; +} + module.exports = { findDocumentInDocuments, cachedVectorInformation, From 476180b9afc2a3f1e7dcb0ab353442871d8522a4 Mon Sep 17 00:00:00 2001 From: timothycarambat Date: Thu, 15 May 2025 12:53:02 -0700 Subject: [PATCH 2/3] implment conditional streaming --- server/package.json | 1 - server/utils/files/index.js | 75 ++++++++++++++++++++++++++++--------- 2 files changed, 57 insertions(+), 19 deletions(-) diff --git a/server/package.json b/server/package.json index 194915aac7b..f226239d898 100644 --- a/server/package.json +++ b/server/package.json @@ -38,7 +38,6 @@ "@qdrant/js-client-rest": "^1.9.0", "@xenova/transformers": "^2.14.0", "@zilliz/milvus2-sdk-node": "^2.3.5", - "JSONStream": "^1.3.5", "adm-zip": "^0.5.16", "apache-arrow": "19.0.0", "bcrypt": "^5.1.0", diff --git a/server/utils/files/index.js b/server/utils/files/index.js index 06ce16b1f9a..6c531163ada 100644 --- a/server/utils/files/index.js +++ b/server/utils/files/index.js @@ -1,6 +1,5 @@ const fs = require("fs"); const path = require("path"); -const JSONStream = require("JSONStream"); const { v5: uuidv5 } = require("uuid"); const { Document } = require("../../models/documents"); const { DocumentSyncQueue } = require("../../models/documentSyncQueue"); @@ -58,13 +57,16 @@ async function viewLocalFiles() { filePromises.push( fileToPickerData({ pathToFile: path.join(folderPath, subfile), - liveSyncAvailable + liveSyncAvailable, }) ); filenames[cachefilename] = subfile; } - const results = await Promise.all(filePromises); + const results = await Promise.all(filePromises).then((results) => + results.filter((i) => !!i) + ); // Filter out any null results subdocs.items.push(...results); + // Grab the pinned workspaces and watched documents for this folder's documents // at the time of the query so we don't have to re-query the database for each file const pinnedWorkspacesByDocument = @@ -335,7 +337,23 @@ function purgeEntireVectorCache() { return; } -const FILE_READ_SIZE_THRESHOLD = 30 * (1024 * 1024); // 30MB +/** + * File size threshold for files that are too large to be read into memory (MB) + * + * If the file is larger than this, we will stream it and parse it in chunks + * This is to prevent us from using too much memory when parsing large files + * or loading the files in the file picker. + * @TODO - When lazy loading for folders is implemented, we should increase this threshold (512MB) + * since it will always be faster to readSync than to stream the file and parse it in chunks. + */ +const FILE_READ_SIZE_THRESHOLD = 150 * (1024 * 1024); + +/** + * Converts a file to picker data + * @param {string} pathToFile - The path to the file to convert + * @param {boolean} liveSyncAvailable - Whether live sync is available + * @returns {Promise<{name: string, type: string, [string]: any, cached: boolean, canWatch: boolean}>} - The picker data + */ async function fileToPickerData({ pathToFile, liveSyncAvailable = false }) { let metadata = {}; const filename = path.basename(pathToFile); @@ -347,7 +365,15 @@ async function fileToPickerData({ pathToFile, liveSyncAvailable = false }) { if (fileStats.size < FILE_READ_SIZE_THRESHOLD) { const rawData = fs.readFileSync(pathToFile, "utf8"); - metadata = JSON.parse(rawData); + try { + metadata = JSON.parse(rawData); + // Remove the pageContent field from the metadata - it is large and not needed for the picker + delete metadata.pageContent; + } catch (err) { + console.error("Error parsing file", err); + return null; + } + return { name: filename, type: "file", @@ -356,32 +382,45 @@ async function fileToPickerData({ pathToFile, liveSyncAvailable = false }) { canWatch: canWatchStatus, // pinnedWorkspaces: [], // This is the list of workspaceIds that have pinned this document // watched: false, // boolean to indicate if this document is watched in ANY workspace - } + }; } console.log( `Stream-parsing ${path.basename(pathToFile)} because it exceeds the ${FILE_READ_SIZE_THRESHOLD} byte limit.` ); - const stream = fs.createReadStream(pathToFile, { encoding: "utf8" }); - const parser = JSONStream.parse("$*"); try { + let fileContent = ""; metadata = await new Promise((resolve, reject) => { - let result = {}; - parser.on("data", (data) => { - if (data.key === "pageContent") return; - result[data.key] = data.value; - }); - parser.on("end", () => resolve(result)); - parser.on("error", reject); - stream.pipe(parser); + stream + .on("data", (chunk) => { + fileContent += chunk; + }) + .on("end", () => { + metadata = JSON.parse(fileContent); + // Remove the pageContent field from the metadata - it is large and not needed for the picker + delete metadata.pageContent; + resolve(metadata); + }) + .on("error", (err) => { + console.error("Error parsing file", err); + reject(null); + }); + }).catch((err) => { + console.error("Error parsing file", err); }); + } catch (err) { + console.error("Error parsing file", err); + metadata = null; } finally { stream.destroy(); - parser.destroy(); } - console.log({ metadata }); + // If the metadata is empty or something went wrong, return null + if (!metadata || !Object.keys(metadata)?.length) { + console.log(`Stream-parsing failed for ${path.basename(pathToFile)}`); + return null; + } return { name: filename, From f2a8867abc3cf4376391c6d738e1b4fcef7a409b Mon Sep 17 00:00:00 2001 From: timothycarambat Date: Thu, 15 May 2025 13:00:57 -0700 Subject: [PATCH 3/3] no bench --- server/utils/files/index.js | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/server/utils/files/index.js b/server/utils/files/index.js index 6c531163ada..98a44253c3f 100644 --- a/server/utils/files/index.js +++ b/server/utils/files/index.js @@ -25,8 +25,6 @@ async function fileData(filePath = null) { } async function viewLocalFiles() { - const start = Date.now(); - if (!fs.existsSync(documentsPath)) fs.mkdirSync(documentsPath); const filePromises = []; const liveSyncAvailable = await DocumentSyncQueue.enabled(); @@ -89,7 +87,6 @@ async function viewLocalFiles() { ...directory.items.filter((folder) => folder.name !== "custom-documents"), ].filter((i) => !!i); - console.log(`Time taken to load documents: ${Date.now() - start}ms`); return directory; } @@ -268,7 +265,7 @@ function hasVectorCachedFiles() { fs.readdirSync(vectorCachePath)?.filter((name) => name.endsWith(".json")) .length !== 0 ); - } catch { } + } catch {} return false; }