diff --git a/docker/.env.example b/docker/.env.example index f0fe46d1365..421d05368c5 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -195,6 +195,7 @@ GID='1000' # EMBEDDING_BASE_PATH='http://127.0.0.1:4000' # GENERIC_OPEN_AI_EMBEDDING_API_KEY='sk-123abc' # GENERIC_OPEN_AI_EMBEDDING_MAX_CONCURRENT_CHUNKS=500 +# GENERIC_OPEN_AI_EMBEDDING_API_DELAY_MS=1000 # EMBEDDING_ENGINE='gemini' # GEMINI_EMBEDDING_API_KEY= diff --git a/server/.env.example b/server/.env.example index e1f5ebfdd94..c60319ab6ab 100644 --- a/server/.env.example +++ b/server/.env.example @@ -194,6 +194,7 @@ SIG_SALT='salt' # Please generate random string at least 32 chars long. # EMBEDDING_BASE_PATH='http://127.0.0.1:4000' # GENERIC_OPEN_AI_EMBEDDING_API_KEY='sk-123abc' # GENERIC_OPEN_AI_EMBEDDING_MAX_CONCURRENT_CHUNKS=500 +# GENERIC_OPEN_AI_EMBEDDING_API_DELAY_MS=1000 # EMBEDDING_ENGINE='gemini' # GEMINI_EMBEDDING_API_KEY= diff --git a/server/utils/EmbeddingEngines/genericOpenAi/index.js b/server/utils/EmbeddingEngines/genericOpenAi/index.js index e88538f4b45..a8a3ac1a584 100644 --- a/server/utils/EmbeddingEngines/genericOpenAi/index.js +++ b/server/utils/EmbeddingEngines/genericOpenAi/index.js @@ -28,6 +28,35 @@ class GenericOpenAiEmbedder { console.log(`\x1b[36m[GenericOpenAiEmbedder]\x1b[0m ${text}`, ...args); } + /** + * returns the `GENERIC_OPEN_AI_EMBEDDING_API_DELAY_MS` env variable as a number or null if the env variable is not set or is not a number. + * The minimum delay is 500ms. + * + * For some implementation this is necessary to avoid 429 errors due to rate limiting or + * hardware limitations where a single-threaded process is not able to handle the requests fast enough. + * @returns {number} + */ + get apiRequestDelay() { + if (!("GENERIC_OPEN_AI_EMBEDDING_API_DELAY_MS" in process.env)) return null; + if (isNaN(Number(process.env.GENERIC_OPEN_AI_EMBEDDING_API_DELAY_MS))) + return null; + const delayTimeout = Number( + process.env.GENERIC_OPEN_AI_EMBEDDING_API_DELAY_MS + ); + if (delayTimeout < 500) return 500; // minimum delay of 500ms + return delayTimeout; + } + + /** + * runs the delay if it is set and valid. + * @returns {Promise} + */ + async runDelay() { + if (!this.apiRequestDelay) return; + this.log(`Delaying new batch request for ${this.apiRequestDelay}ms`); + await new Promise((resolve) => setTimeout(resolve, this.apiRequestDelay)); + } + /** * returns the `GENERIC_OPEN_AI_EMBEDDING_MAX_CONCURRENT_CHUNKS` env variable as a number * or 500 if the env variable is not set or is not a number. @@ -52,62 +81,38 @@ class GenericOpenAiEmbedder { async embedChunks(textChunks = []) { // Because there is a hard POST limit on how many chunks can be sent at once to OpenAI (~8mb) - // we concurrently execute each max batch of text chunks possible. + // we sequentially execute each max batch of text chunks possible. // Refer to constructor maxConcurrentChunks for more info. - const embeddingRequests = []; + const allResults = []; for (const chunk of toChunks(textChunks, this.maxConcurrentChunks)) { - embeddingRequests.push( - new Promise((resolve) => { - this.openai.embeddings - .create({ - model: this.model, - input: chunk, - }) - .then((result) => { - resolve({ data: result?.data, error: null }); - }) - .catch((e) => { - e.type = - e?.response?.data?.error?.code || - e?.response?.status || - "failed_to_embed"; - e.message = e?.response?.data?.error?.message || e.message; - resolve({ data: [], error: e }); - }); - }) - ); - } + const { data = [], error = null } = await new Promise((resolve) => { + this.openai.embeddings + .create({ + model: this.model, + input: chunk, + }) + .then((result) => resolve({ data: result?.data, error: null })) + .catch((e) => { + e.type = + e?.response?.data?.error?.code || + e?.response?.status || + "failed_to_embed"; + e.message = e?.response?.data?.error?.message || e.message; + resolve({ data: [], error: e }); + }); + }); - const { data = [], error = null } = await Promise.all( - embeddingRequests - ).then((results) => { // If any errors were returned from OpenAI abort the entire sequence because the embeddings // will be incomplete. - const errors = results - .filter((res) => !!res.error) - .map((res) => res.error) - .flat(); - if (errors.length > 0) { - let uniqueErrors = new Set(); - errors.map((error) => - uniqueErrors.add(`[${error.type}]: ${error.message}`) - ); - - return { - data: [], - error: Array.from(uniqueErrors).join(", "), - }; - } - return { - data: results.map((res) => res?.data || []).flat(), - error: null, - }; - }); + if (error) + throw new Error(`GenericOpenAI Failed to embed: ${error.message}`); + allResults.push(...(data || [])); + if (this.apiRequestDelay) await this.runDelay(); + } - if (!!error) throw new Error(`GenericOpenAI Failed to embed: ${error}`); - return data.length > 0 && - data.every((embd) => embd.hasOwnProperty("embedding")) - ? data.map((embd) => embd.embedding) + return allResults.length > 0 && + allResults.every((embd) => embd.hasOwnProperty("embedding")) + ? allResults.map((embd) => embd.embedding) : null; } }