From 12b5e03a7839ebcb24aebbe58eefa2abdfbc0487 Mon Sep 17 00:00:00 2001 From: Tristan Stahnke Date: Thu, 24 Apr 2025 12:59:08 -0400 Subject: [PATCH 1/4] Fixed two primary issues discovered while using AWS Bedrock with Anthropic Claude Sonnet models: - Context Window defaults to 8192 maximum, which isn't correct - Multimodal stopped working when removing langchain, which was transparently handling image_url to a format sonnet expects. --- server/utils/AiProviders/bedrock/index.js | 793 +++++++++++++++------- 1 file changed, 532 insertions(+), 261 deletions(-) diff --git a/server/utils/AiProviders/bedrock/index.js b/server/utils/AiProviders/bedrock/index.js index c5a9b8dbb68..dd94cd85c73 100644 --- a/server/utils/AiProviders/bedrock/index.js +++ b/server/utils/AiProviders/bedrock/index.js @@ -1,60 +1,141 @@ +/** + * AWS Bedrock Language Model Connector using the Converse API. + * Supports text and multi-modal (text + image) interactions. + * Handles distinct context window limits and max output token limits. + */ + +// Core AWS SDK imports for Bedrock Converse API const { BedrockRuntimeClient, ConverseCommand, ConverseStreamCommand, } = require("@aws-sdk/client-bedrock-runtime"); + +// Helper imports from the application const { writeResponseChunk, clientAbortedHandler, -} = require("../../helpers/chat/responses"); -const { NativeEmbedder } = require("../../EmbeddingEngines/native"); +} = require("../../helpers/chat/responses"); // For streaming responses +const { NativeEmbedder } = require("../../EmbeddingEngines/native"); // Default embedder const { LLMPerformanceMonitor, -} = require("../../helpers/chat/LLMPerformanceMonitor"); -const { v4: uuidv4 } = require("uuid"); +} = require("../../helpers/chat/LLMPerformanceMonitor"); // For tracking API call performance +const { v4: uuidv4 } = require("uuid"); // For generating unique IDs +const { MODEL_MAP } = require("../modelMap"); // Although imported, not currently used for Bedrock limits + +// --- Constants --- + +/** @const {string[]} Supported image formats by Bedrock Converse API */ +const SUPPORTED_BEDROCK_IMAGE_FORMATS = ["jpeg", "png", "gif", "webp"]; +/** @const {number} Default maximum tokens to generate in the response. Models often have lower output limits than their total context windows. */ +const DEFAULT_MAX_OUTPUT_TOKENS = 4096; +/** @const {number} Default total context window size if not specified in environment variables. */ +const DEFAULT_CONTEXT_WINDOW_TOKENS = 8191; + +// --- Helper Functions --- + +/** + * Parses a MIME type string (e.g., "image/jpeg") to extract and validate the image format + * supported by Bedrock Converse. Handles 'image/jpg' as 'jpeg'. + * @param {string | null | undefined} mimeType - The MIME type string. + * @returns {string | null} The validated image format (e.g., "jpeg") or null if invalid/unsupported. + */ +function getImageFormatFromMime(mimeType) { + if (!mimeType || typeof mimeType !== 'string') { + console.warn(`[AWSBedrock] Invalid or missing MIME type provided for attachment.`); + return null; + } + const parts = mimeType.toLowerCase().split('/'); + if (parts.length !== 2 || parts[0] !== 'image') { + console.warn(`[AWSBedrock] Invalid MIME type format: "${mimeType}". Expected "image/...".`); + return null; + } + + let format = parts[1]; + if (format === 'jpg') format = 'jpeg'; // Normalize jpg to jpeg + + if (!SUPPORTED_BEDROCK_IMAGE_FORMATS.includes(format)) { + console.warn(`[AWSBedrock] Unsupported image format: "${format}" from MIME type "${mimeType}". Supported formats: ${SUPPORTED_BEDROCK_IMAGE_FORMATS.join(', ')}.`); + return null; + } + return format; +} + +/** + * Decodes a pure base64 string (without data URI prefix) into a Uint8Array using the atob method. + * This approach matches the technique previously used by Langchain's implementation. + * @param {string} base64String - The pure base64 encoded data. + * @returns {Uint8Array | null} The resulting byte array or null on decoding error. + */ +function base64ToUint8Array(base64String) { + try { + const binaryString = atob(base64String); // Decode base64 to binary string + const len = binaryString.length; + const bytes = new Uint8Array(len); + for (let i = 0; i < len; i++) { + bytes[i] = binaryString.charCodeAt(i); // Convert char code to byte value + } + return bytes; + } catch (e) { + console.error(`[AWSBedrock] Error decoding base64 string with atob: ${e.message}`); + if (e.name === 'InvalidCharacterError') { + console.error("[AWSBedrock] Base64 decoding failed. Ensure input string is valid base64 and does not contain a data URI prefix."); + } + return null; + } +} + + +// --- AWSBedrockLLM Class --- class AWSBedrockLLM { /** - * These models do not support system prompts - * It is not explicitly stated but it is observed that they do not use the system prompt - * in their responses and will crash when a system prompt is provided. - * We can add more models to this list as we discover them or new models are added. - * We may want to extend this list or make a user-config if using custom bedrock models. + * List of Bedrock models observed to not support system prompts when using the Converse API. + * @type {string[]} */ noSystemPromptModels = [ "amazon.titan-text-express-v1", "amazon.titan-text-lite-v1", "cohere.command-text-v14", "cohere.command-light-text-v14", - "us.deepseek.r1-v1:0", + // Add other models here if identified ]; + /** + * Initializes the AWS Bedrock LLM connector. + * @param {object | null} [embedder=null] - An optional embedder instance. Defaults to NativeEmbedder. + * @param {string | null} [modelPreference=null] - Optional model ID override. Defaults to environment variable. + * @throws {Error} If required environment variables are missing or invalid. + */ constructor(embedder = null, modelPreference = null) { - if (!process.env.AWS_BEDROCK_LLM_ACCESS_KEY_ID) - throw new Error("No AWS Bedrock LLM profile id was set."); - - if (!process.env.AWS_BEDROCK_LLM_ACCESS_KEY) - throw new Error("No AWS Bedrock LLM access key was set."); - - if (!process.env.AWS_BEDROCK_LLM_REGION) - throw new Error("No AWS Bedrock LLM region was set."); - - if ( - process.env.AWS_BEDROCK_LLM_CONNECTION_METHOD === "sessionToken" && - !process.env.AWS_BEDROCK_LLM_SESSION_TOKEN - ) - throw new Error( - "No AWS Bedrock LLM session token was set while using session token as the authentication method." - ); + // --- Environment Variable Validation --- + const requiredEnvVars = [ + 'AWS_BEDROCK_LLM_ACCESS_KEY_ID', + 'AWS_BEDROCK_LLM_ACCESS_KEY', + 'AWS_BEDROCK_LLM_REGION', + 'AWS_BEDROCK_LLM_MODEL_PREFERENCE' // Model preference is effectively required + ]; + for (const envVar of requiredEnvVars) { + if (!process.env[envVar]) { + throw new Error(`Required environment variable ${envVar} is not set.`); + } + } + if (process.env.AWS_BEDROCK_LLM_CONNECTION_METHOD === "sessionToken" && !process.env.AWS_BEDROCK_LLM_SESSION_TOKEN) { + throw new Error("AWS_BEDROCK_LLM_SESSION_TOKEN is not set for sessionToken authentication method."); + } - this.model = - modelPreference || process.env.AWS_BEDROCK_LLM_MODEL_PREFERENCE; + // --- Model and Limits Setup --- + this.model = modelPreference || process.env.AWS_BEDROCK_LLM_MODEL_PREFERENCE; + // Get the total context window limit (used for input management) + const contextWindowLimit = this.promptWindowLimit(); + // Define approximate limits for different parts of the prompt based on the context window this.limits = { - history: this.promptWindowLimit() * 0.15, - system: this.promptWindowLimit() * 0.15, - user: this.promptWindowLimit() * 0.7, + history: Math.floor(contextWindowLimit * 0.15), + system: Math.floor(contextWindowLimit * 0.15), + user: Math.floor(contextWindowLimit * 0.70), // Allow user prompt + context to take the bulk }; + // --- AWS SDK Client Configuration --- this.bedrockClient = new BedrockRuntimeClient({ region: process.env.AWS_BEDROCK_LLM_REGION, credentials: { @@ -66,159 +147,280 @@ class AWSBedrockLLM { }, }); + // --- Other Initializations --- this.embedder = embedder ?? new NativeEmbedder(); - this.defaultTemp = 0.7; - this.#log( - `Loaded with model: ${this.model}. Will communicate with AWS Bedrock using ${this.authMethod} authentication.` - ); + this.defaultTemp = 0.7; // Default sampling temperature + + this.#log(`Initialized with model: ${this.model}. Auth: ${this.authMethod}. Context Window: ${contextWindowLimit}.`); } /** - * Get the authentication method for the AWS Bedrock LLM. - * There are only two valid values for this setting - anything else will default to "iam". - * @returns {"iam"|"sessionToken"} + * Gets the configured AWS authentication method ('iam' or 'sessionToken'). + * Defaults to 'iam' if the environment variable is invalid. + * @returns {"iam" | "sessionToken"} The authentication method. */ get authMethod() { const method = process.env.AWS_BEDROCK_LLM_CONNECTION_METHOD || "iam"; - if (!["iam", "sessionToken"].includes(method)) return "iam"; - return method; + return ["iam", "sessionToken"].includes(method) ? method : "iam"; } + /** + * Appends context texts to a string with standard formatting. + * @param {string[]} [contextTexts=[]] - An array of context text snippets. + * @returns {string} Formatted context string or empty string if no context provided. + * @private + */ #appendContext(contextTexts = []) { - if (!contextTexts || !contextTexts.length) return ""; + if (!Array.isArray(contextTexts) || contextTexts.length === 0) return ""; return ( "\nContext:\n" + contextTexts - .map((text, i) => { - return `[CONTEXT ${i}]:\n${text}\n[END CONTEXT ${i}]\n\n`; - }) + .map((text, i) => `[CONTEXT ${i}]:\n${text}\n[END CONTEXT ${i}]\n\n`) .join("") ); } - #log(text, ...args) { + /** + * Internal logging helper with provider prefix. + * @param {string} text - The log message. + * @param {...any} args - Additional arguments to log. + * @private + */ + #log(text, ...args) { console.log(`\x1b[32m[AWSBedrock]\x1b[0m ${text}`, ...args); } + /** + * Indicates if the provider supports streaming responses. + * @returns {boolean} True. + */ streamingEnabled() { - return "streamGetChatCompletion" in this; + return typeof this.streamGetChatCompletion === "function"; } + /** + * Gets the total prompt window limit (total context window: input + output) from the environment variable. + * This value is used for calculating input limits, NOT for setting the max output tokens in API calls. + * @param {string} [_modelName] - The model name (parameter currently unused, reads directly from env var). + * @returns {number} The total context window token limit. + * @static + * @throws {Error} If the AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT environment variable is invalid. + */ static promptWindowLimit(_modelName) { - const limit = process.env.AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT || 8191; - if (!limit || isNaN(Number(limit))) - throw new Error("No valid token context limit was set."); - return Number(limit); + const limitSourceValue = process.env.AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT; + // Log the value being read (can be commented out in production) + // console.log(`[AWSBedrock DEBUG] Reading AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT for prompt window. Value found: ${limitSourceValue} (Type: ${typeof limitSourceValue})`); + const limit = limitSourceValue || DEFAULT_CONTEXT_WINDOW_TOKENS; // Use default if not set + + const numericLimit = Number(limit); + if (isNaN(numericLimit) || numericLimit <= 0) { + console.error(`[AWSBedrock ERROR] Invalid AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT found: "${limitSourceValue}". Must be a positive number.`); + throw new Error(`Invalid AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT set in environment: "${limitSourceValue}"`); + } + // Note: Does not use MODEL_MAP for Bedrock context window. Relies on the specific Bedrock env var. + return numericLimit; } - // Ensure the user set a value for the token limit - // and if undefined - assume 4096 window. + /** + * Gets the total prompt window limit (total context window) for the current model instance. + * @returns {number} The token limit. + */ promptWindowLimit() { - const limit = process.env.AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT || 8191; - if (!limit || isNaN(Number(limit))) - throw new Error("No valid token context limit was set."); - return Number(limit); + // Delegates to the static method for consistency. + return AWSBedrockLLM.promptWindowLimit(this.model); } - async isValidChatCompletionModel(_ = "") { + /** + * Gets the maximum number of tokens the model should generate in its response. + * Reads from the AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS environment variable or uses a default. + * This is distinct from the total context window limit. + * @returns {number} The maximum output tokens limit for API calls. + */ + getMaxOutputTokens() { + const outputLimitSource = process.env.AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS; + let outputLimit = DEFAULT_MAX_OUTPUT_TOKENS; // Start with the class default + + if (outputLimitSource) { + const numericOutputLimit = Number(outputLimitSource); + // Validate the environment variable value + if (!isNaN(numericOutputLimit) && numericOutputLimit > 0) { + outputLimit = numericOutputLimit; + } else { + this.#log(`Invalid AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS value "${outputLimitSource}". Using default ${DEFAULT_MAX_OUTPUT_TOKENS}.`); + } + } + return outputLimit; + } + + + /** + * Checks if the configured model is valid for chat completion (basic check for Bedrock). + * @param {string} [_modelName] - Model name (unused). + * @returns {Promise} Always true, assuming any configured Bedrock model supports Converse API. + */ + async isValidChatCompletionModel(_modelName = "") { + // Assume valid for Bedrock Converse; more specific checks could be added if needed. return true; } /** - * Generates appropriate content array for a message + attachments. - * TODO: Implement this - attachments are not supported yet for Bedrock. - * @param {{userPrompt:string, attachments: import("../../helpers").Attachment[]}} - * @returns {string|object[]} + * Generates the Bedrock Converse API content array for a message, + * processing text and formatting valid image attachments. + * @param {object} params + * @param {string} [params.userPrompt=""] - The text part of the message. + * @param {Array<{contentString: string, mime: string}>} [params.attachments=[]] - Array of attachments for the message. + * @returns {Array} Array of content blocks (e.g., [{text: "..."}, {image: {...}}]). + * @private */ - #generateContent({ userPrompt, attachments = [] }) { - if (!attachments.length) return [{ text: userPrompt }]; - - // const content = [{ type: "text", text: userPrompt }]; - // for (let attachment of attachments) { - // content.push({ - // type: "image_url", - // image_url: attachment.contentString, - // }); - // } - // return { content: content.flat() }; + #generateContent({ userPrompt = "", attachments = [] }) { + const content = []; + + // Add text block if prompt is not empty + if (userPrompt && typeof userPrompt === 'string' && userPrompt.trim().length > 0) { + content.push({ text: userPrompt }); + } + + // Process valid attachments + if (Array.isArray(attachments)) { + for (const attachment of attachments) { + if (!attachment || typeof attachment.mime !== 'string' || typeof attachment.contentString !== 'string') { + this.#log("Skipping invalid attachment object.", attachment); + continue; + } + + // Strip data URI prefix (e.g., "data:image/png;base64,") + let base64Data = attachment.contentString; + const dataUriPrefixMatch = base64Data.match(/^data:image\/\w+;base64,/); + if (dataUriPrefixMatch) { + base64Data = base64Data.substring(dataUriPrefixMatch[0].length); + } + + const format = getImageFormatFromMime(attachment.mime); + if (format) { + const imageBytes = base64ToUint8Array(base64Data); + if (imageBytes) { + // Add the image block in the required Bedrock format + content.push({ + image: { + format: format, + source: { bytes: imageBytes } + } + }); + } else { + this.#log(`Skipping attachment with mime ${attachment.mime} due to base64 decoding error.`); + } + } else { + this.#log(`Skipping attachment with unsupported/invalid MIME type: ${attachment.mime}`); + } + } + } + + // Ensure content array is never empty (Bedrock requires at least one block) + if (content.length === 0) { + this.#log("Warning: #generateContent resulted in an empty content array. Adding empty text block as fallback."); + content.push({ text: "" }); + } + + return content; } /** - * Construct the user prompt for this model. - * @param {{attachments: import("../../helpers").Attachment[]}} param0 - * @returns + * Constructs the complete message array in the format expected by the Bedrock Converse API. + * @param {object} params + * @param {string} [params.systemPrompt=""] - The system prompt text. + * @param {string[]} [params.contextTexts=[]] - Array of context text snippets. + * @param {Array<{role: 'user' | 'assistant', content: string, attachments?: Array<{contentString: string, mime: string}>}>} [params.chatHistory=[]] - Previous messages. + * @param {string} [params.userPrompt=""] - The latest user prompt text. + * @param {Array<{contentString: string, mime: string}>} [params.attachments=[]] - Attachments for the latest user prompt. + * @returns {Array} The formatted message array for the API call. */ constructPrompt({ systemPrompt = "", contextTexts = [], chatHistory = [], userPrompt = "", - _attachments = [], + attachments = [], }) { - let prompt = [ - { - role: "system", - content: [ - { text: `${systemPrompt}${this.#appendContext(contextTexts)}` }, - ], - }, - ]; + const systemMessageContent = `${systemPrompt}${this.#appendContext(contextTexts)}`; + let messages = []; - // If the model does not support system prompts, we need to add a user message and assistant message + // Handle system prompt (either real or simulated) if (this.noSystemPromptModels.includes(this.model)) { - prompt = [ - { - role: "user", - content: [ - { text: `${systemPrompt}${this.#appendContext(contextTexts)}` }, - ], - }, - { - role: "assistant", - content: [{ text: "Okay." }], - }, - ]; + if (systemMessageContent.trim().length > 0) { + this.#log(`Model ${this.model} doesn't support system prompts; simulating.`); + messages.push( + { role: "user", content: this.#generateContent({ userPrompt: systemMessageContent }) }, // No attachments in simulated system prompt + { role: "assistant", content: [{ text: "Okay." }] } + ); + } + } else if (systemMessageContent.trim().length > 0) { + messages.push({ + role: "system", + content: this.#generateContent({ userPrompt: systemMessageContent }) // No attachments in system prompt + }); } - return [ - ...prompt, - ...chatHistory.map((msg) => ({ + // Add chat history + messages = messages.concat( + chatHistory.map((msg) => ({ role: msg.role, content: this.#generateContent({ userPrompt: msg.content, - attachments: msg.attachments, + attachments: Array.isArray(msg.attachments) ? msg.attachments : [], }), - })), - { + })) + ); + + // Add final user prompt + messages.push({ role: "user", content: this.#generateContent({ - userPrompt: userPrompt, - attachments: [], + userPrompt: userPrompt, + attachments: Array.isArray(attachments) ? attachments : [], }), - }, - ]; + }); + + return messages; } /** - * Parses and prepends reasoning from the response and returns the full text response. - * @param {Object} response - * @returns {string} + * Parses reasoning steps from the response and prepends them in tags. + * @param {object} message - The message object from the Bedrock response. + * @returns {string} The text response, potentially with reasoning prepended. + * @private */ - #parseReasoningFromResponse({ content = [] }) { - let textResponse = content[0]?.text; - - if ( - !!content?.[1]?.reasoningContent && - content?.[1]?.reasoningContent?.reasoningText?.text?.trim().length > 0 - ) - textResponse = `${content?.[1]?.reasoningContent?.reasoningText?.text}${textResponse}`; - + #parseReasoningFromResponse({ content = [] }) { + if (!Array.isArray(content)) return ""; + const textBlock = content.find(block => block.text !== undefined); + let textResponse = textBlock?.text || ""; + const reasoningBlock = content.find(block => block.reasoningContent?.reasoningText?.text); + if (reasoningBlock) { + const reasoningText = reasoningBlock.reasoningContent.reasoningText.text.trim(); + if (reasoningText.length > 0) { + textResponse = `${reasoningText}${textResponse}`; + } + } return textResponse; } - async getChatCompletion(messages = null, { temperature = 0.7 }) { + + /** + * Sends a request for chat completion (non-streaming). + * @param {Array | null} messages - Formatted message array from constructPrompt. + * @param {object} options - Request options. + * @param {number} [options.temperature] - Sampling temperature. + * @returns {Promise} Response object with textResponse and metrics, or null. + * @throws {Error} If the API call fails or validation errors occur. + */ + async getChatCompletion(messages = null, { temperature }) { + if (!Array.isArray(messages) || messages.length === 0) { + throw new Error("AWSBedrock::getChatCompletion requires a non-empty messages array."); + } + const hasSystem = messages[0]?.role === "system"; - const [system, ...history] = hasSystem ? messages : [null, ...messages]; + const systemBlock = hasSystem ? messages[0].content : undefined; + const history = hasSystem ? messages.slice(1) : messages; + const maxTokensToSend = this.getMaxOutputTokens(); const result = await LLMPerformanceMonitor.measureAsyncFunction( this.bedrockClient @@ -227,203 +429,272 @@ class AWSBedrockLLM { modelId: this.model, messages: history, inferenceConfig: { - maxTokens: this.promptWindowLimit(), - temperature, + maxTokens: maxTokensToSend, + temperature: temperature ?? this.defaultTemp, }, - system: !!system ? system.content : undefined, + system: systemBlock, }) ) .catch((e) => { - throw new Error( - `AWSBedrock::getChatCompletion failed to communicate with Bedrock client. ${e.message}` - ); + this.#log(`Bedrock Converse API Error (getChatCompletion): ${e.message}`, e); + if (e.name === 'ValidationException' && e.message.includes('maximum tokens')) { + throw new Error(`AWSBedrock::getChatCompletion failed. Model ${this.model} rejected maxTokens value of ${maxTokensToSend}. Check model documentation for its maximum output token limit and set AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS if needed. Original error: ${e.message}`); + } + throw new Error(`AWSBedrock::getChatCompletion failed. ${e.message}`); }), messages, false ); const response = result.output; - if (!response || !response?.output) return null; + if (!response?.output?.message) { + this.#log("Bedrock response missing expected output.message structure.", response); + return null; + } + + const latencyMs = response?.metrics?.latencyMs; + const outputTokens = response?.usage?.outputTokens; + const outputTps = (latencyMs > 0 && outputTokens) ? (outputTokens / (latencyMs / 1000)) : 0; + return { - textResponse: this.#parseReasoningFromResponse(response.output?.message), + textResponse: this.#parseReasoningFromResponse(response.output.message), metrics: { - prompt_tokens: response?.usage?.inputTokens, - completion_tokens: response?.usage?.outputTokens, - total_tokens: response?.usage?.totalTokens, - outputTps: - response?.usage?.outputTokens / (response?.metrics?.latencyMs / 1000), + prompt_tokens: response?.usage?.inputTokens ?? 0, + completion_tokens: outputTokens ?? 0, + total_tokens: response?.usage?.totalTokens ?? 0, + outputTps: outputTps, duration: result.duration, }, }; } - async streamGetChatCompletion(messages = null, { temperature = 0.7 }) { + /** + * Sends a request for streaming chat completion. + * @param {Array | null} messages - Formatted message array from constructPrompt. + * @param {object} options - Request options. + * @param {number} [options.temperature] - Sampling temperature. + * @returns {Promise} The monitored stream object. + * @throws {Error} If the API call setup fails or validation errors occur. + */ + async streamGetChatCompletion(messages = null, { temperature }) { + if (!Array.isArray(messages) || messages.length === 0) { + throw new Error("AWSBedrock::streamGetChatCompletion requires a non-empty messages array."); + } + const hasSystem = messages[0]?.role === "system"; - const [system, ...history] = hasSystem ? messages : [null, ...messages]; - - const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( - this.bedrockClient.send( - new ConverseStreamCommand({ - modelId: this.model, - messages: history, - inferenceConfig: { maxTokens: this.promptWindowLimit(), temperature }, - system: !!system ? system.content : undefined, - }) - ), - messages, - false - ); - return measuredStreamRequest; + const systemBlock = hasSystem ? messages[0].content : undefined; + const history = hasSystem ? messages.slice(1) : messages; + const maxTokensToSend = this.getMaxOutputTokens(); + + try { + // Attempt to initiate the stream + const stream = await this.bedrockClient.send( + new ConverseStreamCommand({ + modelId: this.model, + messages: history, + inferenceConfig: { + maxTokens: maxTokensToSend, + temperature: temperature ?? this.defaultTemp, + }, + system: systemBlock, + }) + ); + + // If successful, wrap the stream with performance monitoring + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + stream, + messages, + false // Indicate it's not a function call measurement + ); + return measuredStreamRequest; + + } catch (e) { + // Catch errors during the initial .send() call (e.g., validation errors) + this.#log(`Bedrock Converse API Error (streamGetChatCompletion setup): ${e.message}`, e); + if (e.name === 'ValidationException' && e.message.includes('maximum tokens')) { + // Provide specific error context for max token issues + throw new Error(`AWSBedrock::streamGetChatCompletion failed during setup. Model ${this.model} rejected maxTokens value of ${maxTokensToSend}. Check model documentation for its maximum output token limit and set AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS if needed. Original error: ${e.message}`); + } + // Re-throw other setup errors + throw new Error(`AWSBedrock::streamGetChatCompletion failed during setup. ${e.message}`); + } } /** - * Handles the stream response from the AWS Bedrock API. - * Bedrock does not support usage metrics in the stream response so we need to estimate them. - * @param {Object} response - the response object - * @param {import('../../helpers/chat/LLMPerformanceMonitor').MonitoredStream} stream - the stream response from the AWS Bedrock API w/tracking - * @param {Object} responseProps - the response properties - * @returns {Promise} + * Handles the stream response from the AWS Bedrock API ConverseStreamCommand. + * Parses chunks, handles reasoning tags, and estimates token usage if not provided. + * @param {object} response - The HTTP response object to write chunks to. + * @param {import('../../helpers/chat/LLMPerformanceMonitor').MonitoredStream} stream - The monitored stream object from streamGetChatCompletion. + * @param {object} responseProps - Additional properties for the response chunks. + * @param {string} [responseProps.uuid] - Unique ID for the response. + * @param {Array} [responseProps.sources=[]] - Source documents used (if any). + * @returns {Promise} A promise that resolves with the complete text response when the stream ends. */ - handleStream(response, stream, responseProps) { + handleStream(response, stream, responseProps) { const { uuid = uuidv4(), sources = [] } = responseProps; let hasUsageMetrics = false; - let usage = { - prompt_tokens: 0, - completion_tokens: 0, - }; + let usage = { prompt_tokens: 0, completion_tokens: 0, total_tokens: 0 }; return new Promise(async (resolve) => { let fullText = ""; let reasoningText = ""; - // Establish listener to early-abort a streaming response - // in case things go sideways or the user does not like the response. - // We preserve the generated text but continue as if chat was completed - // to preserve previously generated content. + // Abort handler for client closing connection const handleAbort = () => { - stream?.endMeasurement(usage); - clientAbortedHandler(resolve, fullText); + this.#log(`Client closed connection for stream ${uuid}. Aborting.`); + stream?.endMeasurement(usage); // Finalize metrics + clientAbortedHandler(resolve, fullText); // Resolve with partial text }; response.on("close", handleAbort); try { + // Process stream chunks for await (const chunk of stream.stream) { - if (chunk === undefined) - throw new Error( - "Stream returned undefined chunk. Aborting reply - check model provider logs." - ); - + if (!chunk) { this.#log("Stream returned null/undefined chunk."); continue; } const action = Object.keys(chunk)[0]; - if (action === "metadata") { - hasUsageMetrics = true; - usage.prompt_tokens = chunk.metadata?.usage?.inputTokens ?? 0; - usage.completion_tokens = chunk.metadata?.usage?.outputTokens ?? 0; - usage.total_tokens = chunk.metadata?.usage?.totalTokens ?? 0; - } - if (action === "contentBlockDelta") { - const token = chunk.contentBlockDelta?.delta?.text; - const reasoningToken = - chunk.contentBlockDelta?.delta?.reasoningContent?.text; - - // Reasoning models will always return the reasoning text before the token text. - if (reasoningToken) { - // If the reasoning text is empty (''), we need to initialize it - // and send the first chunk of reasoning text. - if (reasoningText.length === 0) { - writeResponseChunk(response, { - uuid, - sources: [], - type: "textResponseChunk", - textResponse: `${reasoningToken}`, - close: false, - error: false, - }); - reasoningText += `${reasoningToken}`; - continue; - } else { - writeResponseChunk(response, { - uuid, - sources: [], - type: "textResponseChunk", - textResponse: reasoningToken, - close: false, - error: false, - }); - reasoningText += reasoningToken; + switch (action) { + case "metadata": // Contains usage metrics at the end + if (chunk.metadata?.usage) { + hasUsageMetrics = true; + usage = { // Overwrite with final metrics + prompt_tokens: chunk.metadata.usage.inputTokens ?? 0, + completion_tokens: chunk.metadata.usage.outputTokens ?? 0, + total_tokens: chunk.metadata.usage.totalTokens ?? 0, + }; + } + break; + case "contentBlockDelta": { // Contains text or reasoning deltas + const delta = chunk.contentBlockDelta?.delta; + if (!delta) break; + const token = delta.text; + const reasoningToken = delta.reasoningContent?.text; + + if (reasoningToken) { // Handle reasoning text + if (reasoningText.length === 0) { // Start of reasoning block + const startTag = ""; + writeResponseChunk(response, { uuid, sources, type: "textResponseChunk", textResponse: startTag + reasoningToken, close: false, error: false }); + reasoningText += startTag + reasoningToken; + } else { // Continuation of reasoning block + writeResponseChunk(response, { uuid, sources, type: "textResponseChunk", textResponse: reasoningToken, close: false, error: false }); + reasoningText += reasoningToken; + } + } else if (token) { // Handle regular text + if (reasoningText.length > 0) { // If reasoning was just output, close the tag + const endTag = ""; + writeResponseChunk(response, { uuid, sources, type: "textResponseChunk", textResponse: endTag, close: false, error: false }); + fullText += reasoningText + endTag; // Add completed reasoning to final text + reasoningText = ""; // Reset reasoning buffer + } + fullText += token; // Append regular text + if (!hasUsageMetrics) usage.completion_tokens++; // Estimate usage if no metrics yet + writeResponseChunk(response, { uuid, sources, type: "textResponseChunk", textResponse: token, close: false, error: false }); } + break; } + case "messageStop": // End of message event + if (chunk.messageStop?.usage) { // Check for final metrics here too + hasUsageMetrics = true; + usage = { // Overwrite with final metrics if available + prompt_tokens: chunk.messageStop.usage.inputTokens ?? usage.prompt_tokens, + completion_tokens: chunk.messageStop.usage.outputTokens ?? usage.completion_tokens, + total_tokens: chunk.messageStop.usage.totalTokens ?? usage.total_tokens, + }; + } + // Ensure reasoning tag is closed if message stops mid-reasoning + if (reasoningText.length > 0) { + const endTag = ""; + writeResponseChunk(response, { uuid, sources, type: "textResponseChunk", textResponse: endTag, close: false, error: false }); + fullText += reasoningText + endTag; + reasoningText = ""; + } + break; + // Ignore other event types for now + case "messageStart": case "contentBlockStart": case "contentBlockStop": break; + default: this.#log(`Unhandled stream action: ${action}`, chunk); + } + } // End for await loop - // If the reasoning text is not empty, but the reasoning token is empty - // and the token text is not empty we need to close the reasoning text and begin sending the token text. - if (!!reasoningText && !reasoningToken && token) { - writeResponseChunk(response, { - uuid, - sources: [], - type: "textResponseChunk", - textResponse: ``, - close: false, - error: false, - }); - fullText += `${reasoningText}`; - reasoningText = ""; + // Final cleanup for reasoning tag in case stream ended abruptly + if (reasoningText.length > 0 && !fullText.endsWith("")) { + const endTag = ""; + if (!response.writableEnded) { + writeResponseChunk(response, { uuid, sources, type: "textResponseChunk", textResponse: endTag, close: false, error: false }); } + fullText += reasoningText + endTag; + } - if (token) { - fullText += token; - // If we never saw a usage metric, we can estimate them by number of completion chunks - if (!hasUsageMetrics) usage.completion_tokens++; - writeResponseChunk(response, { - uuid, - sources: [], - type: "textResponseChunk", - textResponse: token, - close: false, - error: false, - }); - } - } + // Send final closing chunk to signal end of stream + if (!response.writableEnded) { + writeResponseChunk(response, { uuid, sources, type: "textResponseChunk", textResponse: "", close: true, error: false }); } - writeResponseChunk(response, { - uuid, - sources, - type: "textResponseChunk", - textResponse: "", - close: true, - error: false, - }); - response.removeListener("close", handleAbort); - stream?.endMeasurement(usage); - resolve(fullText); } catch (error) { - console.log(`\x1b[43m\x1b[34m[STREAMING ERROR]\x1b[0m ${e.message}`); - writeResponseChunk(response, { - uuid, - type: "abort", - textResponse: null, - sources: [], - close: true, - error: `AWSBedrock:streaming - could not stream chat. ${error?.cause ?? error.message}`, - }); - response.removeListener("close", handleAbort); - stream?.endMeasurement(usage); - resolve(fullText); // Return what we currently have - if anything. + // Handle errors during stream processing + this.#log(`\x1b[43m\x1b[34m[STREAMING ERROR]\x1b[0m ${error.message}`, error); + if (response && !response.writableEnded) { + writeResponseChunk(response, { uuid, type: "abort", textResponse: null, sources, close: true, error: `AWSBedrock:streaming - error. ${error?.message ?? "Unknown error"}` }); + } + } finally { + // Cleanup: Always remove listener and finalize measurement + response.removeListener("close", handleAbort); + stream?.endMeasurement(usage); // Log final usage metrics + resolve(fullText); // Resolve with the accumulated text } }); } - // Simple wrapper for dynamic embedder & normalize interface for all LLM implementations + + /** + * Embeds a single text input using the configured embedder. + * @param {string} textInput - The text to embed. + * @returns {Promise} The embedding vector. + * @throws {Error} If the embedder is not configured or fails. + */ async embedTextInput(textInput) { - return await this.embedder.embedTextInput(textInput); + if (!this.embedder?.embedTextInput) { + throw new Error("Embedder is not configured or does not support embedTextInput."); + } + try { + return await this.embedder.embedTextInput(textInput); + } catch(e) { + this.#log(`EmbedTextInput Error: ${e.message}`, e) + throw e; // Re-throw after logging + } } + + /** + * Embeds multiple chunks of text using the configured embedder. + * @param {string[]} [textChunks=[]] - An array of text chunks to embed. + * @returns {Promise} An array of embedding vectors. + * @throws {Error} If the embedder is not configured or fails. + */ async embedChunks(textChunks = []) { - return await this.embedder.embedChunks(textChunks); + if (!this.embedder?.embedChunks) { + throw new Error("Embedder is not configured or does not support embedChunks."); + } + try { + return await this.embedder.embedChunks(textChunks); + } catch(e) { + this.#log(`EmbedChunks Error: ${e.message}`, e) + throw e; // Re-throw after logging + } } + /** + * Compresses chat messages if necessary using the messageArrayCompressor helper. + * @param {object} [promptArgs={}] - Arguments to pass to constructPrompt. + * @param {Array} [rawHistory=[]] - The raw chat history. + * @returns {Promise} The potentially compressed message array and metadata. + */ async compressMessages(promptArgs = {}, rawHistory = []) { const { messageArrayCompressor } = require("../../helpers/chat"); + if (!messageArrayCompressor) { + throw new Error("Message compressor helper not found."); + } + // Construct the message array first const messageArray = this.constructPrompt(promptArgs); + // Then pass it to the compressor return await messageArrayCompressor(this, messageArray, rawHistory); } } From 78a6ce3011d3406150f5a36cf0bc4028be63c87e Mon Sep 17 00:00:00 2001 From: Tristan Stahnke Date: Thu, 24 Apr 2025 14:02:27 -0400 Subject: [PATCH 2/4] Ran `yarn lint` --- server/utils/AiProviders/bedrock/index.js | 561 ++++++++++++++-------- 1 file changed, 359 insertions(+), 202 deletions(-) diff --git a/server/utils/AiProviders/bedrock/index.js b/server/utils/AiProviders/bedrock/index.js index dd94cd85c73..b8c9ce34048 100644 --- a/server/utils/AiProviders/bedrock/index.js +++ b/server/utils/AiProviders/bedrock/index.js @@ -41,22 +41,30 @@ const DEFAULT_CONTEXT_WINDOW_TOKENS = 8191; * @returns {string | null} The validated image format (e.g., "jpeg") or null if invalid/unsupported. */ function getImageFormatFromMime(mimeType) { - if (!mimeType || typeof mimeType !== 'string') { - console.warn(`[AWSBedrock] Invalid or missing MIME type provided for attachment.`); + if (!mimeType || typeof mimeType !== "string") { + console.warn( + `[AWSBedrock] Invalid or missing MIME type provided for attachment.` + ); return null; } - const parts = mimeType.toLowerCase().split('/'); - if (parts.length !== 2 || parts[0] !== 'image') { - console.warn(`[AWSBedrock] Invalid MIME type format: "${mimeType}". Expected "image/...".`); + const parts = mimeType.toLowerCase().split("/"); + if (parts.length !== 2 || parts[0] !== "image") { + console.warn( + `[AWSBedrock] Invalid MIME type format: "${mimeType}". Expected "image/...".` + ); return null; } let format = parts[1]; - if (format === 'jpg') format = 'jpeg'; // Normalize jpg to jpeg + if (format === "jpg") format = "jpeg"; // Normalize jpg to jpeg if (!SUPPORTED_BEDROCK_IMAGE_FORMATS.includes(format)) { - console.warn(`[AWSBedrock] Unsupported image format: "${format}" from MIME type "${mimeType}". Supported formats: ${SUPPORTED_BEDROCK_IMAGE_FORMATS.join(', ')}.`); - return null; + console.warn( + `[AWSBedrock] Unsupported image format: "${format}" from MIME type "${mimeType}". Supported formats: ${SUPPORTED_BEDROCK_IMAGE_FORMATS.join( + ", " + )}.` + ); + return null; } return format; } @@ -77,15 +85,18 @@ function base64ToUint8Array(base64String) { } return bytes; } catch (e) { - console.error(`[AWSBedrock] Error decoding base64 string with atob: ${e.message}`); - if (e.name === 'InvalidCharacterError') { - console.error("[AWSBedrock] Base64 decoding failed. Ensure input string is valid base64 and does not contain a data URI prefix."); + console.error( + `[AWSBedrock] Error decoding base64 string with atob: ${e.message}` + ); + if (e.name === "InvalidCharacterError") { + console.error( + "[AWSBedrock] Base64 decoding failed. Ensure input string is valid base64 and does not contain a data URI prefix." + ); } return null; } } - // --- AWSBedrockLLM Class --- class AWSBedrockLLM { @@ -110,29 +121,35 @@ class AWSBedrockLLM { constructor(embedder = null, modelPreference = null) { // --- Environment Variable Validation --- const requiredEnvVars = [ - 'AWS_BEDROCK_LLM_ACCESS_KEY_ID', - 'AWS_BEDROCK_LLM_ACCESS_KEY', - 'AWS_BEDROCK_LLM_REGION', - 'AWS_BEDROCK_LLM_MODEL_PREFERENCE' // Model preference is effectively required + "AWS_BEDROCK_LLM_ACCESS_KEY_ID", + "AWS_BEDROCK_LLM_ACCESS_KEY", + "AWS_BEDROCK_LLM_REGION", + "AWS_BEDROCK_LLM_MODEL_PREFERENCE", // Model preference is effectively required ]; for (const envVar of requiredEnvVars) { - if (!process.env[envVar]) { - throw new Error(`Required environment variable ${envVar} is not set.`); - } + if (!process.env[envVar]) { + throw new Error(`Required environment variable ${envVar} is not set.`); + } } - if (process.env.AWS_BEDROCK_LLM_CONNECTION_METHOD === "sessionToken" && !process.env.AWS_BEDROCK_LLM_SESSION_TOKEN) { - throw new Error("AWS_BEDROCK_LLM_SESSION_TOKEN is not set for sessionToken authentication method."); + if ( + process.env.AWS_BEDROCK_LLM_CONNECTION_METHOD === "sessionToken" && + !process.env.AWS_BEDROCK_LLM_SESSION_TOKEN + ) { + throw new Error( + "AWS_BEDROCK_LLM_SESSION_TOKEN is not set for sessionToken authentication method." + ); } // --- Model and Limits Setup --- - this.model = modelPreference || process.env.AWS_BEDROCK_LLM_MODEL_PREFERENCE; + this.model = + modelPreference || process.env.AWS_BEDROCK_LLM_MODEL_PREFERENCE; // Get the total context window limit (used for input management) const contextWindowLimit = this.promptWindowLimit(); // Define approximate limits for different parts of the prompt based on the context window this.limits = { history: Math.floor(contextWindowLimit * 0.15), system: Math.floor(contextWindowLimit * 0.15), - user: Math.floor(contextWindowLimit * 0.70), // Allow user prompt + context to take the bulk + user: Math.floor(contextWindowLimit * 0.7), // Allow user prompt + context to take the bulk }; // --- AWS SDK Client Configuration --- @@ -151,7 +168,9 @@ class AWSBedrockLLM { this.embedder = embedder ?? new NativeEmbedder(); this.defaultTemp = 0.7; // Default sampling temperature - this.#log(`Initialized with model: ${this.model}. Auth: ${this.authMethod}. Context Window: ${contextWindowLimit}.`); + this.#log( + `Initialized with model: ${this.model}. Auth: ${this.authMethod}. Context Window: ${contextWindowLimit}.` + ); } /** @@ -186,7 +205,7 @@ class AWSBedrockLLM { * @param {...any} args - Additional arguments to log. * @private */ - #log(text, ...args) { + #log(text, ...args) { console.log(`\x1b[32m[AWSBedrock]\x1b[0m ${text}`, ...args); } @@ -214,8 +233,12 @@ class AWSBedrockLLM { const numericLimit = Number(limit); if (isNaN(numericLimit) || numericLimit <= 0) { - console.error(`[AWSBedrock ERROR] Invalid AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT found: "${limitSourceValue}". Must be a positive number.`); - throw new Error(`Invalid AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT set in environment: "${limitSourceValue}"`); + console.error( + `[AWSBedrock ERROR] Invalid AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT found: "${limitSourceValue}". Must be a positive number.` + ); + throw new Error( + `Invalid AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT set in environment: "${limitSourceValue}"` + ); } // Note: Does not use MODEL_MAP for Bedrock context window. Relies on the specific Bedrock env var. return numericLimit; @@ -237,22 +260,23 @@ class AWSBedrockLLM { * @returns {number} The maximum output tokens limit for API calls. */ getMaxOutputTokens() { - const outputLimitSource = process.env.AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS; - let outputLimit = DEFAULT_MAX_OUTPUT_TOKENS; // Start with the class default - - if (outputLimitSource) { - const numericOutputLimit = Number(outputLimitSource); - // Validate the environment variable value - if (!isNaN(numericOutputLimit) && numericOutputLimit > 0) { - outputLimit = numericOutputLimit; - } else { - this.#log(`Invalid AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS value "${outputLimitSource}". Using default ${DEFAULT_MAX_OUTPUT_TOKENS}.`); - } + const outputLimitSource = process.env.AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS; + let outputLimit = DEFAULT_MAX_OUTPUT_TOKENS; // Start with the class default + + if (outputLimitSource) { + const numericOutputLimit = Number(outputLimitSource); + // Validate the environment variable value + if (!isNaN(numericOutputLimit) && numericOutputLimit > 0) { + outputLimit = numericOutputLimit; + } else { + this.#log( + `Invalid AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS value "${outputLimitSource}". Using default ${DEFAULT_MAX_OUTPUT_TOKENS}.` + ); } - return outputLimit; + } + return outputLimit; } - /** * Checks if the configured model is valid for chat completion (basic check for Bedrock). * @param {string} [_modelName] - Model name (unused). @@ -276,49 +300,63 @@ class AWSBedrockLLM { const content = []; // Add text block if prompt is not empty - if (userPrompt && typeof userPrompt === 'string' && userPrompt.trim().length > 0) { - content.push({ text: userPrompt }); + if ( + userPrompt && + typeof userPrompt === "string" && + userPrompt.trim().length > 0 + ) { + content.push({ text: userPrompt }); } // Process valid attachments if (Array.isArray(attachments)) { - for (const attachment of attachments) { - if (!attachment || typeof attachment.mime !== 'string' || typeof attachment.contentString !== 'string') { - this.#log("Skipping invalid attachment object.", attachment); - continue; - } + for (const attachment of attachments) { + if ( + !attachment || + typeof attachment.mime !== "string" || + typeof attachment.contentString !== "string" + ) { + this.#log("Skipping invalid attachment object.", attachment); + continue; + } - // Strip data URI prefix (e.g., "data:image/png;base64,") - let base64Data = attachment.contentString; - const dataUriPrefixMatch = base64Data.match(/^data:image\/\w+;base64,/); - if (dataUriPrefixMatch) { - base64Data = base64Data.substring(dataUriPrefixMatch[0].length); - } + // Strip data URI prefix (e.g., "data:image/png;base64,") + let base64Data = attachment.contentString; + const dataUriPrefixMatch = base64Data.match(/^data:image\/\w+;base64,/); + if (dataUriPrefixMatch) { + base64Data = base64Data.substring(dataUriPrefixMatch[0].length); + } - const format = getImageFormatFromMime(attachment.mime); - if (format) { - const imageBytes = base64ToUint8Array(base64Data); - if (imageBytes) { - // Add the image block in the required Bedrock format - content.push({ - image: { - format: format, - source: { bytes: imageBytes } - } - }); - } else { - this.#log(`Skipping attachment with mime ${attachment.mime} due to base64 decoding error.`); - } - } else { - this.#log(`Skipping attachment with unsupported/invalid MIME type: ${attachment.mime}`); - } + const format = getImageFormatFromMime(attachment.mime); + if (format) { + const imageBytes = base64ToUint8Array(base64Data); + if (imageBytes) { + // Add the image block in the required Bedrock format + content.push({ + image: { + format: format, + source: { bytes: imageBytes }, + }, + }); + } else { + this.#log( + `Skipping attachment with mime ${attachment.mime} due to base64 decoding error.` + ); + } + } else { + this.#log( + `Skipping attachment with unsupported/invalid MIME type: ${attachment.mime}` + ); } + } } // Ensure content array is never empty (Bedrock requires at least one block) if (content.length === 0) { - this.#log("Warning: #generateContent resulted in an empty content array. Adding empty text block as fallback."); - content.push({ text: "" }); + this.#log( + "Warning: #generateContent resulted in an empty content array. Adding empty text block as fallback." + ); + content.push({ text: "" }); } return content; @@ -341,23 +379,32 @@ class AWSBedrockLLM { userPrompt = "", attachments = [], }) { - const systemMessageContent = `${systemPrompt}${this.#appendContext(contextTexts)}`; + const systemMessageContent = `${systemPrompt}${this.#appendContext( + contextTexts + )}`; let messages = []; // Handle system prompt (either real or simulated) if (this.noSystemPromptModels.includes(this.model)) { - if (systemMessageContent.trim().length > 0) { - this.#log(`Model ${this.model} doesn't support system prompts; simulating.`); - messages.push( - { role: "user", content: this.#generateContent({ userPrompt: systemMessageContent }) }, // No attachments in simulated system prompt - { role: "assistant", content: [{ text: "Okay." }] } - ); - } + if (systemMessageContent.trim().length > 0) { + this.#log( + `Model ${this.model} doesn't support system prompts; simulating.` + ); + messages.push( + { + role: "user", + content: this.#generateContent({ + userPrompt: systemMessageContent, + }), + }, // No attachments in simulated system prompt + { role: "assistant", content: [{ text: "Okay." }] } + ); + } } else if (systemMessageContent.trim().length > 0) { - messages.push({ - role: "system", - content: this.#generateContent({ userPrompt: systemMessageContent }) // No attachments in system prompt - }); + messages.push({ + role: "system", + content: this.#generateContent({ userPrompt: systemMessageContent }), // No attachments in system prompt + }); } // Add chat history @@ -373,11 +420,11 @@ class AWSBedrockLLM { // Add final user prompt messages.push({ - role: "user", - content: this.#generateContent({ - userPrompt: userPrompt, - attachments: Array.isArray(attachments) ? attachments : [], - }), + role: "user", + content: this.#generateContent({ + userPrompt: userPrompt, + attachments: Array.isArray(attachments) ? attachments : [], + }), }); return messages; @@ -389,13 +436,16 @@ class AWSBedrockLLM { * @returns {string} The text response, potentially with reasoning prepended. * @private */ - #parseReasoningFromResponse({ content = [] }) { + #parseReasoningFromResponse({ content = [] }) { if (!Array.isArray(content)) return ""; - const textBlock = content.find(block => block.text !== undefined); + const textBlock = content.find((block) => block.text !== undefined); let textResponse = textBlock?.text || ""; - const reasoningBlock = content.find(block => block.reasoningContent?.reasoningText?.text); + const reasoningBlock = content.find( + (block) => block.reasoningContent?.reasoningText?.text + ); if (reasoningBlock) { - const reasoningText = reasoningBlock.reasoningContent.reasoningText.text.trim(); + const reasoningText = + reasoningBlock.reasoningContent.reasoningText.text.trim(); if (reasoningText.length > 0) { textResponse = `${reasoningText}${textResponse}`; } @@ -403,7 +453,6 @@ class AWSBedrockLLM { return textResponse; } - /** * Sends a request for chat completion (non-streaming). * @param {Array | null} messages - Formatted message array from constructPrompt. @@ -414,7 +463,9 @@ class AWSBedrockLLM { */ async getChatCompletion(messages = null, { temperature }) { if (!Array.isArray(messages) || messages.length === 0) { - throw new Error("AWSBedrock::getChatCompletion requires a non-empty messages array."); + throw new Error( + "AWSBedrock::getChatCompletion requires a non-empty messages array." + ); } const hasSystem = messages[0]?.role === "system"; @@ -436,10 +487,18 @@ class AWSBedrockLLM { }) ) .catch((e) => { - this.#log(`Bedrock Converse API Error (getChatCompletion): ${e.message}`, e); - if (e.name === 'ValidationException' && e.message.includes('maximum tokens')) { - throw new Error(`AWSBedrock::getChatCompletion failed. Model ${this.model} rejected maxTokens value of ${maxTokensToSend}. Check model documentation for its maximum output token limit and set AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS if needed. Original error: ${e.message}`); - } + this.#log( + `Bedrock Converse API Error (getChatCompletion): ${e.message}`, + e + ); + if ( + e.name === "ValidationException" && + e.message.includes("maximum tokens") + ) { + throw new Error( + `AWSBedrock::getChatCompletion failed. Model ${this.model} rejected maxTokens value of ${maxTokensToSend}. Check model documentation for its maximum output token limit and set AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS if needed. Original error: ${e.message}` + ); + } throw new Error(`AWSBedrock::getChatCompletion failed. ${e.message}`); }), messages, @@ -448,13 +507,17 @@ class AWSBedrockLLM { const response = result.output; if (!response?.output?.message) { - this.#log("Bedrock response missing expected output.message structure.", response); - return null; + this.#log( + "Bedrock response missing expected output.message structure.", + response + ); + return null; } const latencyMs = response?.metrics?.latencyMs; const outputTokens = response?.usage?.outputTokens; - const outputTps = (latencyMs > 0 && outputTokens) ? (outputTokens / (latencyMs / 1000)) : 0; + const outputTps = + latencyMs > 0 && outputTokens ? outputTokens / (latencyMs / 1000) : 0; return { textResponse: this.#parseReasoningFromResponse(response.output.message), @@ -477,8 +540,10 @@ class AWSBedrockLLM { * @throws {Error} If the API call setup fails or validation errors occur. */ async streamGetChatCompletion(messages = null, { temperature }) { - if (!Array.isArray(messages) || messages.length === 0) { - throw new Error("AWSBedrock::streamGetChatCompletion requires a non-empty messages array."); + if (!Array.isArray(messages) || messages.length === 0) { + throw new Error( + "AWSBedrock::streamGetChatCompletion requires a non-empty messages array." + ); } const hasSystem = messages[0]?.role === "system"; @@ -487,36 +552,45 @@ class AWSBedrockLLM { const maxTokensToSend = this.getMaxOutputTokens(); try { - // Attempt to initiate the stream - const stream = await this.bedrockClient.send( - new ConverseStreamCommand({ - modelId: this.model, - messages: history, - inferenceConfig: { - maxTokens: maxTokensToSend, - temperature: temperature ?? this.defaultTemp, - }, - system: systemBlock, - }) - ); - - // If successful, wrap the stream with performance monitoring - const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( - stream, - messages, - false // Indicate it's not a function call measurement - ); - return measuredStreamRequest; - + // Attempt to initiate the stream + const stream = await this.bedrockClient.send( + new ConverseStreamCommand({ + modelId: this.model, + messages: history, + inferenceConfig: { + maxTokens: maxTokensToSend, + temperature: temperature ?? this.defaultTemp, + }, + system: systemBlock, + }) + ); + + // If successful, wrap the stream with performance monitoring + const measuredStreamRequest = await LLMPerformanceMonitor.measureStream( + stream, + messages, + false // Indicate it's not a function call measurement + ); + return measuredStreamRequest; } catch (e) { - // Catch errors during the initial .send() call (e.g., validation errors) - this.#log(`Bedrock Converse API Error (streamGetChatCompletion setup): ${e.message}`, e); - if (e.name === 'ValidationException' && e.message.includes('maximum tokens')) { - // Provide specific error context for max token issues - throw new Error(`AWSBedrock::streamGetChatCompletion failed during setup. Model ${this.model} rejected maxTokens value of ${maxTokensToSend}. Check model documentation for its maximum output token limit and set AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS if needed. Original error: ${e.message}`); - } - // Re-throw other setup errors - throw new Error(`AWSBedrock::streamGetChatCompletion failed during setup. ${e.message}`); + // Catch errors during the initial .send() call (e.g., validation errors) + this.#log( + `Bedrock Converse API Error (streamGetChatCompletion setup): ${e.message}`, + e + ); + if ( + e.name === "ValidationException" && + e.message.includes("maximum tokens") + ) { + // Provide specific error context for max token issues + throw new Error( + `AWSBedrock::streamGetChatCompletion failed during setup. Model ${this.model} rejected maxTokens value of ${maxTokensToSend}. Check model documentation for its maximum output token limit and set AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS if needed. Original error: ${e.message}` + ); + } + // Re-throw other setup errors + throw new Error( + `AWSBedrock::streamGetChatCompletion failed during setup. ${e.message}` + ); } } @@ -530,7 +604,7 @@ class AWSBedrockLLM { * @param {Array} [responseProps.sources=[]] - Source documents used (if any). * @returns {Promise} A promise that resolves with the complete text response when the stream ends. */ - handleStream(response, stream, responseProps) { + handleStream(response, stream, responseProps) { const { uuid = uuidv4(), sources = [] } = responseProps; let hasUsageMetrics = false; let usage = { prompt_tokens: 0, completion_tokens: 0, total_tokens: 0 }; @@ -550,101 +624,180 @@ class AWSBedrockLLM { try { // Process stream chunks for await (const chunk of stream.stream) { - if (!chunk) { this.#log("Stream returned null/undefined chunk."); continue; } + if (!chunk) { + this.#log("Stream returned null/undefined chunk."); + continue; + } const action = Object.keys(chunk)[0]; switch (action) { case "metadata": // Contains usage metrics at the end if (chunk.metadata?.usage) { - hasUsageMetrics = true; - usage = { // Overwrite with final metrics - prompt_tokens: chunk.metadata.usage.inputTokens ?? 0, - completion_tokens: chunk.metadata.usage.outputTokens ?? 0, - total_tokens: chunk.metadata.usage.totalTokens ?? 0, - }; + hasUsageMetrics = true; + usage = { + // Overwrite with final metrics + prompt_tokens: chunk.metadata.usage.inputTokens ?? 0, + completion_tokens: chunk.metadata.usage.outputTokens ?? 0, + total_tokens: chunk.metadata.usage.totalTokens ?? 0, + }; } break; - case "contentBlockDelta": { // Contains text or reasoning deltas + case "contentBlockDelta": { + // Contains text or reasoning deltas const delta = chunk.contentBlockDelta?.delta; if (!delta) break; const token = delta.text; const reasoningToken = delta.reasoningContent?.text; - if (reasoningToken) { // Handle reasoning text - if (reasoningText.length === 0) { // Start of reasoning block + if (reasoningToken) { + // Handle reasoning text + if (reasoningText.length === 0) { + // Start of reasoning block const startTag = ""; - writeResponseChunk(response, { uuid, sources, type: "textResponseChunk", textResponse: startTag + reasoningToken, close: false, error: false }); + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: startTag + reasoningToken, + close: false, + error: false, + }); reasoningText += startTag + reasoningToken; - } else { // Continuation of reasoning block - writeResponseChunk(response, { uuid, sources, type: "textResponseChunk", textResponse: reasoningToken, close: false, error: false }); - reasoningText += reasoningToken; + } else { + // Continuation of reasoning block + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: reasoningToken, + close: false, + error: false, + }); + reasoningText += reasoningToken; } - } else if (token) { // Handle regular text - if (reasoningText.length > 0) { // If reasoning was just output, close the tag - const endTag = ""; - writeResponseChunk(response, { uuid, sources, type: "textResponseChunk", textResponse: endTag, close: false, error: false }); - fullText += reasoningText + endTag; // Add completed reasoning to final text - reasoningText = ""; // Reset reasoning buffer - } - fullText += token; // Append regular text - if (!hasUsageMetrics) usage.completion_tokens++; // Estimate usage if no metrics yet - writeResponseChunk(response, { uuid, sources, type: "textResponseChunk", textResponse: token, close: false, error: false }); + } else if (token) { + // Handle regular text + if (reasoningText.length > 0) { + // If reasoning was just output, close the tag + const endTag = ""; + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: endTag, + close: false, + error: false, + }); + fullText += reasoningText + endTag; // Add completed reasoning to final text + reasoningText = ""; // Reset reasoning buffer + } + fullText += token; // Append regular text + if (!hasUsageMetrics) usage.completion_tokens++; // Estimate usage if no metrics yet + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: token, + close: false, + error: false, + }); } break; } case "messageStop": // End of message event - if (chunk.messageStop?.usage) { // Check for final metrics here too - hasUsageMetrics = true; - usage = { // Overwrite with final metrics if available - prompt_tokens: chunk.messageStop.usage.inputTokens ?? usage.prompt_tokens, - completion_tokens: chunk.messageStop.usage.outputTokens ?? usage.completion_tokens, - total_tokens: chunk.messageStop.usage.totalTokens ?? usage.total_tokens, - }; + if (chunk.messageStop?.usage) { + // Check for final metrics here too + hasUsageMetrics = true; + usage = { + // Overwrite with final metrics if available + prompt_tokens: + chunk.messageStop.usage.inputTokens ?? usage.prompt_tokens, + completion_tokens: + chunk.messageStop.usage.outputTokens ?? + usage.completion_tokens, + total_tokens: + chunk.messageStop.usage.totalTokens ?? usage.total_tokens, + }; } // Ensure reasoning tag is closed if message stops mid-reasoning if (reasoningText.length > 0) { - const endTag = ""; - writeResponseChunk(response, { uuid, sources, type: "textResponseChunk", textResponse: endTag, close: false, error: false }); - fullText += reasoningText + endTag; - reasoningText = ""; + const endTag = ""; + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: endTag, + close: false, + error: false, + }); + fullText += reasoningText + endTag; + reasoningText = ""; } break; // Ignore other event types for now - case "messageStart": case "contentBlockStart": case "contentBlockStop": break; - default: this.#log(`Unhandled stream action: ${action}`, chunk); + case "messageStart": + case "contentBlockStart": + case "contentBlockStop": + break; + default: + this.#log(`Unhandled stream action: ${action}`, chunk); } } // End for await loop // Final cleanup for reasoning tag in case stream ended abruptly - if (reasoningText.length > 0 && !fullText.endsWith("")) { - const endTag = ""; - if (!response.writableEnded) { - writeResponseChunk(response, { uuid, sources, type: "textResponseChunk", textResponse: endTag, close: false, error: false }); - } - fullText += reasoningText + endTag; - } + if (reasoningText.length > 0 && !fullText.endsWith("")) { + const endTag = ""; + if (!response.writableEnded) { + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: endTag, + close: false, + error: false, + }); + } + fullText += reasoningText + endTag; + } // Send final closing chunk to signal end of stream if (!response.writableEnded) { - writeResponseChunk(response, { uuid, sources, type: "textResponseChunk", textResponse: "", close: true, error: false }); + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: "", + close: true, + error: false, + }); } - } catch (error) { // Handle errors during stream processing - this.#log(`\x1b[43m\x1b[34m[STREAMING ERROR]\x1b[0m ${error.message}`, error); + this.#log( + `\x1b[43m\x1b[34m[STREAMING ERROR]\x1b[0m ${error.message}`, + error + ); if (response && !response.writableEnded) { - writeResponseChunk(response, { uuid, type: "abort", textResponse: null, sources, close: true, error: `AWSBedrock:streaming - error. ${error?.message ?? "Unknown error"}` }); + writeResponseChunk(response, { + uuid, + type: "abort", + textResponse: null, + sources, + close: true, + error: `AWSBedrock:streaming - error. ${ + error?.message ?? "Unknown error" + }`, + }); } } finally { - // Cleanup: Always remove listener and finalize measurement - response.removeListener("close", handleAbort); - stream?.endMeasurement(usage); // Log final usage metrics - resolve(fullText); // Resolve with the accumulated text + // Cleanup: Always remove listener and finalize measurement + response.removeListener("close", handleAbort); + stream?.endMeasurement(usage); // Log final usage metrics + resolve(fullText); // Resolve with the accumulated text } }); } - /** * Embeds a single text input using the configured embedder. * @param {string} textInput - The text to embed. @@ -653,13 +806,15 @@ class AWSBedrockLLM { */ async embedTextInput(textInput) { if (!this.embedder?.embedTextInput) { - throw new Error("Embedder is not configured or does not support embedTextInput."); + throw new Error( + "Embedder is not configured or does not support embedTextInput." + ); } try { - return await this.embedder.embedTextInput(textInput); - } catch(e) { - this.#log(`EmbedTextInput Error: ${e.message}`, e) - throw e; // Re-throw after logging + return await this.embedder.embedTextInput(textInput); + } catch (e) { + this.#log(`EmbedTextInput Error: ${e.message}`, e); + throw e; // Re-throw after logging } } @@ -670,15 +825,17 @@ class AWSBedrockLLM { * @throws {Error} If the embedder is not configured or fails. */ async embedChunks(textChunks = []) { - if (!this.embedder?.embedChunks) { - throw new Error("Embedder is not configured or does not support embedChunks."); - } - try { - return await this.embedder.embedChunks(textChunks); - } catch(e) { - this.#log(`EmbedChunks Error: ${e.message}`, e) - throw e; // Re-throw after logging - } + if (!this.embedder?.embedChunks) { + throw new Error( + "Embedder is not configured or does not support embedChunks." + ); + } + try { + return await this.embedder.embedChunks(textChunks); + } catch (e) { + this.#log(`EmbedChunks Error: ${e.message}`, e); + throw e; // Re-throw after logging + } } /** @@ -690,7 +847,7 @@ class AWSBedrockLLM { async compressMessages(promptArgs = {}, rawHistory = []) { const { messageArrayCompressor } = require("../../helpers/chat"); if (!messageArrayCompressor) { - throw new Error("Message compressor helper not found."); + throw new Error("Message compressor helper not found."); } // Construct the message array first const messageArray = this.constructPrompt(promptArgs); From b17c69c54f3c6c01b44ef1590eb0f87cddb2e0e1 Mon Sep 17 00:00:00 2001 From: Tristan Stahnke Date: Thu, 24 Apr 2025 14:22:26 -0400 Subject: [PATCH 3/4] Updated .env.example to have aws bedrock examples too --- docker/.env.example | 3 +++ server/.env.example | 10 ++++++++++ 2 files changed, 13 insertions(+) diff --git a/docker/.env.example b/docker/.env.example index 5fd93ab9e90..117a46133b2 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -105,6 +105,9 @@ GID='1000' # AWS_BEDROCK_LLM_REGION=us-west-2 # AWS_BEDROCK_LLM_MODEL_PREFERENCE=meta.llama3-1-8b-instruct-v1:0 # AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT=8191 +# AWS_BEDROCK_LLM_CONNECTION_METHOD=iam +# AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS=4096 +# AWS_BEDROCK_LLM_SESSION_TOKEN= # Only required if CONNECTION_METHOD is 'sessionToken' # LLM_PROVIDER='fireworksai' # FIREWORKS_AI_LLM_API_KEY='my-fireworks-ai-key' diff --git a/server/.env.example b/server/.env.example index 4a2df43b1b9..d406ba10882 100644 --- a/server/.env.example +++ b/server/.env.example @@ -104,6 +104,16 @@ SIG_SALT='salt' # Please generate random string at least 32 chars long. # COHERE_API_KEY= # COHERE_MODEL_PREF='command-r' +# LLM_PROVIDER='bedrock' +# AWS_BEDROCK_LLM_ACCESS_KEY_ID= +# AWS_BEDROCK_LLM_ACCESS_KEY= +# AWS_BEDROCK_LLM_REGION=us-west-2 +# AWS_BEDROCK_LLM_MODEL_PREFERENCE=meta.llama3-1-8b-instruct-v1:0 +# AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT=8191 +# AWS_BEDROCK_LLM_CONNECTION_METHOD=iam +# AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS=4096 +# AWS_BEDROCK_LLM_SESSION_TOKEN= # Only required if CONNECTION_METHOD is 'sessionToken' + # LLM_PROVIDER='apipie' # APIPIE_LLM_API_KEY='sk-123abc' # APIPIE_LLM_MODEL_PREF='openrouter/llama-3.1-8b-instruct' From a3b27a0fe737e85a1e2290b8c9fde48f02bbd28e Mon Sep 17 00:00:00 2001 From: timothycarambat Date: Tue, 6 May 2025 12:42:15 -0700 Subject: [PATCH 4/4] Refactor for readability move utils for AWS specific functionality to subfile add token output max to ENV so setting persits --- .../AwsBedrockLLMOptions/index.jsx | 18 +- server/models/systemSettings.js | 5 +- server/utils/AiProviders/bedrock/index.js | 408 ++++++------------ server/utils/AiProviders/bedrock/utils.js | 68 +++ server/utils/helpers/updateENV.js | 4 + 5 files changed, 231 insertions(+), 272 deletions(-) create mode 100644 server/utils/AiProviders/bedrock/utils.js diff --git a/frontend/src/components/LLMSelection/AwsBedrockLLMOptions/index.jsx b/frontend/src/components/LLMSelection/AwsBedrockLLMOptions/index.jsx index 779d487b929..ec4a3c6f747 100644 --- a/frontend/src/components/LLMSelection/AwsBedrockLLMOptions/index.jsx +++ b/frontend/src/components/LLMSelection/AwsBedrockLLMOptions/index.jsx @@ -175,7 +175,7 @@ export default function AwsBedrockLLMOptions({ settings }) { type="number" name="AwsBedrockLLMTokenLimit" className="border-none bg-theme-settings-input-bg text-white placeholder:text-theme-settings-input-placeholder text-sm rounded-lg focus:outline-primary-button active:outline-primary-button outline-none block w-full p-2.5" - placeholder="Content window limit (eg: 4096)" + placeholder="Content window limit (eg: 8192)" min={1} onScroll={(e) => e.target.blur()} defaultValue={settings?.AwsBedrockLLMTokenLimit} @@ -183,6 +183,22 @@ export default function AwsBedrockLLMOptions({ settings }) { autoComplete="off" /> +
+ + e.target.blur()} + defaultValue={settings?.AwsBedrockLLMMaxOutputTokens} + required={true} + autoComplete="off" + /> +
)} diff --git a/server/models/systemSettings.js b/server/models/systemSettings.js index 0ac29f19564..566632aaf83 100644 --- a/server/models/systemSettings.js +++ b/server/models/systemSettings.js @@ -540,7 +540,10 @@ const SystemSettings = { AwsBedrockLLMSessionToken: !!process.env.AWS_BEDROCK_LLM_SESSION_TOKEN, AwsBedrockLLMRegion: process.env.AWS_BEDROCK_LLM_REGION, AwsBedrockLLMModel: process.env.AWS_BEDROCK_LLM_MODEL_PREFERENCE, - AwsBedrockLLMTokenLimit: process.env.AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT, + AwsBedrockLLMTokenLimit: + process.env.AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT || 8192, + AwsBedrockLLMMaxOutputTokens: + process.env.AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS || 4096, // Cohere API Keys CohereApiKey: !!process.env.COHERE_API_KEY, diff --git a/server/utils/AiProviders/bedrock/index.js b/server/utils/AiProviders/bedrock/index.js index b8c9ce34048..0fd60c7f72a 100644 --- a/server/utils/AiProviders/bedrock/index.js +++ b/server/utils/AiProviders/bedrock/index.js @@ -1,103 +1,24 @@ -/** - * AWS Bedrock Language Model Connector using the Converse API. - * Supports text and multi-modal (text + image) interactions. - * Handles distinct context window limits and max output token limits. - */ - -// Core AWS SDK imports for Bedrock Converse API const { BedrockRuntimeClient, ConverseCommand, ConverseStreamCommand, } = require("@aws-sdk/client-bedrock-runtime"); - -// Helper imports from the application const { writeResponseChunk, clientAbortedHandler, -} = require("../../helpers/chat/responses"); // For streaming responses -const { NativeEmbedder } = require("../../EmbeddingEngines/native"); // Default embedder +} = require("../../helpers/chat/responses"); +const { NativeEmbedder } = require("../../EmbeddingEngines/native"); const { LLMPerformanceMonitor, -} = require("../../helpers/chat/LLMPerformanceMonitor"); // For tracking API call performance -const { v4: uuidv4 } = require("uuid"); // For generating unique IDs -const { MODEL_MAP } = require("../modelMap"); // Although imported, not currently used for Bedrock limits - -// --- Constants --- - -/** @const {string[]} Supported image formats by Bedrock Converse API */ -const SUPPORTED_BEDROCK_IMAGE_FORMATS = ["jpeg", "png", "gif", "webp"]; -/** @const {number} Default maximum tokens to generate in the response. Models often have lower output limits than their total context windows. */ -const DEFAULT_MAX_OUTPUT_TOKENS = 4096; -/** @const {number} Default total context window size if not specified in environment variables. */ -const DEFAULT_CONTEXT_WINDOW_TOKENS = 8191; - -// --- Helper Functions --- - -/** - * Parses a MIME type string (e.g., "image/jpeg") to extract and validate the image format - * supported by Bedrock Converse. Handles 'image/jpg' as 'jpeg'. - * @param {string | null | undefined} mimeType - The MIME type string. - * @returns {string | null} The validated image format (e.g., "jpeg") or null if invalid/unsupported. - */ -function getImageFormatFromMime(mimeType) { - if (!mimeType || typeof mimeType !== "string") { - console.warn( - `[AWSBedrock] Invalid or missing MIME type provided for attachment.` - ); - return null; - } - const parts = mimeType.toLowerCase().split("/"); - if (parts.length !== 2 || parts[0] !== "image") { - console.warn( - `[AWSBedrock] Invalid MIME type format: "${mimeType}". Expected "image/...".` - ); - return null; - } - - let format = parts[1]; - if (format === "jpg") format = "jpeg"; // Normalize jpg to jpeg - - if (!SUPPORTED_BEDROCK_IMAGE_FORMATS.includes(format)) { - console.warn( - `[AWSBedrock] Unsupported image format: "${format}" from MIME type "${mimeType}". Supported formats: ${SUPPORTED_BEDROCK_IMAGE_FORMATS.join( - ", " - )}.` - ); - return null; - } - return format; -} - -/** - * Decodes a pure base64 string (without data URI prefix) into a Uint8Array using the atob method. - * This approach matches the technique previously used by Langchain's implementation. - * @param {string} base64String - The pure base64 encoded data. - * @returns {Uint8Array | null} The resulting byte array or null on decoding error. - */ -function base64ToUint8Array(base64String) { - try { - const binaryString = atob(base64String); // Decode base64 to binary string - const len = binaryString.length; - const bytes = new Uint8Array(len); - for (let i = 0; i < len; i++) { - bytes[i] = binaryString.charCodeAt(i); // Convert char code to byte value - } - return bytes; - } catch (e) { - console.error( - `[AWSBedrock] Error decoding base64 string with atob: ${e.message}` - ); - if (e.name === "InvalidCharacterError") { - console.error( - "[AWSBedrock] Base64 decoding failed. Ensure input string is valid base64 and does not contain a data URI prefix." - ); - } - return null; - } -} - -// --- AWSBedrockLLM Class --- +} = require("../../helpers/chat/LLMPerformanceMonitor"); +const { v4: uuidv4 } = require("uuid"); +const { + DEFAULT_MAX_OUTPUT_TOKENS, + DEFAULT_CONTEXT_WINDOW_TOKENS, + SUPPORTED_CONNECTION_METHODS, + getImageFormatFromMime, + base64ToUint8Array, +} = require("./utils"); class AWSBedrockLLM { /** @@ -109,6 +30,7 @@ class AWSBedrockLLM { "amazon.titan-text-lite-v1", "cohere.command-text-v14", "cohere.command-light-text-v14", + "us.deepseek.r1-v1:0", // Add other models here if identified ]; @@ -119,18 +41,19 @@ class AWSBedrockLLM { * @throws {Error} If required environment variables are missing or invalid. */ constructor(embedder = null, modelPreference = null) { - // --- Environment Variable Validation --- const requiredEnvVars = [ "AWS_BEDROCK_LLM_ACCESS_KEY_ID", "AWS_BEDROCK_LLM_ACCESS_KEY", "AWS_BEDROCK_LLM_REGION", - "AWS_BEDROCK_LLM_MODEL_PREFERENCE", // Model preference is effectively required + "AWS_BEDROCK_LLM_MODEL_PREFERENCE", ]; + + // Validate required environment variables for (const envVar of requiredEnvVars) { - if (!process.env[envVar]) { + if (!process.env[envVar]) throw new Error(`Required environment variable ${envVar} is not set.`); - } } + if ( process.env.AWS_BEDROCK_LLM_CONNECTION_METHOD === "sessionToken" && !process.env.AWS_BEDROCK_LLM_SESSION_TOKEN @@ -140,19 +63,16 @@ class AWSBedrockLLM { ); } - // --- Model and Limits Setup --- this.model = modelPreference || process.env.AWS_BEDROCK_LLM_MODEL_PREFERENCE; - // Get the total context window limit (used for input management) + const contextWindowLimit = this.promptWindowLimit(); - // Define approximate limits for different parts of the prompt based on the context window this.limits = { history: Math.floor(contextWindowLimit * 0.15), system: Math.floor(contextWindowLimit * 0.15), - user: Math.floor(contextWindowLimit * 0.7), // Allow user prompt + context to take the bulk + user: Math.floor(contextWindowLimit * 0.7), }; - // --- AWS SDK Client Configuration --- this.bedrockClient = new BedrockRuntimeClient({ region: process.env.AWS_BEDROCK_LLM_REGION, credentials: { @@ -164,10 +84,8 @@ class AWSBedrockLLM { }, }); - // --- Other Initializations --- this.embedder = embedder ?? new NativeEmbedder(); - this.defaultTemp = 0.7; // Default sampling temperature - + this.defaultTemp = 0.7; this.#log( `Initialized with model: ${this.model}. Auth: ${this.authMethod}. Context Window: ${contextWindowLimit}.` ); @@ -180,17 +98,17 @@ class AWSBedrockLLM { */ get authMethod() { const method = process.env.AWS_BEDROCK_LLM_CONNECTION_METHOD || "iam"; - return ["iam", "sessionToken"].includes(method) ? method : "iam"; + return SUPPORTED_CONNECTION_METHODS.includes(method) ? method : "iam"; } /** * Appends context texts to a string with standard formatting. - * @param {string[]} [contextTexts=[]] - An array of context text snippets. + * @param {string[]} contextTexts - An array of context text snippets. * @returns {string} Formatted context string or empty string if no context provided. * @private */ #appendContext(contextTexts = []) { - if (!Array.isArray(contextTexts) || contextTexts.length === 0) return ""; + if (!contextTexts?.length) return ""; return ( "\nContext:\n" + contextTexts @@ -209,38 +127,39 @@ class AWSBedrockLLM { console.log(`\x1b[32m[AWSBedrock]\x1b[0m ${text}`, ...args); } + /** + * Internal logging helper with provider prefix for static methods. + * @private + */ + static #slog(text, ...args) { + console.log(`\x1b[32m[AWSBedrock]\x1b[0m ${text}`, ...args); + } + /** * Indicates if the provider supports streaming responses. * @returns {boolean} True. */ streamingEnabled() { - return typeof this.streamGetChatCompletion === "function"; + return "streamGetChatCompletion" in this; } /** + * @static * Gets the total prompt window limit (total context window: input + output) from the environment variable. * This value is used for calculating input limits, NOT for setting the max output tokens in API calls. - * @param {string} [_modelName] - The model name (parameter currently unused, reads directly from env var). - * @returns {number} The total context window token limit. - * @static - * @throws {Error} If the AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT environment variable is invalid. + * @returns {number} The total context window token limit. Defaults to 8191. */ - static promptWindowLimit(_modelName) { - const limitSourceValue = process.env.AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT; - // Log the value being read (can be commented out in production) - // console.log(`[AWSBedrock DEBUG] Reading AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT for prompt window. Value found: ${limitSourceValue} (Type: ${typeof limitSourceValue})`); - const limit = limitSourceValue || DEFAULT_CONTEXT_WINDOW_TOKENS; // Use default if not set - + static promptWindowLimit() { + const limit = + process.env.AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT ?? + DEFAULT_CONTEXT_WINDOW_TOKENS; const numericLimit = Number(limit); if (isNaN(numericLimit) || numericLimit <= 0) { - console.error( - `[AWSBedrock ERROR] Invalid AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT found: "${limitSourceValue}". Must be a positive number.` - ); - throw new Error( - `Invalid AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT set in environment: "${limitSourceValue}"` + this.#slog( + `[AWSBedrock ERROR] Invalid AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT found: "${limitSourceValue}". Must be a positive number - returning default ${DEFAULT_CONTEXT_WINDOW_TOKENS}.` ); + return DEFAULT_CONTEXT_WINDOW_TOKENS; } - // Note: Does not use MODEL_MAP for Bedrock context window. Relies on the specific Bedrock env var. return numericLimit; } @@ -249,8 +168,7 @@ class AWSBedrockLLM { * @returns {number} The token limit. */ promptWindowLimit() { - // Delegates to the static method for consistency. - return AWSBedrockLLM.promptWindowLimit(this.model); + return AWSBedrockLLM.promptWindowLimit(); } /** @@ -261,115 +179,110 @@ class AWSBedrockLLM { */ getMaxOutputTokens() { const outputLimitSource = process.env.AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS; - let outputLimit = DEFAULT_MAX_OUTPUT_TOKENS; // Start with the class default - - if (outputLimitSource) { - const numericOutputLimit = Number(outputLimitSource); - // Validate the environment variable value - if (!isNaN(numericOutputLimit) && numericOutputLimit > 0) { - outputLimit = numericOutputLimit; - } else { - this.#log( - `Invalid AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS value "${outputLimitSource}". Using default ${DEFAULT_MAX_OUTPUT_TOKENS}.` - ); - } + if (isNaN(Number(outputLimitSource))) { + this.#log( + `[AWSBedrock ERROR] Invalid AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS found: "${outputLimitSource}". Must be a positive number - returning default ${DEFAULT_MAX_OUTPUT_TOKENS}.` + ); + return DEFAULT_MAX_OUTPUT_TOKENS; + } + + const numericOutputLimit = Number(outputLimitSource); + if (numericOutputLimit <= 0) { + this.#log( + `[AWSBedrock ERROR] Invalid AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS found: "${outputLimitSource}". Must be a greater than 0 - returning default ${DEFAULT_MAX_OUTPUT_TOKENS}.` + ); + return DEFAULT_MAX_OUTPUT_TOKENS; } - return outputLimit; + + return numericOutputLimit; } - /** - * Checks if the configured model is valid for chat completion (basic check for Bedrock). - * @param {string} [_modelName] - Model name (unused). - * @returns {Promise} Always true, assuming any configured Bedrock model supports Converse API. - */ + /** Stubbed method for compatibility with LLM interface. */ async isValidChatCompletionModel(_modelName = "") { - // Assume valid for Bedrock Converse; more specific checks could be added if needed. return true; } + /** + * Validates attachments array and returns a new array with valid attachments. + * @param {Array<{contentString: string, mime: string}>} attachments - Array of attachments. + * @returns {Array<{image: {format: string, source: {bytes: Uint8Array}}>} Array of valid attachments. + * @private + */ + #validateAttachments(attachments = []) { + if (!Array.isArray(attachments) || !attachments?.length) return []; + const validAttachments = []; + for (const attachment of attachments) { + if ( + !attachment || + typeof attachment.mime !== "string" || + typeof attachment.contentString !== "string" + ) { + this.#log("Skipping invalid attachment object.", attachment); + continue; + } + + // Strip data URI prefix (e.g., "data:image/png;base64,") + const base64Data = attachment.contentString.replace( + /^data:image\/\w+;base64,/, + "" + ); + + const format = getImageFormatFromMime(attachment.mime); + const attachmentInfo = { + valid: format !== null, + format, + imageBytes: base64ToUint8Array(base64Data), + }; + + if (!attachmentInfo.valid) { + this.#log( + `Skipping attachment with unsupported/invalid MIME type: ${attachment.mime}` + ); + continue; + } + + validAttachments.push({ + image: { + format: format, + source: { bytes: attachmentInfo.imageBytes }, + }, + }); + } + + return validAttachments; + } + /** * Generates the Bedrock Converse API content array for a message, * processing text and formatting valid image attachments. * @param {object} params - * @param {string} [params.userPrompt=""] - The text part of the message. - * @param {Array<{contentString: string, mime: string}>} [params.attachments=[]] - Array of attachments for the message. + * @param {string} params.userPrompt - The text part of the message. + * @param {Array<{contentString: string, mime: string}>} params.attachments - Array of attachments for the message. * @returns {Array} Array of content blocks (e.g., [{text: "..."}, {image: {...}}]). * @private */ #generateContent({ userPrompt = "", attachments = [] }) { const content = []; - // Add text block if prompt is not empty - if ( - userPrompt && - typeof userPrompt === "string" && - userPrompt.trim().length > 0 - ) { - content.push({ text: userPrompt }); - } + if (!!userPrompt?.trim()?.length) content.push({ text: userPrompt }); - // Process valid attachments - if (Array.isArray(attachments)) { - for (const attachment of attachments) { - if ( - !attachment || - typeof attachment.mime !== "string" || - typeof attachment.contentString !== "string" - ) { - this.#log("Skipping invalid attachment object.", attachment); - continue; - } - - // Strip data URI prefix (e.g., "data:image/png;base64,") - let base64Data = attachment.contentString; - const dataUriPrefixMatch = base64Data.match(/^data:image\/\w+;base64,/); - if (dataUriPrefixMatch) { - base64Data = base64Data.substring(dataUriPrefixMatch[0].length); - } - - const format = getImageFormatFromMime(attachment.mime); - if (format) { - const imageBytes = base64ToUint8Array(base64Data); - if (imageBytes) { - // Add the image block in the required Bedrock format - content.push({ - image: { - format: format, - source: { bytes: imageBytes }, - }, - }); - } else { - this.#log( - `Skipping attachment with mime ${attachment.mime} due to base64 decoding error.` - ); - } - } else { - this.#log( - `Skipping attachment with unsupported/invalid MIME type: ${attachment.mime}` - ); - } - } - } + // Validate attachments and add valid attachments to content + const validAttachments = this.#validateAttachments(attachments); + if (validAttachments?.length) content.push(...validAttachments); // Ensure content array is never empty (Bedrock requires at least one block) - if (content.length === 0) { - this.#log( - "Warning: #generateContent resulted in an empty content array. Adding empty text block as fallback." - ); - content.push({ text: "" }); - } - + if (content.length === 0) content.push({ text: "" }); return content; } /** * Constructs the complete message array in the format expected by the Bedrock Converse API. * @param {object} params - * @param {string} [params.systemPrompt=""] - The system prompt text. - * @param {string[]} [params.contextTexts=[]] - Array of context text snippets. - * @param {Array<{role: 'user' | 'assistant', content: string, attachments?: Array<{contentString: string, mime: string}>}>} [params.chatHistory=[]] - Previous messages. - * @param {string} [params.userPrompt=""] - The latest user prompt text. - * @param {Array<{contentString: string, mime: string}>} [params.attachments=[]] - Attachments for the latest user prompt. + * @param {string} params.systemPrompt - The system prompt text. + * @param {string[]} params.contextTexts - Array of context text snippets. + * @param {Array<{role: 'user' | 'assistant', content: string, attachments?: Array<{contentString: string, mime: string}>}>} params.chatHistory - Previous messages. + * @param {string} params.userPrompt - The latest user prompt text. + * @param {Array<{contentString: string, mime: string}>} params.attachments - Attachments for the latest user prompt. * @returns {Array} The formatted message array for the API call. */ constructPrompt({ @@ -379,9 +292,7 @@ class AWSBedrockLLM { userPrompt = "", attachments = [], }) { - const systemMessageContent = `${systemPrompt}${this.#appendContext( - contextTexts - )}`; + const systemMessageContent = `${systemPrompt}${this.#appendContext(contextTexts)}`; let messages = []; // Handle system prompt (either real or simulated) @@ -396,14 +307,14 @@ class AWSBedrockLLM { content: this.#generateContent({ userPrompt: systemMessageContent, }), - }, // No attachments in simulated system prompt + }, { role: "assistant", content: [{ text: "Okay." }] } ); } } else if (systemMessageContent.trim().length > 0) { messages.push({ role: "system", - content: this.#generateContent({ userPrompt: systemMessageContent }), // No attachments in system prompt + content: this.#generateContent({ userPrompt: systemMessageContent }), }); } @@ -437,18 +348,21 @@ class AWSBedrockLLM { * @private */ #parseReasoningFromResponse({ content = [] }) { - if (!Array.isArray(content)) return ""; + if (!content?.length) return ""; + + // Find the text block and grab the text const textBlock = content.find((block) => block.text !== undefined); let textResponse = textBlock?.text || ""; + + // Find the reasoning block and grab the reasoning text const reasoningBlock = content.find( (block) => block.reasoningContent?.reasoningText?.text ); if (reasoningBlock) { const reasoningText = reasoningBlock.reasoningContent.reasoningText.text.trim(); - if (reasoningText.length > 0) { + if (!!reasoningText?.length) textResponse = `${reasoningText}${textResponse}`; - } } return textResponse; } @@ -457,16 +371,15 @@ class AWSBedrockLLM { * Sends a request for chat completion (non-streaming). * @param {Array | null} messages - Formatted message array from constructPrompt. * @param {object} options - Request options. - * @param {number} [options.temperature] - Sampling temperature. + * @param {number} options.temperature - Sampling temperature. * @returns {Promise} Response object with textResponse and metrics, or null. * @throws {Error} If the API call fails or validation errors occur. */ async getChatCompletion(messages = null, { temperature }) { - if (!Array.isArray(messages) || messages.length === 0) { + if (!messages?.length) throw new Error( "AWSBedrock::getChatCompletion requires a non-empty messages array." ); - } const hasSystem = messages[0]?.role === "system"; const systemBlock = hasSystem ? messages[0].content : undefined; @@ -582,12 +495,11 @@ class AWSBedrockLLM { e.name === "ValidationException" && e.message.includes("maximum tokens") ) { - // Provide specific error context for max token issues throw new Error( `AWSBedrock::streamGetChatCompletion failed during setup. Model ${this.model} rejected maxTokens value of ${maxTokensToSend}. Check model documentation for its maximum output token limit and set AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS if needed. Original error: ${e.message}` ); } - // Re-throw other setup errors + throw new Error( `AWSBedrock::streamGetChatCompletion failed during setup. ${e.message}` ); @@ -600,8 +512,8 @@ class AWSBedrockLLM { * @param {object} response - The HTTP response object to write chunks to. * @param {import('../../helpers/chat/LLMPerformanceMonitor').MonitoredStream} stream - The monitored stream object from streamGetChatCompletion. * @param {object} responseProps - Additional properties for the response chunks. - * @param {string} [responseProps.uuid] - Unique ID for the response. - * @param {Array} [responseProps.sources=[]] - Source documents used (if any). + * @param {string} responseProps.uuid - Unique ID for the response. + * @param {Array} responseProps.sources - Source documents used (if any). * @returns {Promise} A promise that resolves with the complete text response when the stream ends. */ handleStream(response, stream, responseProps) { @@ -790,68 +702,24 @@ class AWSBedrockLLM { }); } } finally { - // Cleanup: Always remove listener and finalize measurement response.removeListener("close", handleAbort); - stream?.endMeasurement(usage); // Log final usage metrics + stream?.endMeasurement(usage); resolve(fullText); // Resolve with the accumulated text } }); } - /** - * Embeds a single text input using the configured embedder. - * @param {string} textInput - The text to embed. - * @returns {Promise} The embedding vector. - * @throws {Error} If the embedder is not configured or fails. - */ + // Simple wrapper for dynamic embedder & normalize interface for all LLM implementations async embedTextInput(textInput) { - if (!this.embedder?.embedTextInput) { - throw new Error( - "Embedder is not configured or does not support embedTextInput." - ); - } - try { - return await this.embedder.embedTextInput(textInput); - } catch (e) { - this.#log(`EmbedTextInput Error: ${e.message}`, e); - throw e; // Re-throw after logging - } + return await this.embedder.embedTextInput(textInput); } - - /** - * Embeds multiple chunks of text using the configured embedder. - * @param {string[]} [textChunks=[]] - An array of text chunks to embed. - * @returns {Promise} An array of embedding vectors. - * @throws {Error} If the embedder is not configured or fails. - */ async embedChunks(textChunks = []) { - if (!this.embedder?.embedChunks) { - throw new Error( - "Embedder is not configured or does not support embedChunks." - ); - } - try { - return await this.embedder.embedChunks(textChunks); - } catch (e) { - this.#log(`EmbedChunks Error: ${e.message}`, e); - throw e; // Re-throw after logging - } + return await this.embedder.embedChunks(textChunks); } - /** - * Compresses chat messages if necessary using the messageArrayCompressor helper. - * @param {object} [promptArgs={}] - Arguments to pass to constructPrompt. - * @param {Array} [rawHistory=[]] - The raw chat history. - * @returns {Promise} The potentially compressed message array and metadata. - */ async compressMessages(promptArgs = {}, rawHistory = []) { const { messageArrayCompressor } = require("../../helpers/chat"); - if (!messageArrayCompressor) { - throw new Error("Message compressor helper not found."); - } - // Construct the message array first const messageArray = this.constructPrompt(promptArgs); - // Then pass it to the compressor return await messageArrayCompressor(this, messageArray, rawHistory); } } diff --git a/server/utils/AiProviders/bedrock/utils.js b/server/utils/AiProviders/bedrock/utils.js new file mode 100644 index 00000000000..f1529fb94ca --- /dev/null +++ b/server/utils/AiProviders/bedrock/utils.js @@ -0,0 +1,68 @@ +/** @typedef {'jpeg' | 'png' | 'gif' | 'webp'} */ +const SUPPORTED_BEDROCK_IMAGE_FORMATS = ["jpeg", "png", "gif", "webp"]; + +/** @type {number} */ +const DEFAULT_MAX_OUTPUT_TOKENS = 4096; + +/** @type {number} */ +const DEFAULT_CONTEXT_WINDOW_TOKENS = 8191; + +/** @type {'iam' | 'sessionToken'} */ +const SUPPORTED_CONNECTION_METHODS = ["iam", "sessionToken"]; + +/** + * Parses a MIME type string (e.g., "image/jpeg") to extract and validate the image format + * supported by Bedrock Converse. Handles 'image/jpg' as 'jpeg'. + * @param {string | null | undefined} mimeType - The MIME type string. + * @returns {string | null} The validated image format (e.g., "jpeg") or null if invalid/unsupported. + */ +function getImageFormatFromMime(mimeType = "") { + if (!mimeType) return null; + const parts = mimeType.toLowerCase().split("/"); + if (parts?.[0] !== "image") return null; + const format = parts?.[1]; + + if (!format) return null; + + // Remap jpg to jpeg + switch (format) { + case "jpg": + format = "jpeg"; + break; + default: + break; + } + + if (!SUPPORTED_BEDROCK_IMAGE_FORMATS.includes(format)) return null; + return format; +} + +/** + * Decodes a pure base64 string (without data URI prefix) into a Uint8Array using the atob method. + * This approach matches the technique previously used by Langchain's implementation. + * @param {string} base64String - The pure base64 encoded data. + * @returns {Uint8Array | null} The resulting byte array or null on decoding error. + */ +function base64ToUint8Array(base64String) { + try { + const binaryString = atob(base64String); + const len = binaryString.length; + const bytes = new Uint8Array(len); + for (let i = 0; i < len; i++) bytes[i] = binaryString.charCodeAt(i); + return bytes; + } catch (e) { + console.error( + `[AWSBedrock] Error decoding base64 string with atob: ${e.message}` + ); + return null; + } +} + +module.exports = { + SUPPORTED_CONNECTION_METHODS, + SUPPORTED_BEDROCK_IMAGE_FORMATS, + DEFAULT_MAX_OUTPUT_TOKENS, + DEFAULT_CONTEXT_WINDOW_TOKENS, + getImageFormatFromMime, + base64ToUint8Array, +}; diff --git a/server/utils/helpers/updateENV.js b/server/utils/helpers/updateENV.js index bffab102609..32dca2d0bbc 100644 --- a/server/utils/helpers/updateENV.js +++ b/server/utils/helpers/updateENV.js @@ -254,6 +254,10 @@ const KEY_MAPPING = { envKey: "AWS_BEDROCK_LLM_MODEL_TOKEN_LIMIT", checks: [nonZero], }, + AwsBedrockLLMMaxOutputTokens: { + envKey: "AWS_BEDROCK_LLM_MAX_OUTPUT_TOKENS", + checks: [nonZero], + }, EmbeddingEngine: { envKey: "EMBEDDING_ENGINE",