-
Notifications
You must be signed in to change notification settings - Fork 65
Add OpenSearch shard related logging #1145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,26 @@ | ||
from typing import Optional | ||
import logging | ||
from typing import Optional, Any | ||
|
||
from opensearchpy import OpenSearch | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here's the unconditional import |
||
|
||
from sycamore import Context | ||
from sycamore.context import context_params | ||
from sycamore.transforms import Embedder | ||
|
||
|
||
logger = logging.getLogger("opensearch") | ||
|
||
|
||
class OpenSearchClientWithLogging(OpenSearch): | ||
def search(self, **kwargs) -> Any: | ||
"""Helper method to execute OpenSearch search queries, and silent errors.""" | ||
response = super().search(**kwargs) | ||
shards = response.get("_shards", {}) | ||
if shards.get("total") != shards.get("successful"): | ||
logger.error(f"OpenSearch query skipped shards: {response}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might be helpful to fish out "query" from kwargs (if it's present) and log it along with the response? Do you think we need to log the entire response payload? It could be big. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll leave this in for now while we're debugging so we know which results actually got back. If it happens frequently enough and we handle it we can clean |
||
return response | ||
|
||
|
||
@context_params("opensearch") | ||
def get_knn_query( | ||
text_embedder: Embedder, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
import structlog | ||
import yaml | ||
from rich.console import Console | ||
|
||
from sycamore.schema import Schema | ||
|
||
import sycamore | ||
|
@@ -131,7 +132,7 @@ def __init__( | |
llm: Optional[Union[LLM, str]] = None, | ||
query_plan_strategy: Optional[QueryPlanStrategy] = None, | ||
): | ||
from opensearchpy import OpenSearch | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also here |
||
from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging | ||
|
||
self.llm_cache_dir = llm_cache_dir | ||
self.os_config = os_config | ||
|
@@ -155,7 +156,7 @@ def __init__( | |
|
||
assert self.context.params, "Could not find required params in Context" | ||
self.os_client_args = self.context.params.get("opensearch", {}).get("os_client_args", os_client_args) | ||
self._os_client = OpenSearch(**self.os_client_args) | ||
self._os_client = OpenSearchClientWithLogging(**self.os_client_args) | ||
self._os_query_executor = OpenSearchQueryExecutor(self.os_client_args) | ||
|
||
def get_opensearch_indices(self) -> List[str]: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
from abc import abstractmethod, ABC | ||
from typing import Any | ||
|
||
from sycamore.utils.import_utils import requires_modules | ||
|
||
from sycamore.data import OpenSearchQueryResult, Element, OpenSearchQuery | ||
|
@@ -26,10 +27,10 @@ def __init__(self, os_client_args: dict) -> None: | |
|
||
@requires_modules("opensearchpy", extra="opensearch") | ||
def query(self, query: OpenSearchQuery) -> OpenSearchQueryResult: | ||
from opensearchpy import OpenSearch | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. prob need to keep the import sycamore.connectors.opensearch.utils here instead of at the top |
||
from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging | ||
|
||
logger.debug("Executing OS query: " + str(query)) | ||
client = OpenSearch(**self._os_client_args) | ||
client = OpenSearchClientWithLogging(**self._os_client_args) | ||
|
||
os_result = client.transport.perform_request( | ||
"POST", | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also here