From dff2419cd215987c9ae0b0be4c9f0c7fd72f40f9 Mon Sep 17 00:00:00 2001 From: alisalim17 Date: Wed, 21 Feb 2024 21:32:04 +0400 Subject: [PATCH 1/3] feat: add timeout to streaming callback handler --- libs/superagent/app/utils/callbacks.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/libs/superagent/app/utils/callbacks.py b/libs/superagent/app/utils/callbacks.py index dcd498c50..d462f6f8c 100644 --- a/libs/superagent/app/utils/callbacks.py +++ b/libs/superagent/app/utils/callbacks.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import logging from typing import Any, AsyncIterator, List, Literal, Tuple, Union, cast from decouple import config @@ -9,6 +10,8 @@ from langfuse import Langfuse from litellm import cost_per_token, token_counter +logger = logging.getLogger(__name__) + class CustomAsyncIteratorCallbackHandler(AsyncCallbackHandler): """Callback handler that returns an async iterator.""" @@ -17,6 +20,8 @@ class CustomAsyncIteratorCallbackHandler(AsyncCallbackHandler): done: asyncio.Event + TIMEOUT_SECONDS = 30 + @property def always_verbose(self) -> bool: return True @@ -65,7 +70,13 @@ async def aiter(self) -> AsyncIterator[str]: asyncio.ensure_future(self.done.wait()), ], return_when=asyncio.FIRST_COMPLETED, + timeout=self.TIMEOUT_SECONDS, ) + # if we the timeout has been reached + if not done or not other: + logger.warning(f"{self.TIMEOUT_SECONDS} seconds of timeout reached") + self.done.set() + break # Cancel the other task if other: From 463d46acad6c54a602212bb99fa30802aea28a93 Mon Sep 17 00:00:00 2001 From: alisalim17 Date: Wed, 21 Feb 2024 21:32:27 +0400 Subject: [PATCH 2/3] feat: catch errors in workflow.arun func --- libs/superagent/app/api/workflows.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/libs/superagent/app/api/workflows.py b/libs/superagent/app/api/workflows.py index 5f8fbd1b0..a46f324ca 100644 --- a/libs/superagent/app/api/workflows.py +++ b/libs/superagent/app/api/workflows.py @@ -263,8 +263,11 @@ async def send_message() -> AsyncIterable[str]: yield f"id: {workflow_step['agent_name']}\ndata: {token}\n\n" await task - workflow_result = task.result() + exception = task.exception() + if exception: + raise exception + workflow_result = task.result() for index, workflow_step in enumerate(workflow_steps): workflow_step_result = workflow_result.get("steps")[index] @@ -285,11 +288,7 @@ async def send_message() -> AsyncIterable[str]: ) except Exception as error: - yield ( - f"id: {workflow_step['agent_name']}\n" - f"event: error\n" - f"data: {error}\n\n" - ) + yield (f"event: error\n" f"data: {error}\n\n") if SEGMENT_WRITE_KEY: for workflow_step in workflow_data.steps: From a0d7d9bb8a6be391b3d21439df74f8685ff62cf2 Mon Sep 17 00:00:00 2001 From: alisalim17 Date: Wed, 21 Feb 2024 21:35:32 +0400 Subject: [PATCH 3/3] feat: show errors in ui --- libs/ui/app/workflows/[id]/chat.tsx | 30 +++++++++++++++++++++-------- libs/ui/components/message.tsx | 2 +- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/libs/ui/app/workflows/[id]/chat.tsx b/libs/ui/app/workflows/[id]/chat.tsx index 3baa6e679..bc93cb9be 100644 --- a/libs/ui/app/workflows/[id]/chat.tsx +++ b/libs/ui/app/workflows/[id]/chat.tsx @@ -46,7 +46,12 @@ export default function Chat({ ) const [isLoading, setIsLoading] = React.useState(false) const [messages, setMessages] = React.useState< - { type: string; message: string; steps?: Record }[] + { + type: string + message: string + steps?: Record + isSuccess?: boolean + }[] >( workflowSteps[0]?.agent?.initialMessage ? [{ type: "ai", message: workflowSteps[0].agent.initialMessage }] @@ -147,13 +152,21 @@ export default function Chat({ type: "function_call", }, ]) - } + } else if (event.event === "error") { + setMessages((previousMessages) => { + let updatedMessages = [...previousMessages] - if ( - event.data !== "[END]" && - event.event !== "function_call" && - currentEventId - ) { + for (let i = updatedMessages.length - 1; i >= 0; i--) { + if (updatedMessages[i].type === "ai") { + updatedMessages[i].message = event.data + updatedMessages[i].isSuccess = false + break + } + } + + return updatedMessages + }) + } else if (event.data !== "[END]" && currentEventId) { if (!messageByEventIds[currentEventId]) messageByEventIds[currentEventId] = "" @@ -232,13 +245,14 @@ export default function Chat({
- {messages.map(({ type, message, steps }, index) => ( + {messages.map(({ type, message, steps, isSuccess }, index) => ( ))}
diff --git a/libs/ui/components/message.tsx b/libs/ui/components/message.tsx index 1e1267b2a..97563dfa6 100644 --- a/libs/ui/components/message.tsx +++ b/libs/ui/components/message.tsx @@ -233,7 +233,7 @@ function MessageAlert({ error }: MessageAlertProps) { Error - {error}. + {error} )