θΏ™ζ˜―indexlocζδΎ›ηš„ζœεŠ‘οΌŒδΈθ¦θΎ“ε…₯任何密码
Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 152 additions & 3 deletions server/utils/AiProviders/genericOpenAi/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
const {
handleDefaultStreamResponseV2,
formatChatHistory,
writeResponseChunk,
clientAbortedHandler,
} = require("../../helpers/chat/responses");
const { toValidNumber } = require("../../http");

Expand Down Expand Up @@ -142,6 +143,21 @@ class GenericOpenAiLLM {
];
}

/**
* Parses and prepends reasoning from the response and returns the full text response.
* @param {Object} response
* @returns {string}
*/
#parseReasoningFromResponse({ message }) {
let textResponse = message?.content;
if (
!!message?.reasoning_content &&
message.reasoning_content.trim().length > 0
)
textResponse = `<think>${message.reasoning_content}</think>${textResponse}`;
return textResponse;
}

async getChatCompletion(messages = null, { temperature = 0.7 }) {
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.openai.chat.completions
Expand All @@ -163,7 +179,7 @@ class GenericOpenAiLLM {
return null;

return {
textResponse: result.output.choices[0].message.content,
textResponse: this.#parseReasoningFromResponse(result.output.choices[0]),
metrics: {
prompt_tokens: result.output?.usage?.prompt_tokens || 0,
completion_tokens: result.output?.usage?.completion_tokens || 0,
Expand Down Expand Up @@ -191,8 +207,141 @@ class GenericOpenAiLLM {
return measuredStreamRequest;
}

// TODO: This is a copy of the generic handleStream function in responses.js
// to specifically handle the DeepSeek reasoning model `reasoning_content` field.
// When or if ever possible, we should refactor this to be in the generic function.
handleStream(response, stream, responseProps) {
return handleDefaultStreamResponseV2(response, stream, responseProps);
const { uuid = uuidv4(), sources = [] } = responseProps;
let hasUsageMetrics = false;
let usage = {
completion_tokens: 0,
};

return new Promise(async (resolve) => {
let fullText = "";
let reasoningText = "";

// Establish listener to early-abort a streaming response
// in case things go sideways or the user does not like the response.
// We preserve the generated text but continue as if chat was completed
// to preserve previously generated content.
const handleAbort = () => {
stream?.endMeasurement(usage);
clientAbortedHandler(resolve, fullText);
};
response.on("close", handleAbort);

try {
for await (const chunk of stream) {
const message = chunk?.choices?.[0];
const token = message?.delta?.content;
const reasoningToken = message?.delta?.reasoning_content;

if (
chunk.hasOwnProperty("usage") && // exists
!!chunk.usage && // is not null
Object.values(chunk.usage).length > 0 // has values
) {
if (chunk.usage.hasOwnProperty("prompt_tokens")) {
usage.prompt_tokens = Number(chunk.usage.prompt_tokens);
}

if (chunk.usage.hasOwnProperty("completion_tokens")) {
hasUsageMetrics = true; // to stop estimating counter
usage.completion_tokens = Number(chunk.usage.completion_tokens);
}
}

// Reasoning models will always return the reasoning text before the token text.
if (reasoningToken) {
// If the reasoning text is empty (''), we need to initialize it
// and send the first chunk of reasoning text.
if (reasoningText.length === 0) {
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: `<think>${reasoningToken}`,
close: false,
error: false,
});
reasoningText += `<think>${reasoningToken}`;
continue;
} else {
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: reasoningToken,
close: false,
error: false,
});
reasoningText += reasoningToken;
}
}

// If the reasoning text is not empty, but the reasoning token is empty
// and the token text is not empty we need to close the reasoning text and begin sending the token text.
if (!!reasoningText && !reasoningToken && token) {
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: `</think>`,
close: false,
error: false,
});
fullText += `${reasoningText}</think>`;
reasoningText = "";
}

if (token) {
fullText += token;
// If we never saw a usage metric, we can estimate them by number of completion chunks
if (!hasUsageMetrics) usage.completion_tokens++;
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: token,
close: false,
error: false,
});
}

if (
message?.hasOwnProperty("finish_reason") && // Got valid message and it is an object with finish_reason
message.finish_reason !== "" &&
message.finish_reason !== null
) {
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
response.removeListener("close", handleAbort);
stream?.endMeasurement(usage);
resolve(fullText);
break; // Break streaming when a valid finish_reason is first encountered
}
}
} catch (e) {
console.log(`\x1b[43m\x1b[34m[STREAMING ERROR]\x1b[0m ${e.message}`);
writeResponseChunk(response, {
uuid,
type: "abort",
textResponse: null,
sources: [],
close: true,
error: e.message,
});
stream?.endMeasurement(usage);
resolve(fullText);
}
});
}

// Simple wrapper for dynamic embedder & normalize interface for all LLM implementations
Expand Down