diff --git a/server/utils/AiProviders/openAi/index.js b/server/utils/AiProviders/openAi/index.js index c371a1d47d1..bc159d76f29 100644 --- a/server/utils/AiProviders/openAi/index.js +++ b/server/utils/AiProviders/openAi/index.js @@ -1,7 +1,9 @@ +const { v4: uuidv4 } = require("uuid"); const { NativeEmbedder } = require("../../EmbeddingEngines/native"); const { - handleDefaultStreamResponseV2, formatChatHistory, + writeResponseChunk, + clientAbortedHandler, } = require("../../helpers/chat/responses"); const { MODEL_MAP } = require("../modelMap"); const { @@ -34,14 +36,6 @@ class OpenAiLLM { console.log(`\x1b[36m[${this.constructor.name}]\x1b[0m ${text}`, ...args); } - /** - * Check if the model is an o1 model. - * @returns {boolean} - */ - get isOTypeModel() { - return this.model.startsWith("o"); - } - #appendContext(contextTexts = []) { if (!contextTexts || !contextTexts.length) return ""; return ( @@ -55,8 +49,6 @@ class OpenAiLLM { } streamingEnabled() { - // o3-mini is the only o-type model that supports streaming - if (this.isOTypeModel && this.model !== "o3-mini") return false; return "streamGetChatCompletion" in this; } @@ -96,14 +88,11 @@ class OpenAiLLM { return userPrompt; } - const content = [{ type: "text", text: userPrompt }]; + const content = [{ type: "input_text", text: userPrompt }]; for (let attachment of attachments) { content.push({ - type: "image_url", - image_url: { - url: attachment.contentString, - detail: "auto", - }, + type: "input_image", + image_url: attachment.contentString, }); } return content.flat(); @@ -121,11 +110,8 @@ class OpenAiLLM { userPrompt = "", attachments = [], // This is the specific attachment for only this prompt }) { - // o1 Models do not support the "system" role - // in order to combat this, we can use the "user" role as a replacement for now - // https://community.openai.com/t/o1-models-do-not-support-system-role-in-chat-completion/953880 const prompt = { - role: this.isOTypeModel ? "user" : "system", + role: "system", content: `${systemPrompt}${this.#appendContext(contextTexts)}`, }; return [ @@ -138,6 +124,24 @@ class OpenAiLLM { ]; } + /** + * Determine the appropriate temperature for the model. + * @param {string} modelName + * @param {number} temperature + * @returns {number} + */ + #temperature(modelName, temperature) { + // For models that don't support temperature + // OpenAI accepts temperature 1 + const NO_TEMP_MODELS = ["o", "gpt-5"]; + + if (NO_TEMP_MODELS.some((prefix) => modelName.startsWith(prefix))) { + return 1; + } + + return temperature; + } + async getChatCompletion(messages = null, { temperature = 0.7 }) { if (!(await this.isValidChatCompletionModel(this.model))) throw new Error( @@ -145,30 +149,30 @@ class OpenAiLLM { ); const result = await LLMPerformanceMonitor.measureAsyncFunction( - this.openai.chat.completions + this.openai.responses .create({ model: this.model, - messages, - temperature: this.isOTypeModel ? 1 : temperature, // o1 models only accept temperature 1 + input: messages, + store: false, + temperature: this.#temperature(this.model, temperature), }) .catch((e) => { throw new Error(e.message); }) ); - if ( - !result.output.hasOwnProperty("choices") || - result.output.choices.length === 0 - ) - return null; + if (!result.output.hasOwnProperty("output_text")) return null; + const usage = result.output.usage || {}; return { - textResponse: result.output.choices[0].message.content, + textResponse: result.output.output_text, metrics: { - prompt_tokens: result.output.usage.prompt_tokens || 0, - completion_tokens: result.output.usage.completion_tokens || 0, - total_tokens: result.output.usage.total_tokens || 0, - outputTps: result.output.usage.completion_tokens / result.duration, + prompt_tokens: usage.prompt_tokens || 0, + completion_tokens: usage.completion_tokens || 0, + total_tokens: usage.total_tokens || 0, + outputTps: usage.completion_tokens + ? usage.completion_tokens / result.duration + : 0, duration: result.duration, }, }; @@ -181,23 +185,88 @@ class OpenAiLLM { ); const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( - this.openai.chat.completions.create({ + this.openai.responses.create({ model: this.model, stream: true, - messages, - temperature: this.isOTypeModel ? 1 : temperature, // o1 models only accept temperature 1 + input: messages, + store: false, + temperature: this.#temperature(this.model, temperature), }), - messages - // runPromptTokenCalculation: true - We manually count the tokens because OpenAI does not provide them in the stream - // since we are not using the OpenAI API version that supports this `stream_options` param. - // TODO: implement this once we upgrade to the OpenAI API version that supports this param. + messages, + false ); return measuredStreamRequest; } handleStream(response, stream, responseProps) { - return handleDefaultStreamResponseV2(response, stream, responseProps); + const { uuid = uuidv4(), sources = [] } = responseProps; + + let hasUsageMetrics = false; + let usage = { + completion_tokens: 0, + }; + + return new Promise(async (resolve) => { + let fullText = ""; + + const handleAbort = () => { + stream?.endMeasurement(usage); + clientAbortedHandler(resolve, fullText); + }; + response.on("close", handleAbort); + + try { + for await (const chunk of stream) { + if (chunk.type === "response.output_text.delta") { + const token = chunk.delta; + if (token) { + fullText += token; + if (!hasUsageMetrics) usage.completion_tokens++; + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: token, + close: false, + error: false, + }); + } + } else if (chunk.type === "response.completed") { + const { response: res } = chunk; + if (res.hasOwnProperty("usage") && !!res.usage) { + hasUsageMetrics = true; + usage = { ...usage, ...res.usage }; + } + + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: "", + close: true, + error: false, + }); + response.removeListener("close", handleAbort); + stream?.endMeasurement(usage); + resolve(fullText); + break; + } + } + } catch (e) { + console.log(`\x1b[43m\x1b[34m[STREAMING ERROR]\x1b[0m ${e.message}`); + writeResponseChunk(response, { + uuid, + type: "abort", + textResponse: null, + sources: [], + close: true, + error: e.message, + }); + stream?.endMeasurement(usage); + resolve(fullText); + } + }); } // Simple wrapper for dynamic embedder & normalize interface for all LLM implementations