diff --git a/Cargo.lock b/Cargo.lock index 6397d7c8..36c3928a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1537,7 +1537,6 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", - "bytes", "configuration", "dc-api", "dc-api-test-helpers", diff --git a/crates/dc-api-types/src/query_response.rs b/crates/dc-api-types/src/query_response.rs index 0c48d215..8b1d9ca6 100644 --- a/crates/dc-api-types/src/query_response.rs +++ b/crates/dc-api-types/src/query_response.rs @@ -42,13 +42,29 @@ pub struct ForEachRow { pub query: RowSet, } +/// A row set must contain either rows, or aggregates, or possibly both #[skip_serializing_none] -#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] -pub struct RowSet { - /// The results of the aggregates returned by the query - pub aggregates: Option>, - /// The rows returned by the query, corresponding to the query's fields - pub rows: Option>>, +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum RowSet { + Aggregate { + /// The results of the aggregates returned by the query + aggregates: HashMap, + /// The rows returned by the query, corresponding to the query's fields + rows: Option>>, + }, + Rows { + /// Rows returned by a query that did not request aggregates. + rows: Vec>, + }, +} + +impl Default for RowSet { + fn default() -> Self { + RowSet::Rows { + rows: Default::default(), + } + } } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] diff --git a/crates/dc-api/src/interface_types/json_response.rs b/crates/dc-api/src/interface_types/json_response.rs deleted file mode 100644 index 9ff90f28..00000000 --- a/crates/dc-api/src/interface_types/json_response.rs +++ /dev/null @@ -1,120 +0,0 @@ -use axum::response::IntoResponse; -use bytes::Bytes; -use http::{header, HeaderValue}; - -/// Represents a response value that will be serialized to JSON. -/// -/// Copied from rust-connector-sdk. -/// -/// The value may be of a type that implements `serde::Serialize`, or it may be -/// a contiguous sequence of bytes, which are _assumed_ to be valid JSON. -#[derive(Debug, Clone)] -pub enum JsonResponse { - /// A value that can be serialized to JSON. - Value(A), - /// A serialized JSON bytestring that is assumed to represent a value of - /// type `A`. This is not guaranteed by the SDK; the connector is - /// responsible for ensuring this. - Serialized(Bytes), -} - -impl From for JsonResponse { - fn from(value: A) -> Self { - Self::Value(value) - } -} - -impl IntoResponse for JsonResponse { - fn into_response(self) -> axum::response::Response { - match self { - Self::Value(value) => axum::Json(value).into_response(), - Self::Serialized(bytes) => ( - [( - header::CONTENT_TYPE, - HeaderValue::from_static(mime::APPLICATION_JSON.as_ref()), - )], - bytes, - ) - .into_response(), - } - } -} - -impl serde::Deserialize<'de>)> JsonResponse { - /// Unwraps the value, deserializing if necessary. - /// - /// This is only intended for testing and compatibility. If it lives on a - /// critical path, we recommend you avoid it. - pub fn into_value(self) -> Result { - match self { - Self::Value(value) => Ok(value), - Self::Serialized(bytes) => serde_json::de::from_slice(&bytes), - } - } -} - -#[cfg(test)] -mod tests { - use axum::{routing, Router}; - use axum_test_helper::TestClient; - use http::StatusCode; - - use super::*; - - #[tokio::test] - async fn serializes_value_to_json() { - let app = Router::new().route( - "/", - routing::get(|| async { - JsonResponse::Value(Person { - name: "Alice Appleton".to_owned(), - age: 42, - }) - }), - ); - - let client = TestClient::new(app); - let response = client.get("/").send().await; - - assert_eq!(response.status(), StatusCode::OK); - - let headers = response.headers(); - assert_eq!( - headers.get_all("Content-Type").iter().collect::>(), - vec!["application/json"] - ); - - let body = response.text().await; - assert_eq!(body, r#"{"name":"Alice Appleton","age":42}"#); - } - - #[tokio::test] - async fn writes_json_string_directly() { - let app = Router::new().route( - "/", - routing::get(|| async { - JsonResponse::Serialized::(r#"{"name":"Bob Davis","age":7}"#.into()) - }), - ); - - let client = TestClient::new(app); - let response = client.get("/").send().await; - - assert_eq!(response.status(), StatusCode::OK); - - let headers = response.headers(); - assert_eq!( - headers.get_all("Content-Type").iter().collect::>(), - vec!["application/json"] - ); - - let body = response.text().await; - assert_eq!(body, r#"{"name":"Bob Davis","age":7}"#); - } - - #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] - struct Person { - name: String, - age: u16, - } -} diff --git a/crates/dc-api/src/interface_types/mod.rs b/crates/dc-api/src/interface_types/mod.rs index 674a6eff..e584429c 100644 --- a/crates/dc-api/src/interface_types/mod.rs +++ b/crates/dc-api/src/interface_types/mod.rs @@ -1,4 +1,3 @@ mod agent_error; -mod json_response; -pub use self::{agent_error::AgentError, json_response::JsonResponse}; +pub use self::agent_error::AgentError; diff --git a/crates/dc-api/src/lib.rs b/crates/dc-api/src/lib.rs index cd0f0fac..6b182571 100644 --- a/crates/dc-api/src/lib.rs +++ b/crates/dc-api/src/lib.rs @@ -1,3 +1,3 @@ mod interface_types; -pub use self::interface_types::{AgentError, JsonResponse}; +pub use self::interface_types::AgentError; diff --git a/crates/mongodb-agent-common/src/query/execute_query_request.rs b/crates/mongodb-agent-common/src/query/execute_query_request.rs index 03f9d13a..62a74fcd 100644 --- a/crates/mongodb-agent-common/src/query/execute_query_request.rs +++ b/crates/mongodb-agent-common/src/query/execute_query_request.rs @@ -1,9 +1,7 @@ use anyhow::anyhow; -use bytes::Bytes; -use dc_api::JsonResponse; use dc_api_types::{QueryRequest, QueryResponse}; use futures_util::TryStreamExt; -use mongodb::bson::{doc, Document}; +use mongodb::bson::{self, doc, Document}; use super::pipeline::{pipeline_for_query_request, ResponseShape}; use crate::{interface_types::MongoAgentError, mongodb::CollectionTrait}; @@ -11,7 +9,7 @@ use crate::{interface_types::MongoAgentError, mongodb::CollectionTrait}; pub async fn execute_query_request( collection: &impl CollectionTrait, query_request: QueryRequest, -) -> Result, MongoAgentError> { +) -> Result { let (pipeline, response_shape) = pipeline_for_query_request(&query_request)?; tracing::debug!(pipeline = %serde_json::to_string(&pipeline).unwrap(), "aggregate pipeline"); @@ -33,9 +31,8 @@ pub async fn execute_query_request( )) })?, }; + tracing::debug!(response_document = %serde_json::to_string(&response_document).unwrap(), "response from MongoDB"); - let bytes: Bytes = serde_json::to_vec(&response_document) - .map_err(MongoAgentError::Serialization)? - .into(); - Ok(JsonResponse::Serialized(bytes)) + let response = bson::from_document(response_document)?; + Ok(response) } diff --git a/crates/mongodb-agent-common/src/query/foreach.rs b/crates/mongodb-agent-common/src/query/foreach.rs index 7febe1c0..2b9bc8aa 100644 --- a/crates/mongodb-agent-common/src/query/foreach.rs +++ b/crates/mongodb-agent-common/src/query/foreach.rs @@ -176,14 +176,14 @@ mod tests { "$facet": { "__FACET___0": [ { "$match": { "$and": [{ "artistId": {"$eq":1 }}]}}, - { "$replaceWith": { + { "$replaceWith": { "albumId": { "$ifNull": ["$albumId", null] }, "title": { "$ifNull": ["$title", null] } } }, ], "__FACET___1": [ { "$match": { "$and": [{ "artistId": {"$eq":2}}]}}, - { "$replaceWith": { + { "$replaceWith": { "albumId": { "$ifNull": ["$albumId", null] }, "title": { "$ifNull": ["$title", null] } } }, @@ -248,9 +248,7 @@ mod tests { })?)])) }); - let result = execute_query_request(&collection, query_request) - .await? - .into_value()?; + let result = execute_query_request(&collection, query_request).await?; assert_eq!(expected_response, result); Ok(()) @@ -364,7 +362,6 @@ mod tests { ] }))?; - let mut collection = MockCollectionTrait::new(); collection .expect_aggregate() @@ -398,9 +395,7 @@ mod tests { })?)])) }); - let result = execute_query_request(&collection, query_request) - .await? - .into_value()?; + let result = execute_query_request(&collection, query_request).await?; assert_eq!(expected_response, result); Ok(()) diff --git a/crates/mongodb-agent-common/src/query/make_selector.rs b/crates/mongodb-agent-common/src/query/make_selector.rs index e84a8fc4..974282c0 100644 --- a/crates/mongodb-agent-common/src/query/make_selector.rs +++ b/crates/mongodb-agent-common/src/query/make_selector.rs @@ -10,7 +10,7 @@ use mongodb_support::BsonScalarType; use crate::{ comparison_function::ComparisonFunction, interface_types::MongoAgentError, - query::serialization::json_to_bson_scalar, query::column_ref::column_ref, + query::column_ref::column_ref, query::serialization::json_to_bson_scalar, }; use BinaryArrayComparisonOperator as ArrOp; @@ -112,7 +112,7 @@ fn make_selector_helper( "comparisons between columns", )), ArrayComparisonValue::Variable(name) => { - Ok(variable_to_mongo_expression(variables, name, value_type)?.into()) + variable_to_mongo_expression(variables, name, value_type) } }) .collect::>()?; @@ -149,11 +149,10 @@ fn variable_to_mongo_expression( variables: Option<&BTreeMap>, variable: &str, value_type: &str, -) -> Result { +) -> Result { let value = variables .and_then(|vars| vars.get(variable)) .ok_or_else(|| MongoAgentError::VariableNotDefined(variable.to_owned()))?; - Ok(doc! { - "$literal": bson_from_scalar_value(value, value_type)? - }) + + bson_from_scalar_value(value, value_type) } diff --git a/crates/mongodb-agent-common/src/query/mod.rs b/crates/mongodb-agent-common/src/query/mod.rs index abbe37ed..c5597604 100644 --- a/crates/mongodb-agent-common/src/query/mod.rs +++ b/crates/mongodb-agent-common/src/query/mod.rs @@ -9,7 +9,6 @@ mod pipeline; mod relations; pub mod serialization; -use dc_api::JsonResponse; use dc_api_types::{QueryRequest, QueryResponse, Target}; use mongodb::bson::Document; @@ -28,7 +27,7 @@ pub fn collection_name(query_request_target: &Target) -> String { pub async fn handle_query_request( config: &MongoConfig, query_request: QueryRequest, -) -> Result, MongoAgentError> { +) -> Result { tracing::debug!(?config, query_request = %serde_json::to_string(&query_request).unwrap(), "executing query"); let database = config.client.database(&config.database); @@ -91,9 +90,7 @@ mod tests { ])) }); - let result = execute_query_request(&collection, query_request) - .await? - .into_value()?; + let result = execute_query_request(&collection, query_request).await?; assert_eq!(expected_response, result); Ok(()) } @@ -170,9 +167,7 @@ mod tests { })?)])) }); - let result = execute_query_request(&collection, query_request) - .await? - .into_value()?; + let result = execute_query_request(&collection, query_request).await?; assert_eq!(expected_response, result); Ok(()) } @@ -255,9 +250,7 @@ mod tests { })?)])) }); - let result = execute_query_request(&collection, query_request) - .await? - .into_value()?; + let result = execute_query_request(&collection, query_request).await?; assert_eq!(expected_response, result); Ok(()) } @@ -317,9 +310,7 @@ mod tests { })?)])) }); - let result = execute_query_request(&collection, query_request) - .await? - .into_value()?; + let result = execute_query_request(&collection, query_request).await?; assert_eq!(expected_response, result); Ok(()) } diff --git a/crates/mongodb-agent-common/src/query/relations.rs b/crates/mongodb-agent-common/src/query/relations.rs index 07e0d62f..9cb11481 100644 --- a/crates/mongodb-agent-common/src/query/relations.rs +++ b/crates/mongodb-agent-common/src/query/relations.rs @@ -333,9 +333,7 @@ mod tests { })])) }); - let result = execute_query_request(&collection, query_request) - .await? - .into_value()?; + let result = execute_query_request(&collection, query_request).await?; assert_eq!(expected_response, result); Ok(()) @@ -433,9 +431,7 @@ mod tests { ])) }); - let result = execute_query_request(&collection, query_request) - .await? - .into_value()?; + let result = execute_query_request(&collection, query_request).await?; assert_eq!(expected_response, result); Ok(()) @@ -533,9 +529,7 @@ mod tests { })])) }); - let result = execute_query_request(&collection, query_request) - .await? - .into_value()?; + let result = execute_query_request(&collection, query_request).await?; assert_eq!(expected_response, result); Ok(()) @@ -707,9 +701,7 @@ mod tests { })])) }); - let result = execute_query_request(&collection, query_request) - .await? - .into_value()?; + let result = execute_query_request(&collection, query_request).await?; assert_eq!(expected_response, result); Ok(()) @@ -815,9 +807,7 @@ mod tests { })])) }); - let result = execute_query_request(&collection, query_request) - .await? - .into_value()?; + let result = execute_query_request(&collection, query_request).await?; assert_eq!(expected_response, result); Ok(()) @@ -951,9 +941,7 @@ mod tests { })])) }); - let result = execute_query_request(&collection, query_request) - .await? - .into_value()?; + let result = execute_query_request(&collection, query_request).await?; assert_eq!(expected_response, result); Ok(()) @@ -1098,9 +1086,7 @@ mod tests { })])) }); - let result = execute_query_request(&collection, query_request) - .await? - .into_value()?; + let result = execute_query_request(&collection, query_request).await?; assert_eq!(expected_response, result); Ok(()) diff --git a/crates/mongodb-connector/Cargo.toml b/crates/mongodb-connector/Cargo.toml index fa8333f7..e89e8392 100644 --- a/crates/mongodb-connector/Cargo.toml +++ b/crates/mongodb-connector/Cargo.toml @@ -6,7 +6,6 @@ edition = "2021" [dependencies] anyhow = "1" async-trait = "^0.1" -bytes = "^1" configuration = { path = "../configuration" } dc-api = { path = "../dc-api" } dc-api-types = { path = "../dc-api-types" } diff --git a/crates/mongodb-connector/src/api_type_conversions/json_response.rs b/crates/mongodb-connector/src/api_type_conversions/json_response.rs deleted file mode 100644 index ca2c6d25..00000000 --- a/crates/mongodb-connector/src/api_type_conversions/json_response.rs +++ /dev/null @@ -1,19 +0,0 @@ -use ndc_sdk::json_response as ndc_sdk; - -/// Transform a [`dc_api::JsonResponse`] to a [`ndc_sdk::JsonResponse`] value **assuming -/// pre-serialized bytes do not need to be transformed**. The given mapping function will be used -/// to transform values that have not already been serialized, but serialized bytes will be -/// re-wrapped without modification. -#[allow(dead_code)] // TODO: MVC-7 -pub fn map_unserialized( - input: dc_api::JsonResponse, - mapping: Fn, -) -> ndc_sdk::JsonResponse -where - Fn: FnOnce(A) -> B, -{ - match input { - dc_api::JsonResponse::Value(value) => ndc_sdk::JsonResponse::Value(mapping(value)), - dc_api::JsonResponse::Serialized(bytes) => ndc_sdk::JsonResponse::Serialized(bytes), - } -} diff --git a/crates/mongodb-connector/src/api_type_conversions/mod.rs b/crates/mongodb-connector/src/api_type_conversions/mod.rs index deb1d029..d9ab3a60 100644 --- a/crates/mongodb-connector/src/api_type_conversions/mod.rs +++ b/crates/mongodb-connector/src/api_type_conversions/mod.rs @@ -1,7 +1,6 @@ mod capabilities; mod conversion_error; mod helpers; -mod json_response; mod query_request; mod query_response; mod query_traversal; @@ -10,7 +9,6 @@ mod query_traversal; pub use self::{ capabilities::v2_to_v3_scalar_type_capabilities, conversion_error::ConversionError, - json_response::map_unserialized, query_request::{v3_to_v2_query_request, QueryContext}, query_response::{v2_to_v3_explain_response, v2_to_v3_query_response}, }; diff --git a/crates/mongodb-connector/src/api_type_conversions/query_response.rs b/crates/mongodb-connector/src/api_type_conversions/query_response.rs index ef66142e..f1cc2791 100644 --- a/crates/mongodb-connector/src/api_type_conversions/query_response.rs +++ b/crates/mongodb-connector/src/api_type_conversions/query_response.rs @@ -15,9 +15,14 @@ pub fn v2_to_v3_query_response(response: v2::QueryResponse) -> v3::QueryResponse } fn v2_to_v3_row_set(row_set: v2::RowSet) -> v3::RowSet { + let (aggregates, rows) = match row_set { + v2::RowSet::Aggregate { aggregates, rows } => (Some(aggregates), rows), + v2::RowSet::Rows { rows } => (None, Some(rows)), + }; + v3::RowSet { - aggregates: row_set.aggregates.map(hash_map_to_index_map), - rows: row_set.rows.map(|xs| { + aggregates: aggregates.map(hash_map_to_index_map), + rows: rows.map(|xs| { xs.into_iter() .map(|field_values| { field_values diff --git a/crates/mongodb-connector/src/mongo_connector.rs b/crates/mongodb-connector/src/mongo_connector.rs index bb19504a..e330095b 100644 --- a/crates/mongodb-connector/src/mongo_connector.rs +++ b/crates/mongodb-connector/src/mongo_connector.rs @@ -2,7 +2,6 @@ use std::path::Path; use anyhow::anyhow; use async_trait::async_trait; -use bytes::Bytes; use configuration::Configuration; use mongodb_agent_common::{ explain::explain_query, health::check_health, interface_types::MongoConfig, @@ -158,22 +157,11 @@ impl Connector for MongoConnector { }, request, )?; - let response_json = handle_query_request(state, v2_request) + let response = handle_query_request(state, v2_request) .await .map_err(mongo_agent_error_to_query_error)?; - - match response_json { - dc_api::JsonResponse::Value(v2_response) => { - Ok(JsonResponse::Value(v2_to_v3_query_response(v2_response))) - } - dc_api::JsonResponse::Serialized(bytes) => { - let v2_value: serde_json::Value = serde_json::de::from_slice(&bytes) - .map_err(|e| QueryError::Other(Box::new(e)))?; - let v3_bytes: Bytes = serde_json::to_vec(&vec![v2_value]) - .map_err(|e| QueryError::Other(Box::new(e)))? - .into(); - Ok(JsonResponse::Serialized(v3_bytes)) - } - } + let r = v2_to_v3_query_response(response); + tracing::warn!(v3_response = %serde_json::to_string(&r).unwrap()); + Ok(r.into()) } }