From 9dc99d04b1e11de59d20bf714600496148dde9e2 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Mon, 22 Apr 2024 13:39:44 -0700 Subject: [PATCH 01/25] first step in better query response serialization --- .../src/query/execute_query_request.rs | 60 ++++---------- .../mongodb-agent-common/src/query/foreach.rs | 9 +-- crates/mongodb-agent-common/src/query/mod.rs | 11 +-- .../src/api_type_conversions/mod.rs | 2 +- .../api_type_conversions/query_response.rs | 44 ---------- crates/mongodb-connector/src/main.rs | 1 + .../mongodb-connector/src/mongo_connector.rs | 12 +-- .../mongodb-connector/src/query_response.rs | 81 +++++++++++++++++++ 8 files changed, 112 insertions(+), 108 deletions(-) create mode 100644 crates/mongodb-connector/src/query_response.rs diff --git a/crates/mongodb-agent-common/src/query/execute_query_request.rs b/crates/mongodb-agent-common/src/query/execute_query_request.rs index b49cb58d..6559be0a 100644 --- a/crates/mongodb-agent-common/src/query/execute_query_request.rs +++ b/crates/mongodb-agent-common/src/query/execute_query_request.rs @@ -1,16 +1,14 @@ -use anyhow::anyhow; use configuration::Configuration; -use dc_api_types::{QueryRequest, QueryResponse, RowSet}; +use dc_api_types::QueryRequest; use futures::Stream; use futures_util::TryStreamExt; -use itertools::Itertools as _; -use mongodb::bson::{self, Document}; +use mongodb::bson; -use super::pipeline::{pipeline_for_query_request, ResponseShape}; +use super::pipeline::pipeline_for_query_request; use crate::{ interface_types::MongoAgentError, mongodb::{CollectionTrait as _, DatabaseTrait}, - query::{foreach::foreach_variants, QueryTarget}, + query::QueryTarget, }; /// Execute a query request against the given collection. @@ -21,7 +19,7 @@ pub async fn execute_query_request( database: impl DatabaseTrait, config: &Configuration, query_request: QueryRequest, -) -> Result { +) -> Result, MongoAgentError> { let target = QueryTarget::for_request(config, &query_request); let (pipeline, response_shape) = pipeline_for_query_request(config, &query_request)?; tracing::debug!( @@ -39,55 +37,25 @@ pub async fn execute_query_request( let collection = database.collection(&collection_name); collect_from_cursor(collection.aggregate(pipeline, None).await?).await } - QueryTarget::NativeQuery { native_query, .. } => { - match &native_query.input_collection { - Some(collection_name) => { - let collection = database.collection(collection_name); - collect_from_cursor(collection.aggregate(pipeline, None).await?).await - }, - None => collect_from_cursor(database.aggregate(pipeline, None).await?).await + QueryTarget::NativeQuery { native_query, .. } => match &native_query.input_collection { + Some(collection_name) => { + let collection = database.collection(collection_name); + collect_from_cursor(collection.aggregate(pipeline, None).await?).await } - } + None => collect_from_cursor(database.aggregate(pipeline, None).await?).await, + }, }?; - tracing::debug!(response_documents = %serde_json::to_string(&documents).unwrap(), "response from MongoDB"); - let response = match (foreach_variants(&query_request), response_shape) { - (Some(_), _) => parse_single_document(documents)?, - (None, ResponseShape::ListOfRows) => QueryResponse::Single(RowSet::Rows { - rows: documents - .into_iter() - .map(bson::from_document) - .try_collect()?, - }), - (None, ResponseShape::SingleObject) => { - QueryResponse::Single(parse_single_document(documents)?) - } - }; - tracing::debug!(response = %serde_json::to_string(&response).unwrap(), "query response"); - - Ok(response) + Ok(documents) } async fn collect_from_cursor( - document_cursor: impl Stream>, -) -> Result, MongoAgentError> { + document_cursor: impl Stream>, +) -> Result, MongoAgentError> { document_cursor .into_stream() .map_err(MongoAgentError::MongoDB) .try_collect::>() .await } - -fn parse_single_document(documents: Vec) -> Result -where - T: for<'de> serde::Deserialize<'de>, -{ - let document = documents.into_iter().next().ok_or_else(|| { - MongoAgentError::AdHoc(anyhow!( - "Expected a response document from MongoDB, but did not get one" - )) - })?; - let value = bson::from_document(document)?; - Ok(value) -} diff --git a/crates/mongodb-agent-common/src/query/foreach.rs b/crates/mongodb-agent-common/src/query/foreach.rs index d347537e..9f5921eb 100644 --- a/crates/mongodb-agent-common/src/query/foreach.rs +++ b/crates/mongodb-agent-common/src/query/foreach.rs @@ -83,12 +83,9 @@ pub fn pipeline_for_foreach( .collect::>()?; let selection = Selection(doc! { - "rows": pipelines_with_response_shapes.iter().map(|(key, (_, response_shape))| doc! { - "query": match response_shape { - ResponseShape::ListOfRows => doc! { "rows": format!("${key}") }.into(), - ResponseShape::SingleObject => Bson::String(format!("${key}")), - } - }).collect::>() + "row_sets": pipelines_with_response_shapes.iter().map(|(key, (_, response_shape))| + Bson::String(format!("${key}")), + ).collect::>() }); let queries = pipelines_with_response_shapes diff --git a/crates/mongodb-agent-common/src/query/mod.rs b/crates/mongodb-agent-common/src/query/mod.rs index 08498435..d9c551ae 100644 --- a/crates/mongodb-agent-common/src/query/mod.rs +++ b/crates/mongodb-agent-common/src/query/mod.rs @@ -12,7 +12,8 @@ mod relations; pub mod serialization; use configuration::Configuration; -use dc_api_types::{QueryRequest, QueryResponse}; +use dc_api_types::QueryRequest; +use mongodb::bson; use self::execute_query_request::execute_query_request; pub use self::{ @@ -27,7 +28,7 @@ pub async fn handle_query_request( config: &Configuration, state: &ConnectorState, query_request: QueryRequest, -) -> Result { +) -> Result, MongoAgentError> { let database = state.database(); // This function delegates to another function which gives is a point to inject a mock database // implementation for testing. @@ -37,7 +38,7 @@ pub async fn handle_query_request( #[cfg(test)] mod tests { use dc_api_types::{QueryRequest, QueryResponse, RowSet}; - use mongodb::bson::{self, bson}; + use mongodb::bson::{self, bson, doc}; use pretty_assertions::assert_eq; use serde_json::{from_value, json}; @@ -64,12 +65,12 @@ mod tests { "relationships": [], }))?; - let expected_response: QueryResponse = from_value(json!({ + let expected_response = doc! { "rows": [ { "student_gpa": 3.1 }, { "student_gpa": 3.6 }, ], - }))?; + }; let expected_pipeline = bson!([ { "$match": { "gpa": { "$lt": 4.0 } } }, diff --git a/crates/mongodb-connector/src/api_type_conversions/mod.rs b/crates/mongodb-connector/src/api_type_conversions/mod.rs index 4b77162e..87386b60 100644 --- a/crates/mongodb-connector/src/api_type_conversions/mod.rs +++ b/crates/mongodb-connector/src/api_type_conversions/mod.rs @@ -8,5 +8,5 @@ mod query_traversal; pub use self::{ conversion_error::ConversionError, query_request::{v3_to_v2_query_request, QueryContext}, - query_response::{v2_to_v3_explain_response, v2_to_v3_query_response}, + query_response::v2_to_v3_explain_response, }; diff --git a/crates/mongodb-connector/src/api_type_conversions/query_response.rs b/crates/mongodb-connector/src/api_type_conversions/query_response.rs index f1cc2791..1985f8c9 100644 --- a/crates/mongodb-connector/src/api_type_conversions/query_response.rs +++ b/crates/mongodb-connector/src/api_type_conversions/query_response.rs @@ -3,50 +3,6 @@ use std::collections::BTreeMap; use dc_api_types::{self as v2}; use ndc_sdk::models::{self as v3}; -pub fn v2_to_v3_query_response(response: v2::QueryResponse) -> v3::QueryResponse { - let rows: Vec = match response { - v2::QueryResponse::ForEach { rows } => rows - .into_iter() - .map(|foreach| v2_to_v3_row_set(foreach.query)) - .collect(), - v2::QueryResponse::Single(row_set) => vec![v2_to_v3_row_set(row_set)], - }; - v3::QueryResponse(rows) -} - -fn v2_to_v3_row_set(row_set: v2::RowSet) -> v3::RowSet { - let (aggregates, rows) = match row_set { - v2::RowSet::Aggregate { aggregates, rows } => (Some(aggregates), rows), - v2::RowSet::Rows { rows } => (None, Some(rows)), - }; - - v3::RowSet { - aggregates: aggregates.map(hash_map_to_index_map), - rows: rows.map(|xs| { - xs.into_iter() - .map(|field_values| { - field_values - .into_iter() - .map(|(name, value)| (name, v2_to_v3_field_value(value))) - .collect() - }) - .collect() - }), - } -} - -fn v2_to_v3_field_value(field_value: v2::ResponseFieldValue) -> v3::RowFieldValue { - v3::RowFieldValue(serde_json::to_value(field_value).expect("serializing result field value")) -} - -fn hash_map_to_index_map(xs: InputMap) -> OutputMap -where - InputMap: IntoIterator, - OutputMap: FromIterator<(K, V)>, -{ - xs.into_iter().collect::() -} - pub fn v2_to_v3_explain_response(response: v2::ExplainResponse) -> v3::ExplainResponse { v3::ExplainResponse { details: BTreeMap::from_iter([ diff --git a/crates/mongodb-connector/src/main.rs b/crates/mongodb-connector/src/main.rs index 00071bc7..3f8b36fd 100644 --- a/crates/mongodb-connector/src/main.rs +++ b/crates/mongodb-connector/src/main.rs @@ -4,6 +4,7 @@ mod error_mapping; mod mongo_connector; mod mutation; mod query_context; +mod query_response; mod schema; use std::error::Error; diff --git a/crates/mongodb-connector/src/mongo_connector.rs b/crates/mongodb-connector/src/mongo_connector.rs index 8705c132..3819e777 100644 --- a/crates/mongodb-connector/src/mongo_connector.rs +++ b/crates/mongodb-connector/src/mongo_connector.rs @@ -21,11 +21,10 @@ use ndc_sdk::{ use tracing::instrument; use crate::{ - api_type_conversions::{ - v2_to_v3_explain_response, v2_to_v3_query_response, v3_to_v2_query_request, - }, + api_type_conversions::{v2_to_v3_explain_response, v3_to_v2_query_request}, error_mapping::{mongo_agent_error_to_explain_error, mongo_agent_error_to_query_error}, query_context::get_query_context, + query_response::serialize_query_response, }; use crate::{capabilities::mongo_capabilities_response, mutation::handle_mutation_request}; @@ -143,10 +142,11 @@ impl Connector for MongoConnector { request: QueryRequest, ) -> Result, QueryError> { tracing::debug!(query_request = %serde_json::to_string(&request).unwrap(), "received query request"); - let v2_request = v3_to_v2_query_request(&get_query_context(configuration), request)?; - let response = handle_query_request(configuration, state, v2_request) + let v2_request = + v3_to_v2_query_request(&get_query_context(configuration), request.clone())?; + let response_documents = handle_query_request(configuration, state, v2_request) .await .map_err(mongo_agent_error_to_query_error)?; - Ok(v2_to_v3_query_response(response).into()) + Ok(serialize_query_response(configuration, &request, response_documents)?.into()) } } diff --git a/crates/mongodb-connector/src/query_response.rs b/crates/mongodb-connector/src/query_response.rs new file mode 100644 index 00000000..6b31e2e7 --- /dev/null +++ b/crates/mongodb-connector/src/query_response.rs @@ -0,0 +1,81 @@ +use anyhow::anyhow; +use configuration::Configuration; +use itertools::Itertools as _; +use mongodb::bson; +use ndc_sdk::{ + connector::QueryError, + models::{QueryRequest, QueryResponse, RowSet}, +}; +use serde::Deserialize; + +// These structs describe possible shapes of data returned by MongoDB query plans + +#[derive(Debug, Deserialize)] +struct ResponsesForVariableSets { + row_sets: Vec>, +} + +// #[derive(Debug, Deserialize)] +// struct ResponseForAVariableSet { +// query: BsonRowSet, +// } +// +// #[derive(Debug, Deserialize)] +// struct SingleResponse { +// query: BsonRowSet, +// } + +// #[derive(Debug, Deserialize)] +// struct BsonRowSet { +// rows: Vec, +// } + +pub fn serialize_query_response( + config: &Configuration, + query_request: &QueryRequest, + response_documents: Vec, +) -> Result { + tracing::debug!(response_documents = %serde_json::to_string(&response_documents).unwrap(), "response from MongoDB"); + // If the query request specified variable sets then we should have gotten a single document + // from MongoDB with fields for multiple sets of results - one for each set of variables. + let row_sets = if query_request.variables.is_some() { + let responses: ResponsesForVariableSets = parse_single_document(response_documents)?; + responses + .row_sets + .into_iter() + .map(|docs| serialize_row_set(docs)) + .try_collect() + } else { + // TODO: in an aggregation response we expect one document instead of a list of documents + Ok(vec![serialize_row_set(response_documents)?]) + }?; + let response = QueryResponse(row_sets); + tracing::debug!(query_response = %serde_json::to_string(&response).unwrap()); + Ok(response) +} + +fn serialize_row_set(docs: Vec) -> Result { + let rows = docs + .into_iter() + .map(|doc| bson::from_document(doc)) + .try_collect() + .map_err(|err| QueryError::Other(err.into()))?; + Ok(RowSet { + aggregates: None, + rows: Some(rows), + }) +} + +fn parse_single_document(documents: Vec) -> Result +where + T: for<'de> serde::Deserialize<'de>, +{ + let document = documents.into_iter().next().ok_or_else(|| { + QueryError::Other( + (anyhow!("expected a single response document from MongoDB, but did not get one")) + .into(), + ) + })?; + let value = bson::from_document(document).map_err(|err| QueryError::Other(err.into()))?; + Ok(value) +} From ce0d66ad8f398382f65428b9b9f6154a0691305c Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Mon, 22 Apr 2024 13:58:23 -0700 Subject: [PATCH 02/25] distinguish between aggregate and non-aggregate responses --- .../mongodb-connector/src/query_response.rs | 87 +++++++++++++------ 1 file changed, 61 insertions(+), 26 deletions(-) diff --git a/crates/mongodb-connector/src/query_response.rs b/crates/mongodb-connector/src/query_response.rs index 6b31e2e7..6de29db3 100644 --- a/crates/mongodb-connector/src/query_response.rs +++ b/crates/mongodb-connector/src/query_response.rs @@ -1,10 +1,13 @@ +use std::collections::BTreeMap; + use anyhow::anyhow; use configuration::Configuration; +use indexmap::IndexMap; use itertools::Itertools as _; -use mongodb::bson; +use mongodb::bson::{self, from_bson, Bson}; use ndc_sdk::{ connector::QueryError, - models::{QueryRequest, QueryResponse, RowSet}, + models::{Query, QueryRequest, QueryResponse, RowFieldValue, RowSet}, }; use serde::Deserialize; @@ -15,20 +18,13 @@ struct ResponsesForVariableSets { row_sets: Vec>, } -// #[derive(Debug, Deserialize)] -// struct ResponseForAVariableSet { -// query: BsonRowSet, -// } -// -// #[derive(Debug, Deserialize)] -// struct SingleResponse { -// query: BsonRowSet, -// } - -// #[derive(Debug, Deserialize)] -// struct BsonRowSet { -// rows: Vec, -// } +#[derive(Debug, Deserialize)] +struct BsonRowSet { + #[serde(default)] + aggregates: BTreeMap, + #[serde(default)] + rows: Vec, +} pub fn serialize_query_response( config: &Configuration, @@ -43,27 +39,66 @@ pub fn serialize_query_response( responses .row_sets .into_iter() - .map(|docs| serialize_row_set(docs)) + .map(|docs| serialize_row_set(&query_request.query, docs)) .try_collect() } else { // TODO: in an aggregation response we expect one document instead of a list of documents - Ok(vec![serialize_row_set(response_documents)?]) + Ok(vec![serialize_row_set( + &query_request.query, + response_documents, + )?]) }?; let response = QueryResponse(row_sets); tracing::debug!(query_response = %serde_json::to_string(&response).unwrap()); Ok(response) } -fn serialize_row_set(docs: Vec) -> Result { - let rows = docs - .into_iter() +fn serialize_row_set(query: &Query, docs: Vec) -> Result { + if query + .aggregates + .as_ref() + .unwrap_or(&IndexMap::new()) + .is_empty() + { + // When there are no aggregates we expect a list of rows + let rows = serialize_rows(docs)?; + Ok(RowSet { + aggregates: None, + rows: Some(rows), + }) + } else { + // When there are aggregates we expect a single document with `rows` and `aggregates` + // fields + let row_set: BsonRowSet = parse_single_document(docs)?; + let aggregates: IndexMap = row_set + .aggregates + .into_iter() + .map(|(key, value)| { + Ok(( + key, + from_bson(value).map_err(|err| QueryError::Other(err.into()))?, + )) + }) + .try_collect::<_, _, QueryError>()?; + let rows = serialize_rows(row_set.rows)?; + Ok(RowSet { + aggregates: if aggregates.is_empty() { + None + } else { + Some(aggregates) + }, + rows: if rows.is_empty() { None } else { Some(rows) }, + }) + } +} + +fn serialize_rows( + docs: Vec, +) -> Result>, QueryError> { + docs.into_iter() .map(|doc| bson::from_document(doc)) .try_collect() - .map_err(|err| QueryError::Other(err.into()))?; - Ok(RowSet { - aggregates: None, - rows: Some(rows), - }) + .map_err(|err| QueryError::Other(err.into())) } fn parse_single_document(documents: Vec) -> Result From c58e493bd926b5720e54e8e6e42e330d2df49182 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Tue, 23 Apr 2024 14:26:40 -0700 Subject: [PATCH 03/25] remove double option from query request fields --- crates/dc-api-test-helpers/src/query.rs | 18 ++++---- crates/dc-api-types/src/query.rs | 42 +++++-------------- .../src/mongodb/projection.rs | 4 +- .../src/mongodb/selection.rs | 6 +-- .../src/query/pipeline.rs | 20 ++++----- .../src/query/relations.rs | 4 +- .../src/api_type_conversions/query_request.rs | 23 +++++----- 7 files changed, 43 insertions(+), 74 deletions(-) diff --git a/crates/dc-api-test-helpers/src/query.rs b/crates/dc-api-test-helpers/src/query.rs index 27604f58..4d73dccd 100644 --- a/crates/dc-api-test-helpers/src/query.rs +++ b/crates/dc-api-test-helpers/src/query.rs @@ -4,12 +4,12 @@ use dc_api_types::{Aggregate, Expression, Field, OrderBy, Query}; #[derive(Clone, Debug, Default)] pub struct QueryBuilder { - aggregates: Option>>, - aggregates_limit: Option>, - fields: Option>>, - limit: Option>, - offset: Option>, - order_by: Option>, + aggregates: Option>, + aggregates_limit: Option, + fields: Option>, + limit: Option, + offset: Option, + order_by: Option, predicate: Option, } @@ -22,7 +22,7 @@ impl QueryBuilder { where I: IntoIterator, { - self.fields = Some(Some(fields.into_iter().collect())); + self.fields = Some(fields.into_iter().collect()); self } @@ -30,7 +30,7 @@ impl QueryBuilder { where I: IntoIterator, { - self.aggregates = Some(Some(aggregates.into_iter().collect())); + self.aggregates = Some(aggregates.into_iter().collect()); self } @@ -40,7 +40,7 @@ impl QueryBuilder { } pub fn order_by(mut self, order_by: OrderBy) -> Self { - self.order_by = Some(Some(order_by)); + self.order_by = Some(order_by); self } } diff --git a/crates/dc-api-types/src/query.rs b/crates/dc-api-types/src/query.rs index 529f907f..9d106123 100644 --- a/crates/dc-api-types/src/query.rs +++ b/crates/dc-api-types/src/query.rs @@ -16,49 +16,27 @@ pub struct Query { #[serde( rename = "aggregates", default, - with = "::serde_with::rust::double_option", skip_serializing_if = "Option::is_none" )] - pub aggregates: Option>>, + pub aggregates: Option<::std::collections::HashMap>, /// Optionally limit the maximum number of rows considered while applying aggregations. This limit does not apply to returned rows. #[serde( rename = "aggregates_limit", default, - with = "::serde_with::rust::double_option", skip_serializing_if = "Option::is_none" )] - pub aggregates_limit: Option>, + pub aggregates_limit: Option, /// Fields of the query - #[serde( - rename = "fields", - default, - with = "::serde_with::rust::double_option", - skip_serializing_if = "Option::is_none" - )] - pub fields: Option>>, + #[serde(rename = "fields", default, skip_serializing_if = "Option::is_none")] + pub fields: Option<::std::collections::HashMap>, /// Optionally limit the maximum number of returned rows. This limit does not apply to records considered while apply aggregations. - #[serde( - rename = "limit", - default, - with = "::serde_with::rust::double_option", - skip_serializing_if = "Option::is_none" - )] - pub limit: Option>, + #[serde(rename = "limit", default, skip_serializing_if = "Option::is_none")] + pub limit: Option, /// Optionally offset from the Nth result. This applies to both row and aggregation results. - #[serde( - rename = "offset", - default, - with = "::serde_with::rust::double_option", - skip_serializing_if = "Option::is_none" - )] - pub offset: Option>, - #[serde( - rename = "order_by", - default, - with = "::serde_with::rust::double_option", - skip_serializing_if = "Option::is_none" - )] - pub order_by: Option>, + #[serde(rename = "offset", default, skip_serializing_if = "Option::is_none")] + pub offset: Option, + #[serde(rename = "order_by", default, skip_serializing_if = "Option::is_none")] + pub order_by: Option, #[serde(rename = "where", skip_serializing_if = "Option::is_none")] pub r#where: Option, } diff --git a/crates/mongodb-agent-common/src/mongodb/projection.rs b/crates/mongodb-agent-common/src/mongodb/projection.rs index 2cf57f41..54dcbc2c 100644 --- a/crates/mongodb-agent-common/src/mongodb/projection.rs +++ b/crates/mongodb-agent-common/src/mongodb/projection.rs @@ -63,7 +63,7 @@ fn project_field_as(parent_columns: &[&str], field: &Field) -> ProjectAs { } Field::NestedObject { column, query } => { let nested_parent_columns = append_to_path(parent_columns, column); - let fields = query.fields.clone().flatten().unwrap_or_default(); + let fields = query.fields.clone().unwrap_or_default(); ProjectAs::Nested(for_field_selection_helper(&nested_parent_columns, fields)) } Field::NestedArray { @@ -81,7 +81,7 @@ fn project_field_as(parent_columns: &[&str], field: &Field) -> ProjectAs { // TODO: Need to determine whether the relation type is "object" or "array" and project // accordingly let nested_parent_columns = append_to_path(parent_columns, relationship); - let fields = query.fields.clone().flatten().unwrap_or_default(); + let fields = query.fields.clone().unwrap_or_default(); ProjectAs::Nested(for_field_selection_helper(&nested_parent_columns, fields)) } } diff --git a/crates/mongodb-agent-common/src/mongodb/selection.rs b/crates/mongodb-agent-common/src/mongodb/selection.rs index d9e5dfd3..231dfccd 100644 --- a/crates/mongodb-agent-common/src/mongodb/selection.rs +++ b/crates/mongodb-agent-common/src/mongodb/selection.rs @@ -29,7 +29,7 @@ impl Selection { pub fn from_query_request(query_request: &QueryRequest) -> Result { // let fields = (&query_request.query.fields).flatten().unwrap_or_default(); let empty_map = HashMap::new(); - let fields = if let Some(Some(fs)) = &query_request.query.fields { + let fields = if let Some(fs) = &query_request.query.fields { fs } else { &empty_map @@ -92,7 +92,7 @@ fn selection_for_field( Field::NestedObject { column, query } => { let nested_parent_columns = append_to_path(parent_columns, column); let nested_parent_col_path = format!("${}", nested_parent_columns.join(".")); - let fields = query.fields.clone().flatten().unwrap_or_default(); + let fields = query.fields.clone().unwrap_or_default(); let nested_selection = from_query_request_helper(table_relationships, &nested_parent_columns, &fields)?; Ok(doc! {"$cond": {"if": nested_parent_col_path, "then": nested_selection, "else": Bson::Null}}.into()) @@ -126,7 +126,7 @@ fn selection_for_array( Field::NestedObject { column, query } => { let nested_parent_columns = append_to_path(parent_columns, column); let nested_parent_col_path = format!("${}", nested_parent_columns.join(".")); - let fields = query.fields.clone().flatten().unwrap_or_default(); + let fields = query.fields.clone().unwrap_or_default(); let mut nested_selection = from_query_request_helper(table_relationships, &["$this"], &fields)?; for _ in 0..array_nesting_level { diff --git a/crates/mongodb-agent-common/src/query/pipeline.rs b/crates/mongodb-agent-common/src/query/pipeline.rs index d105b1d9..213b43e1 100644 --- a/crates/mongodb-agent-common/src/query/pipeline.rs +++ b/crates/mongodb-agent-common/src/query/pipeline.rs @@ -38,7 +38,7 @@ pub enum ResponseShape { /// can instead be appended to `pipeline`. pub fn is_response_faceted(query: &Query) -> bool { match &query.aggregates { - Some(Some(aggregates)) => !aggregates.is_empty(), + Some(aggregates) => !aggregates.is_empty(), _ => false, } } @@ -89,12 +89,8 @@ pub fn pipeline_for_non_foreach( .map(|expression| make_selector(variables, expression)) .transpose()? .map(Stage::Match); - let sort_stage: Option = order_by - .iter() - .flatten() - .map(|o| Stage::Sort(make_sort(o))) - .next(); - let skip_stage = offset.flatten().map(Stage::Skip); + let sort_stage: Option = order_by.iter().map(|o| Stage::Sort(make_sort(o))).next(); + let skip_stage = offset.map(Stage::Skip); [match_stage, sort_stage, skip_stage] .into_iter() @@ -128,7 +124,7 @@ pub fn pipeline_for_fields_facet( ) -> Result { let Query { limit, .. } = &*query_request.query; - let limit_stage = limit.flatten().map(Stage::Limit); + let limit_stage = limit.map(Stage::Limit); let replace_with_stage: Stage = Stage::ReplaceWith(Selection::from_query_request(query_request)?); @@ -155,16 +151,15 @@ fn facet_pipelines_for_query( let mut facet_pipelines = aggregates .iter() .flatten() - .flatten() .map(|(key, aggregate)| { Ok(( key.clone(), - pipeline_for_aggregate(aggregate.clone(), aggregates_limit.flatten())?, + pipeline_for_aggregate(aggregate.clone(), *aggregates_limit)?, )) }) .collect::, MongoAgentError>>()?; - if let Some(Some(_)) = fields { + if let Some(_) = fields { let fields_pipeline = pipeline_for_fields_facet(query_request)?; facet_pipelines.insert(ROWS_FIELD.to_owned(), fields_pipeline); } @@ -174,7 +169,6 @@ fn facet_pipelines_for_query( let aggregate_selections: bson::Document = aggregates .iter() .flatten() - .flatten() .map(|(key, _aggregate)| { // The facet result for each aggregate is an array containing a single document which // has a field called `result`. This code selects each facet result by name, and pulls @@ -201,7 +195,7 @@ fn facet_pipelines_for_query( }; let select_rows = match fields { - Some(Some(_)) => Some(("rows".to_owned(), Bson::String(format!("${ROWS_FIELD}")))), + Some(_) => Some(("rows".to_owned(), Bson::String(format!("${ROWS_FIELD}")))), _ => None, }; diff --git a/crates/mongodb-agent-common/src/query/relations.rs b/crates/mongodb-agent-common/src/query/relations.rs index c6bc918c..50407878 100644 --- a/crates/mongodb-agent-common/src/query/relations.rs +++ b/crates/mongodb-agent-common/src/query/relations.rs @@ -29,7 +29,7 @@ pub fn pipeline_for_relations( } = query_request; let empty_field_map = HashMap::new(); - let fields = if let Some(Some(fs)) = &query.fields { + let fields = if let Some(fs) = &query.fields { fs } else { &empty_field_map @@ -94,7 +94,7 @@ fn lookups_for_field( Field::Column { .. } => Ok(vec![]), Field::NestedObject { column, query } => { let nested_parent_columns = append_to_path(parent_columns, column); - let fields = query.fields.clone().flatten().unwrap_or_default(); + let fields = query.fields.clone().unwrap_or_default(); lookups_for_fields( config, query_request, diff --git a/crates/mongodb-connector/src/api_type_conversions/query_request.rs b/crates/mongodb-connector/src/api_type_conversions/query_request.rs index 24e1d6ad..199dd4bb 100644 --- a/crates/mongodb-connector/src/api_type_conversions/query_request.rs +++ b/crates/mongodb-connector/src/api_type_conversions/query_request.rs @@ -40,7 +40,7 @@ impl QueryContext<'_> { )) } - fn find_collection_object_type( + pub fn find_collection_object_type( &self, collection_name: &str, ) -> Result, ConversionError> { @@ -144,7 +144,7 @@ fn v3_to_v2_query( query: v3::Query, collection_object_type: &WithNameRef, ) -> Result { - let aggregates: Option>> = query + let aggregates: Option> = query .aggregates .map(|aggregates| -> Result<_, ConversionError> { aggregates @@ -157,8 +157,7 @@ fn v3_to_v2_query( }) .collect() }) - .transpose()? - .map(Some); + .transpose()?; let fields = v3_to_v2_fields( context, @@ -168,7 +167,7 @@ fn v3_to_v2_query( query.fields, )?; - let order_by: Option> = query + let order_by: Option = query .order_by .map(|order_by| -> Result<_, ConversionError> { let (elements, relations) = order_by @@ -201,8 +200,7 @@ fn v3_to_v2_query( relations, }) }) - .transpose()? - .map(Some); + .transpose()?; let limit = optional_32bit_number_to_64bit(query.limit); let offset = optional_32bit_number_to_64bit(query.offset); @@ -293,8 +291,8 @@ fn v3_to_v2_fields( root_collection_object_type: &WithNameRef, object_type: &WithNameRef, v3_fields: Option>, -) -> Result>>, ConversionError> { - let v2_fields: Option>> = v3_fields +) -> Result>, ConversionError> { + let v2_fields: Option> = v3_fields .map(|fields| { fields .into_iter() @@ -312,8 +310,7 @@ fn v3_to_v2_fields( }) .collect::>() }) - .transpose()? - .map(Some); + .transpose()?; Ok(v2_fields) } @@ -871,11 +868,11 @@ fn v3_to_v2_comparison_value( } #[inline] -fn optional_32bit_number_to_64bit(n: Option) -> Option> +fn optional_32bit_number_to_64bit(n: Option) -> Option where B: From, { - n.map(|input| Some(input.into())) + n.map(|input| input.into()) } fn v3_to_v2_arguments(arguments: BTreeMap) -> HashMap { From b542c4fff92930e999465cb9451ae26e52883059 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Tue, 23 Apr 2024 14:27:02 -0700 Subject: [PATCH 04/25] revert RowSet serialization bandaid --- crates/dc-api-types/src/query_response.rs | 28 +++++------------------ 1 file changed, 6 insertions(+), 22 deletions(-) diff --git a/crates/dc-api-types/src/query_response.rs b/crates/dc-api-types/src/query_response.rs index 8b1d9ca6..0c48d215 100644 --- a/crates/dc-api-types/src/query_response.rs +++ b/crates/dc-api-types/src/query_response.rs @@ -42,29 +42,13 @@ pub struct ForEachRow { pub query: RowSet, } -/// A row set must contain either rows, or aggregates, or possibly both #[skip_serializing_none] -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -#[serde(untagged)] -pub enum RowSet { - Aggregate { - /// The results of the aggregates returned by the query - aggregates: HashMap, - /// The rows returned by the query, corresponding to the query's fields - rows: Option>>, - }, - Rows { - /// Rows returned by a query that did not request aggregates. - rows: Vec>, - }, -} - -impl Default for RowSet { - fn default() -> Self { - RowSet::Rows { - rows: Default::default(), - } - } +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] +pub struct RowSet { + /// The results of the aggregates returned by the query + pub aggregates: Option>, + /// The rows returned by the query, corresponding to the query's fields + pub rows: Option>>, } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] From 25b48320721aea02daf9879e13a9539127a68251 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Tue, 23 Apr 2024 14:27:26 -0700 Subject: [PATCH 05/25] wip: hooking up types in serialization --- .../src/query/execute_query_request.rs | 2 +- .../mongodb-connector/src/query_response.rs | 201 +++++++++++++++--- 2 files changed, 169 insertions(+), 34 deletions(-) diff --git a/crates/mongodb-agent-common/src/query/execute_query_request.rs b/crates/mongodb-agent-common/src/query/execute_query_request.rs index 6559be0a..3abc03f7 100644 --- a/crates/mongodb-agent-common/src/query/execute_query_request.rs +++ b/crates/mongodb-agent-common/src/query/execute_query_request.rs @@ -1,7 +1,7 @@ use configuration::Configuration; use dc_api_types::QueryRequest; use futures::Stream; -use futures_util::TryStreamExt; +use futures_util::TryStreamExt as _; use mongodb::bson; use super::pipeline::pipeline_for_query_request; diff --git a/crates/mongodb-connector/src/query_response.rs b/crates/mongodb-connector/src/query_response.rs index 6de29db3..c665214a 100644 --- a/crates/mongodb-connector/src/query_response.rs +++ b/crates/mongodb-connector/src/query_response.rs @@ -1,15 +1,42 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use anyhow::anyhow; -use configuration::Configuration; +use configuration::{schema::Type, Configuration}; +use dc_api_types::{Aggregate, Field, QueryRequest}; use indexmap::IndexMap; -use itertools::Itertools as _; +use itertools::Itertools; use mongodb::bson::{self, from_bson, Bson}; +use mongodb_agent_common::query::{serialization::bson_to_json, QueryTarget}; +use mongodb_support::BsonScalarType; use ndc_sdk::{ connector::QueryError, - models::{Query, QueryRequest, QueryResponse, RowFieldValue, RowSet}, + models::{self as ndc, QueryResponse, RowFieldValue, RowSet}, }; use serde::Deserialize; +use thiserror::Error; + +use crate::api_type_conversions::{ConversionError, QueryContext}; + +#[derive(Clone, Debug, Error)] +pub enum QueryResponseError { + #[error("{0}")] + Conversion(#[from] ConversionError), + + #[error("expected a single response document from MongoDB, but did not get one")] + ExpectedSingleDocument, + + #[error("expected {collection_name} to have a field named {column} of type {expected_type:?}, but value is missing from database response")] + MissingValue { + collection_name: String, + column: String, + expected_type: Type, + }, + + #[error("placeholder")] + TODORemoveMe, +} + +type Result = std::result::Result; // These structs describe possible shapes of data returned by MongoDB query plans @@ -61,56 +88,164 @@ fn serialize_row_set(query: &Query, docs: Vec) -> Result = row_set + + let aggregates = query .aggregates - .into_iter() - .map(|(key, value)| { - Ok(( - key, + .map(|aggregates| serialize_aggregates(aggregates, row_set.aggregates)) + .transpose()?; + + let rows = query + .fields + .map(|fields| serialize_rows(fields, row_set.rows)) + .transpose()?; + + Ok(RowSet { aggregates, rows }) + } +} + +fn serialize_aggregates( + query_aggregates: &HashMap, + mut aggregate_values: BTreeMap, +) -> Result, QueryError> { + query_aggregates + .iter() + .map( + |(key, aggregate_definition)| match aggregate_values.remove_entry(key) { + Some((owned_key, value)) => Ok(( + owned_key, + // TODO: bson_to_json from_bson(value).map_err(|err| QueryError::Other(err.into()))?, - )) - }) - .try_collect::<_, _, QueryError>()?; - let rows = serialize_rows(row_set.rows)?; - Ok(RowSet { - aggregates: if aggregates.is_empty() { - None - } else { - Some(aggregates) + )), + None => Err(QueryError::Other( + anyhow!("missing aggregate value in response: {key}").into(), + )), }, - rows: if rows.is_empty() { None } else { Some(rows) }, - }) - } + ) + .try_collect() } fn serialize_rows( + query_target: QueryTarget<'_>, + query_fields: &IndexMap, docs: Vec, -) -> Result>, QueryError> { +) -> Result>> { docs.into_iter() - .map(|doc| bson::from_document(doc)) + .map(|doc| serialize_single_row(query_fields, doc)) .try_collect() .map_err(|err| QueryError::Other(err.into())) } -fn parse_single_document(documents: Vec) -> Result +fn serialize_single_row( + query_context: QueryContext<'_>, + query_target: QueryTarget<'_>, + query_fields: &IndexMap, + mut doc: bson::Document, +) -> Result> { + query_fields + .iter() + .map(|(field_name, field_definition)| { + // let + let value = doc.remove(field_name); + }) + .try_collect() + + // doc.into_iter() + // .map(|(key, value)| { + // let json_value = + // bson_to_json(expected_type, object_types, value) + // // use UnprocessableContent so the user sees the error message + // .map_err(|err| + // QueryError::UnprocessableContent(format!("type mismatch found in MongoDB query response: {}\n\nYou may need to alter your connector configuration to change a collection schema, or a native query definition.", err.to_string())))?; + // Ok(( + // key, + // RowFieldValue( + // json_value + // ), + // )) + // }) + // .try_collect() +} + +fn value_and_type_from_field( + query_context: QueryContext<'_>, + collection_name: &str, + field_definition: &ndc::Field, + field_name: &str, + input: &mut bson::Document, +) -> Result<(Bson, Type)> { + match field_definition { + ndc::Field::Column { column, fields } => { + let field_type = find_field_type(query_context, collection_name, column)?; + let value = value_from_option( + collection_name, + column, + &field_type, + input.remove(field_name), + )?; + Ok((value, field_type)) + } + ndc::Field::Relationship { + query, + relationship, + arguments, + } => todo!(), + } +} + +fn find_field_type( + query_context: QueryContext<'_>, + collection_name: &str, + column: &str, +) -> Result { + let object_type = query_context.find_collection_object_type(collection_name)?; + let field_type = object_type.value.fields.get(column).ok_or_else(|| { + ConversionError::UnknownObjectTypeField { + object_type: object_type.name.to_string(), + field_name: column.to_string(), + } + })?; + Ok(field_type.r#type) +} + +fn parse_single_document(documents: Vec) -> Result where T: for<'de> serde::Deserialize<'de>, { - let document = documents.into_iter().next().ok_or_else(|| { - QueryError::Other( - (anyhow!("expected a single response document from MongoDB, but did not get one")) - .into(), - ) - })?; - let value = bson::from_document(document).map_err(|err| QueryError::Other(err.into()))?; + let document = documents + .into_iter() + .next() + .ok_or(QueryResponseError::ExpectedSingleDocument)?; + let value = bson::from_document(document).map_err(|_| QueryResponseError::TODORemoveMe)?; Ok(value) } + +/// Check option result for a BSON value. If the value is missing but the expected type is nullable +/// then return null. Otherwise return an error. +fn value_from_option( + collection_name: &str, + column: &str, + expected_type: &Type, + value_option: Option, +) -> Result { + match (expected_type, value_option) { + (_, Some(value)) => Ok(value), + (Type::Nullable(_), None) => Ok(Bson::Null), + _ => Err(QueryResponseError::MissingValue { + collection_name: collection_name.to_string(), + column: column.to_string(), + expected_type: expected_type.clone(), + }), + } +} From f50d1e08555cde4ba083dfae7ac5c436573b92d7 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Tue, 23 Apr 2024 18:56:57 -0700 Subject: [PATCH 06/25] add input_collection method to QueryTarget --- crates/mongodb-agent-common/src/explain.rs | 14 ++++---------- .../src/query/execute_query_request.rs | 12 +++--------- .../mongodb-agent-common/src/query/query_target.rs | 10 ++++++++++ 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/crates/mongodb-agent-common/src/explain.rs b/crates/mongodb-agent-common/src/explain.rs index 259629c3..3f1f9094 100644 --- a/crates/mongodb-agent-common/src/explain.rs +++ b/crates/mongodb-agent-common/src/explain.rs @@ -20,16 +20,10 @@ pub async fn explain_query( let (pipeline, _) = query::pipeline_for_query_request(config, &query_request)?; let pipeline_bson = to_bson(&pipeline)?; - let aggregate_target = match QueryTarget::for_request(config, &query_request) { - QueryTarget::Collection(collection_name) => Bson::String(collection_name), - QueryTarget::NativeQuery { native_query, .. } => { - match &native_query.input_collection { - Some(collection_name) => Bson::String(collection_name.to_string()), - // 1 means aggregation without a collection target - as in `db.aggregate()` instead of - // `db..aggregate()` - None => Bson::Int32(1) - } - } + let aggregate_target = match QueryTarget::for_request(config, &query_request).input_collection() + { + Some(collection_name) => Bson::String(collection_name.to_owned()), + None => Bson::Int32(1), }; let query_command = doc! { diff --git a/crates/mongodb-agent-common/src/query/execute_query_request.rs b/crates/mongodb-agent-common/src/query/execute_query_request.rs index 3abc03f7..cb06629f 100644 --- a/crates/mongodb-agent-common/src/query/execute_query_request.rs +++ b/crates/mongodb-agent-common/src/query/execute_query_request.rs @@ -32,18 +32,12 @@ pub async fn execute_query_request( // The target of a query request might be a collection, or it might be a native query. In the // latter case there is no collection to perform the aggregation against. So instead of sending // the MongoDB API call `db..aggregate` we instead call `db.aggregate`. - let documents = match target { - QueryTarget::Collection(collection_name) => { + let documents = match target.input_collection() { + Some(collection_name) => { let collection = database.collection(&collection_name); collect_from_cursor(collection.aggregate(pipeline, None).await?).await } - QueryTarget::NativeQuery { native_query, .. } => match &native_query.input_collection { - Some(collection_name) => { - let collection = database.collection(collection_name); - collect_from_cursor(collection.aggregate(pipeline, None).await?).await - } - None => collect_from_cursor(database.aggregate(pipeline, None).await?).await, - }, + None => collect_from_cursor(database.aggregate(pipeline, None).await?).await, }?; tracing::debug!(response_documents = %serde_json::to_string(&documents).unwrap(), "response from MongoDB"); diff --git a/crates/mongodb-agent-common/src/query/query_target.rs b/crates/mongodb-agent-common/src/query/query_target.rs index 937365ec..ec4de4e0 100644 --- a/crates/mongodb-agent-common/src/query/query_target.rs +++ b/crates/mongodb-agent-common/src/query/query_target.rs @@ -29,6 +29,16 @@ impl QueryTarget<'_> { None => QueryTarget::Collection(target_name), } } + + pub fn input_collection(&self) -> Option<&str> { + match self { + QueryTarget::Collection(collection_name) => Some(collection_name), + QueryTarget::NativeQuery { native_query, .. } => native_query + .input_collection + .as_ref() + .map(|name| name.as_str()), + } + } } impl Display for QueryTarget<'_> { From 878c1a654d2549b8b78ba1e2d1b4aaba8d338818 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Tue, 23 Apr 2024 18:57:29 -0700 Subject: [PATCH 07/25] wip: more progress on hooking up types --- .../src/query/serialization/mod.rs | 2 +- .../src/api_type_conversions/query_request.rs | 2 +- .../mongodb-connector/src/mongo_connector.rs | 14 +- .../mongodb-connector/src/query_response.rs | 124 ++++++++++-------- 4 files changed, 80 insertions(+), 62 deletions(-) diff --git a/crates/mongodb-agent-common/src/query/serialization/mod.rs b/crates/mongodb-agent-common/src/query/serialization/mod.rs index 31e63af4..be3becd0 100644 --- a/crates/mongodb-agent-common/src/query/serialization/mod.rs +++ b/crates/mongodb-agent-common/src/query/serialization/mod.rs @@ -5,5 +5,5 @@ mod json_to_bson; #[cfg(test)] mod tests; -pub use self::bson_to_json::bson_to_json; +pub use self::bson_to_json::{bson_to_json, BsonToJsonError}; pub use self::json_to_bson::{json_to_bson, json_to_bson_scalar, JsonToBsonError}; diff --git a/crates/mongodb-connector/src/api_type_conversions/query_request.rs b/crates/mongodb-connector/src/api_type_conversions/query_request.rs index 199dd4bb..929aa14d 100644 --- a/crates/mongodb-connector/src/api_type_conversions/query_request.rs +++ b/crates/mongodb-connector/src/api_type_conversions/query_request.rs @@ -24,7 +24,7 @@ pub struct QueryContext<'a> { } impl QueryContext<'_> { - fn find_collection( + pub fn find_collection( &self, collection_name: &str, ) -> Result<&v3::CollectionInfo, ConversionError> { diff --git a/crates/mongodb-connector/src/mongo_connector.rs b/crates/mongodb-connector/src/mongo_connector.rs index 3819e777..f5194feb 100644 --- a/crates/mongodb-connector/src/mongo_connector.rs +++ b/crates/mongodb-connector/src/mongo_connector.rs @@ -142,11 +142,19 @@ impl Connector for MongoConnector { request: QueryRequest, ) -> Result, QueryError> { tracing::debug!(query_request = %serde_json::to_string(&request).unwrap(), "received query request"); - let v2_request = - v3_to_v2_query_request(&get_query_context(configuration), request.clone())?; + let query_context = get_query_context(configuration); + let v2_request = v3_to_v2_query_request(&query_context, request.clone())?; let response_documents = handle_query_request(configuration, state, v2_request) .await .map_err(mongo_agent_error_to_query_error)?; - Ok(serialize_query_response(configuration, &request, response_documents)?.into()) + Ok( + serialize_query_response(&query_context, &request, response_documents) + .map_err(|err| { + QueryError::UnprocessableContent(format!( + "error converting MongoDB response to JSON: {err}" + )) + })? + .into(), + ) } } diff --git a/crates/mongodb-connector/src/query_response.rs b/crates/mongodb-connector/src/query_response.rs index c665214a..0cf9a42d 100644 --- a/crates/mongodb-connector/src/query_response.rs +++ b/crates/mongodb-connector/src/query_response.rs @@ -1,32 +1,34 @@ -use std::collections::{BTreeMap, HashMap}; +use std::collections::BTreeMap; -use anyhow::anyhow; -use configuration::{schema::Type, Configuration}; -use dc_api_types::{Aggregate, Field, QueryRequest}; +use configuration::schema::Type; use indexmap::IndexMap; use itertools::Itertools; use mongodb::bson::{self, from_bson, Bson}; -use mongodb_agent_common::query::{serialization::bson_to_json, QueryTarget}; -use mongodb_support::BsonScalarType; -use ndc_sdk::{ - connector::QueryError, - models::{self as ndc, QueryResponse, RowFieldValue, RowSet}, +use mongodb_agent_common::query::serialization::{bson_to_json, BsonToJsonError}; +use ndc_sdk::models::{ + self as ndc, Aggregate, Field, Query, QueryRequest, QueryResponse, RowFieldValue, RowSet, }; use serde::Deserialize; use thiserror::Error; use crate::api_type_conversions::{ConversionError, QueryContext}; -#[derive(Clone, Debug, Error)] +#[derive(Debug, Error)] pub enum QueryResponseError { + #[error("{0}")] + BsonToJson(#[from] BsonToJsonError), + #[error("{0}")] Conversion(#[from] ConversionError), #[error("expected a single response document from MongoDB, but did not get one")] ExpectedSingleDocument, + #[error("missing aggregate value in response: {0}")] + MissingAggregateValue(String), + #[error("expected {collection_name} to have a field named {column} of type {expected_type:?}, but value is missing from database response")] - MissingValue { + MissingColumnValue { collection_name: String, column: String, expected_type: Type, @@ -54,11 +56,15 @@ struct BsonRowSet { } pub fn serialize_query_response( - config: &Configuration, + query_context: &QueryContext<'_>, query_request: &QueryRequest, response_documents: Vec, -) -> Result { +) -> Result { tracing::debug!(response_documents = %serde_json::to_string(&response_documents).unwrap(), "response from MongoDB"); + + let collection_info = query_context.find_collection(&query_request.collection)?; + let collection_name = &collection_info.name; + // If the query request specified variable sets then we should have gotten a single document // from MongoDB with fields for multiple sets of results - one for each set of variables. let row_sets = if query_request.variables.is_some() { @@ -66,11 +72,15 @@ pub fn serialize_query_response( responses .row_sets .into_iter() - .map(|docs| serialize_row_set(&query_request.query, docs)) + .map(|docs| { + serialize_row_set(query_context, collection_name, &query_request.query, docs) + }) .try_collect() } else { // TODO: in an aggregation response we expect one document instead of a list of documents Ok(vec![serialize_row_set( + query_context, + collection_name, &query_request.query, response_documents, )?]) @@ -80,7 +90,12 @@ pub fn serialize_query_response( Ok(response) } -fn serialize_row_set(query: &Query, docs: Vec) -> Result { +fn serialize_row_set( + query_context: &QueryContext<'_>, + collection_name: &str, + query: &Query, + docs: Vec, +) -> Result { if query .aggregates .as_ref() @@ -90,7 +105,8 @@ fn serialize_row_set(query: &Query, docs: Vec) -> Result) -> Result) -> Result, + query_aggregates: &IndexMap, mut aggregate_values: BTreeMap, -) -> Result, QueryError> { +) -> Result> { query_aggregates .iter() .map( @@ -126,67 +144,57 @@ fn serialize_aggregates( Some((owned_key, value)) => Ok(( owned_key, // TODO: bson_to_json - from_bson(value).map_err(|err| QueryError::Other(err.into()))?, - )), - None => Err(QueryError::Other( - anyhow!("missing aggregate value in response: {key}").into(), + from_bson(value).map_err(|_| QueryResponseError::TODORemoveMe)?, )), + None => Err(QueryResponseError::MissingAggregateValue(key.clone())), }, ) .try_collect() } fn serialize_rows( - query_target: QueryTarget<'_>, + query_context: &QueryContext<'_>, + collection_name: &str, query_fields: &IndexMap, docs: Vec, ) -> Result>> { docs.into_iter() - .map(|doc| serialize_single_row(query_fields, doc)) + .map(|doc| serialize_single_row(query_context, collection_name, query_fields, doc)) .try_collect() - .map_err(|err| QueryError::Other(err.into())) } fn serialize_single_row( - query_context: QueryContext<'_>, - query_target: QueryTarget<'_>, + query_context: &QueryContext<'_>, + collection_name: &str, query_fields: &IndexMap, mut doc: bson::Document, ) -> Result> { query_fields .iter() .map(|(field_name, field_definition)| { - // let - let value = doc.remove(field_name); + let value = serialize_field_value( + query_context, + collection_name, + field_definition, + field_name, + &mut doc, + )?; + Ok((field_name.clone(), RowFieldValue(value))) }) .try_collect() - - // doc.into_iter() - // .map(|(key, value)| { - // let json_value = - // bson_to_json(expected_type, object_types, value) - // // use UnprocessableContent so the user sees the error message - // .map_err(|err| - // QueryError::UnprocessableContent(format!("type mismatch found in MongoDB query response: {}\n\nYou may need to alter your connector configuration to change a collection schema, or a native query definition.", err.to_string())))?; - // Ok(( - // key, - // RowFieldValue( - // json_value - // ), - // )) - // }) - // .try_collect() } -fn value_and_type_from_field( - query_context: QueryContext<'_>, +fn serialize_field_value( + query_context: &QueryContext<'_>, collection_name: &str, field_definition: &ndc::Field, field_name: &str, input: &mut bson::Document, -) -> Result<(Bson, Type)> { - match field_definition { +) -> Result { + let (bson, field_type) = match field_definition { ndc::Field::Column { column, fields } => { + // TODO: if `field_type` is an object type, build a new object type by filtering down to + // the filds listed in `fields` let field_type = find_field_type(query_context, collection_name, column)?; let value = value_from_option( collection_name, @@ -194,21 +202,23 @@ fn value_and_type_from_field( &field_type, input.remove(field_name), )?; - Ok((value, field_type)) + (value, field_type) } ndc::Field::Relationship { query, relationship, arguments, } => todo!(), - } + }; + let json = bson_to_json(field_type, &query_context.object_types, bson)?; + Ok(json) } -fn find_field_type( - query_context: QueryContext<'_>, +fn find_field_type<'a>( + query_context: &'a QueryContext<'a>, collection_name: &str, column: &str, -) -> Result { +) -> Result<&'a Type> { let object_type = query_context.find_collection_object_type(collection_name)?; let field_type = object_type.value.fields.get(column).ok_or_else(|| { ConversionError::UnknownObjectTypeField { @@ -216,7 +226,7 @@ fn find_field_type( field_name: column.to_string(), } })?; - Ok(field_type.r#type) + Ok(&field_type.r#type) } fn parse_single_document(documents: Vec) -> Result @@ -242,7 +252,7 @@ fn value_from_option( match (expected_type, value_option) { (_, Some(value)) => Ok(value), (Type::Nullable(_), None) => Ok(Bson::Null), - _ => Err(QueryResponseError::MissingValue { + _ => Err(QueryResponseError::MissingColumnValue { collection_name: collection_name.to_string(), column: column.to_string(), expected_type: expected_type.clone(), From ec0c79ff7fcf4436ff8304c19cd9b61fc4df1489 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Wed, 24 Apr 2024 13:45:04 -0700 Subject: [PATCH 08/25] response type propagation for non-relation, non-aggregate responses --- Cargo.lock | 10 +- Cargo.toml | 2 + crates/cli/Cargo.toml | 2 +- crates/configuration/Cargo.toml | 2 +- crates/dc-api-test-helpers/Cargo.toml | 2 +- crates/dc-api-types/Cargo.toml | 2 +- crates/mongodb-agent-common/Cargo.toml | 2 +- crates/mongodb-connector/Cargo.toml | 2 +- .../api_type_conversions/conversion_error.rs | 20 +- .../src/api_type_conversions/query_request.rs | 3 +- .../mongodb-connector/src/query_response.rs | 199 +++++++++++++++--- crates/ndc-test-helpers/Cargo.toml | 2 +- 12 files changed, 206 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2336c4cc..dd64cac5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -618,7 +618,7 @@ name = "dc-api-test-helpers" version = "0.1.0" dependencies = [ "dc-api-types", - "itertools 0.10.5", + "itertools 0.12.1", ] [[package]] @@ -626,7 +626,7 @@ name = "dc-api-types" version = "0.1.0" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.12.1", "mongodb", "nonempty", "once_cell", @@ -1635,7 +1635,7 @@ dependencies = [ "http 0.2.9", "indent", "indexmap 1.9.3", - "itertools 0.10.5", + "itertools 0.12.1", "mockall", "mongodb", "mongodb-cli-plugin", @@ -1690,7 +1690,7 @@ dependencies = [ "futures", "http 0.2.9", "indexmap 2.2.5", - "itertools 0.10.5", + "itertools 0.12.1", "lazy_static", "mongodb", "mongodb-agent-common", @@ -1808,7 +1808,7 @@ name = "ndc-test-helpers" version = "0.1.0" dependencies = [ "indexmap 2.2.5", - "itertools 0.10.5", + "itertools 0.12.1", "ndc-models", "serde_json", ] diff --git a/Cargo.toml b/Cargo.toml index f0d32f10..e61ce41e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,8 @@ resolver = "2" ndc-sdk = { git = "https://github.com/hasura/ndc-sdk-rs.git" } ndc-models = { git = "http://github.com/hasura/ndc-spec.git", tag = "v0.1.2" } +itertools = "^0.12.1" + # We have a fork of the mongodb driver with a fix for reading metadata from time # series collections. # See the upstream PR: https://github.com/mongodb/mongo-rust-driver/pull/1003 diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index fb1da2ad..80f3268f 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -13,7 +13,7 @@ anyhow = "1.0.80" clap = { version = "4.5.1", features = ["derive", "env"] } futures-util = "0.3.28" indexmap = { version = "1", features = ["serde"] } # must match the version that ndc-client uses -itertools = "^0.12.1" +itertools = { workspace = true } serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0.113", features = ["raw_value"] } thiserror = "1.0.57" diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml index 37d4af35..a4dcc197 100644 --- a/crates/configuration/Cargo.toml +++ b/crates/configuration/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] anyhow = "1" futures = "^0.3" -itertools = "^0.12" +itertools = { workspace = true } mongodb = "2.8" mongodb-support = { path = "../mongodb-support" } ndc-models = { workspace = true } diff --git a/crates/dc-api-test-helpers/Cargo.toml b/crates/dc-api-test-helpers/Cargo.toml index e1655489..2165ebe7 100644 --- a/crates/dc-api-test-helpers/Cargo.toml +++ b/crates/dc-api-test-helpers/Cargo.toml @@ -5,4 +5,4 @@ edition = "2021" [dependencies] dc-api-types = { path = "../dc-api-types" } -itertools = "^0.10" +itertools = { workspace = true } diff --git a/crates/dc-api-types/Cargo.toml b/crates/dc-api-types/Cargo.toml index 18349561..61cfa52f 100644 --- a/crates/dc-api-types/Cargo.toml +++ b/crates/dc-api-types/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -itertools = "^0.10" +itertools = { workspace = true } nonempty = { version = "0.8.1", features = ["serialize"] } once_cell = "1" regex = "1" diff --git a/crates/mongodb-agent-common/Cargo.toml b/crates/mongodb-agent-common/Cargo.toml index d61d7284..e6a9ab7e 100644 --- a/crates/mongodb-agent-common/Cargo.toml +++ b/crates/mongodb-agent-common/Cargo.toml @@ -20,7 +20,7 @@ futures-util = "0.3.28" http = "^0.2" indexmap = { version = "1", features = ["serde"] } # must match the version that ndc-client uses indent = "^0.1" -itertools = "^0.10" +itertools = { workspace = true } mongodb = "2.8" once_cell = "1" regex = "1" diff --git a/crates/mongodb-connector/Cargo.toml b/crates/mongodb-connector/Cargo.toml index 2ab44609..1c39372f 100644 --- a/crates/mongodb-connector/Cargo.toml +++ b/crates/mongodb-connector/Cargo.toml @@ -13,7 +13,7 @@ enum-iterator = "^2.0.0" futures = "^0.3" http = "^0.2" indexmap = { version = "2.1.0", features = ["serde"] } -itertools = "^0.10" +itertools = { workspace = true } lazy_static = "^1.4.0" mongodb = "2.8" mongodb-agent-common = { path = "../mongodb-agent-common" } diff --git a/crates/mongodb-connector/src/api_type_conversions/conversion_error.rs b/crates/mongodb-connector/src/api_type_conversions/conversion_error.rs index 4f34c8ca..13b41c86 100644 --- a/crates/mongodb-connector/src/api_type_conversions/conversion_error.rs +++ b/crates/mongodb-connector/src/api_type_conversions/conversion_error.rs @@ -21,8 +21,15 @@ pub enum ConversionError { #[error("Unknown object type, \"{0}\"")] UnknownObjectType(String), - #[error("Unknown field \"{field_name}\" in object type \"{object_type}\"")] - UnknownObjectTypeField { object_type: String, field_name: String }, + #[error( + "Unknown field \"{field_name}\" in object type \"{object_type}\"{}", + if path.is_empty() { "".to_owned() } else { format!(" at path {}", path.join(".")) } + )] + UnknownObjectTypeField { + object_type: String, + field_name: String, + path: Vec, + }, #[error("Unknown collection, \"{0}\"")] UnknownCollection(String), @@ -30,8 +37,13 @@ pub enum ConversionError { #[error("Unknown relationship, \"{0}\"")] UnknownRelationship(String), - #[error("Unknown aggregate function, \"{aggregate_function}\" in scalar type \"{scalar_type}\"")] - UnknownAggregateFunction { scalar_type: String, aggregate_function: String }, + #[error( + "Unknown aggregate function, \"{aggregate_function}\" in scalar type \"{scalar_type}\"" + )] + UnknownAggregateFunction { + scalar_type: String, + aggregate_function: String, + }, #[error("Query referenced a function, \"{0}\", but it has not been defined")] UnspecifiedFunction(String), diff --git a/crates/mongodb-connector/src/api_type_conversions/query_request.rs b/crates/mongodb-connector/src/api_type_conversions/query_request.rs index 929aa14d..88a4785b 100644 --- a/crates/mongodb-connector/src/api_type_conversions/query_request.rs +++ b/crates/mongodb-connector/src/api_type_conversions/query_request.rs @@ -48,7 +48,7 @@ impl QueryContext<'_> { self.find_object_type(&collection.collection_type) } - fn find_object_type<'a>( + pub fn find_object_type<'a>( &'a self, object_type_name: &'a str, ) -> Result, ConversionError> { @@ -105,6 +105,7 @@ fn find_object_field<'a>( ConversionError::UnknownObjectTypeField { object_type: object_type.name.to_string(), field_name: field_name.to_string(), + path: Default::default(), // TODO: set a path for more helpful error reporting } }) } diff --git a/crates/mongodb-connector/src/query_response.rs b/crates/mongodb-connector/src/query_response.rs index 0cf9a42d..2d686d31 100644 --- a/crates/mongodb-connector/src/query_response.rs +++ b/crates/mongodb-connector/src/query_response.rs @@ -1,12 +1,13 @@ -use std::collections::BTreeMap; +use std::{borrow::Cow, collections::BTreeMap}; -use configuration::schema::Type; +use configuration::schema::{ObjectField, ObjectType, Type}; use indexmap::IndexMap; -use itertools::Itertools; +use itertools::Itertools as _; use mongodb::bson::{self, from_bson, Bson}; use mongodb_agent_common::query::serialization::{bson_to_json, BsonToJsonError}; use ndc_sdk::models::{ - self as ndc, Aggregate, Field, Query, QueryRequest, QueryResponse, RowFieldValue, RowSet, + self as ndc, Aggregate, Field, NestedField, NestedObject, Query, QueryRequest, QueryResponse, + RowFieldValue, RowSet, }; use serde::Deserialize; use thiserror::Error; @@ -21,6 +22,12 @@ pub enum QueryResponseError { #[error("{0}")] Conversion(#[from] ConversionError), + #[error("expected an array at path {}", path.join("."))] + ExpectedArray { path: Vec }, + + #[error("expected an object at path {}", path.join("."))] + ExpectedObject { path: Vec }, + #[error("expected a single response document from MongoDB, but did not get one")] ExpectedSingleDocument, @@ -73,13 +80,20 @@ pub fn serialize_query_response( .row_sets .into_iter() .map(|docs| { - serialize_row_set(query_context, collection_name, &query_request.query, docs) + serialize_row_set( + query_context, + &vec![], + collection_name, + &query_request.query, + docs, + ) }) .try_collect() } else { // TODO: in an aggregation response we expect one document instead of a list of documents Ok(vec![serialize_row_set( query_context, + &vec![], collection_name, &query_request.query, response_documents, @@ -92,6 +106,7 @@ pub fn serialize_query_response( fn serialize_row_set( query_context: &QueryContext<'_>, + path: &[&str], collection_name: &str, query: &Query, docs: Vec, @@ -106,7 +121,7 @@ fn serialize_row_set( let rows = query .fields .as_ref() - .map(|fields| serialize_rows(query_context, collection_name, fields, docs)) + .map(|fields| serialize_rows(query_context, path, collection_name, fields, docs)) .transpose()?; Ok(RowSet { aggregates: None, @@ -126,7 +141,9 @@ fn serialize_row_set( let rows = query .fields .as_ref() - .map(|fields| serialize_rows(query_context, collection_name, fields, row_set.rows)) + .map(|fields| { + serialize_rows(query_context, path, collection_name, fields, row_set.rows) + }) .transpose()?; Ok(RowSet { aggregates, rows }) @@ -154,17 +171,19 @@ fn serialize_aggregates( fn serialize_rows( query_context: &QueryContext<'_>, + path: &[&str], collection_name: &str, query_fields: &IndexMap, docs: Vec, ) -> Result>> { docs.into_iter() - .map(|doc| serialize_single_row(query_context, collection_name, query_fields, doc)) + .map(|doc| serialize_single_row(query_context, path, collection_name, query_fields, doc)) .try_collect() } fn serialize_single_row( query_context: &QueryContext<'_>, + path: &[&str], collection_name: &str, query_fields: &IndexMap, mut doc: bson::Document, @@ -174,6 +193,7 @@ fn serialize_single_row( .map(|(field_name, field_definition)| { let value = serialize_field_value( query_context, + path, collection_name, field_definition, field_name, @@ -186,23 +206,34 @@ fn serialize_single_row( fn serialize_field_value( query_context: &QueryContext<'_>, + path: &[&str], collection_name: &str, field_definition: &ndc::Field, field_name: &str, input: &mut bson::Document, ) -> Result { - let (bson, field_type) = match field_definition { + let (bson, field_type, object_types) = match field_definition { ndc::Field::Column { column, fields } => { - // TODO: if `field_type` is an object type, build a new object type by filtering down to - // the filds listed in `fields` - let field_type = find_field_type(query_context, collection_name, column)?; + let field_type = find_field_type(query_context, path, collection_name, column)?; + + let (requested_type, temp_object_types) = + prune_type_to_field_selection(query_context, path, field_type, fields.as_ref())?; + + let object_types = if temp_object_types.is_empty() { + query_context.object_types.clone() // We're cloning a Cow, not a BTreeMap + } else { + let mut configured_types = query_context.object_types.clone().into_owned(); + configured_types.extend(temp_object_types); + Cow::Owned(configured_types) + }; + let value = value_from_option( collection_name, column, - &field_type, + &requested_type, input.remove(field_name), )?; - (value, field_type) + (value, field_type, object_types) } ndc::Field::Relationship { query, @@ -210,12 +241,13 @@ fn serialize_field_value( arguments, } => todo!(), }; - let json = bson_to_json(field_type, &query_context.object_types, bson)?; + let json = bson_to_json(field_type, &object_types, bson)?; Ok(json) } fn find_field_type<'a>( query_context: &'a QueryContext<'a>, + path: &[&str], collection_name: &str, column: &str, ) -> Result<&'a Type> { @@ -224,21 +256,118 @@ fn find_field_type<'a>( ConversionError::UnknownObjectTypeField { object_type: object_type.name.to_string(), field_name: column.to_string(), + path: path_to_owned(path), } })?; Ok(&field_type.r#type) } -fn parse_single_document(documents: Vec) -> Result -where - T: for<'de> serde::Deserialize<'de>, -{ - let document = documents - .into_iter() - .next() - .ok_or(QueryResponseError::ExpectedSingleDocument)?; - let value = bson::from_document(document).map_err(|_| QueryResponseError::TODORemoveMe)?; - Ok(value) +// TODO: test response with nested ExtendedJSON +fn prune_type_to_field_selection<'a>( + query_context: &QueryContext<'_>, + path: &[&str], + field_type: &'a Type, + fields: Option<&NestedField>, +) -> Result<(Type, Vec<(String, ObjectType)>)> { + match (field_type, fields) { + (t, None) => Ok((t.clone(), Default::default())), + (t @ Type::Scalar(_) | t @ Type::ExtendedJSON, _) => Ok((t.clone(), Default::default())), + + (Type::Nullable(t), _) => { + let (underlying_type, object_types) = + prune_type_to_field_selection(query_context, path, t, fields)?; + Ok((Type::Nullable(Box::new(underlying_type)), object_types)) + } + (Type::ArrayOf(t), Some(NestedField::Array(nested))) => { + let (element_type, object_types) = + prune_type_to_field_selection(query_context, path, t, Some(&nested.fields))?; + Ok((Type::ArrayOf(Box::new(element_type)), object_types)) + } + (Type::Object(t), Some(NestedField::Object(nested))) => { + object_type_for_field_subset(query_context, path, t, nested) + } + + (_, Some(NestedField::Array(_))) => Err(QueryResponseError::ExpectedArray { + path: path_to_owned(path), + }), + (_, Some(NestedField::Object(_))) => Err(QueryResponseError::ExpectedObject { + path: path_to_owned(path), + }), + } +} + +// TODO: test query with aliased name for nested field +fn object_type_for_field_subset( + query_context: &QueryContext<'_>, + path: &[&str], + object_type_name: &str, + requested_fields: &NestedObject, +) -> Result<(Type, Vec<(String, ObjectType)>)> { + let object_type = query_context.find_object_type(object_type_name)?.value; + let (fields, object_type_sets): (_, Vec>) = requested_fields + .fields + .iter() + .map(|(name, requested_field)| { + let (object_field, object_types) = requested_field_definition( + query_context, + &append_to_path(path, [name.as_ref()]), + object_type_name, + object_type, + requested_field, + )?; + Ok(((name.clone(), object_field), object_types)) + }) + .process_results::<_, _, QueryResponseError, _>(|iter| iter.unzip())?; + + let pruned_object_type = ObjectType { + fields, + description: None, + }; + let pruned_object_type_name = path.into_iter().join("_"); + let pruned_type = Type::Object(pruned_object_type_name); + + let object_types = object_type_sets.into_iter().flatten().collect(); + + Ok((pruned_type, object_types)) +} + +/// Given an object type for a value, and a requested field from that value, produce an updated +/// object field definition to match the request. This must take into account aliasing where the +/// name of the requested field maps to a different name on the underlying type. +fn requested_field_definition( + query_context: &QueryContext<'_>, + path: &[&str], + object_type_name: &str, + object_type: &ObjectType, + requested_field: &Field, +) -> Result<(ObjectField, Vec<(String, ObjectType)>)> { + match requested_field { + Field::Column { column, fields } => { + let field_def = object_type.fields.get(column).ok_or_else(|| { + ConversionError::UnknownObjectTypeField { + object_type: object_type_name.to_owned(), + field_name: column.to_owned(), + path: path_to_owned(path), + } + })?; + let (field_type, object_types) = prune_type_to_field_selection( + query_context, + path, + &field_def.r#type, + fields.as_ref(), + )?; + let pruned_field = ObjectField { + r#type: field_type, + description: None, + }; + Ok((pruned_field, object_types)) + } + Field::Relationship { + query, + relationship, + arguments, + } => todo!(), + } } /// Check option result for a BSON value. If the value is missing but the expected type is nullable @@ -259,3 +388,23 @@ fn value_from_option( }), } } + +fn parse_single_document(documents: Vec) -> Result +where + T: for<'de> serde::Deserialize<'de>, +{ + let document = documents + .into_iter() + .next() + .ok_or(QueryResponseError::ExpectedSingleDocument)?; + let value = bson::from_document(document).map_err(|_| QueryResponseError::TODORemoveMe)?; + Ok(value) +} + +fn append_to_path<'a>(path: &[&'a str], elems: impl IntoIterator) -> Vec<&'a str> { + path.into_iter().map(|x| *x).chain(elems).collect() +} + +fn path_to_owned(path: &[&str]) -> Vec { + path.into_iter().map(|x| (*x).to_owned()).collect() +} diff --git a/crates/ndc-test-helpers/Cargo.toml b/crates/ndc-test-helpers/Cargo.toml index d42fcb22..b0d18672 100644 --- a/crates/ndc-test-helpers/Cargo.toml +++ b/crates/ndc-test-helpers/Cargo.toml @@ -5,6 +5,6 @@ edition = "2021" [dependencies] indexmap = "2" -itertools = "^0.10" +itertools = { workspace = true } ndc-models = { workspace = true } serde_json = "1" From bdb1c6fe50f1f76bc6a0670c7a15b6a06c591ef9 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Wed, 24 Apr 2024 15:22:06 -0700 Subject: [PATCH 09/25] split out make_nested_schema and make_flat_schema text fixtures --- .../src/api_type_conversions/query_request.rs | 234 +----------------- crates/mongodb-connector/src/main.rs | 3 + crates/mongodb-connector/src/test_helpers.rs | 227 +++++++++++++++++ 3 files changed, 235 insertions(+), 229 deletions(-) create mode 100644 crates/mongodb-connector/src/test_helpers.rs diff --git a/crates/mongodb-connector/src/api_type_conversions/query_request.rs b/crates/mongodb-connector/src/api_type_conversions/query_request.rs index 88a4785b..69acff43 100644 --- a/crates/mongodb-connector/src/api_type_conversions/query_request.rs +++ b/crates/mongodb-connector/src/api_type_conversions/query_request.rs @@ -907,23 +907,17 @@ fn v3_to_v2_relationship_arguments( #[cfg(test)] mod tests { - use std::{ - borrow::Cow, - collections::{BTreeMap, HashMap}, - }; + use std::collections::HashMap; - use configuration::schema; use dc_api_test_helpers::{self as v2, source, table_relationships, target}; - use mongodb_support::BsonScalarType; - use ndc_sdk::models::{ - self as v3, AggregateFunctionDefinition, ComparisonOperatorDefinition, OrderByElement, - OrderByTarget, OrderDirection, ScalarType, Type, TypeRepresentation, - }; + use ndc_sdk::models::{OrderByElement, OrderByTarget, OrderDirection}; use ndc_test_helpers::*; use pretty_assertions::assert_eq; use serde_json::json; - use super::{v3_to_v2_query_request, v3_to_v2_relationships, QueryContext}; + use crate::test_helpers::{make_flat_schema, make_nested_schema}; + + use super::{v3_to_v2_query_request, v3_to_v2_relationships}; #[test] fn translates_query_request_relationships() -> Result<(), anyhow::Error> { @@ -1267,222 +1261,4 @@ mod tests { assert_eq!(v2_request, expected); Ok(()) } - - fn make_scalar_types() -> BTreeMap { - BTreeMap::from([ - ( - "String".to_owned(), - ScalarType { - representation: Some(TypeRepresentation::String), - aggregate_functions: Default::default(), - comparison_operators: BTreeMap::from([ - ("_eq".to_owned(), ComparisonOperatorDefinition::Equal), - ( - "_regex".to_owned(), - ComparisonOperatorDefinition::Custom { - argument_type: Type::Named { - name: "String".to_owned(), - }, - }, - ), - ]), - }, - ), - ( - "Int".to_owned(), - ScalarType { - representation: Some(TypeRepresentation::Int32), - aggregate_functions: BTreeMap::from([( - "avg".into(), - AggregateFunctionDefinition { - result_type: Type::Named { - name: "Float".into(), // Different result type to the input scalar type - }, - }, - )]), - comparison_operators: BTreeMap::from([( - "_eq".to_owned(), - ComparisonOperatorDefinition::Equal, - )]), - }, - ), - ]) - } - - fn make_flat_schema() -> QueryContext<'static> { - QueryContext { - collections: Cow::Owned(BTreeMap::from([ - ( - "authors".into(), - v3::CollectionInfo { - name: "authors".to_owned(), - description: None, - collection_type: "Author".into(), - arguments: Default::default(), - uniqueness_constraints: make_primary_key_uniqueness_constraint("authors"), - foreign_keys: Default::default(), - }, - ), - ( - "articles".into(), - v3::CollectionInfo { - name: "articles".to_owned(), - description: None, - collection_type: "Article".into(), - arguments: Default::default(), - uniqueness_constraints: make_primary_key_uniqueness_constraint("articles"), - foreign_keys: Default::default(), - }, - ), - ])), - functions: Default::default(), - object_types: Cow::Owned(BTreeMap::from([ - ( - "Author".into(), - schema::ObjectType { - description: None, - fields: BTreeMap::from([ - ( - "id".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::Scalar(BsonScalarType::Int), - }, - ), - ( - "last_name".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::Scalar(BsonScalarType::String), - }, - ), - ]), - }, - ), - ( - "Article".into(), - schema::ObjectType { - description: None, - fields: BTreeMap::from([ - ( - "author_id".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::Scalar(BsonScalarType::Int), - }, - ), - ( - "title".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::Scalar(BsonScalarType::String), - }, - ), - ( - "year".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::Nullable(Box::new(schema::Type::Scalar( - BsonScalarType::Int, - ))), - }, - ), - ]), - }, - ), - ])), - scalar_types: Cow::Owned(make_scalar_types()), - } - } - - fn make_nested_schema() -> QueryContext<'static> { - QueryContext { - collections: Cow::Owned(BTreeMap::from([( - "authors".into(), - v3::CollectionInfo { - name: "authors".into(), - description: None, - collection_type: "Author".into(), - arguments: Default::default(), - uniqueness_constraints: make_primary_key_uniqueness_constraint("authors"), - foreign_keys: Default::default(), - }, - )])), - functions: Default::default(), - object_types: Cow::Owned(BTreeMap::from([ - ( - "Author".into(), - schema::ObjectType { - description: None, - fields: BTreeMap::from([ - ( - "address".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::Object("Address".into()), - }, - ), - ( - "articles".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::ArrayOf(Box::new(schema::Type::Object( - "Article".into(), - ))), - }, - ), - ( - "array_of_arrays".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::ArrayOf(Box::new(schema::Type::ArrayOf( - Box::new(schema::Type::Object("Article".into())), - ))), - }, - ), - ]), - }, - ), - ( - "Address".into(), - schema::ObjectType { - description: None, - fields: BTreeMap::from([( - "country".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::Scalar(BsonScalarType::String), - }, - )]), - }, - ), - ( - "Article".into(), - schema::ObjectType { - description: None, - fields: BTreeMap::from([( - "title".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::Scalar(BsonScalarType::String), - }, - )]), - }, - ), - ])), - scalar_types: Cow::Owned(make_scalar_types()), - } - } - - fn make_primary_key_uniqueness_constraint( - collection_name: &str, - ) -> BTreeMap { - [( - format!("{collection_name}_id"), - v3::UniquenessConstraint { - unique_columns: vec!["_id".to_owned()], - }, - )] - .into() - } } diff --git a/crates/mongodb-connector/src/main.rs b/crates/mongodb-connector/src/main.rs index 3f8b36fd..261a1185 100644 --- a/crates/mongodb-connector/src/main.rs +++ b/crates/mongodb-connector/src/main.rs @@ -7,6 +7,9 @@ mod query_context; mod query_response; mod schema; +#[cfg(test)] +mod test_helpers; + use std::error::Error; use mongo_connector::MongoConnector; diff --git a/crates/mongodb-connector/src/test_helpers.rs b/crates/mongodb-connector/src/test_helpers.rs new file mode 100644 index 00000000..1c1ade80 --- /dev/null +++ b/crates/mongodb-connector/src/test_helpers.rs @@ -0,0 +1,227 @@ +use std::{borrow::Cow, collections::BTreeMap}; + +use configuration::schema; +use mongodb_support::BsonScalarType; +use ndc_sdk::models::{ + AggregateFunctionDefinition, CollectionInfo, ComparisonOperatorDefinition, ScalarType, Type, TypeRepresentation, UniquenessConstraint +}; + +use crate::api_type_conversions::QueryContext; + +pub fn make_scalar_types() -> BTreeMap { + BTreeMap::from([ + ( + "String".to_owned(), + ScalarType { + representation: Some(TypeRepresentation::String), + aggregate_functions: Default::default(), + comparison_operators: BTreeMap::from([ + ("_eq".to_owned(), ComparisonOperatorDefinition::Equal), + ( + "_regex".to_owned(), + ComparisonOperatorDefinition::Custom { + argument_type: Type::Named { + name: "String".to_owned(), + }, + }, + ), + ]), + }, + ), + ( + "Int".to_owned(), + ScalarType { + representation: Some(TypeRepresentation::Int32), + aggregate_functions: BTreeMap::from([( + "avg".into(), + AggregateFunctionDefinition { + result_type: Type::Named { + name: "Float".into(), // Different result type to the input scalar type + }, + }, + )]), + comparison_operators: BTreeMap::from([( + "_eq".to_owned(), + ComparisonOperatorDefinition::Equal, + )]), + }, + ), + ]) +} + +pub fn make_flat_schema() -> QueryContext<'static> { + QueryContext { + collections: Cow::Owned(BTreeMap::from([ + ( + "authors".into(), + CollectionInfo { + name: "authors".to_owned(), + description: None, + collection_type: "Author".into(), + arguments: Default::default(), + uniqueness_constraints: make_primary_key_uniqueness_constraint("authors"), + foreign_keys: Default::default(), + }, + ), + ( + "articles".into(), + CollectionInfo { + name: "articles".to_owned(), + description: None, + collection_type: "Article".into(), + arguments: Default::default(), + uniqueness_constraints: make_primary_key_uniqueness_constraint("articles"), + foreign_keys: Default::default(), + }, + ), + ])), + functions: Default::default(), + object_types: Cow::Owned(BTreeMap::from([ + ( + "Author".into(), + schema::ObjectType { + description: None, + fields: BTreeMap::from([ + ( + "id".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Scalar(BsonScalarType::Int), + }, + ), + ( + "last_name".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Scalar(BsonScalarType::String), + }, + ), + ]), + }, + ), + ( + "Article".into(), + schema::ObjectType { + description: None, + fields: BTreeMap::from([ + ( + "author_id".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Scalar(BsonScalarType::Int), + }, + ), + ( + "title".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Scalar(BsonScalarType::String), + }, + ), + ( + "year".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Nullable(Box::new(schema::Type::Scalar( + BsonScalarType::Int, + ))), + }, + ), + ]), + }, + ), + ])), + scalar_types: Cow::Owned(make_scalar_types()), + } +} + +pub fn make_nested_schema() -> QueryContext<'static> { + QueryContext { + collections: Cow::Owned(BTreeMap::from([( + "authors".into(), + CollectionInfo { + name: "authors".into(), + description: None, + collection_type: "Author".into(), + arguments: Default::default(), + uniqueness_constraints: make_primary_key_uniqueness_constraint("authors"), + foreign_keys: Default::default(), + }, + )])), + functions: Default::default(), + object_types: Cow::Owned(BTreeMap::from([ + ( + "Author".into(), + schema::ObjectType { + description: None, + fields: BTreeMap::from([ + ( + "address".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Object("Address".into()), + }, + ), + ( + "articles".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::ArrayOf(Box::new(schema::Type::Object( + "Article".into(), + ))), + }, + ), + ( + "array_of_arrays".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::ArrayOf(Box::new(schema::Type::ArrayOf( + Box::new(schema::Type::Object("Article".into())), + ))), + }, + ), + ]), + }, + ), + ( + "Address".into(), + schema::ObjectType { + description: None, + fields: BTreeMap::from([( + "country".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Scalar(BsonScalarType::String), + }, + )]), + }, + ), + ( + "Article".into(), + schema::ObjectType { + description: None, + fields: BTreeMap::from([( + "title".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Scalar(BsonScalarType::String), + }, + )]), + }, + ), + ])), + scalar_types: Cow::Owned(make_scalar_types()), + } +} + +fn make_primary_key_uniqueness_constraint( + collection_name: &str, +) -> BTreeMap { + [( + format!("{collection_name}_id"), + UniquenessConstraint { + unique_columns: vec!["_id".to_owned()], + }, + )] + .into() +} From 87f2ceb27bbd6fee19a25d8389d81e96cf8e39d5 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Wed, 24 Apr 2024 15:50:18 -0700 Subject: [PATCH 10/25] fix a type reference, add tests --- ...ion_tests__tests__basic__runs_a_query.snap | 30 +++++--- .../mongodb-connector/src/query_response.rs | 73 +++++++++++++++++-- crates/mongodb-connector/src/test_helpers.rs | 66 +++++++++++++++-- 3 files changed, 144 insertions(+), 25 deletions(-) diff --git a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__basic__runs_a_query.snap b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__basic__runs_a_query.snap index cea7aa7f..a4fec50d 100644 --- a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__basic__runs_a_query.snap +++ b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__basic__runs_a_query.snap @@ -5,43 +5,53 @@ expression: "query(r#\"\n query Movies {\n movie data: movies: - imdb: - rating: 6.2 + rating: + $numberDouble: "6.2" votes: 1189 title: Blacksmith Scene - imdb: - rating: 7.4 + rating: + $numberDouble: "7.4" votes: 9847 title: The Great Train Robbery - imdb: - rating: 7.1 + rating: + $numberDouble: "7.1" votes: 448 title: The Land Beyond the Sunset - imdb: - rating: 6.6 + rating: + $numberDouble: "6.6" votes: 1375 title: A Corner in Wheat - imdb: - rating: 7.3 + rating: + $numberDouble: "7.3" votes: 1034 title: "Winsor McCay, the Famous Cartoonist of the N.Y. Herald and His Moving Comics" - imdb: - rating: 6 + rating: + $numberInt: "6" votes: 371 title: Traffic in Souls - imdb: - rating: 7.3 + rating: + $numberDouble: "7.3" votes: 1837 title: Gertie the Dinosaur - imdb: - rating: 5.8 + rating: + $numberDouble: "5.8" votes: 223 title: In the Land of the Head Hunters - imdb: - rating: 7.6 + rating: + $numberDouble: "7.6" votes: 744 title: The Perils of Pauline - imdb: - rating: 6.8 + rating: + $numberDouble: "6.8" votes: 15715 title: The Birth of a Nation errors: ~ diff --git a/crates/mongodb-connector/src/query_response.rs b/crates/mongodb-connector/src/query_response.rs index 2d686d31..ddab5f53 100644 --- a/crates/mongodb-connector/src/query_response.rs +++ b/crates/mongodb-connector/src/query_response.rs @@ -193,7 +193,7 @@ fn serialize_single_row( .map(|(field_name, field_definition)| { let value = serialize_field_value( query_context, - path, + &append_to_path(path, [field_name.as_ref()]), collection_name, field_definition, field_name, @@ -212,7 +212,7 @@ fn serialize_field_value( field_name: &str, input: &mut bson::Document, ) -> Result { - let (bson, field_type, object_types) = match field_definition { + let (bson, requested_type, object_types) = match field_definition { ndc::Field::Column { column, fields } => { let field_type = find_field_type(query_context, path, collection_name, column)?; @@ -227,13 +227,15 @@ fn serialize_field_value( Cow::Owned(configured_types) }; + println!("{requested_type:?}\n\n{object_types:?}"); + let value = value_from_option( collection_name, column, &requested_type, input.remove(field_name), )?; - (value, field_type, object_types) + (value, requested_type, object_types) } ndc::Field::Relationship { query, @@ -241,7 +243,7 @@ fn serialize_field_value( arguments, } => todo!(), }; - let json = bson_to_json(field_type, &object_types, bson)?; + let json = bson_to_json(&requested_type, &object_types, bson)?; Ok(json) } @@ -323,10 +325,12 @@ fn object_type_for_field_subset( fields, description: None, }; - let pruned_object_type_name = path.into_iter().join("_"); - let pruned_type = Type::Object(pruned_object_type_name); + let pruned_object_type_name = format!("requested_fields_{}", path.into_iter().join("_")); + let pruned_type = Type::Object(pruned_object_type_name.clone()); - let object_types = object_type_sets.into_iter().flatten().collect(); + let mut object_types: Vec<(String, ObjectType)> = + object_type_sets.into_iter().flatten().collect(); + object_types.push((pruned_object_type_name, pruned_object_type)); Ok((pruned_type, object_types)) } @@ -408,3 +412,58 @@ fn append_to_path<'a>(path: &[&'a str], elems: impl IntoIterator fn path_to_owned(path: &[&str]) -> Vec { path.into_iter().map(|x| (*x).to_owned()).collect() } + +#[cfg(test)] +mod tests { + use mongodb::bson; + use ndc_sdk::models::{QueryResponse, RowFieldValue, RowSet}; + use ndc_test_helpers::{field, object, query, query_request}; + use pretty_assertions::assert_eq; + use serde_json::json; + + use crate::test_helpers::make_nested_schema; + + use super::serialize_query_response; + + #[test] + fn serializes_response_with_nested_fields() -> anyhow::Result<()> { + let query_context = make_nested_schema(); + let request = query_request() + .collection("authors") + .query(query().fields([field!("address" => "address", object!([ + field!("street"), + field!("geocode" => "geocode", object!([ + field!("longitude"), + ])), + ]))])) + .into(); + + let response_documents = vec![bson::doc! { + "address": { + "street": "137 Maple Dr", + "geocode": { + "longitude": 122.4194, + }, + }, + }]; + + let response = serialize_query_response(&query_context, &request, response_documents)?; + assert_eq!( + response, + QueryResponse(vec![RowSet { + aggregates: Default::default(), + rows: Some(vec![[( + "address".into(), + RowFieldValue(json!({ + "street": "137 Maple Dr", + "geocode": { + "longitude": 122.4194, + }, + })) + )] + .into()]), + }]) + ); + Ok(()) + } +} diff --git a/crates/mongodb-connector/src/test_helpers.rs b/crates/mongodb-connector/src/test_helpers.rs index 1c1ade80..2c97abd3 100644 --- a/crates/mongodb-connector/src/test_helpers.rs +++ b/crates/mongodb-connector/src/test_helpers.rs @@ -3,7 +3,8 @@ use std::{borrow::Cow, collections::BTreeMap}; use configuration::schema; use mongodb_support::BsonScalarType; use ndc_sdk::models::{ - AggregateFunctionDefinition, CollectionInfo, ComparisonOperatorDefinition, ScalarType, Type, TypeRepresentation, UniquenessConstraint + AggregateFunctionDefinition, CollectionInfo, ComparisonOperatorDefinition, ScalarType, Type, + TypeRepresentation, UniquenessConstraint, }; use crate::api_type_conversions::QueryContext; @@ -187,13 +188,40 @@ pub fn make_nested_schema() -> QueryContext<'static> { "Address".into(), schema::ObjectType { description: None, - fields: BTreeMap::from([( - "country".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::Scalar(BsonScalarType::String), - }, - )]), + fields: BTreeMap::from([ + ( + "country".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Scalar(BsonScalarType::String), + }, + ), + ( + "street".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Scalar(BsonScalarType::String), + }, + ), + ( + "apartment".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Nullable(Box::new(schema::Type::Scalar( + BsonScalarType::String, + ))), + }, + ), + ( + "geocode".into(), + schema::ObjectField { + description: Some("Lat/Long".to_owned()), + r#type: schema::Type::Nullable(Box::new(schema::Type::Object( + "Geocode".to_owned(), + ))), + }, + ), + ]), }, ), ( @@ -209,6 +237,28 @@ pub fn make_nested_schema() -> QueryContext<'static> { )]), }, ), + ( + "Geocode".into(), + schema::ObjectType { + description: None, + fields: BTreeMap::from([ + ( + "latitude".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Scalar(BsonScalarType::Double), + }, + ), + ( + "longitude".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Scalar(BsonScalarType::Double), + }, + ), + ]), + }, + ), ])), scalar_types: Cow::Owned(make_scalar_types()), } From cd793ab1d989c824d8a6458affa57dbdee14f46b Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Wed, 24 Apr 2024 17:51:41 -0700 Subject: [PATCH 11/25] test serializing decimal128 --- .../mongodb-connector/src/query_response.rs | 91 ++++++++++++++++++- crates/mongodb-connector/src/test_helpers.rs | 2 +- 2 files changed, 89 insertions(+), 4 deletions(-) diff --git a/crates/mongodb-connector/src/query_response.rs b/crates/mongodb-connector/src/query_response.rs index ddab5f53..2db07284 100644 --- a/crates/mongodb-connector/src/query_response.rs +++ b/crates/mongodb-connector/src/query_response.rs @@ -415,13 +415,21 @@ fn path_to_owned(path: &[&str]) -> Vec { #[cfg(test)] mod tests { - use mongodb::bson; - use ndc_sdk::models::{QueryResponse, RowFieldValue, RowSet}; + use std::{borrow::Cow, str::FromStr}; + + use configuration::schema::{ObjectField, ObjectType, Type}; + use mongodb::bson::{self, Bson}; + use ndc_sdk::models::{CollectionInfo, QueryResponse, RowFieldValue, RowSet}; use ndc_test_helpers::{field, object, query, query_request}; use pretty_assertions::assert_eq; use serde_json::json; - use crate::test_helpers::make_nested_schema; + use crate::{ + api_type_conversions::QueryContext, + test_helpers::{ + make_nested_schema, make_primary_key_uniqueness_constraint, make_scalar_types, + }, + }; use super::serialize_query_response; @@ -466,4 +474,81 @@ mod tests { ); Ok(()) } + + #[test] + fn serializes_response_with_decimal_128_fields() -> anyhow::Result<()> { + let query_context = QueryContext { + collections: Cow::Owned( + [( + "business".into(), + CollectionInfo { + name: "business".into(), + description: None, + collection_type: "Business".into(), + arguments: Default::default(), + uniqueness_constraints: make_primary_key_uniqueness_constraint("business"), + foreign_keys: Default::default(), + }, + )] + .into(), + ), + functions: Default::default(), + object_types: Cow::Owned( + [( + "Business".to_owned(), + ObjectType { + description: None, + fields: [ + ( + "price".to_owned(), + ObjectField { + description: None, + r#type: Type::Scalar(mongodb_support::BsonScalarType::Decimal), + }, + ), + ( + "price_extjson".to_owned(), + ObjectField { + description: None, + r#type: Type::ExtendedJSON, + }, + ), + ] + .into(), + }, + )] + .into(), + ), + scalar_types: Cow::Owned(make_scalar_types()), + }; + + let request = query_request() + .collection("business") + .query(query().fields([field!("price"), field!("price_extjson")])) + .into(); + + let response_documents = vec![bson::doc! { + "price": Bson::Decimal128(bson::Decimal128::from_str("127.6486654").unwrap()), + "price_extjson": Bson::Decimal128(bson::Decimal128::from_str("-4.9999999999").unwrap()), + }]; + + let response = serialize_query_response(&query_context, &request, response_documents)?; + assert_eq!( + response, + QueryResponse(vec![RowSet { + aggregates: Default::default(), + rows: Some(vec![[ + ("price".into(), RowFieldValue(json!("127.6486654"))), + ( + "price_extjson".into(), + RowFieldValue(json!({ + "$numberDecimal": "-4.9999999999" + })) + ), + ] + .into()]), + }]) + ); + Ok(()) + } } diff --git a/crates/mongodb-connector/src/test_helpers.rs b/crates/mongodb-connector/src/test_helpers.rs index 2c97abd3..9a1146b2 100644 --- a/crates/mongodb-connector/src/test_helpers.rs +++ b/crates/mongodb-connector/src/test_helpers.rs @@ -264,7 +264,7 @@ pub fn make_nested_schema() -> QueryContext<'static> { } } -fn make_primary_key_uniqueness_constraint( +pub fn make_primary_key_uniqueness_constraint( collection_name: &str, ) -> BTreeMap { [( From 202c960a086df07c585bb6c92e63216404f4a6d6 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Wed, 24 Apr 2024 18:35:06 -0700 Subject: [PATCH 12/25] test nested extjson --- crates/configuration/src/schema/mod.rs | 50 +++++++- .../mongodb-connector/src/query_response.rs | 112 +++++++++++------- crates/mongodb-connector/src/test_helpers.rs | 15 +-- .../ndc-test-helpers/src/collection_info.rs | 27 +++++ crates/ndc-test-helpers/src/lib.rs | 4 + crates/ndc-test-helpers/src/types.rs | 45 +++++++ 6 files changed, 198 insertions(+), 55 deletions(-) create mode 100644 crates/ndc-test-helpers/src/collection_info.rs create mode 100644 crates/ndc-test-helpers/src/types.rs diff --git a/crates/configuration/src/schema/mod.rs b/crates/configuration/src/schema/mod.rs index 4b7418ad..90f4e0f4 100644 --- a/crates/configuration/src/schema/mod.rs +++ b/crates/configuration/src/schema/mod.rs @@ -3,7 +3,7 @@ use std::collections::BTreeMap; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use mongodb_support::BsonScalarType; +use mongodb_support::{BsonScalarType, EXTENDED_JSON_TYPE_NAME}; use crate::{WithName, WithNameRef}; @@ -95,6 +95,30 @@ impl From for ndc_models::Type { } } +// Should only be used for testing +impl From for Type { + fn from(value: ndc_models::Type) -> Self { + match value { + ndc_models::Type::Named { name } => { + if name == EXTENDED_JSON_TYPE_NAME { + Type::ExtendedJSON + } else if let Ok(scalar_type) = BsonScalarType::from_bson_name(&name) { + Type::Scalar(scalar_type) + } else { + Type::Object(name.clone()) + } + } + ndc_models::Type::Nullable { underlying_type } => { + Type::Nullable(Box::new((*underlying_type).into())) + } + ndc_models::Type::Array { element_type } => { + Type::ArrayOf(Box::new((*element_type).into())) + } + ndc_models::Type::Predicate { .. } => panic!("not implemented"), + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct ObjectType { @@ -130,6 +154,20 @@ impl From for ndc_models::ObjectType { } } +// Should only be used for testing +impl From for ObjectType { + fn from(value: ndc_models::ObjectType) -> Self { + ObjectType { + fields: value + .fields + .into_iter() + .map(|(name, field)| (name, field.into())) + .collect(), + description: value.description, + } + } +} + /// Information about an object type field. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] @@ -159,3 +197,13 @@ impl From for ndc_models::ObjectField { } } } + +// Should only be used for testing +impl From for ObjectField { + fn from(value: ndc_models::ObjectField) -> Self { + ObjectField { + r#type: value.r#type.into(), + description: value.description, + } + } +} diff --git a/crates/mongodb-connector/src/query_response.rs b/crates/mongodb-connector/src/query_response.rs index 2db07284..0e84b956 100644 --- a/crates/mongodb-connector/src/query_response.rs +++ b/crates/mongodb-connector/src/query_response.rs @@ -417,18 +417,17 @@ fn path_to_owned(path: &[&str]) -> Vec { mod tests { use std::{borrow::Cow, str::FromStr}; - use configuration::schema::{ObjectField, ObjectType, Type}; + use configuration::schema::Type; use mongodb::bson::{self, Bson}; - use ndc_sdk::models::{CollectionInfo, QueryResponse, RowFieldValue, RowSet}; - use ndc_test_helpers::{field, object, query, query_request}; + use mongodb_support::BsonScalarType; + use ndc_sdk::models::{QueryResponse, RowFieldValue, RowSet}; + use ndc_test_helpers::{collection, field, object, object_type, query, query_request}; use pretty_assertions::assert_eq; use serde_json::json; use crate::{ api_type_conversions::QueryContext, - test_helpers::{ - make_nested_schema, make_primary_key_uniqueness_constraint, make_scalar_types, - }, + test_helpers::{make_nested_schema, make_scalar_types}, }; use super::serialize_query_response; @@ -478,44 +477,16 @@ mod tests { #[test] fn serializes_response_with_decimal_128_fields() -> anyhow::Result<()> { let query_context = QueryContext { - collections: Cow::Owned( - [( - "business".into(), - CollectionInfo { - name: "business".into(), - description: None, - collection_type: "Business".into(), - arguments: Default::default(), - uniqueness_constraints: make_primary_key_uniqueness_constraint("business"), - foreign_keys: Default::default(), - }, - )] - .into(), - ), + collections: Cow::Owned([collection("business")].into()), functions: Default::default(), object_types: Cow::Owned( [( - "Business".to_owned(), - ObjectType { - description: None, - fields: [ - ( - "price".to_owned(), - ObjectField { - description: None, - r#type: Type::Scalar(mongodb_support::BsonScalarType::Decimal), - }, - ), - ( - "price_extjson".to_owned(), - ObjectField { - description: None, - r#type: Type::ExtendedJSON, - }, - ), - ] - .into(), - }, + "business".to_owned(), + object_type([ + ("price", Type::Scalar(BsonScalarType::Decimal)), + ("price_extjson", Type::ExtendedJSON), + ]) + .into(), )] .into(), ), @@ -551,4 +522,63 @@ mod tests { ); Ok(()) } + + #[test] + fn serializes_response_with_nested_extjson() -> anyhow::Result<()> { + let query_context = QueryContext { + collections: Cow::Owned([collection("data")].into()), + functions: Default::default(), + object_types: Cow::Owned( + [( + "data".to_owned(), + object_type([("value", Type::ExtendedJSON)]).into(), + )] + .into(), + ), + scalar_types: Cow::Owned(make_scalar_types()), + }; + + let request = query_request() + .collection("data") + .query(query().fields([field!("value")])) + .into(); + + let response_documents = vec![bson::doc! { + "value": { + "array": [ + { "number": Bson::Int32(3) }, + { "number": Bson::Decimal128(bson::Decimal128::from_str("127.6486654").unwrap()) }, + ], + "string": "hello", + "object": { + "foo": 1, + "bar": 2, + }, + }, + }]; + + let response = serialize_query_response(&query_context, &request, response_documents)?; + assert_eq!( + response, + QueryResponse(vec![RowSet { + aggregates: Default::default(), + rows: Some(vec![[( + "value".into(), + RowFieldValue(json!({ + "array": [ + { "number": { "$numberInt": "3" } }, + { "number": { "$numberDecimal": "127.6486654" } }, + ], + "string": "hello", + "object": { + "foo": { "$numberInt": "1" }, + "bar": { "$numberInt": "2" }, + }, + })) + )] + .into()]), + }]) + ); + Ok(()) + } } diff --git a/crates/mongodb-connector/src/test_helpers.rs b/crates/mongodb-connector/src/test_helpers.rs index 9a1146b2..8b2aa1a2 100644 --- a/crates/mongodb-connector/src/test_helpers.rs +++ b/crates/mongodb-connector/src/test_helpers.rs @@ -4,8 +4,9 @@ use configuration::schema; use mongodb_support::BsonScalarType; use ndc_sdk::models::{ AggregateFunctionDefinition, CollectionInfo, ComparisonOperatorDefinition, ScalarType, Type, - TypeRepresentation, UniquenessConstraint, + TypeRepresentation, }; +use ndc_test_helpers::make_primary_key_uniqueness_constraint; use crate::api_type_conversions::QueryContext; @@ -263,15 +264,3 @@ pub fn make_nested_schema() -> QueryContext<'static> { scalar_types: Cow::Owned(make_scalar_types()), } } - -pub fn make_primary_key_uniqueness_constraint( - collection_name: &str, -) -> BTreeMap { - [( - format!("{collection_name}_id"), - UniquenessConstraint { - unique_columns: vec!["_id".to_owned()], - }, - )] - .into() -} diff --git a/crates/ndc-test-helpers/src/collection_info.rs b/crates/ndc-test-helpers/src/collection_info.rs new file mode 100644 index 00000000..4b41d802 --- /dev/null +++ b/crates/ndc-test-helpers/src/collection_info.rs @@ -0,0 +1,27 @@ +use std::{collections::BTreeMap, fmt::Display}; + +use ndc_models::{CollectionInfo, ObjectField, ObjectType, Type, UniquenessConstraint}; + +pub fn collection(name: impl Display + Clone) -> (String, CollectionInfo) { + let coll = CollectionInfo { + name: name.to_string(), + description: None, + arguments: Default::default(), + collection_type: name.to_string(), + uniqueness_constraints: make_primary_key_uniqueness_constraint(name.clone()), + foreign_keys: Default::default(), + }; + (name.to_string(), coll) +} + +pub fn make_primary_key_uniqueness_constraint( + collection_name: impl Display, +) -> BTreeMap { + [( + format!("{collection_name}_id"), + UniquenessConstraint { + unique_columns: vec!["_id".to_owned()], + }, + )] + .into() +} diff --git a/crates/ndc-test-helpers/src/lib.rs b/crates/ndc-test-helpers/src/lib.rs index 3d916a09..7be28e21 100644 --- a/crates/ndc-test-helpers/src/lib.rs +++ b/crates/ndc-test-helpers/src/lib.rs @@ -2,11 +2,13 @@ #![allow(unused_imports)] mod aggregates; +mod collection_info; mod comparison_target; mod comparison_value; mod exists_in_collection; mod expressions; mod field; +mod types; use std::collections::BTreeMap; @@ -16,11 +18,13 @@ use ndc_models::{ QueryRequest, Relationship, RelationshipArgument, RelationshipType, }; +pub use collection_info::*; pub use comparison_target::*; pub use comparison_value::*; pub use exists_in_collection::*; pub use expressions::*; pub use field::*; +pub use types::*; #[derive(Clone, Debug, Default)] pub struct QueryRequestBuilder { diff --git a/crates/ndc-test-helpers/src/types.rs b/crates/ndc-test-helpers/src/types.rs new file mode 100644 index 00000000..3f2921f4 --- /dev/null +++ b/crates/ndc-test-helpers/src/types.rs @@ -0,0 +1,45 @@ +use std::fmt::Display; + +use ndc_models::{ObjectField, ObjectType, Type}; + +// pub fn object_type( +// name: impl Display, +// fields: impl IntoIterator)>, +// ) -> (String, ObjectType) { +// let t = ObjectType { +// description: Default::default(), +// fields: fields +// .into_iter() +// .map(|(name, field_type)| { +// ( +// name.to_string(), +// ObjectField { +// description: Default::default(), +// r#type: field_type.into(), +// }, +// ) +// }) +// .collect(), +// }; +// (name.to_string(), t) +// } + +pub fn object_type( + fields: impl IntoIterator)>, +) -> ObjectType { + ObjectType { + description: Default::default(), + fields: fields + .into_iter() + .map(|(name, field_type)| { + ( + name.to_string(), + ObjectField { + description: Default::default(), + r#type: field_type.into(), + }, + ) + }) + .collect(), + } +} From 3dfcb32e042289102279435253e53b281a1cd5b5 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Wed, 24 Apr 2024 18:39:23 -0700 Subject: [PATCH 13/25] test field aliases --- .../mongodb-connector/src/query_response.rs | 58 ++++++++++++++++++- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/crates/mongodb-connector/src/query_response.rs b/crates/mongodb-connector/src/query_response.rs index 0e84b956..1bf34d51 100644 --- a/crates/mongodb-connector/src/query_response.rs +++ b/crates/mongodb-connector/src/query_response.rs @@ -264,7 +264,6 @@ fn find_field_type<'a>( Ok(&field_type.r#type) } -// TODO: test response with nested ExtendedJSON fn prune_type_to_field_selection<'a>( query_context: &QueryContext<'_>, path: &[&str], @@ -298,7 +297,6 @@ fn prune_type_to_field_selection<'a>( } } -// TODO: test query with aliased name for nested field fn object_type_for_field_subset( query_context: &QueryContext<'_>, path: &[&str], @@ -413,6 +411,7 @@ fn path_to_owned(path: &[&str]) -> Vec { path.into_iter().map(|x| (*x).to_owned()).collect() } +// TODO: test nested objects in arrays #[cfg(test)] mod tests { use std::{borrow::Cow, str::FromStr}; @@ -474,6 +473,61 @@ mod tests { Ok(()) } + #[test] + fn serializes_response_with_aliased_fields() -> anyhow::Result<()> { + let query_context = make_nested_schema(); + let request = query_request() + .collection("authors") + .query(query().fields([ + field!("address1" => "address", object!([ + field!("line1" => "street"), + ])), + field!("address2" => "address", object!([ + field!("latlong" => "geocode", object!([ + field!("long" => "longitude"), + ])), + ])), + ])) + .into(); + + let response_documents = vec![bson::doc! { + "address1": { + "line1": "137 Maple Dr", + }, + "address2": { + "latlong": { + "long": 122.4194, + }, + }, + }]; + + let response = serialize_query_response(&query_context, &request, response_documents)?; + assert_eq!( + response, + QueryResponse(vec![RowSet { + aggregates: Default::default(), + rows: Some(vec![[ + ( + "address1".into(), + RowFieldValue(json!({ + "line1": "137 Maple Dr", + })) + ), + ( + "address2".into(), + RowFieldValue(json!({ + "latlong": { + "long": 122.4194, + }, + })) + ) + ] + .into()]), + }]) + ); + Ok(()) + } + #[test] fn serializes_response_with_decimal_128_fields() -> anyhow::Result<()> { let query_context = QueryContext { From d2d8dca25ef194bed390d10b5ee6d8615b232d34 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Wed, 24 Apr 2024 18:44:06 -0700 Subject: [PATCH 14/25] delete commented code --- crates/ndc-test-helpers/src/types.rs | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/crates/ndc-test-helpers/src/types.rs b/crates/ndc-test-helpers/src/types.rs index 3f2921f4..dde51b96 100644 --- a/crates/ndc-test-helpers/src/types.rs +++ b/crates/ndc-test-helpers/src/types.rs @@ -2,28 +2,6 @@ use std::fmt::Display; use ndc_models::{ObjectField, ObjectType, Type}; -// pub fn object_type( -// name: impl Display, -// fields: impl IntoIterator)>, -// ) -> (String, ObjectType) { -// let t = ObjectType { -// description: Default::default(), -// fields: fields -// .into_iter() -// .map(|(name, field_type)| { -// ( -// name.to_string(), -// ObjectField { -// description: Default::default(), -// r#type: field_type.into(), -// }, -// ) -// }) -// .collect(), -// }; -// (name.to_string(), t) -// } - pub fn object_type( fields: impl IntoIterator)>, ) -> ObjectType { From 71ac9b05ae35f12324a4b6dbff75dc4553a540b0 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 25 Apr 2024 13:29:45 -0700 Subject: [PATCH 15/25] delete unused projection code --- .../mongodb-agent-common/src/mongodb/mod.rs | 2 - .../src/mongodb/projection.rs | 272 ------------------ .../mongodb-agent-common/src/mongodb/stage.rs | 11 +- 3 files changed, 1 insertion(+), 284 deletions(-) delete mode 100644 crates/mongodb-agent-common/src/mongodb/projection.rs diff --git a/crates/mongodb-agent-common/src/mongodb/mod.rs b/crates/mongodb-agent-common/src/mongodb/mod.rs index 2a8961cf..f311835e 100644 --- a/crates/mongodb-agent-common/src/mongodb/mod.rs +++ b/crates/mongodb-agent-common/src/mongodb/mod.rs @@ -2,7 +2,6 @@ mod accumulator; mod collection; mod database; mod pipeline; -mod projection; pub mod sanitize; mod selection; mod stage; @@ -15,7 +14,6 @@ pub use self::{ collection::CollectionTrait, database::DatabaseTrait, pipeline::Pipeline, - projection::{ProjectAs, Projection}, selection::Selection, stage::Stage, }; diff --git a/crates/mongodb-agent-common/src/mongodb/projection.rs b/crates/mongodb-agent-common/src/mongodb/projection.rs deleted file mode 100644 index 54dcbc2c..00000000 --- a/crates/mongodb-agent-common/src/mongodb/projection.rs +++ /dev/null @@ -1,272 +0,0 @@ -use std::collections::BTreeMap; - -use mongodb::bson::{self}; -use serde::Serialize; - -use dc_api_types::Field; - -use crate::mongodb::selection::serialized_null_checked_column_reference; - -/// A projection determines which fields to request from the result of a query. -/// -/// See https://www.mongodb.com/docs/manual/reference/operator/aggregation/project/#mongodb-pipeline-pipe.-project -#[derive(Clone, Debug, PartialEq, Serialize)] -#[serde(transparent)] -pub struct Projection { - pub field_projections: BTreeMap, -} - -impl Projection { - pub fn new(fields: T) -> Projection - where - T: IntoIterator, - K: Into, - { - Projection { - field_projections: fields.into_iter().map(|(k, v)| (k.into(), v)).collect(), - } - } - - pub fn for_field_selection(field_selection: T) -> Projection - where - T: IntoIterator, - K: Into, - { - for_field_selection_helper(&[], field_selection) - } -} - -fn for_field_selection_helper(parent_columns: &[&str], field_selection: T) -> Projection -where - T: IntoIterator, - K: Into, -{ - Projection::new( - field_selection - .into_iter() - .map(|(key, value)| (key.into(), project_field_as(parent_columns, &value))), - ) -} - -fn project_field_as(parent_columns: &[&str], field: &Field) -> ProjectAs { - match field { - Field::Column { - column, - column_type, - } => { - let col_path = match parent_columns { - [] => format!("${column}"), - _ => format!("${}.{}", parent_columns.join("."), column), - }; - let bson_col_path = serialized_null_checked_column_reference(col_path, column_type); - ProjectAs::Expression(bson_col_path) - } - Field::NestedObject { column, query } => { - let nested_parent_columns = append_to_path(parent_columns, column); - let fields = query.fields.clone().unwrap_or_default(); - ProjectAs::Nested(for_field_selection_helper(&nested_parent_columns, fields)) - } - Field::NestedArray { - field, - // NOTE: We can use a $slice in our projection to do offsets and limits: - // https://www.mongodb.com/docs/manual/reference/operator/projection/slice/#mongodb-projection-proj.-slice - limit: _, - offset: _, - r#where: _, - } => project_field_as(parent_columns, field), - Field::Relationship { - query, - relationship, - } => { - // TODO: Need to determine whether the relation type is "object" or "array" and project - // accordingly - let nested_parent_columns = append_to_path(parent_columns, relationship); - let fields = query.fields.clone().unwrap_or_default(); - ProjectAs::Nested(for_field_selection_helper(&nested_parent_columns, fields)) - } - } -} - -fn append_to_path<'a, 'b, 'c>(parent_columns: &'a [&'b str], column: &'c str) -> Vec<&'c str> -where - 'b: 'c, -{ - parent_columns.iter().copied().chain(Some(column)).collect() -} - -impl TryFrom<&Projection> for bson::Document { - type Error = bson::ser::Error; - fn try_from(value: &Projection) -> Result { - bson::to_document(value) - } -} - -impl TryFrom for bson::Document { - type Error = bson::ser::Error; - fn try_from(value: Projection) -> Result { - (&value).try_into() - } -} - -#[derive(Clone, Debug, PartialEq)] -pub enum ProjectAs { - #[allow(dead_code)] - Included, - Excluded, - Expression(bson::Bson), - Nested(Projection), -} - -impl Serialize for ProjectAs { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - match self { - ProjectAs::Included => serializer.serialize_u8(1), - ProjectAs::Excluded => serializer.serialize_u8(0), - ProjectAs::Expression(v) => v.serialize(serializer), - ProjectAs::Nested(projection) => projection.serialize(serializer), - } - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - - use mongodb::bson::{bson, doc, to_bson, to_document}; - use pretty_assertions::assert_eq; - use serde_json::{from_value, json}; - - use super::{ProjectAs, Projection}; - use dc_api_types::{Field, QueryRequest}; - - #[test] - fn serializes_a_projection() -> Result<(), anyhow::Error> { - let projection = Projection { - field_projections: [ - ("foo".to_owned(), ProjectAs::Included), - ( - "bar".to_owned(), - ProjectAs::Nested(Projection { - field_projections: [("baz".to_owned(), ProjectAs::Included)].into(), - }), - ), - ] - .into(), - }; - assert_eq!( - to_bson(&projection)?, - bson!({ - "foo": 1, - "bar": { - "baz": 1 - } - }) - ); - Ok(()) - } - - #[test] - fn calculates_projection_for_fields() -> Result<(), anyhow::Error> { - let fields: HashMap = from_value(json!({ - "foo": { "type": "column", "column": "foo", "column_type": "String" }, - "foo_again": { "type": "column", "column": "foo", "column_type": "String" }, - "bar": { - "type": "object", - "column": "bar", - "query": { - "fields": { - "baz": { "type": "column", "column": "baz", "column_type": "String" }, - "baz_again": { "type": "column", "column": "baz", "column_type": "String" }, - }, - }, - }, - "bar_again": { - "type": "object", - "column": "bar", - "query": { - "fields": { - "baz": { "type": "column", "column": "baz", "column_type": "String" }, - }, - }, - }, - "my_date": { "type": "column", "column": "my_date", "column_type": "date"}, - }))?; - let projection = Projection::for_field_selection(fields); - assert_eq!( - to_document(&projection)?, - doc! { - "foo": { "$ifNull": ["$foo", null] }, - "foo_again": { "$ifNull": ["$foo", null] }, - "bar": { - "baz": { "$ifNull": ["$bar.baz", null] }, - "baz_again": { "$ifNull": ["$bar.baz", null] } - }, - "bar_again": { - "baz": { "$ifNull": ["$bar.baz", null] } - }, - "my_date": { - "$dateToString": { - "date": { "$ifNull": ["$my_date", null] } - } - } - } - ); - Ok(()) - } - - #[test] - fn produces_projection_for_relation() -> Result<(), anyhow::Error> { - let query_request: QueryRequest = from_value(json!({ - "query": { - "fields": { - "class_students": { - "type": "relationship", - "query": { - "fields": { - "name": { "type": "column", "column": "name", "column_type": "string" }, - }, - }, - "relationship": "class_students", - }, - "students": { - "type": "relationship", - "query": { - "fields": { - "student_name": { "type": "column", "column": "name", "column_type": "string" }, - }, - }, - "relationship": "class_students", - }, - }, - }, - "target": { "name": ["classes"], "type": "table" }, - "relationships": [{ - "source_table": ["classes"], - "relationships": { - "class_students": { - "column_mapping": { "_id": "classId" }, - "relationship_type": "array", - "target": {"name": ["students"], "type": "table"}, - }, - }, - }], - }))?; - let projection = - Projection::for_field_selection(query_request.query.fields.flatten().unwrap()); - assert_eq!( - to_document(&projection)?, - doc! { - "class_students": { - "name": { "$ifNull": ["$class_students.name", null] }, - }, - "students": { - "student_name": { "$ifNull": ["$class_students.name", null] }, - }, - } - ); - Ok(()) - } -} diff --git a/crates/mongodb-agent-common/src/mongodb/stage.rs b/crates/mongodb-agent-common/src/mongodb/stage.rs index 7164046e..4be51550 100644 --- a/crates/mongodb-agent-common/src/mongodb/stage.rs +++ b/crates/mongodb-agent-common/src/mongodb/stage.rs @@ -3,7 +3,7 @@ use std::collections::BTreeMap; use mongodb::bson; use serde::Serialize; -use super::{accumulator::Accumulator, pipeline::Pipeline, projection::Projection, Selection}; +use super::{accumulator::Accumulator, pipeline::Pipeline, Selection}; /// Aggergation Pipeline Stage. This is a work-in-progress - we are adding enum variants to match /// MongoDB pipeline stage types as we need them in this app. For documentation on all stage types @@ -133,15 +133,6 @@ pub enum Stage { #[serde(rename = "$count")] Count(String), - /// Reshapes each document in the stream, such as by adding new fields or removing existing fields. For each input document, outputs one document. - /// - /// See also [`$unset`] for removing existing fields. - /// - /// See https://www.mongodb.com/docs/manual/reference/operator/aggregation/project/#mongodb-pipeline-pipe.-project - #[allow(dead_code)] - #[serde(rename = "$project")] - Project(Projection), - /// Replaces a document with the specified embedded document. The operation replaces all /// existing fields in the input document, including the _id field. Specify a document embedded /// in the input document to promote the embedded document to the top level. From e82eb4ae363c715ad80fb7150dde0c713319bf90 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 25 Apr 2024 13:50:53 -0700 Subject: [PATCH 16/25] serialize relation fields --- .../api_type_conversions/conversion_error.rs | 17 +- .../mongodb-connector/src/mongo_connector.rs | 16 +- .../mongodb-connector/src/query_response.rs | 203 +++++++++++++++--- 3 files changed, 192 insertions(+), 44 deletions(-) diff --git a/crates/mongodb-connector/src/api_type_conversions/conversion_error.rs b/crates/mongodb-connector/src/api_type_conversions/conversion_error.rs index 13b41c86..5553bf07 100644 --- a/crates/mongodb-connector/src/api_type_conversions/conversion_error.rs +++ b/crates/mongodb-connector/src/api_type_conversions/conversion_error.rs @@ -23,7 +23,7 @@ pub enum ConversionError { #[error( "Unknown field \"{field_name}\" in object type \"{object_type}\"{}", - if path.is_empty() { "".to_owned() } else { format!(" at path {}", path.join(".")) } + at_path(path) )] UnknownObjectTypeField { object_type: String, @@ -34,8 +34,11 @@ pub enum ConversionError { #[error("Unknown collection, \"{0}\"")] UnknownCollection(String), - #[error("Unknown relationship, \"{0}\"")] - UnknownRelationship(String), + #[error("Unknown relationship, \"{relationship_name}\"{}", at_path(path))] + UnknownRelationship { + relationship_name: String, + path: Vec, + }, #[error( "Unknown aggregate function, \"{aggregate_function}\" in scalar type \"{scalar_type}\"" @@ -69,3 +72,11 @@ impl From for ExplainError { } } } + +fn at_path(path: &Vec) -> String { + if path.is_empty() { + "".to_owned() + } else { + format!(" at path {}", path.join(".")) + } +} diff --git a/crates/mongodb-connector/src/mongo_connector.rs b/crates/mongodb-connector/src/mongo_connector.rs index f5194feb..892c8741 100644 --- a/crates/mongodb-connector/src/mongo_connector.rs +++ b/crates/mongodb-connector/src/mongo_connector.rs @@ -147,14 +147,12 @@ impl Connector for MongoConnector { let response_documents = handle_query_request(configuration, state, v2_request) .await .map_err(mongo_agent_error_to_query_error)?; - Ok( - serialize_query_response(&query_context, &request, response_documents) - .map_err(|err| { - QueryError::UnprocessableContent(format!( - "error converting MongoDB response to JSON: {err}" - )) - })? - .into(), - ) + let response = serialize_query_response(&query_context, &request, response_documents) + .map_err(|err| { + QueryError::UnprocessableContent(format!( + "error converting MongoDB response to JSON: {err}" + )) + })?; + Ok(response.into()) } } diff --git a/crates/mongodb-connector/src/query_response.rs b/crates/mongodb-connector/src/query_response.rs index 1bf34d51..2af4c414 100644 --- a/crates/mongodb-connector/src/query_response.rs +++ b/crates/mongodb-connector/src/query_response.rs @@ -6,8 +6,8 @@ use itertools::Itertools as _; use mongodb::bson::{self, from_bson, Bson}; use mongodb_agent_common::query::serialization::{bson_to_json, BsonToJsonError}; use ndc_sdk::models::{ - self as ndc, Aggregate, Field, NestedField, NestedObject, Query, QueryRequest, QueryResponse, - RowFieldValue, RowSet, + self as ndc, Aggregate, Field, NestedArray, NestedField, NestedObject, Query, QueryRequest, + QueryResponse, RowFieldValue, RowSet, }; use serde::Deserialize; use thiserror::Error; @@ -41,6 +41,9 @@ pub enum QueryResponseError { expected_type: Type, }, + #[error("results from relation are missing at path {}", path.join("."))] + MissingRelationData { path: Vec }, + #[error("placeholder")] TODORemoveMe, } @@ -82,6 +85,7 @@ pub fn serialize_query_response( .map(|docs| { serialize_row_set( query_context, + query_request, &vec![], collection_name, &query_request.query, @@ -93,6 +97,7 @@ pub fn serialize_query_response( // TODO: in an aggregation response we expect one document instead of a list of documents Ok(vec![serialize_row_set( query_context, + query_request, &vec![], collection_name, &query_request.query, @@ -106,6 +111,7 @@ pub fn serialize_query_response( fn serialize_row_set( query_context: &QueryContext<'_>, + query_request: &QueryRequest, path: &[&str], collection_name: &str, query: &Query, @@ -121,7 +127,16 @@ fn serialize_row_set( let rows = query .fields .as_ref() - .map(|fields| serialize_rows(query_context, path, collection_name, fields, docs)) + .map(|fields| { + serialize_rows( + query_context, + query_request, + path, + collection_name, + fields, + docs, + ) + }) .transpose()?; Ok(RowSet { aggregates: None, @@ -142,7 +157,14 @@ fn serialize_row_set( .fields .as_ref() .map(|fields| { - serialize_rows(query_context, path, collection_name, fields, row_set.rows) + serialize_rows( + query_context, + query_request, + path, + collection_name, + fields, + row_set.rows, + ) }) .transpose()?; @@ -171,18 +193,29 @@ fn serialize_aggregates( fn serialize_rows( query_context: &QueryContext<'_>, + query_request: &QueryRequest, path: &[&str], collection_name: &str, query_fields: &IndexMap, docs: Vec, ) -> Result>> { docs.into_iter() - .map(|doc| serialize_single_row(query_context, path, collection_name, query_fields, doc)) + .map(|doc| { + serialize_single_row( + query_context, + query_request, + path, + collection_name, + query_fields, + doc, + ) + }) .try_collect() } fn serialize_single_row( query_context: &QueryContext<'_>, + query_request: &QueryRequest, path: &[&str], collection_name: &str, query_fields: &IndexMap, @@ -193,6 +226,7 @@ fn serialize_single_row( .map(|(field_name, field_definition)| { let value = serialize_field_value( query_context, + query_request, &append_to_path(path, [field_name.as_ref()]), collection_name, field_definition, @@ -206,46 +240,56 @@ fn serialize_single_row( fn serialize_field_value( query_context: &QueryContext<'_>, + query_request: &QueryRequest, path: &[&str], collection_name: &str, field_definition: &ndc::Field, field_name: &str, input: &mut bson::Document, ) -> Result { - let (bson, requested_type, object_types) = match field_definition { + let value_option = input.remove(field_name); + + let (requested_type, value, temp_object_types) = match field_definition { ndc::Field::Column { column, fields } => { let field_type = find_field_type(query_context, path, collection_name, column)?; let (requested_type, temp_object_types) = - prune_type_to_field_selection(query_context, path, field_type, fields.as_ref())?; - - let object_types = if temp_object_types.is_empty() { - query_context.object_types.clone() // We're cloning a Cow, not a BTreeMap - } else { - let mut configured_types = query_context.object_types.clone().into_owned(); - configured_types.extend(temp_object_types); - Cow::Owned(configured_types) - }; + prune_type_to_field_selection(query_context, query_request, path, &field_type, fields.as_ref())?; - println!("{requested_type:?}\n\n{object_types:?}"); + let value = value_from_option(collection_name, column, &requested_type, value_option)?; - let value = value_from_option( - collection_name, - column, - &requested_type, - input.remove(field_name), - )?; - (value, requested_type, object_types) + (requested_type, value, temp_object_types) } + ndc::Field::Relationship { query, relationship, - arguments, - } => todo!(), + .. + } => { + let (requested_type, temp_object_types) = + type_for_relation_field(query_context, query_request, path, query, relationship)?; + + let value = value_option.ok_or_else(|| QueryResponseError::MissingRelationData { + path: path_to_owned(path), + })?; + + (requested_type, value, temp_object_types) + } + }; + + let object_types = if temp_object_types.is_empty() { + query_context.object_types.clone() // We're cloning a Cow, not a BTreeMap + } else { + let mut configured_types = query_context.object_types.clone().into_owned(); + configured_types.extend(temp_object_types); + Cow::Owned(configured_types) }; - let json = bson_to_json(&requested_type, &object_types, bson)?; + + let json = bson_to_json(&requested_type, &object_types, value)?; Ok(json) } +// TODO: test object relationship type +// TODO: test array relationship type fn find_field_type<'a>( query_context: &'a QueryContext<'a>, @@ -264,8 +308,17 @@ fn find_field_type<'a>( Ok(&field_type.r#type) } +/// Computes a new hierarchy of object types (if necessary) that select a subset of fields from +/// existing object types to match the fields requested by the query. Recurses into nested objects, +/// arrays, and nullable type references. +/// +/// Scalar types are returned without modification. +/// +/// Returns a reference to the pruned type, and a list of newly-computed object types with +/// generated names. fn prune_type_to_field_selection<'a>( query_context: &QueryContext<'_>, + query_request: &QueryRequest, path: &[&str], field_type: &'a Type, fields: Option<&NestedField>, @@ -276,16 +329,21 @@ fn prune_type_to_field_selection<'a>( (Type::Nullable(t), _) => { let (underlying_type, object_types) = - prune_type_to_field_selection(query_context, path, t, fields)?; + prune_type_to_field_selection(query_context, query_request, path, t, fields)?; Ok((Type::Nullable(Box::new(underlying_type)), object_types)) } (Type::ArrayOf(t), Some(NestedField::Array(nested))) => { - let (element_type, object_types) = - prune_type_to_field_selection(query_context, path, t, Some(&nested.fields))?; + let (element_type, object_types) = prune_type_to_field_selection( + query_context, + query_request, + path, + t, + Some(&nested.fields), + )?; Ok((Type::ArrayOf(Box::new(element_type)), object_types)) } (Type::Object(t), Some(NestedField::Object(nested))) => { - object_type_for_field_subset(query_context, path, t, nested) + object_type_for_field_subset(query_context, query_request, path, t, nested) } (_, Some(NestedField::Array(_))) => Err(QueryResponseError::ExpectedArray { @@ -297,8 +355,15 @@ fn prune_type_to_field_selection<'a>( } } +/// We have a configured object type for a collection, or for a nested object in a collection. But +/// the query may request a subset of fields from that object type. We need to compute a new object +/// type for that requested subset. +/// +/// Returns a reference to the newly-generated object type, and a list of all new object types with +/// generated names including the newly-generated object type, and types for any nested objects. fn object_type_for_field_subset( query_context: &QueryContext<'_>, + query_request: &QueryRequest, path: &[&str], object_type_name: &str, requested_fields: &NestedObject, @@ -310,6 +375,7 @@ fn object_type_for_field_subset( .map(|(name, requested_field)| { let (object_field, object_types) = requested_field_definition( query_context, + query_request, &append_to_path(path, [name.as_ref()]), object_type_name, object_type, @@ -338,6 +404,7 @@ fn object_type_for_field_subset( /// name of the requested field maps to a different name on the underlying type. fn requested_field_definition( query_context: &QueryContext<'_>, + query_request: &QueryRequest, path: &[&str], object_type_name: &str, object_type: &ObjectType, @@ -354,6 +421,7 @@ fn requested_field_definition( })?; let (field_type, object_types) = prune_type_to_field_selection( query_context, + query_request, path, &field_def.r#type, fields.as_ref(), @@ -367,11 +435,82 @@ fn requested_field_definition( Field::Relationship { query, relationship, - arguments, - } => todo!(), + .. + } => { + let (relation_type, temp_object_types) = + type_for_relation_field(query_context, query_request, path, query, relationship)?; + let relation_field = ObjectField { + r#type: relation_type, + description: None, + }; + Ok((relation_field, temp_object_types)) + } } } +/// We have a predefined object type for each collection, and for each nested object in +/// a collection. Those types don't have fields defined for joined relationships since such fields +/// are a query-time thing. When a query requests related data we have to create a new field +/// definition to merge with fields in the predefined object type. +fn type_for_relation_field( + query_context: &QueryContext<'_>, + query_request: &QueryRequest, + path: &[&str], + query: &Query, + relationship: &str, +) -> Result<(Type, Vec<(String, ObjectType)>)> { + let relationship_def = query_request + .collection_relationships + .get(relationship) + .ok_or_else(|| ConversionError::UnknownRelationship { + relationship_name: relationship.to_owned(), + path: path_to_owned(path), + })?; + let collection_name = &relationship_def.target_collection; + let collection = query_context.find_collection(collection_name)?; + + // Related data always comes back as an array, even if the relation type is "Object". + let relation_type = Type::ArrayOf(Box::new(Type::Object( + collection.collection_type.to_owned(), + ))); + + // Translate requested query fields into a `NestedField` value to match what we get for + // column fields. + let fields = query.fields.as_ref().map(|query_fields| { + NestedField::Array(NestedArray { + fields: Box::new(NestedField::Object(NestedObject { + fields: query_fields.clone(), + })), + }) + }); + + let (requested_relation_type, mut temp_object_types) = prune_type_to_field_selection( + query_context, + query_request, + path, + &relation_type, + fields.as_ref(), + )?; + + // Relation data is wrapped in an object with a `rows` property + let relation_object_type = ObjectType { + fields: [( + "rows".to_owned(), + ObjectField { + r#type: requested_relation_type, + description: None, + }, + )] + .into(), + description: Default::default(), + }; + let relation_object_type_name = format!("relation_{}", path.into_iter().join("_")); + temp_object_types.push((relation_object_type_name.clone(), relation_object_type)); + let requested_type = Type::Object(relation_object_type_name); + + Ok((requested_type, temp_object_types)) +} + /// Check option result for a BSON value. If the value is missing but the expected type is nullable /// then return null. Otherwise return an error. fn value_from_option( From 9ae36e9043fbab1c8af0f3eff7cd2efc39e0d39c Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 25 Apr 2024 15:58:18 -0700 Subject: [PATCH 17/25] remove response_shape --- crates/mongodb-agent-common/src/explain.rs | 2 +- .../src/query/execute_query_request.rs | 2 +- .../mongodb-agent-common/src/query/foreach.rs | 19 +++++++-------- .../src/query/pipeline.rs | 24 +++++-------------- .../src/query/relations.rs | 2 +- 5 files changed, 17 insertions(+), 32 deletions(-) diff --git a/crates/mongodb-agent-common/src/explain.rs b/crates/mongodb-agent-common/src/explain.rs index 3f1f9094..cad0d898 100644 --- a/crates/mongodb-agent-common/src/explain.rs +++ b/crates/mongodb-agent-common/src/explain.rs @@ -17,7 +17,7 @@ pub async fn explain_query( let db = state.database(); - let (pipeline, _) = query::pipeline_for_query_request(config, &query_request)?; + let pipeline = query::pipeline_for_query_request(config, &query_request)?; let pipeline_bson = to_bson(&pipeline)?; let aggregate_target = match QueryTarget::for_request(config, &query_request).input_collection() diff --git a/crates/mongodb-agent-common/src/query/execute_query_request.rs b/crates/mongodb-agent-common/src/query/execute_query_request.rs index cb06629f..cd7ea560 100644 --- a/crates/mongodb-agent-common/src/query/execute_query_request.rs +++ b/crates/mongodb-agent-common/src/query/execute_query_request.rs @@ -21,7 +21,7 @@ pub async fn execute_query_request( query_request: QueryRequest, ) -> Result, MongoAgentError> { let target = QueryTarget::for_request(config, &query_request); - let (pipeline, response_shape) = pipeline_for_query_request(config, &query_request)?; + let pipeline = pipeline_for_query_request(config, &query_request)?; tracing::debug!( ?query_request, ?target, diff --git a/crates/mongodb-agent-common/src/query/foreach.rs b/crates/mongodb-agent-common/src/query/foreach.rs index 9f5921eb..f51b77d6 100644 --- a/crates/mongodb-agent-common/src/query/foreach.rs +++ b/crates/mongodb-agent-common/src/query/foreach.rs @@ -8,7 +8,7 @@ use dc_api_types::{ }; use mongodb::bson::{doc, Bson}; -use super::pipeline::{pipeline_for_non_foreach, ResponseShape}; +use super::pipeline::pipeline_for_non_foreach; use crate::mongodb::Selection; use crate::{ interface_types::MongoAgentError, @@ -57,8 +57,8 @@ pub fn pipeline_for_foreach( foreach: Vec, config: &Configuration, query_request: &QueryRequest, -) -> Result<(Pipeline, ResponseShape), MongoAgentError> { - let pipelines_with_response_shapes: Vec<(String, (Pipeline, ResponseShape))> = foreach +) -> Result { + let pipelines_with_response_shapes: Vec<(String, Pipeline)> = foreach .into_iter() .enumerate() .map(|(index, foreach_variant)| { @@ -83,22 +83,19 @@ pub fn pipeline_for_foreach( .collect::>()?; let selection = Selection(doc! { - "row_sets": pipelines_with_response_shapes.iter().map(|(key, (_, response_shape))| + "row_sets": pipelines_with_response_shapes.iter().map(|(key, _)| Bson::String(format!("${key}")), ).collect::>() }); let queries = pipelines_with_response_shapes .into_iter() - .map(|(key, (pipeline, _))| (key, pipeline)) + .map(|(key, pipeline)| (key, pipeline)) .collect(); - Ok(( - Pipeline { - stages: vec![Stage::Facet(queries), Stage::ReplaceWith(selection)], - }, - ResponseShape::SingleObject, - )) + Ok(Pipeline { + stages: vec![Stage::Facet(queries), Stage::ReplaceWith(selection)], + }) } /// Fold a 'foreach' HashMap into an Expression. diff --git a/crates/mongodb-agent-common/src/query/pipeline.rs b/crates/mongodb-agent-common/src/query/pipeline.rs index 213b43e1..fba07a14 100644 --- a/crates/mongodb-agent-common/src/query/pipeline.rs +++ b/crates/mongodb-agent-common/src/query/pipeline.rs @@ -18,18 +18,6 @@ use super::{ relations::pipeline_for_relations, }; -/// Signals the shape of data that will be returned by MongoDB. -#[derive(Clone, Copy, Debug)] -pub enum ResponseShape { - /// Indicates that the response will be a stream of records that must be wrapped in an object - /// with a `rows` field to produce a valid `QueryResponse` for HGE. - ListOfRows, - - /// Indicates that the response has already been wrapped in a single object with `rows` and/or - /// `aggregates` fields. - SingleObject, -} - /// A query that includes aggregates will be run using a $facet pipeline stage, while a query /// without aggregates will not. The choice affects how result rows are mapped to a QueryResponse. /// @@ -50,7 +38,7 @@ pub fn is_response_faceted(query: &Query) -> bool { pub fn pipeline_for_query_request( config: &Configuration, query_request: &QueryRequest, -) -> Result<(Pipeline, ResponseShape), MongoAgentError> { +) -> Result { let foreach = foreach_variants(query_request); if let Some(foreach) = foreach { pipeline_for_foreach(foreach, config, query_request) @@ -68,7 +56,7 @@ pub fn pipeline_for_non_foreach( config: &Configuration, variables: Option<&VariableSet>, query_request: &QueryRequest, -) -> Result<(Pipeline, ResponseShape), MongoAgentError> { +) -> Result { let query = &*query_request.query; let Query { offset, @@ -100,19 +88,19 @@ pub fn pipeline_for_non_foreach( // `diverging_stages` includes either a $facet stage if the query includes aggregates, or the // sort and limit stages if we are requesting rows only. In both cases the last stage is // a $replaceWith. - let (diverging_stages, response_shape) = if is_response_faceted(query) { + let diverging_stages = if is_response_faceted(query) { let (facet_pipelines, select_facet_results) = facet_pipelines_for_query(query_request)?; let aggregation_stages = Stage::Facet(facet_pipelines); let replace_with_stage = Stage::ReplaceWith(select_facet_results); let stages = Pipeline::from_iter([aggregation_stages, replace_with_stage]); - (stages, ResponseShape::SingleObject) + stages } else { let stages = pipeline_for_fields_facet(query_request)?; - (stages, ResponseShape::ListOfRows) + stages }; pipeline.append(diverging_stages); - Ok((pipeline, response_shape)) + Ok(pipeline) } /// Generate a pipeline to select fields requested by the given query. This is intended to be used diff --git a/crates/mongodb-agent-common/src/query/relations.rs b/crates/mongodb-agent-common/src/query/relations.rs index 50407878..49df8aa0 100644 --- a/crates/mongodb-agent-common/src/query/relations.rs +++ b/crates/mongodb-agent-common/src/query/relations.rs @@ -138,7 +138,7 @@ fn lookups_for_field( let from = collection_reference(target.name())?; // Recursively build pipeline according to relation query - let (lookup_pipeline, _) = pipeline_for_non_foreach( + let lookup_pipeline = pipeline_for_non_foreach( config, variables, &QueryRequest { From 5efe9fa80fbc1e1de3072d75d32e709ba6ba0729 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 25 Apr 2024 16:48:57 -0700 Subject: [PATCH 18/25] update unit tests --- .../src/mongodb/selection.rs | 2 +- .../mongodb-agent-common/src/query/foreach.rs | 272 ++++++++---------- crates/mongodb-agent-common/src/query/mod.rs | 25 +- .../src/query/native_query.rs | 14 +- .../src/query/relations.rs | 158 +++++----- 5 files changed, 196 insertions(+), 275 deletions(-) diff --git a/crates/mongodb-agent-common/src/mongodb/selection.rs b/crates/mongodb-agent-common/src/mongodb/selection.rs index 231dfccd..db99df03 100644 --- a/crates/mongodb-agent-common/src/mongodb/selection.rs +++ b/crates/mongodb-agent-common/src/mongodb/selection.rs @@ -249,7 +249,7 @@ mod tests { let query_request = QueryRequest { query: Box::new(Query { - fields: Some(Some(fields)), + fields: Some(fields), ..Default::default() }), foreach: None, diff --git a/crates/mongodb-agent-common/src/query/foreach.rs b/crates/mongodb-agent-common/src/query/foreach.rs index f51b77d6..a7d9e504 100644 --- a/crates/mongodb-agent-common/src/query/foreach.rs +++ b/crates/mongodb-agent-common/src/query/foreach.rs @@ -130,10 +130,8 @@ fn facet_name(index: usize) -> String { #[cfg(test)] mod tests { - use dc_api_types::{ - BinaryComparisonOperator, ComparisonColumn, Field, Query, QueryRequest, QueryResponse, - }; - use mongodb::bson::{bson, Bson}; + use dc_api_types::{BinaryComparisonOperator, ComparisonColumn, Field, Query, QueryRequest}; + use mongodb::bson::{bson, doc, Bson}; use pretty_assertions::assert_eq; use serde_json::{from_value, json}; @@ -188,56 +186,40 @@ mod tests { }, { "$replaceWith": { - "rows": [ - { "query": { "rows": "$__FACET___0" } }, - { "query": { "rows": "$__FACET___1" } }, + "row_sets": [ + "$__FACET___0", + "$__FACET___1", ] }, } ]); - let expected_response: QueryResponse = from_value(json! ({ - "rows": [ - { - "query": { - "rows": [ - { "albumId": 1, "title": "For Those About To Rock We Salute You" }, - { "albumId": 4, "title": "Let There Be Rock" } - ] - } - }, - { - "query": { - "rows": [ - { "albumId": 2, "title": "Balls to the Wall" }, - { "albumId": 3, "title": "Restless and Wild" } - ] - } - } + let expected_response = vec![doc! { + "row_sets": [ + [ + { "albumId": 1, "title": "For Those About To Rock We Salute You" }, + { "albumId": 4, "title": "Let There Be Rock" }, + ], + [ + { "albumId": 2, "title": "Balls to the Wall" }, + { "albumId": 3, "title": "Restless and Wild" }, + ], ] - }))?; + }]; let db = mock_collection_aggregate_response_for_pipeline( "tracks", expected_pipeline, bson!([{ - "rows": [ - { - "query": { - "rows": [ - { "albumId": 1, "title": "For Those About To Rock We Salute You" }, - { "albumId": 4, "title": "Let There Be Rock" } - ] - } - }, - { - "query": { - "rows": [ - { "albumId": 2, "title": "Balls to the Wall" }, - { "albumId": 3, "title": "Restless and Wild" } - ] - } - } + "row_sets": [ + [ + { "albumId": 1, "title": "For Those About To Rock We Salute You" }, + { "albumId": 4, "title": "Let There Be Rock" } + ], + [ + { "albumId": 2, "title": "Balls to the Wall" }, + { "albumId": 3, "title": "Restless and Wild" } + ], ], }]), ); @@ -321,68 +303,60 @@ mod tests { }, { "$replaceWith": { - "rows": [ - { "query": "$__FACET___0" }, - { "query": "$__FACET___1" }, + "row_sets": [ + "$__FACET___0", + "$__FACET___1", ] }, } ]); - let expected_response: QueryResponse = from_value(json! ({ - "rows": [ + let expected_response = vec![doc! { + "row_sets": [ + { + "aggregates": { + "count": 2, + }, + "rows": [ + { "albumId": 1, "title": "For Those About To Rock We Salute You" }, + { "albumId": 4, "title": "Let There Be Rock" }, + ] + }, { - "query": { + "aggregates": { + "count": 2, + }, + "rows": [ + { "albumId": 2, "title": "Balls to the Wall" }, + { "albumId": 3, "title": "Restless and Wild" }, + ] + }, + ] + }]; + + let db = mock_collection_aggregate_response_for_pipeline( + "tracks", + expected_pipeline, + bson!([{ + "row_sets": [ + { "aggregates": { "count": 2, }, "rows": [ { "albumId": 1, "title": "For Those About To Rock We Salute You" }, - { "albumId": 4, "title": "Let There Be Rock" } + { "albumId": 4, "title": "Let There Be Rock" }, ] - } - }, - { - "query": { + }, + { "aggregates": { "count": 2, }, "rows": [ { "albumId": 2, "title": "Balls to the Wall" }, - { "albumId": 3, "title": "Restless and Wild" } + { "albumId": 3, "title": "Restless and Wild" }, ] - } - } - ] - }))?; - - let db = mock_collection_aggregate_response_for_pipeline( - "tracks", - expected_pipeline, - bson!([{ - "rows": [ - { - "query": { - "aggregates": { - "count": 2, - }, - "rows": [ - { "albumId": 1, "title": "For Those About To Rock We Salute You" }, - { "albumId": 4, "title": "Let There Be Rock" } - ] - } }, - { - "query": { - "aggregates": { - "count": 2, - }, - "rows": [ - { "albumId": 2, "title": "Balls to the Wall" }, - { "albumId": 3, "title": "Restless and Wild" } - ] - } - } ] }]), ); @@ -418,7 +392,7 @@ mod tests { name: "artistId".to_owned(), }, }), - fields: Some(Some( + fields: Some( [ ( "albumId".to_owned(), @@ -436,7 +410,7 @@ mod tests { ), ] .into(), - )), + ), aggregates: None, aggregates_limit: None, limit: None, @@ -474,88 +448,68 @@ mod tests { }, { "$replaceWith": { - "rows": [ - { "query": { "rows": "$__FACET___0" } }, - { "query": { "rows": "$__FACET___1" } }, - { "query": { "rows": "$__FACET___2" } }, - { "query": { "rows": "$__FACET___3" } }, - { "query": { "rows": "$__FACET___4" } }, - { "query": { "rows": "$__FACET___5" } }, - { "query": { "rows": "$__FACET___6" } }, - { "query": { "rows": "$__FACET___7" } }, - { "query": { "rows": "$__FACET___8" } }, - { "query": { "rows": "$__FACET___9" } }, - { "query": { "rows": "$__FACET___10" } }, - { "query": { "rows": "$__FACET___11" } }, + "row_sets": [ + "$__FACET___0", + "$__FACET___1", + "$__FACET___2", + "$__FACET___3", + "$__FACET___4", + "$__FACET___5", + "$__FACET___6", + "$__FACET___7", + "$__FACET___8", + "$__FACET___9", + "$__FACET___10", + "$__FACET___11", ] }, } ]); - let expected_response: QueryResponse = from_value(json! ({ - "rows": [ - { - "query": { - "rows": [ - { "albumId": 1, "title": "For Those About To Rock We Salute You" }, - { "albumId": 4, "title": "Let There Be Rock" } - ] - } - }, - { "query": { "rows": [] } }, - { - "query": { - "rows": [ - { "albumId": 2, "title": "Balls to the Wall" }, - { "albumId": 3, "title": "Restless and Wild" } - ] - } - }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, + let expected_response = vec![doc! { + "row_sets": [ + [ + { "albumId": 1, "title": "For Those About To Rock We Salute You" }, + { "albumId": 4, "title": "Let There Be Rock" } + ], + [], + [ + { "albumId": 2, "title": "Balls to the Wall" }, + { "albumId": 3, "title": "Restless and Wild" } + ], + [], + [], + [], + [], + [], + [], + [], + [], ] - }))?; + }]; let db = mock_collection_aggregate_response_for_pipeline( "tracks", expected_pipeline, bson!([{ - "rows": [ - { - "query": { - "rows": [ - { "albumId": 1, "title": "For Those About To Rock We Salute You" }, - { "albumId": 4, "title": "Let There Be Rock" } - ] - } - }, - { - "query": { - "rows": [] - } - }, - { - "query": { - "rows": [ - { "albumId": 2, "title": "Balls to the Wall" }, - { "albumId": 3, "title": "Restless and Wild" } - ] - } - }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, + "row_sets": [ + [ + { "albumId": 1, "title": "For Those About To Rock We Salute You" }, + { "albumId": 4, "title": "Let There Be Rock" } + ], + [], + [ + { "albumId": 2, "title": "Balls to the Wall" }, + { "albumId": 3, "title": "Restless and Wild" } + ], + [], + [], + [], + [], + [], + [], + [], + [], ], }]), ); diff --git a/crates/mongodb-agent-common/src/query/mod.rs b/crates/mongodb-agent-common/src/query/mod.rs index d9c551ae..c86a012a 100644 --- a/crates/mongodb-agent-common/src/query/mod.rs +++ b/crates/mongodb-agent-common/src/query/mod.rs @@ -37,7 +37,7 @@ pub async fn handle_query_request( #[cfg(test)] mod tests { - use dc_api_types::{QueryRequest, QueryResponse, RowSet}; + use dc_api_types::QueryRequest; use mongodb::bson::{self, bson, doc}; use pretty_assertions::assert_eq; use serde_json::{from_value, json}; @@ -65,12 +65,7 @@ mod tests { "relationships": [], }))?; - let expected_response = doc! { - "rows": [ - { "student_gpa": 3.1 }, - { "student_gpa": 3.6 }, - ], - }; + let expected_response = vec![doc! { "student_gpa": 3.1 }, doc! { "student_gpa": 3.6 }]; let expected_pipeline = bson!([ { "$match": { "gpa": { "$lt": 4.0 } } }, @@ -113,12 +108,12 @@ mod tests { "relationships": [], }))?; - let expected_response: QueryResponse = from_value(json!({ + let expected_response = vec![doc! { "aggregates": { "count": 11, "avg": 3, } - }))?; + }]; let expected_pipeline = bson!([ { @@ -192,14 +187,14 @@ mod tests { "relationships": [], }))?; - let expected_response: QueryResponse = from_value(json!({ + let expected_response = vec![doc! { "aggregates": { "avg": 3.1, }, "rows": [{ "gpa": 3.1, }], - }))?; + }]; let expected_pipeline = bson!([ { "$match": { "gpa": { "$lt": 4.0 } } }, @@ -269,11 +264,7 @@ mod tests { "relationships": [] }))?; - let expected_response: QueryResponse = from_value(json!({ - "rows": [{ - "date": "2018-08-14T15:05:03.142Z", - }] - }))?; + let expected_response = vec![doc! { "date": "2018-08-14T15:05:03.142Z" }]; let expected_pipeline = bson!([ { @@ -317,7 +308,7 @@ mod tests { "relationships": [], }))?; - let expected_response = QueryResponse::Single(RowSet::Rows { rows: vec![] }); + let expected_response: Vec = vec![]; let db = mock_collection_aggregate_response("comments", bson!([])); diff --git a/crates/mongodb-agent-common/src/query/native_query.rs b/crates/mongodb-agent-common/src/query/native_query.rs index d2b4b1c8..9657ce64 100644 --- a/crates/mongodb-agent-common/src/query/native_query.rs +++ b/crates/mongodb-agent-common/src/query/native_query.rs @@ -87,11 +87,11 @@ mod tests { Configuration, }; use dc_api_test_helpers::{column, query, query_request}; - use dc_api_types::{Argument, QueryResponse}; + use dc_api_types::Argument; use mongodb::bson::{bson, doc}; use mongodb_support::BsonScalarType as S; use pretty_assertions::assert_eq; - use serde_json::{from_value, json}; + use serde_json::json; use crate::{ mongodb::test_helpers::mock_aggregate_response_for_pipeline, query::execute_query_request, @@ -272,12 +272,10 @@ mod tests { }, ]); - let expected_response: QueryResponse = from_value(json!({ - "rows": [ - { "title": "Beau Geste", "year": 1926, "genres": ["Action", "Adventure", "Drama"] }, - { "title": "For Heaven's Sake", "year": 1926, "genres": ["Action", "Comedy", "Romance"] }, - ], - }))?; + let expected_response = vec![ + doc! { "title": "Beau Geste", "year": 1926, "genres": ["Action", "Adventure", "Drama"] }, + doc! { "title": "For Heaven's Sake", "year": 1926, "genres": ["Action", "Comedy", "Romance"] }, + ]; let db = mock_aggregate_response_for_pipeline( expected_pipeline, diff --git a/crates/mongodb-agent-common/src/query/relations.rs b/crates/mongodb-agent-common/src/query/relations.rs index 49df8aa0..206e603f 100644 --- a/crates/mongodb-agent-common/src/query/relations.rs +++ b/crates/mongodb-agent-common/src/query/relations.rs @@ -243,8 +243,8 @@ where #[cfg(test)] mod tests { - use dc_api_types::{QueryRequest, QueryResponse}; - use mongodb::bson::{bson, Bson}; + use dc_api_types::QueryRequest; + use mongodb::bson::{bson, doc, Bson}; use pretty_assertions::assert_eq; use serde_json::{from_value, json}; @@ -281,17 +281,13 @@ mod tests { }], }))?; - let expected_response: QueryResponse = from_value(json!({ - "rows": [ - { - "class_title": "MongoDB 101", - "students": [ - { "student_name": "Alice" }, - { "student_name": "Bob" }, - ], - }, - ], - }))?; + let expected_response = vec![doc! { + "class_title": "MongoDB 101", + "students": { "rows": [ + { "student_name": "Alice" }, + { "student_name": "Bob" }, + ] }, + }]; let expected_pipeline = bson!([ { @@ -332,10 +328,10 @@ mod tests { expected_pipeline, bson!([{ "class_title": "MongoDB 101", - "students": [ + "students": { "rows": [ { "student_name": "Alice" }, { "student_name": "Bob" }, - ], + ] }, }]), ); @@ -375,18 +371,16 @@ mod tests { }], }))?; - let expected_response: QueryResponse = from_value(json!({ - "rows": [ - { - "student_name": "Alice", - "class": { "class_title": "MongoDB 101" }, - }, - { - "student_name": "Bob", - "class": { "class_title": "MongoDB 101" }, - }, - ], - }))?; + let expected_response = vec![ + doc! { + "student_name": "Alice", + "class": { "rows": [{ "class_title": "MongoDB 101" }] }, + }, + doc! { + "student_name": "Bob", + "class": { "rows": [{ "class_title": "MongoDB 101" }] }, + }, + ]; let expected_pipeline = bson!([ { @@ -426,11 +420,11 @@ mod tests { bson!([ { "student_name": "Alice", - "class": { "class_title": "MongoDB 101" }, + "class": { "rows": [{ "class_title": "MongoDB 101" }] }, }, { "student_name": "Bob", - "class": { "class_title": "MongoDB 101" }, + "class": { "rows": [{ "class_title": "MongoDB 101" }] }, }, ]), ); @@ -471,17 +465,13 @@ mod tests { }], }))?; - let expected_response: QueryResponse = from_value(json!({ - "rows": [ - { - "class_title": "MongoDB 101", - "students": [ - { "student_name": "Alice" }, - { "student_name": "Bob" }, - ], - }, - ], - }))?; + let expected_response = vec![doc! { + "class_title": "MongoDB 101", + "students": { "rows": [ + { "student_name": "Alice" }, + { "student_name": "Bob" }, + ] }, + }]; let expected_pipeline = bson!([ { @@ -524,10 +514,10 @@ mod tests { expected_pipeline, bson!([{ "class_title": "MongoDB 101", - "students": [ - { "student_name": "Alice" }, - { "student_name": "Bob" }, - ], + "students": { "rows": [ + { "student_name": "Alice" }, + { "student_name": "Bob" }, + ] }, }]), ); @@ -589,28 +579,24 @@ mod tests { ], }))?; - let expected_response: QueryResponse = from_value(json!({ - "rows": [ + let expected_response = vec![doc! { + "class_title": "MongoDB 101", + "students": { "rows": [ { - "class_title": "MongoDB 101", - "students": { "rows": [ - { - "student_name": "Alice", - "assignments": { "rows": [ - { "assignment_title": "read chapter 2" }, - ]} - }, - { - "student_name": "Bob", - "assignments": { "rows": [ - { "assignment_title": "JSON Basics" }, - { "assignment_title": "read chapter 2" }, - ]} - }, - ]}, + "student_name": "Alice", + "assignments": { "rows": [ + { "assignment_title": "read chapter 2" }, + ]} }, - ], - }))?; + { + "student_name": "Bob", + "assignments": { "rows": [ + { "assignment_title": "JSON Basics" }, + { "assignment_title": "read chapter 2" }, + ]} + }, + ]}, + }]; let expected_pipeline = bson!([ { @@ -736,17 +722,13 @@ mod tests { }], }))?; - let expected_response: QueryResponse = from_value(json!({ - "rows": [ - { - "students_aggregate": { - "aggregates": { - "aggregate_count": 2, - }, - }, + let expected_response = vec![doc! { + "students_aggregate": { + "aggregates": { + "aggregate_count": 2, }, - ], - }))?; + }, + }]; let expected_pipeline = bson!([ { @@ -868,15 +850,13 @@ mod tests { ] }))?; - let expected_response: QueryResponse = from_value(json!({ - "rows": [{ + let expected_response = vec![doc! { "name": "Mercedes Tyler", "movie": { "rows": [{ - "title": "The Land Beyond the Sunset", - "year": 1912 + "title": "The Land Beyond the Sunset", + "year": 1912 }] }, - }] - }))?; + }]; let expected_pipeline = bson!([ { @@ -1004,16 +984,14 @@ mod tests { ] }))?; - let expected_response: QueryResponse = from_value(json!({ - "rows": [{ - "name": "Beric Dondarrion", - "movie": { "rows": [{ - "credits": { - "director": "Martin Scorsese", - } - }] }, - }] - }))?; + let expected_response = vec![doc! { + "name": "Beric Dondarrion", + "movie": { "rows": [{ + "credits": { + "director": "Martin Scorsese", + } + }] }, + }]; let expected_pipeline = bson!([ { From dd5104131073b11b9faa6326ead04d5d2e800a5e Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 25 Apr 2024 17:02:18 -0700 Subject: [PATCH 19/25] lint fixes --- .../src/query/execute_query_request.rs | 2 +- .../mongodb-agent-common/src/query/foreach.rs | 18 ++++++------------ .../src/query/pipeline.rs | 8 +++----- .../src/query/query_target.rs | 7 +++---- .../api_type_conversions/conversion_error.rs | 2 +- .../mongodb-connector/src/query_response.rs | 19 ++++++++++--------- 6 files changed, 24 insertions(+), 32 deletions(-) diff --git a/crates/mongodb-agent-common/src/query/execute_query_request.rs b/crates/mongodb-agent-common/src/query/execute_query_request.rs index cd7ea560..71c92a54 100644 --- a/crates/mongodb-agent-common/src/query/execute_query_request.rs +++ b/crates/mongodb-agent-common/src/query/execute_query_request.rs @@ -34,7 +34,7 @@ pub async fn execute_query_request( // the MongoDB API call `db..aggregate` we instead call `db.aggregate`. let documents = match target.input_collection() { Some(collection_name) => { - let collection = database.collection(&collection_name); + let collection = database.collection(collection_name); collect_from_cursor(collection.aggregate(pipeline, None).await?).await } None => collect_from_cursor(database.aggregate(pipeline, None).await?).await, diff --git a/crates/mongodb-agent-common/src/query/foreach.rs b/crates/mongodb-agent-common/src/query/foreach.rs index a7d9e504..e7bd9cf5 100644 --- a/crates/mongodb-agent-common/src/query/foreach.rs +++ b/crates/mongodb-agent-common/src/query/foreach.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use configuration::Configuration; use dc_api_types::comparison_column::ColumnSelector; @@ -58,7 +58,7 @@ pub fn pipeline_for_foreach( config: &Configuration, query_request: &QueryRequest, ) -> Result { - let pipelines_with_response_shapes: Vec<(String, Pipeline)> = foreach + let pipelines: BTreeMap = foreach .into_iter() .enumerate() .map(|(index, foreach_variant)| { @@ -76,25 +76,19 @@ pub fn pipeline_for_foreach( .into(); } - let pipeline_with_response_shape = - pipeline_for_non_foreach(config, variables.as_ref(), &q)?; - Ok((facet_name(index), pipeline_with_response_shape)) + let pipeline = pipeline_for_non_foreach(config, variables.as_ref(), &q)?; + Ok((facet_name(index), pipeline)) }) .collect::>()?; let selection = Selection(doc! { - "row_sets": pipelines_with_response_shapes.iter().map(|(key, _)| + "row_sets": pipelines.keys().map(|key| Bson::String(format!("${key}")), ).collect::>() }); - let queries = pipelines_with_response_shapes - .into_iter() - .map(|(key, pipeline)| (key, pipeline)) - .collect(); - Ok(Pipeline { - stages: vec![Stage::Facet(queries), Stage::ReplaceWith(selection)], + stages: vec![Stage::Facet(pipelines), Stage::ReplaceWith(selection)], }) } diff --git a/crates/mongodb-agent-common/src/query/pipeline.rs b/crates/mongodb-agent-common/src/query/pipeline.rs index fba07a14..ed67c2ac 100644 --- a/crates/mongodb-agent-common/src/query/pipeline.rs +++ b/crates/mongodb-agent-common/src/query/pipeline.rs @@ -92,11 +92,9 @@ pub fn pipeline_for_non_foreach( let (facet_pipelines, select_facet_results) = facet_pipelines_for_query(query_request)?; let aggregation_stages = Stage::Facet(facet_pipelines); let replace_with_stage = Stage::ReplaceWith(select_facet_results); - let stages = Pipeline::from_iter([aggregation_stages, replace_with_stage]); - stages + Pipeline::from_iter([aggregation_stages, replace_with_stage]) } else { - let stages = pipeline_for_fields_facet(query_request)?; - stages + pipeline_for_fields_facet(query_request)? }; pipeline.append(diverging_stages); @@ -147,7 +145,7 @@ fn facet_pipelines_for_query( }) .collect::, MongoAgentError>>()?; - if let Some(_) = fields { + if fields.is_some() { let fields_pipeline = pipeline_for_fields_facet(query_request)?; facet_pipelines.insert(ROWS_FIELD.to_owned(), fields_pipeline); } diff --git a/crates/mongodb-agent-common/src/query/query_target.rs b/crates/mongodb-agent-common/src/query/query_target.rs index ec4de4e0..25c62442 100644 --- a/crates/mongodb-agent-common/src/query/query_target.rs +++ b/crates/mongodb-agent-common/src/query/query_target.rs @@ -33,10 +33,9 @@ impl QueryTarget<'_> { pub fn input_collection(&self) -> Option<&str> { match self { QueryTarget::Collection(collection_name) => Some(collection_name), - QueryTarget::NativeQuery { native_query, .. } => native_query - .input_collection - .as_ref() - .map(|name| name.as_str()), + QueryTarget::NativeQuery { native_query, .. } => { + native_query.input_collection.as_deref() + } } } } diff --git a/crates/mongodb-connector/src/api_type_conversions/conversion_error.rs b/crates/mongodb-connector/src/api_type_conversions/conversion_error.rs index 5553bf07..b032f484 100644 --- a/crates/mongodb-connector/src/api_type_conversions/conversion_error.rs +++ b/crates/mongodb-connector/src/api_type_conversions/conversion_error.rs @@ -73,7 +73,7 @@ impl From for ExplainError { } } -fn at_path(path: &Vec) -> String { +fn at_path(path: &[String]) -> String { if path.is_empty() { "".to_owned() } else { diff --git a/crates/mongodb-connector/src/query_response.rs b/crates/mongodb-connector/src/query_response.rs index 2af4c414..ade11eb7 100644 --- a/crates/mongodb-connector/src/query_response.rs +++ b/crates/mongodb-connector/src/query_response.rs @@ -86,7 +86,7 @@ pub fn serialize_query_response( serialize_row_set( query_context, query_request, - &vec![], + &[], collection_name, &query_request.query, docs, @@ -98,7 +98,7 @@ pub fn serialize_query_response( Ok(vec![serialize_row_set( query_context, query_request, - &vec![], + &[], collection_name, &query_request.query, response_documents, @@ -254,7 +254,7 @@ fn serialize_field_value( let field_type = find_field_type(query_context, path, collection_name, column)?; let (requested_type, temp_object_types) = - prune_type_to_field_selection(query_context, query_request, path, &field_type, fields.as_ref())?; + prune_type_to_field_selection(query_context, query_request, path, field_type, fields.as_ref())?; let value = value_from_option(collection_name, column, &requested_type, value_option)?; @@ -316,11 +316,11 @@ fn find_field_type<'a>( /// /// Returns a reference to the pruned type, and a list of newly-computed object types with /// generated names. -fn prune_type_to_field_selection<'a>( +fn prune_type_to_field_selection( query_context: &QueryContext<'_>, query_request: &QueryRequest, path: &[&str], - field_type: &'a Type, + field_type: &Type, fields: Option<&NestedField>, ) -> Result<(Type, Vec<(String, ObjectType)>)> { match (field_type, fields) { @@ -389,7 +389,7 @@ fn object_type_for_field_subset( fields, description: None, }; - let pruned_object_type_name = format!("requested_fields_{}", path.into_iter().join("_")); + let pruned_object_type_name = format!("requested_fields_{}", path.iter().join("_")); let pruned_type = Type::Object(pruned_object_type_name.clone()); let mut object_types: Vec<(String, ObjectType)> = @@ -398,6 +398,7 @@ fn object_type_for_field_subset( Ok((pruned_type, object_types)) } +// TODO: why are objectIds serializing as extended JSON? /// Given an object type for a value, and a requested field from that value, produce an updated /// object field definition to match the request. This must take into account aliasing where the @@ -504,7 +505,7 @@ fn type_for_relation_field( .into(), description: Default::default(), }; - let relation_object_type_name = format!("relation_{}", path.into_iter().join("_")); + let relation_object_type_name = format!("relation_{}", path.iter().join("_")); temp_object_types.push((relation_object_type_name.clone(), relation_object_type)); let requested_type = Type::Object(relation_object_type_name); @@ -543,11 +544,11 @@ where } fn append_to_path<'a>(path: &[&'a str], elems: impl IntoIterator) -> Vec<&'a str> { - path.into_iter().map(|x| *x).chain(elems).collect() + path.iter().copied().chain(elems).collect() } fn path_to_owned(path: &[&str]) -> Vec { - path.into_iter().map(|x| (*x).to_owned()).collect() + path.iter().map(|x| (*x).to_owned()).collect() } // TODO: test nested objects in arrays From 0c53528b13e7e91088c58dc066add39fd562b065 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 25 Apr 2024 17:13:41 -0700 Subject: [PATCH 20/25] there was a reason that was a vec --- crates/mongodb-agent-common/src/query/foreach.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/mongodb-agent-common/src/query/foreach.rs b/crates/mongodb-agent-common/src/query/foreach.rs index e7bd9cf5..46e0a61d 100644 --- a/crates/mongodb-agent-common/src/query/foreach.rs +++ b/crates/mongodb-agent-common/src/query/foreach.rs @@ -58,7 +58,7 @@ pub fn pipeline_for_foreach( config: &Configuration, query_request: &QueryRequest, ) -> Result { - let pipelines: BTreeMap = foreach + let pipelines: Vec<(String, Pipeline)> = foreach .into_iter() .enumerate() .map(|(index, foreach_variant)| { @@ -82,13 +82,15 @@ pub fn pipeline_for_foreach( .collect::>()?; let selection = Selection(doc! { - "row_sets": pipelines.keys().map(|key| + "row_sets": pipelines.iter().map(|(key, _)| Bson::String(format!("${key}")), ).collect::>() }); + let queries = pipelines.into_iter().collect(); + Ok(Pipeline { - stages: vec![Stage::Facet(pipelines), Stage::ReplaceWith(selection)], + stages: vec![Stage::Facet(queries), Stage::ReplaceWith(selection)], }) } From b737a0a2773ac1437d660ff6d2f3bd6d65c58d2d Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 25 Apr 2024 17:18:33 -0700 Subject: [PATCH 21/25] fix objectId serialization --- .../src/tests/local_relationship.rs | 1 + ...lationship__joins_local_relationships.snap | 22 +++++++++++-------- .../mongodb-agent-common/src/query/foreach.rs | 2 +- .../src/query/serialization/bson_to_json.rs | 16 +++++++++++++- 4 files changed, 30 insertions(+), 11 deletions(-) diff --git a/crates/integration-tests/src/tests/local_relationship.rs b/crates/integration-tests/src/tests/local_relationship.rs index 6a897a58..151752c0 100644 --- a/crates/integration-tests/src/tests/local_relationship.rs +++ b/crates/integration-tests/src/tests/local_relationship.rs @@ -26,6 +26,7 @@ async fn joins_local_relationships() -> anyhow::Result<()> { user { email comments(limit: 2, order_by: {id: Asc}) { + id email } } diff --git a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__local_relationship__joins_local_relationships.snap b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__local_relationship__joins_local_relationships.snap index 9ed9a0ee..ac32decb 100644 --- a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__local_relationship__joins_local_relationships.snap +++ b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__local_relationship__joins_local_relationships.snap @@ -1,14 +1,13 @@ --- source: crates/integration-tests/src/tests/local_relationship.rs -expression: "query(r#\"\n query {\n movies(limit: 2, order_by: {title: Asc}, where: {title: {_iregex: \"Rear\"}}) {\n id\n title\n comments(limit: 2, order_by: {id: Asc}) {\n email\n text\n movie {\n id\n title\n }\n user {\n email\n comments(limit: 2, order_by: {id: Asc}) {\n email\n text\n user {\n email\n comments(limit: 2, order_by: {id: Asc}) {\n email\n }\n }\n }\n }\n }\n }\n }\n \"#).variables(json!({\n \"limit\": 11, \"movies_limit\": 2\n })).run().await?" +expression: "query(r#\"\n query {\n movies(limit: 2, order_by: {title: Asc}, where: {title: {_iregex: \"Rear\"}}) {\n id\n title\n comments(limit: 2, order_by: {id: Asc}) {\n email\n text\n movie {\n id\n title\n }\n user {\n email\n comments(limit: 2, order_by: {id: Asc}) {\n email\n text\n user {\n email\n comments(limit: 2, order_by: {id: Asc}) {\n id\n email\n }\n }\n }\n }\n }\n }\n }\n \"#).variables(json!({\n \"limit\": 11, \"movies_limit\": 2\n })).run().await?" --- data: movies: - comments: - email: iain_glen@gameofthron.es movie: - id: - $oid: 573a1398f29313caabceb0b1 + id: 573a1398f29313caabceb0b1 title: A Night in the Life of Jimmy Reardon text: Debitis tempore cum natus quaerat dolores quibusdam perferendis. Pariatur aspernatur officia libero quod pariatur nobis neque. Maiores non ipsam iste repellendus distinctio praesentium iure. user: @@ -18,24 +17,26 @@ data: user: comments: - email: iain_glen@gameofthron.es + id: 5a9427648b0beebeb69579f3 - email: iain_glen@gameofthron.es + id: 5a9427648b0beebeb6957b0f email: iain_glen@gameofthron.es - email: iain_glen@gameofthron.es text: Impedit consectetur ex cupiditate enim. Placeat assumenda reiciendis iste neque similique nesciunt aperiam. user: comments: - email: iain_glen@gameofthron.es + id: 5a9427648b0beebeb69579f3 - email: iain_glen@gameofthron.es + id: 5a9427648b0beebeb6957b0f email: iain_glen@gameofthron.es email: iain_glen@gameofthron.es - id: - $oid: 573a1398f29313caabceb0b1 + id: 573a1398f29313caabceb0b1 title: A Night in the Life of Jimmy Reardon - comments: - email: owen_teale@gameofthron.es movie: - id: - $oid: 573a1394f29313caabcdfa00 + id: 573a1394f29313caabcdfa00 title: Rear Window text: Nobis corporis rem hic ipsa cum impedit. Esse nihil cum est minima ducimus temporibus minima. Sed reprehenderit tempore similique nam. Ipsam nesciunt veniam aut amet ut. user: @@ -45,17 +46,20 @@ data: user: comments: - email: owen_teale@gameofthron.es + id: 5a9427648b0beebeb6957b44 - email: owen_teale@gameofthron.es + id: 5a9427648b0beebeb6957cf6 email: owen_teale@gameofthron.es - email: owen_teale@gameofthron.es text: Repudiandae repellat quia officiis. Quidem voluptatum vel id itaque et. Corrupti corporis magni voluptas quae itaque fugiat quae. user: comments: - email: owen_teale@gameofthron.es + id: 5a9427648b0beebeb6957b44 - email: owen_teale@gameofthron.es + id: 5a9427648b0beebeb6957cf6 email: owen_teale@gameofthron.es email: owen_teale@gameofthron.es - id: - $oid: 573a1394f29313caabcdfa00 + id: 573a1394f29313caabcdfa00 title: Rear Window errors: ~ diff --git a/crates/mongodb-agent-common/src/query/foreach.rs b/crates/mongodb-agent-common/src/query/foreach.rs index 46e0a61d..3541f4f3 100644 --- a/crates/mongodb-agent-common/src/query/foreach.rs +++ b/crates/mongodb-agent-common/src/query/foreach.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use configuration::Configuration; use dc_api_types::comparison_column::ColumnSelector; diff --git a/crates/mongodb-agent-common/src/query/serialization/bson_to_json.rs b/crates/mongodb-agent-common/src/query/serialization/bson_to_json.rs index f745634e..2d4adbc9 100644 --- a/crates/mongodb-agent-common/src/query/serialization/bson_to_json.rs +++ b/crates/mongodb-agent-common/src/query/serialization/bson_to_json.rs @@ -92,7 +92,7 @@ fn bson_scalar_to_json(expected_type: BsonScalarType, value: Bson) -> Result { Ok(to_value::(b.into())?) } - (BsonScalarType::ObjectId, Bson::ObjectId(oid)) => Ok(to_value(oid)?), + (BsonScalarType::ObjectId, Bson::ObjectId(oid)) => Ok(Value::String(oid.to_hex())), (BsonScalarType::DbPointer, v) => Ok(v.into_canonical_extjson()), (_, v) => Err(BsonToJsonError::TypeMismatch( Type::Scalar(expected_type), @@ -226,11 +226,25 @@ fn convert_small_number(expected_type: BsonScalarType, value: Bson) -> Result anyhow::Result<()> { + let expected_string = "573a1390f29313caabcd446f"; + let json = bson_to_json( + &Type::Scalar(BsonScalarType::ObjectId), + &Default::default(), + Bson::ObjectId(FromStr::from_str(expected_string)?), + )?; + assert_eq!(json, Value::String(expected_string.to_owned())); + Ok(()) + } + #[test] fn serializes_document_with_missing_nullable_field() -> anyhow::Result<()> { let expected_type = Type::Object("test_object".to_owned()); From d4c40f1911f933f96af3a16b4ea95c7f0759547d Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 25 Apr 2024 18:16:14 -0700 Subject: [PATCH 22/25] some cleanup --- .../mongodb-connector/src/query_response.rs | 65 +++++++++++++++---- 1 file changed, 51 insertions(+), 14 deletions(-) diff --git a/crates/mongodb-connector/src/query_response.rs b/crates/mongodb-connector/src/query_response.rs index ade11eb7..5e959652 100644 --- a/crates/mongodb-connector/src/query_response.rs +++ b/crates/mongodb-connector/src/query_response.rs @@ -16,6 +16,9 @@ use crate::api_type_conversions::{ConversionError, QueryContext}; #[derive(Debug, Error)] pub enum QueryResponseError { + #[error("{0}")] + BsonDeserialization(#[from] bson::de::Error), + #[error("{0}")] BsonToJson(#[from] BsonToJsonError), @@ -43,9 +46,6 @@ pub enum QueryResponseError { #[error("results from relation are missing at path {}", path.join("."))] MissingRelationData { path: Vec }, - - #[error("placeholder")] - TODORemoveMe, } type Result = std::result::Result; @@ -94,7 +94,6 @@ pub fn serialize_query_response( }) .try_collect() } else { - // TODO: in an aggregation response we expect one document instead of a list of documents Ok(vec![serialize_row_set( query_context, query_request, @@ -179,11 +178,11 @@ fn serialize_aggregates( query_aggregates .iter() .map( - |(key, aggregate_definition)| match aggregate_values.remove_entry(key) { + |(key, _aggregate_definition)| match aggregate_values.remove_entry(key) { Some((owned_key, value)) => Ok(( owned_key, // TODO: bson_to_json - from_bson(value).map_err(|_| QueryResponseError::TODORemoveMe)?, + from_bson(value)?, )), None => Err(QueryResponseError::MissingAggregateValue(key.clone())), }, @@ -253,8 +252,13 @@ fn serialize_field_value( ndc::Field::Column { column, fields } => { let field_type = find_field_type(query_context, path, collection_name, column)?; - let (requested_type, temp_object_types) = - prune_type_to_field_selection(query_context, query_request, path, field_type, fields.as_ref())?; + let (requested_type, temp_object_types) = prune_type_to_field_selection( + query_context, + query_request, + path, + field_type, + fields.as_ref(), + )?; let value = value_from_option(collection_name, column, &requested_type, value_option)?; @@ -288,8 +292,6 @@ fn serialize_field_value( let json = bson_to_json(&requested_type, &object_types, value)?; Ok(json) } -// TODO: test object relationship type -// TODO: test array relationship type fn find_field_type<'a>( query_context: &'a QueryContext<'a>, @@ -398,7 +400,6 @@ fn object_type_for_field_subset( Ok((pruned_type, object_types)) } -// TODO: why are objectIds serializing as extended JSON? /// Given an object type for a value, and a requested field from that value, produce an updated /// object field definition to match the request. This must take into account aliasing where the @@ -539,7 +540,7 @@ where .into_iter() .next() .ok_or(QueryResponseError::ExpectedSingleDocument)?; - let value = bson::from_document(document).map_err(|_| QueryResponseError::TODORemoveMe)?; + let value = bson::from_document(document)?; Ok(value) } @@ -551,7 +552,6 @@ fn path_to_owned(path: &[&str]) -> Vec { path.iter().map(|x| (*x).to_owned()).collect() } -// TODO: test nested objects in arrays #[cfg(test)] mod tests { use std::{borrow::Cow, str::FromStr}; @@ -560,7 +560,7 @@ mod tests { use mongodb::bson::{self, Bson}; use mongodb_support::BsonScalarType; use ndc_sdk::models::{QueryResponse, RowFieldValue, RowSet}; - use ndc_test_helpers::{collection, field, object, object_type, query, query_request}; + use ndc_test_helpers::{array, collection, field, object, object_type, query, query_request}; use pretty_assertions::assert_eq; use serde_json::json; @@ -613,6 +613,43 @@ mod tests { Ok(()) } + #[test] + fn serializes_response_with_nested_object_inside_array() -> anyhow::Result<()> { + let query_context = make_nested_schema(); + let request = query_request() + .collection("authors") + .query(query().fields([field!("articles" => "articles", array!( + object!([ + field!("title"), + ]) + ))])) + .into(); + + let response_documents = vec![bson::doc! { + "articles": [ + { "title": "Modeling MongoDB with relational model" }, + { "title": "NoSQL databases: MongoDB vs cassandra" }, + ], + }]; + + let response = serialize_query_response(&query_context, &request, response_documents)?; + assert_eq!( + response, + QueryResponse(vec![RowSet { + aggregates: Default::default(), + rows: Some(vec![[( + "articles".into(), + RowFieldValue(json!([ + { "title": "Modeling MongoDB with relational model" }, + { "title": "NoSQL databases: MongoDB vs cassandra" }, + ])) + )] + .into()]), + }]) + ); + Ok(()) + } + #[test] fn serializes_response_with_aliased_fields() -> anyhow::Result<()> { let query_context = make_nested_schema(); From af4b145a360fc9b1f08b23e48b7f08853c2c6c12 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Fri, 26 Apr 2024 13:34:21 -0700 Subject: [PATCH 23/25] hoist type processing to the row set level --- .../mongodb-connector/src/query_response.rs | 315 ++++++++++-------- 1 file changed, 170 insertions(+), 145 deletions(-) diff --git a/crates/mongodb-connector/src/query_response.rs b/crates/mongodb-connector/src/query_response.rs index 5e959652..4ffaeeec 100644 --- a/crates/mongodb-connector/src/query_response.rs +++ b/crates/mongodb-connector/src/query_response.rs @@ -2,12 +2,12 @@ use std::{borrow::Cow, collections::BTreeMap}; use configuration::schema::{ObjectField, ObjectType, Type}; use indexmap::IndexMap; -use itertools::Itertools as _; -use mongodb::bson::{self, from_bson, Bson}; +use itertools::Itertools; +use mongodb::bson::{self, Bson}; use mongodb_agent_common::query::serialization::{bson_to_json, BsonToJsonError}; use ndc_sdk::models::{ - self as ndc, Aggregate, Field, NestedArray, NestedField, NestedObject, Query, QueryRequest, - QueryResponse, RowFieldValue, RowSet, + self as ndc, Aggregate, Field, NestedField, NestedObject, Query, QueryRequest, QueryResponse, + RowFieldValue, RowSet, }; use serde::Deserialize; use thiserror::Error; @@ -16,6 +16,9 @@ use crate::api_type_conversions::{ConversionError, QueryContext}; #[derive(Debug, Error)] pub enum QueryResponseError { + #[error("expected aggregates to be an object at path {}", path.join("."))] + AggregatesNotObject { path: Vec }, + #[error("{0}")] BsonDeserialization(#[from] bson::de::Error), @@ -33,21 +36,9 @@ pub enum QueryResponseError { #[error("expected a single response document from MongoDB, but did not get one")] ExpectedSingleDocument, - - #[error("missing aggregate value in response: {0}")] - MissingAggregateValue(String), - - #[error("expected {collection_name} to have a field named {column} of type {expected_type:?}, but value is missing from database response")] - MissingColumnValue { - collection_name: String, - column: String, - expected_type: Type, - }, - - #[error("results from relation are missing at path {}", path.join("."))] - MissingRelationData { path: Vec }, } +type ObjectTypes = Vec<(String, ObjectType)>; type Result = std::result::Result; // These structs describe possible shapes of data returned by MongoDB query plans @@ -60,7 +51,7 @@ struct ResponsesForVariableSets { #[derive(Debug, Deserialize)] struct BsonRowSet { #[serde(default)] - aggregates: BTreeMap, + aggregates: Bson, #[serde(default)] rows: Vec, } @@ -86,7 +77,7 @@ pub fn serialize_query_response( serialize_row_set( query_context, query_request, - &[], + &[collection_name], collection_name, &query_request.query, docs, @@ -116,12 +107,7 @@ fn serialize_row_set( query: &Query, docs: Vec, ) -> Result { - if query - .aggregates - .as_ref() - .unwrap_or(&IndexMap::new()) - .is_empty() - { + if !has_aggregates(query) { // When there are no aggregates we expect a list of rows let rows = query .fields @@ -137,6 +123,7 @@ fn serialize_row_set( ) }) .transpose()?; + Ok(RowSet { aggregates: None, rows, @@ -149,7 +136,9 @@ fn serialize_row_set( let aggregates = query .aggregates .as_ref() - .map(|aggregates| serialize_aggregates(aggregates, row_set.aggregates)) + .map(|aggregates| { + serialize_aggregates(query_context, path, aggregates, row_set.aggregates) + }) .transpose()?; let rows = query @@ -172,22 +161,26 @@ fn serialize_row_set( } fn serialize_aggregates( - query_aggregates: &IndexMap, - mut aggregate_values: BTreeMap, + query_context: &QueryContext<'_>, + path: &[&str], + _query_aggregates: &IndexMap, + value: Bson, ) -> Result> { - query_aggregates - .iter() - .map( - |(key, _aggregate_definition)| match aggregate_values.remove_entry(key) { - Some((owned_key, value)) => Ok(( - owned_key, - // TODO: bson_to_json - from_bson(value)?, - )), - None => Err(QueryResponseError::MissingAggregateValue(key.clone())), - }, - ) - .try_collect() + let (aggregates_type, temp_object_types) = type_for_aggregates()?; + + let object_types = extend_configured_object_types(query_context, temp_object_types); + + let json = bson_to_json(&aggregates_type, &object_types, value)?; + + // The NDC type uses an IndexMap for aggregate values; we need to convert the map + // underlying the Value::Object value to an IndexMap + let aggregate_values = match json { + serde_json::Value::Object(obj) => obj.into_iter().collect(), + _ => Err(QueryResponseError::AggregatesNotObject { + path: path_to_owned(path), + })?, + }; + Ok(aggregate_values) } fn serialize_rows( @@ -198,57 +191,136 @@ fn serialize_rows( query_fields: &IndexMap, docs: Vec, ) -> Result>> { + let (row_type, temp_object_types) = type_for_row( + query_context, + query_request, + path, + collection_name, + query_fields, + )?; + + let object_types = extend_configured_object_types(query_context, temp_object_types); + docs.into_iter() .map(|doc| { - serialize_single_row( - query_context, - query_request, - path, - collection_name, - query_fields, - doc, - ) + let json = bson_to_json(&row_type, &object_types, doc.into())?; + // The NDC types use an IndexMap for each row value; we need to convert the map + // underlying the Value::Object value to an IndexMap + let index_map = match json { + serde_json::Value::Object(obj) => obj + .into_iter() + .map(|(key, value)| (key, RowFieldValue(value))) + .collect(), + _ => unreachable!(), + }; + Ok(index_map) }) .try_collect() } -fn serialize_single_row( +fn type_for_row_set( + query_context: &QueryContext<'_>, + query_request: &QueryRequest, + path: &[&str], + collection_name: &str, + query: &Query, +) -> Result<(Type, ObjectTypes)> { + let mut fields = BTreeMap::new(); + let mut object_types = vec![]; + + if has_aggregates(query) { + let (aggregates_type, nested_object_types) = type_for_aggregates()?; + fields.insert( + "aggregates".to_owned(), + ObjectField { + r#type: aggregates_type, + description: Default::default(), + }, + ); + object_types.extend(nested_object_types); + } + + if let Some(query_fields) = &query.fields { + let (row_type, nested_object_types) = type_for_row( + query_context, + query_request, + path, + collection_name, + query_fields, + )?; + fields.insert( + "rows".to_owned(), + ObjectField { + r#type: Type::ArrayOf(Box::new(row_type)), + description: Default::default(), + }, + ); + object_types.extend(nested_object_types); + } + + let (row_set_type_name, row_set_type) = named_type(path, "row_set"); + let object_type = ObjectType { + description: Default::default(), + fields, + }; + object_types.push((row_set_type_name, object_type)); + + Ok((row_set_type, object_types)) +} + +// TODO: infer response type for aggregates MDB-130 +fn type_for_aggregates() -> Result<(Type, ObjectTypes)> { + Ok((Type::ExtendedJSON, Default::default())) +} + +fn type_for_row( query_context: &QueryContext<'_>, query_request: &QueryRequest, path: &[&str], collection_name: &str, query_fields: &IndexMap, - mut doc: bson::Document, -) -> Result> { - query_fields +) -> Result<(Type, ObjectTypes)> { + let mut object_types = vec![]; + + let fields = query_fields .iter() .map(|(field_name, field_definition)| { - let value = serialize_field_value( + let (field_type, nested_object_types) = type_for_field( query_context, query_request, &append_to_path(path, [field_name.as_ref()]), collection_name, field_definition, - field_name, - &mut doc, )?; - Ok((field_name.clone(), RowFieldValue(value))) + object_types.extend(nested_object_types); + Ok(( + field_name.clone(), + ObjectField { + description: Default::default(), + r#type: field_type, + }, + )) }) - .try_collect() + .try_collect::<_, _, QueryResponseError>()?; + + let (row_type_name, row_type) = named_type(path, "row"); + let object_type = ObjectType { + description: Default::default(), + fields, + }; + object_types.push((row_type_name, object_type)); + + Ok((row_type, object_types)) } -fn serialize_field_value( +fn type_for_field( query_context: &QueryContext<'_>, query_request: &QueryRequest, path: &[&str], collection_name: &str, field_definition: &ndc::Field, - field_name: &str, - input: &mut bson::Document, -) -> Result { - let value_option = input.remove(field_name); - - let (requested_type, value, temp_object_types) = match field_definition { +) -> Result<(Type, ObjectTypes)> { + match field_definition { ndc::Field::Column { column, fields } => { let field_type = find_field_type(query_context, path, collection_name, column)?; @@ -260,9 +332,7 @@ fn serialize_field_value( fields.as_ref(), )?; - let value = value_from_option(collection_name, column, &requested_type, value_option)?; - - (requested_type, value, temp_object_types) + Ok((requested_type, temp_object_types)) } ndc::Field::Relationship { @@ -273,24 +343,9 @@ fn serialize_field_value( let (requested_type, temp_object_types) = type_for_relation_field(query_context, query_request, path, query, relationship)?; - let value = value_option.ok_or_else(|| QueryResponseError::MissingRelationData { - path: path_to_owned(path), - })?; - - (requested_type, value, temp_object_types) + Ok((requested_type, temp_object_types)) } - }; - - let object_types = if temp_object_types.is_empty() { - query_context.object_types.clone() // We're cloning a Cow, not a BTreeMap - } else { - let mut configured_types = query_context.object_types.clone().into_owned(); - configured_types.extend(temp_object_types); - Cow::Owned(configured_types) - }; - - let json = bson_to_json(&requested_type, &object_types, value)?; - Ok(json) + } } fn find_field_type<'a>( @@ -450,10 +505,6 @@ fn requested_field_definition( } } -/// We have a predefined object type for each collection, and for each nested object in -/// a collection. Those types don't have fields defined for joined relationships since such fields -/// are a query-time thing. When a query requests related data we have to create a new field -/// definition to merge with fields in the predefined object type. fn type_for_relation_field( query_context: &QueryContext<'_>, query_request: &QueryRequest, @@ -468,67 +519,28 @@ fn type_for_relation_field( relationship_name: relationship.to_owned(), path: path_to_owned(path), })?; - let collection_name = &relationship_def.target_collection; - let collection = query_context.find_collection(collection_name)?; - - // Related data always comes back as an array, even if the relation type is "Object". - let relation_type = Type::ArrayOf(Box::new(Type::Object( - collection.collection_type.to_owned(), - ))); - - // Translate requested query fields into a `NestedField` value to match what we get for - // column fields. - let fields = query.fields.as_ref().map(|query_fields| { - NestedField::Array(NestedArray { - fields: Box::new(NestedField::Object(NestedObject { - fields: query_fields.clone(), - })), - }) - }); - - let (requested_relation_type, mut temp_object_types) = prune_type_to_field_selection( + type_for_row_set( query_context, query_request, path, - &relation_type, - fields.as_ref(), - )?; - - // Relation data is wrapped in an object with a `rows` property - let relation_object_type = ObjectType { - fields: [( - "rows".to_owned(), - ObjectField { - r#type: requested_relation_type, - description: None, - }, - )] - .into(), - description: Default::default(), - }; - let relation_object_type_name = format!("relation_{}", path.iter().join("_")); - temp_object_types.push((relation_object_type_name.clone(), relation_object_type)); - let requested_type = Type::Object(relation_object_type_name); - - Ok((requested_type, temp_object_types)) + &relationship_def.target_collection, + query, + ) } -/// Check option result for a BSON value. If the value is missing but the expected type is nullable -/// then return null. Otherwise return an error. -fn value_from_option( - collection_name: &str, - column: &str, - expected_type: &Type, - value_option: Option, -) -> Result { - match (expected_type, value_option) { - (_, Some(value)) => Ok(value), - (Type::Nullable(_), None) => Ok(Bson::Null), - _ => Err(QueryResponseError::MissingColumnValue { - collection_name: collection_name.to_string(), - column: column.to_string(), - expected_type: expected_type.clone(), - }), +fn extend_configured_object_types<'a>( + query_context: &QueryContext<'a>, + object_types: ObjectTypes, +) -> Cow<'a, BTreeMap> { + if object_types.is_empty() { + // We're cloning a Cow, not a BTreeMap here. In production that will be a [Cow::Borrowed] + // variant so effectively that means we're cloning a wide pointer + query_context.object_types.clone() + } else { + // This time we're cloning the BTreeMap + let mut extended_object_types = query_context.object_types.clone().into_owned(); + extended_object_types.extend(object_types); + Cow::Owned(extended_object_types) } } @@ -544,6 +556,13 @@ where Ok(value) } +fn has_aggregates(query: &Query) -> bool { + match &query.aggregates { + Some(aggregates) => !aggregates.is_empty(), + None => false, + } +} + fn append_to_path<'a>(path: &[&'a str], elems: impl IntoIterator) -> Vec<&'a str> { path.iter().copied().chain(elems).collect() } @@ -552,6 +571,12 @@ fn path_to_owned(path: &[&str]) -> Vec { path.iter().map(|x| (*x).to_owned()).collect() } +fn named_type(path: &[&str], name_suffix: &str) -> (String, Type) { + let name = format!("{}_{name_suffix}", path.iter().join("_")); + let t = Type::Object(name.clone()); + (name, t) +} + #[cfg(test)] mod tests { use std::{borrow::Cow, str::FromStr}; From f3a89b59d133728b8fa84fa3ff6435617a713991 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Fri, 26 Apr 2024 15:15:41 -0700 Subject: [PATCH 24/25] test generated names look right, some cleanup --- crates/configuration/src/schema/mod.rs | 50 +------ .../mongodb-connector/src/query_response.rs | 132 ++++++++++++++++-- crates/mongodb-connector/src/test_helpers.rs | 51 +++++-- crates/ndc-test-helpers/src/lib.rs | 2 - crates/ndc-test-helpers/src/types.rs | 23 --- 5 files changed, 164 insertions(+), 94 deletions(-) delete mode 100644 crates/ndc-test-helpers/src/types.rs diff --git a/crates/configuration/src/schema/mod.rs b/crates/configuration/src/schema/mod.rs index 90f4e0f4..4b7418ad 100644 --- a/crates/configuration/src/schema/mod.rs +++ b/crates/configuration/src/schema/mod.rs @@ -3,7 +3,7 @@ use std::collections::BTreeMap; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use mongodb_support::{BsonScalarType, EXTENDED_JSON_TYPE_NAME}; +use mongodb_support::BsonScalarType; use crate::{WithName, WithNameRef}; @@ -95,30 +95,6 @@ impl From for ndc_models::Type { } } -// Should only be used for testing -impl From for Type { - fn from(value: ndc_models::Type) -> Self { - match value { - ndc_models::Type::Named { name } => { - if name == EXTENDED_JSON_TYPE_NAME { - Type::ExtendedJSON - } else if let Ok(scalar_type) = BsonScalarType::from_bson_name(&name) { - Type::Scalar(scalar_type) - } else { - Type::Object(name.clone()) - } - } - ndc_models::Type::Nullable { underlying_type } => { - Type::Nullable(Box::new((*underlying_type).into())) - } - ndc_models::Type::Array { element_type } => { - Type::ArrayOf(Box::new((*element_type).into())) - } - ndc_models::Type::Predicate { .. } => panic!("not implemented"), - } - } -} - #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct ObjectType { @@ -154,20 +130,6 @@ impl From for ndc_models::ObjectType { } } -// Should only be used for testing -impl From for ObjectType { - fn from(value: ndc_models::ObjectType) -> Self { - ObjectType { - fields: value - .fields - .into_iter() - .map(|(name, field)| (name, field.into())) - .collect(), - description: value.description, - } - } -} - /// Information about an object type field. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] @@ -197,13 +159,3 @@ impl From for ndc_models::ObjectField { } } } - -// Should only be used for testing -impl From for ObjectField { - fn from(value: ndc_models::ObjectField) -> Self { - ObjectField { - r#type: value.r#type.into(), - description: value.description, - } - } -} diff --git a/crates/mongodb-connector/src/query_response.rs b/crates/mongodb-connector/src/query_response.rs index 4ffaeeec..ff0bbf5c 100644 --- a/crates/mongodb-connector/src/query_response.rs +++ b/crates/mongodb-connector/src/query_response.rs @@ -14,6 +14,8 @@ use thiserror::Error; use crate::api_type_conversions::{ConversionError, QueryContext}; +const GEN_OBJECT_TYPE_PREFIX: &str = "__query__"; + #[derive(Debug, Error)] pub enum QueryResponseError { #[error("expected aggregates to be an object at path {}", path.join("."))] @@ -446,8 +448,7 @@ fn object_type_for_field_subset( fields, description: None, }; - let pruned_object_type_name = format!("requested_fields_{}", path.iter().join("_")); - let pruned_type = Type::Object(pruned_object_type_name.clone()); + let (pruned_object_type_name, pruned_type) = named_type(path, "fields"); let mut object_types: Vec<(String, ObjectType)> = object_type_sets.into_iter().flatten().collect(); @@ -572,29 +573,34 @@ fn path_to_owned(path: &[&str]) -> Vec { } fn named_type(path: &[&str], name_suffix: &str) -> (String, Type) { - let name = format!("{}_{name_suffix}", path.iter().join("_")); + let name = format!( + "{GEN_OBJECT_TYPE_PREFIX}{}_{name_suffix}", + path.iter().join("_") + ); let t = Type::Object(name.clone()); (name, t) } #[cfg(test)] mod tests { - use std::{borrow::Cow, str::FromStr}; + use std::{borrow::Cow, collections::BTreeMap, str::FromStr}; - use configuration::schema::Type; + use configuration::schema::{ObjectType, Type}; use mongodb::bson::{self, Bson}; use mongodb_support::BsonScalarType; use ndc_sdk::models::{QueryResponse, RowFieldValue, RowSet}; - use ndc_test_helpers::{array, collection, field, object, object_type, query, query_request}; + use ndc_test_helpers::{ + array, collection, field, object, query, query_request, relation_field, relationship, + }; use pretty_assertions::assert_eq; use serde_json::json; use crate::{ api_type_conversions::QueryContext, - test_helpers::{make_nested_schema, make_scalar_types}, + test_helpers::{make_nested_schema, make_scalar_types, object_type}, }; - use super::serialize_query_response; + use super::{serialize_query_response, type_for_row_set}; #[test] fn serializes_response_with_nested_fields() -> anyhow::Result<()> { @@ -837,4 +843,114 @@ mod tests { ); Ok(()) } + + #[test] + fn uses_field_path_to_guarantee_distinct_type_names() -> anyhow::Result<()> { + let query_context = make_nested_schema(); + let collection_name = "appearances"; + let request = query_request() + .collection(collection_name) + .relationships([("author", relationship("authors", [("authorId", "id")]))]) + .query( + query().fields([relation_field!("author" => "presenter", query().fields([ + field!("addr" => "address", object!([ + field!("street"), + field!("geocode" => "geocode", object!([ + field!("latitude"), + field!("long" => "longitude"), + ])) + ])), + field!("articles" => "articles", array!(object!([ + field!("article_title" => "title") + ]))), + ]))]), + ) + .into(); + let path = [collection_name]; + + let (row_set_type, object_types) = type_for_row_set( + &query_context, + &request, + &path, + collection_name, + &request.query, + )?; + + // Convert object types into a map so we can compare without worrying about order + let object_types: BTreeMap = object_types.into_iter().collect(); + + assert_eq!( + (row_set_type, object_types), + ( + Type::Object("__query__appearances_row_set".to_owned()), + [ + ( + "__query__appearances_row_set".to_owned(), + object_type([( + "rows".to_owned(), + Type::ArrayOf(Box::new(Type::Object( + "__query__appearances_row".to_owned() + ))) + )]), + ), + ( + "__query__appearances_row".to_owned(), + object_type([( + "presenter".to_owned(), + Type::Object("__query__appearances_presenter_row_set".to_owned()) + )]), + ), + ( + "__query__appearances_presenter_row_set".to_owned(), + object_type([( + "rows", + Type::ArrayOf(Box::new(Type::Object( + "__query__appearances_presenter_row".to_owned() + ))) + )]), + ), + ( + "__query__appearances_presenter_row".to_owned(), + object_type([ + ( + "addr", + Type::Object("__query__appearances_presenter_addr_fields".to_owned()) + ), + ( + "articles", + Type::ArrayOf(Box::new(Type::Object( + "__query__appearances_presenter_articles_fields".to_owned() + ))) + ), + ]), + ), + ( + "__query__appearances_presenter_addr_fields".to_owned(), + object_type([ + ( + "geocode", + Type::Nullable(Box::new(Type::Object( + "__query__appearances_presenter_addr_geocode_fields".to_owned() + ))) + ), + ("street", Type::Scalar(BsonScalarType::String)), + ]), + ), + ( + "__query__appearances_presenter_addr_geocode_fields".to_owned(), + object_type([ + ("latitude", Type::Scalar(BsonScalarType::Double)), + ("long", Type::Scalar(BsonScalarType::Double)), + ]), + ), + ( + "__query__appearances_presenter_articles_fields".to_owned(), + object_type([("article_title", Type::Scalar(BsonScalarType::String))]), + ), + ] + .into() + ) + ); + Ok(()) + } } diff --git a/crates/mongodb-connector/src/test_helpers.rs b/crates/mongodb-connector/src/test_helpers.rs index 8b2aa1a2..85c48d18 100644 --- a/crates/mongodb-connector/src/test_helpers.rs +++ b/crates/mongodb-connector/src/test_helpers.rs @@ -6,10 +6,30 @@ use ndc_sdk::models::{ AggregateFunctionDefinition, CollectionInfo, ComparisonOperatorDefinition, ScalarType, Type, TypeRepresentation, }; -use ndc_test_helpers::make_primary_key_uniqueness_constraint; +use ndc_test_helpers::{collection, make_primary_key_uniqueness_constraint}; use crate::api_type_conversions::QueryContext; +pub fn object_type( + fields: impl IntoIterator)>, +) -> schema::ObjectType { + schema::ObjectType { + description: Default::default(), + fields: fields + .into_iter() + .map(|(name, field_type)| { + ( + name.to_string(), + schema::ObjectField { + description: Default::default(), + r#type: field_type.into(), + }, + ) + }) + .collect(), + } +} + pub fn make_scalar_types() -> BTreeMap { BTreeMap::from([ ( @@ -139,17 +159,20 @@ pub fn make_flat_schema() -> QueryContext<'static> { pub fn make_nested_schema() -> QueryContext<'static> { QueryContext { - collections: Cow::Owned(BTreeMap::from([( - "authors".into(), - CollectionInfo { - name: "authors".into(), - description: None, - collection_type: "Author".into(), - arguments: Default::default(), - uniqueness_constraints: make_primary_key_uniqueness_constraint("authors"), - foreign_keys: Default::default(), - }, - )])), + collections: Cow::Owned(BTreeMap::from([ + ( + "authors".into(), + CollectionInfo { + name: "authors".into(), + description: None, + collection_type: "Author".into(), + arguments: Default::default(), + uniqueness_constraints: make_primary_key_uniqueness_constraint("authors"), + foreign_keys: Default::default(), + }, + ), + collection("appearances"), // new helper gives more concise syntax + ])), functions: Default::default(), object_types: Cow::Owned(BTreeMap::from([ ( @@ -260,6 +283,10 @@ pub fn make_nested_schema() -> QueryContext<'static> { ]), }, ), + ( + "appearances".to_owned(), + object_type([("authorId", schema::Type::Scalar(BsonScalarType::ObjectId))]).into(), + ), ])), scalar_types: Cow::Owned(make_scalar_types()), } diff --git a/crates/ndc-test-helpers/src/lib.rs b/crates/ndc-test-helpers/src/lib.rs index 7be28e21..c1fe9731 100644 --- a/crates/ndc-test-helpers/src/lib.rs +++ b/crates/ndc-test-helpers/src/lib.rs @@ -8,7 +8,6 @@ mod comparison_value; mod exists_in_collection; mod expressions; mod field; -mod types; use std::collections::BTreeMap; @@ -24,7 +23,6 @@ pub use comparison_value::*; pub use exists_in_collection::*; pub use expressions::*; pub use field::*; -pub use types::*; #[derive(Clone, Debug, Default)] pub struct QueryRequestBuilder { diff --git a/crates/ndc-test-helpers/src/types.rs b/crates/ndc-test-helpers/src/types.rs deleted file mode 100644 index dde51b96..00000000 --- a/crates/ndc-test-helpers/src/types.rs +++ /dev/null @@ -1,23 +0,0 @@ -use std::fmt::Display; - -use ndc_models::{ObjectField, ObjectType, Type}; - -pub fn object_type( - fields: impl IntoIterator)>, -) -> ObjectType { - ObjectType { - description: Default::default(), - fields: fields - .into_iter() - .map(|(name, field_type)| { - ( - name.to_string(), - ObjectField { - description: Default::default(), - r#type: field_type.into(), - }, - ) - }) - .collect(), - } -} From 0ac22ec624f77bda4bf0f5d6e32fa238cd3ac6b5 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Fri, 26 Apr 2024 15:28:04 -0700 Subject: [PATCH 25/25] lint fixes --- crates/mongodb-connector/src/query_response.rs | 9 +++++---- crates/mongodb-connector/src/test_helpers.rs | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/crates/mongodb-connector/src/query_response.rs b/crates/mongodb-connector/src/query_response.rs index ff0bbf5c..0643dc52 100644 --- a/crates/mongodb-connector/src/query_response.rs +++ b/crates/mongodb-connector/src/query_response.rs @@ -747,8 +747,7 @@ mod tests { object_type([ ("price", Type::Scalar(BsonScalarType::Decimal)), ("price_extjson", Type::ExtendedJSON), - ]) - .into(), + ]), )] .into(), ), @@ -793,7 +792,7 @@ mod tests { object_types: Cow::Owned( [( "data".to_owned(), - object_type([("value", Type::ExtendedJSON)]).into(), + object_type([("value", Type::ExtendedJSON)]), )] .into(), ), @@ -914,7 +913,9 @@ mod tests { object_type([ ( "addr", - Type::Object("__query__appearances_presenter_addr_fields".to_owned()) + Type::Object( + "__query__appearances_presenter_addr_fields".to_owned() + ) ), ( "articles", diff --git a/crates/mongodb-connector/src/test_helpers.rs b/crates/mongodb-connector/src/test_helpers.rs index 85c48d18..4c9a9918 100644 --- a/crates/mongodb-connector/src/test_helpers.rs +++ b/crates/mongodb-connector/src/test_helpers.rs @@ -285,7 +285,7 @@ pub fn make_nested_schema() -> QueryContext<'static> { ), ( "appearances".to_owned(), - object_type([("authorId", schema::Type::Scalar(BsonScalarType::ObjectId))]).into(), + object_type([("authorId", schema::Type::Scalar(BsonScalarType::ObjectId))]), ), ])), scalar_types: Cow::Owned(make_scalar_types()),