diff --git a/frontend/src/models/workspace.js b/frontend/src/models/workspace.js index 8abdcfe3dcd..5db3fff866f 100644 --- a/frontend/src/models/workspace.js +++ b/frontend/src/models/workspace.js @@ -250,6 +250,16 @@ const Workspace = { const data = await response.json(); return { response, data }; }, + parseFile: async function (slug, formData) { + const response = await fetch(`${API_BASE}/workspace/${slug}/parse`, { + method: "POST", + body: formData, + headers: baseHeaders(), + }); + + const data = await response.json(); + return { response, data }; + }, uploadLink: async function (slug, link) { const response = await fetch(`${API_BASE}/workspace/${slug}/upload-link`, { method: "POST", diff --git a/server/.gitignore b/server/.gitignore index 45a0f0371da..7f8b3a5c527 100644 --- a/server/.gitignore +++ b/server/.gitignore @@ -14,6 +14,7 @@ storage/plugins/agent-flows/* storage/plugins/office-extensions/* storage/plugins/anythingllm_mcp_servers.json !storage/documents/DOCUMENTS.md +storage/direct-uploads logs/server.log *.db *.db-journal diff --git a/server/endpoints/workspaces.js b/server/endpoints/workspaces.js index 66605d655dd..b2b70d9fbe5 100644 --- a/server/endpoints/workspaces.js +++ b/server/endpoints/workspaces.js @@ -36,6 +36,7 @@ const truncate = require("truncate"); const { purgeDocument } = require("../utils/files/purgeDocument"); const { getModelTag } = require("./utils"); const { searchWorkspaceAndThreads } = require("../utils/helpers/search"); +const { WorkspaceParsedFiles } = require("../models/workspaceParsedFiles"); function workspaceEndpoints(app) { if (!app) return; @@ -111,6 +112,90 @@ function workspaceEndpoints(app) { } ); + app.post( + "/workspace/:slug/parse", + [ + validatedRequest, + flexUserRoleValid([ROLES.admin, ROLES.manager]), + handleFileUpload, + ], + async function (request, response) { + try { + const { slug = null } = request.params; + const user = await userFromSession(request, response); + const workspace = multiUserMode(response) + ? await Workspace.getWithUser(user, { slug }) + : await Workspace.get({ slug }); + + if (!workspace) { + response.sendStatus(400).end(); + return; + } + + const Collector = new CollectorApi(); + const { originalname } = request.file; + const processingOnline = await Collector.online(); + + if (!processingOnline) { + response.status(500).json({ + success: false, + error: `Document processing API is not online. Document ${originalname} will not be parsed.`, + }); + return; + } + + const { success, reason, documents } = + await Collector.parseDocument(originalname); + if (!success || !documents?.[0]) { + response.status(500).json({ + success: false, + error: reason || "No document returned from collector", + }); + return; + } + + const files = await Promise.all( + documents.map(async (doc) => { + const metadata = { ...doc }; + // Strip out pageContent + delete metadata.pageContent; + const filename = `${originalname}-${doc.id}.json`; + + const { file, error: dbError } = await WorkspaceParsedFiles.create({ + filename, + workspaceId: workspace.id, + userId: user?.id || null, + metadata: JSON.stringify(metadata), + }); + + if (dbError) throw new Error(dbError); + return file; + }) + ); + + Collector.log(`Document ${originalname} parsed successfully.`); + await Telemetry.sendTelemetry("document_parsed"); + await EventLogs.logEvent( + "document_parsed", + { + documentName: originalname, + workspaceId: workspace.id, + }, + user?.id + ); + + response.status(200).json({ + success: true, + error: null, + files, + }); + } catch (e) { + console.error(e.message, e); + response.sendStatus(500).end(); + } + } + ); + app.post( "/workspace/:slug/upload", [ diff --git a/server/models/telemetry.js b/server/models/telemetry.js index 4c2e3576906..0257d030683 100644 --- a/server/models/telemetry.js +++ b/server/models/telemetry.js @@ -11,7 +11,7 @@ const Telemetry = { pubkey: "phc_9qu7QLpV8L84P3vFmEiZxL020t2EqIubP7HHHxrSsqS", stubDevelopmentEvents: true, // [DO NOT TOUCH] Core team only. label: "telemetry_id", - /* + /* Key value pairs of events that should be debounced to prevent spamming the logs. This should be used for events that could be triggered in rapid succession that are not useful to atomically log. The value is the number of seconds to debounce the event @@ -27,6 +27,7 @@ const Telemetry = { documents_embedded_in_workspace: 30, link_uploaded: 30, raw_document_uploaded: 30, + document_parsed: 30, }, id: async function () { diff --git a/server/models/workspaceParsedFiles.js b/server/models/workspaceParsedFiles.js new file mode 100644 index 00000000000..caef4bafb84 --- /dev/null +++ b/server/models/workspaceParsedFiles.js @@ -0,0 +1,67 @@ +const prisma = require("../utils/prisma"); + +const WorkspaceParsedFiles = { + create: async function ({ + filename, + workspaceId, + userId = null, + threadId = null, + metadata = null, + }) { + try { + const file = await prisma.workspace_parsed_files.create({ + data: { + filename, + workspaceId: parseInt(workspaceId), + userId: userId ? parseInt(userId) : null, + threadId: threadId ? parseInt(threadId) : null, + metadata, + }, + }); + + return { file, error: null }; + } catch (error) { + console.error("FAILED TO CREATE PARSED FILE RECORD.", error.message); + return { file: null, error: error.message }; + } + }, + + get: async function (clause = {}) { + try { + const file = await prisma.workspace_parsed_files.findFirst({ + where: clause, + }); + return file; + } catch (error) { + console.error(error.message); + return null; + } + }, + + where: async function (clause = {}, limit = null) { + try { + const files = await prisma.workspace_parsed_files.findMany({ + where: clause, + ...(limit !== null ? { take: limit } : {}), + }); + return files; + } catch (error) { + console.error(error.message); + return []; + } + }, + + delete: async function (clause = {}) { + try { + await prisma.workspace_parsed_files.deleteMany({ + where: clause, + }); + return true; + } catch (error) { + console.error(error.message); + return false; + } + }, +}; + +module.exports = { WorkspaceParsedFiles }; diff --git a/server/prisma/migrations/20250730235629_init/migration.sql b/server/prisma/migrations/20250730235629_init/migration.sql new file mode 100644 index 00000000000..2622825985a --- /dev/null +++ b/server/prisma/migrations/20250730235629_init/migration.sql @@ -0,0 +1,21 @@ +-- CreateTable +CREATE TABLE "workspace_parsed_files" ( + "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + "filename" TEXT NOT NULL, + "workspaceId" INTEGER NOT NULL, + "userId" INTEGER, + "threadId" INTEGER, + "metadata" TEXT, + "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "workspace_parsed_files_workspaceId_fkey" FOREIGN KEY ("workspaceId") REFERENCES "workspaces" ("id") ON DELETE CASCADE ON UPDATE CASCADE, + CONSTRAINT "workspace_parsed_files_userId_fkey" FOREIGN KEY ("userId") REFERENCES "users" ("id") ON DELETE SET NULL ON UPDATE CASCADE +); + +-- CreateIndex +CREATE UNIQUE INDEX "workspace_parsed_files_filename_key" ON "workspace_parsed_files"("filename"); + +-- CreateIndex +CREATE INDEX "workspace_parsed_files_workspaceId_idx" ON "workspace_parsed_files"("workspaceId"); + +-- CreateIndex +CREATE INDEX "workspace_parsed_files_userId_idx" ON "workspace_parsed_files"("userId"); diff --git a/server/prisma/schema.prisma b/server/prisma/schema.prisma index 999cf65dfb2..5a73db252e9 100644 --- a/server/prisma/schema.prisma +++ b/server/prisma/schema.prisma @@ -82,6 +82,7 @@ model users { temporary_auth_tokens temporary_auth_tokens[] system_prompt_variables system_prompt_variables[] prompt_history prompt_history[] + workspace_parsed_files workspace_parsed_files[] } model recovery_codes { @@ -148,6 +149,7 @@ model workspaces { threads workspace_threads[] workspace_agent_invocations workspace_agent_invocations[] prompt_history prompt_history[] + workspace_parsed_files workspace_parsed_files[] } model workspace_threads { @@ -356,3 +358,18 @@ model prompt_history { @@index([workspaceId]) } + +model workspace_parsed_files { + id Int @id @default(autoincrement()) + filename String @unique + workspaceId Int + userId Int? + threadId Int? + metadata String? + createdAt DateTime @default(now()) + workspace workspaces @relation(fields: [workspaceId], references: [id], onDelete: Cascade) + user users? @relation(fields: [userId], references: [id], onDelete: SetNull) + + @@index([workspaceId]) + @@index([userId]) +} diff --git a/server/utils/collectorApi/index.js b/server/utils/collectorApi/index.js index d7953ce2209..c991c299ce8 100644 --- a/server/utils/collectorApi/index.js +++ b/server/utils/collectorApi/index.js @@ -232,6 +232,42 @@ class CollectorApi { return { success: false, content: null }; }); } + + /** + * Parse a document without processing it + * - Will append the options to the request body + * @param {string} filename - The filename of the document to parse + * @returns {Promise} - The response from the collector API + */ + async parseDocument(filename = "") { + if (!filename) return false; + + const data = JSON.stringify({ + filename, + options: this.#attachOptions(), + }); + + return await fetch(`${this.endpoint}/parse`, { + method: "POST", + headers: { + "Content-Type": "application/json", + "X-Integrity": this.comkey.sign(data), + "X-Payload-Signer": this.comkey.encrypt( + new EncryptionManager().xPayload + ), + }, + body: data, + }) + .then((res) => { + if (!res.ok) throw new Error("Response could not be completed"); + return res.json(); + }) + .then((res) => res) + .catch((e) => { + this.log(e.message); + return { success: false, reason: e.message, documents: [] }; + }); + } } module.exports.CollectorApi = CollectorApi;