θΏ™ζ˜―indexlocζδΎ›ηš„ζœεŠ‘οΌŒδΈθ¦θΎ“ε…₯任何密码
Skip to content

JSON Parser from agent flow blocks #3863

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"node": ">=18"
},
"scripts": {
"test": "jest",
"lint": "cd server && yarn lint && cd ../frontend && yarn lint && cd ../collector && yarn lint",
"setup": "cd server && yarn && cd ../collector && yarn && cd ../frontend && yarn && cd .. && yarn setup:envs && yarn prisma:setup && echo \"Please run yarn dev:server, yarn dev:collector, and yarn dev:frontend in separate terminal tabs.\"",
"setup:envs": "cp -n ./frontend/.env.example ./frontend/.env && cp -n ./server/.env.example ./server/.env.development && cp -n ./collector/.env.example ./collector/.env && cp -n ./docker/.env.example ./docker/.env && echo \"All ENV files copied!\n\"",
Expand All @@ -31,6 +32,7 @@
},
"private": false,
"devDependencies": {
"concurrently": "^9.1.2"
"concurrently": "^9.1.2",
"jest": "^29.7.0"
}
}
1 change: 1 addition & 0 deletions server/.gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.env.production
.env.development
.env.test
storage/assets/*
!storage/assets/anything-llm.png
storage/documents/*
Expand Down
93 changes: 93 additions & 0 deletions server/__tests__/utils/agentFlows/executor.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
const { FlowExecutor } = require("../../../utils/agentFlows/executor");

describe("FlowExecutor: getValueFromPath", () => {
const executor = new FlowExecutor();

it("can handle invalid objects", () => {
expect(executor.getValueFromPath(null, "a.b.c")).toBe("");
expect(executor.getValueFromPath(undefined, "a.b.c")).toBe("");
expect(executor.getValueFromPath(1, "a.b.c")).toBe("");
expect(executor.getValueFromPath("string", "a.b.c")).toBe("");
expect(executor.getValueFromPath(true, "a.b.c")).toBe("");
});

it("can handle invalid paths", () => {
const obj = { a: { b: { c: "answer" } } };
expect(executor.getValueFromPath(obj, -1)).toBe("");
expect(executor.getValueFromPath(obj, undefined)).toBe("");
expect(executor.getValueFromPath(obj, [1, 2, 3])).toBe("");
expect(executor.getValueFromPath(obj, () => { })).toBe("");
});

it("should be able to resolve a value from a dot path at various levels", () => {
let obj = {
a: {
prop: "top-prop",
b: {
c: "answer",
num: 100,
arr: [1, 2, 3],
subarr: [
{ id: 1, name: "answer2" },
{ id: 2, name: "answer3" },
{ id: 3, name: "answer4" },
]
}
}
};
expect(executor.getValueFromPath(obj, "a.prop")).toBe("top-prop");
expect(executor.getValueFromPath(obj, "a.b.c")).toBe("answer");
expect(executor.getValueFromPath(obj, "a.b.num")).toBe(100);
expect(executor.getValueFromPath(obj, "a.b.arr[0]")).toBe(1);
expect(executor.getValueFromPath(obj, "a.b.arr[1]")).toBe(2);
expect(executor.getValueFromPath(obj, "a.b.arr[2]")).toBe(3);
expect(executor.getValueFromPath(obj, "a.b.subarr[0].id")).toBe(1);
expect(executor.getValueFromPath(obj, "a.b.subarr[0].name")).toBe("answer2");
expect(executor.getValueFromPath(obj, "a.b.subarr[1].id")).toBe(2);
expect(executor.getValueFromPath(obj, "a.b.subarr[2].name")).toBe("answer4");
expect(executor.getValueFromPath(obj, "a.b.subarr[2].id")).toBe(3);
});

it("should return empty string if the path is invalid", () => {
const result = executor.getValueFromPath({}, "a.b.c");
expect(result).toBe("");
});

it("should return empty string if the object is invalid", () => {
const result = executor.getValueFromPath(null, "a.b.c");
expect(result).toBe("");
});

it("can return a stringified item if the path target is not an object or array", () => {
const obj = { a: { b: { c: "answer", numbers: [1, 2, 3] } } };
expect(executor.getValueFromPath(obj, "a.b")).toEqual(JSON.stringify(obj.a.b));
expect(executor.getValueFromPath(obj, "a.b.numbers")).toEqual(JSON.stringify(obj.a.b.numbers));
expect(executor.getValueFromPath(obj, "a.b.c")).toBe("answer");
});

it("can return a stringified object if the path target is an array", () => {
const obj = { a: { b: [1, 2, 3] } };
expect(executor.getValueFromPath(obj, "a.b")).toEqual(JSON.stringify(obj.a.b));
expect(executor.getValueFromPath(obj, "a.b[0]")).toBe(1);
expect(executor.getValueFromPath(obj, "a.b[1]")).toBe(2);
expect(executor.getValueFromPath(obj, "a.b[2]")).toBe(3);
});

it("can find a value by string key traversal", () => {
const obj = {
a: {
items: [
{
'my-long-key': [
{ id: 1, name: "answer1" },
{ id: 2, name: "answer2" },
{ id: 3, name: "answer3" },
]
},
],
}
};
expect(executor.getValueFromPath(obj, "a.items[0]['my-long-key'][1].id")).toBe(2);
expect(executor.getValueFromPath(obj, "a.items[0]['my-long-key'][1].name")).toBe("answer2");
});
});
103 changes: 95 additions & 8 deletions server/utils/agentFlows/executor.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const executeCode = require("./executors/code");
const executeLLMInstruction = require("./executors/llm-instruction");
const executeWebScraping = require("./executors/web-scraping");
const { Telemetry } = require("../../models/telemetry");
const { safeJsonParse } = require("../http");

class FlowExecutor {
constructor() {
Expand All @@ -21,19 +22,101 @@ class FlowExecutor {
this.logger = loggerFn || console.info;
}

// Utility to replace variables in config
/**
* Resolves nested values from objects using dot notation and array indices
* Supports paths like "data.items[0].name" or "response.users[2].address.city"
* Returns undefined for invalid paths or errors
* @param {Object|string} obj - The object to resolve the value from
* @param {string} path - The path to the value
* @returns {string} The resolved value
*/
getValueFromPath(obj = {}, path = "") {
if (typeof obj === "string") obj = safeJsonParse(obj, {});

if (
!obj ||
!path ||
typeof obj !== "object" ||
Object.keys(obj).length === 0 ||
typeof path !== "string"
)
return "";

// First split by dots that are not inside brackets
const parts = [];
let currentPart = "";
let inBrackets = false;

for (let i = 0; i < path.length; i++) {
const char = path[i];
if (char === "[") {
inBrackets = true;
if (currentPart) {
parts.push(currentPart);
currentPart = "";
}
currentPart += char;
} else if (char === "]") {
inBrackets = false;
currentPart += char;
parts.push(currentPart);
currentPart = "";
} else if (char === "." && !inBrackets) {
if (currentPart) {
parts.push(currentPart);
currentPart = "";
}
} else {
currentPart += char;
}
}

if (currentPart) parts.push(currentPart);
let current = obj;

for (const part of parts) {
if (current === null || typeof current !== "object") return undefined;

// Handle bracket notation
if (part.startsWith("[") && part.endsWith("]")) {
const key = part.slice(1, -1);
const cleanKey = key.replace(/^['"]|['"]$/g, "");

if (!isNaN(cleanKey)) {
if (!Array.isArray(current)) return undefined;
current = current[parseInt(cleanKey)];
} else {
if (!(cleanKey in current)) return undefined;
current = current[cleanKey];
}
} else {
// Handle dot notation
if (!(part in current)) return undefined;
current = current[part];
}

if (current === undefined || current === null) return undefined;
}

return typeof current === "object" ? JSON.stringify(current) : current;
}

/**
* Replaces variables in the config with their values
* @param {Object} config - The config to replace variables in
* @returns {Object} The config with variables replaced
*/
replaceVariables(config) {
const deepReplace = (obj) => {
if (typeof obj === "string") {
return obj.replace(/\${([^}]+)}/g, (match, varName) => {
return this.variables[varName] !== undefined
? this.variables[varName]
: match;
const value = this.getValueFromPath(this.variables, varName);
return value !== undefined ? value : match;
});
}
if (Array.isArray(obj)) {
return obj.map((item) => deepReplace(item));
}

if (Array.isArray(obj)) return obj.map((item) => deepReplace(item));

if (obj && typeof obj === "object") {
const result = {};
for (const [key, value] of Object.entries(obj)) {
Expand All @@ -47,7 +130,11 @@ class FlowExecutor {
return deepReplace(config);
}

// Main execution method
/**
* Executes a single step of the flow
* @param {Object} step - The step to execute
* @returns {Promise<Object>} The result of the step
*/
async executeStep(step) {
const config = this.replaceVariables(step.config);
let result;
Expand Down
8 changes: 4 additions & 4 deletions server/utils/agentFlows/executors/llm-instruction.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
const AIbitat = require("../../agents/aibitat");

/**
* Execute an LLM instruction flow step
* @param {Object} config Flow step configuration
Expand All @@ -15,8 +13,10 @@ async function executeLLMInstruction(config, context) {
introspect(`Processing data with LLM instruction...`);

if (!variables[inputVariable]) {
logger(`Input variable ${inputVariable} not found`);
throw new Error(`Input variable ${inputVariable} not found`);
logger(`Input variable ${inputVariable} (${inputVariable}) not found`);
throw new Error(
`Input variable ${inputVariable} (${inputVariable}) not found`
);
}

try {
Expand Down
10 changes: 5 additions & 5 deletions server/utils/agentFlows/executors/web-scraping.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
const { CollectorApi } = require("../../collectorApi");
const { TokenManager } = require("../../helpers/tiktoken");
const Provider = require("../../agents/aibitat/providers/ai-provider");
const { summarizeContent } = require("../../agents/aibitat/utils/summarize");

/**
* Execute a web scraping flow step
* @param {Object} config Flow step configuration
* @param {Object} context Execution context with introspect function
* @returns {Promise<string>} Scraped content
*/
async function executeWebScraping(config, context) {
const { CollectorApi } = require("../../collectorApi");
const { TokenManager } = require("../../helpers/tiktoken");
const Provider = require("../../agents/aibitat/providers/ai-provider");
const { summarizeContent } = require("../../agents/aibitat/utils/summarize");

const { url, captureAs = "text", enableSummarization = true } = config;
const { introspect, logger, aibitat } = context;
logger(
Expand Down