diff --git a/Cargo.lock b/Cargo.lock index d1dbd18f..04ad9b9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -627,7 +627,7 @@ name = "dc-api-test-helpers" version = "0.1.0" dependencies = [ "dc-api-types", - "itertools 0.10.5", + "itertools 0.12.1", ] [[package]] @@ -635,7 +635,7 @@ name = "dc-api-types" version = "0.1.0" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.12.1", "mongodb", "nonempty", "once_cell", @@ -1654,7 +1654,7 @@ dependencies = [ "http 0.2.9", "indent", "indexmap 1.9.3", - "itertools 0.10.5", + "itertools 0.12.1", "mockall", "mongodb", "mongodb-cli-plugin", @@ -1709,7 +1709,7 @@ dependencies = [ "futures", "http 0.2.9", "indexmap 2.2.5", - "itertools 0.10.5", + "itertools 0.12.1", "lazy_static", "mongodb", "mongodb-agent-common", @@ -1828,7 +1828,7 @@ name = "ndc-test-helpers" version = "0.1.0" dependencies = [ "indexmap 2.2.5", - "itertools 0.10.5", + "itertools 0.12.1", "ndc-models", "serde_json", ] diff --git a/Cargo.toml b/Cargo.toml index f0d32f10..e61ce41e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,8 @@ resolver = "2" ndc-sdk = { git = "https://github.com/hasura/ndc-sdk-rs.git" } ndc-models = { git = "http://github.com/hasura/ndc-spec.git", tag = "v0.1.2" } +itertools = "^0.12.1" + # We have a fork of the mongodb driver with a fix for reading metadata from time # series collections. # See the upstream PR: https://github.com/mongodb/mongo-rust-driver/pull/1003 diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index fb1da2ad..80f3268f 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -13,7 +13,7 @@ anyhow = "1.0.80" clap = { version = "4.5.1", features = ["derive", "env"] } futures-util = "0.3.28" indexmap = { version = "1", features = ["serde"] } # must match the version that ndc-client uses -itertools = "^0.12.1" +itertools = { workspace = true } serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0.113", features = ["raw_value"] } thiserror = "1.0.57" diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml index 37d4af35..a4dcc197 100644 --- a/crates/configuration/Cargo.toml +++ b/crates/configuration/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] anyhow = "1" futures = "^0.3" -itertools = "^0.12" +itertools = { workspace = true } mongodb = "2.8" mongodb-support = { path = "../mongodb-support" } ndc-models = { workspace = true } diff --git a/crates/dc-api-test-helpers/Cargo.toml b/crates/dc-api-test-helpers/Cargo.toml index e1655489..2165ebe7 100644 --- a/crates/dc-api-test-helpers/Cargo.toml +++ b/crates/dc-api-test-helpers/Cargo.toml @@ -5,4 +5,4 @@ edition = "2021" [dependencies] dc-api-types = { path = "../dc-api-types" } -itertools = "^0.10" +itertools = { workspace = true } diff --git a/crates/dc-api-test-helpers/src/query.rs b/crates/dc-api-test-helpers/src/query.rs index 27604f58..4d73dccd 100644 --- a/crates/dc-api-test-helpers/src/query.rs +++ b/crates/dc-api-test-helpers/src/query.rs @@ -4,12 +4,12 @@ use dc_api_types::{Aggregate, Expression, Field, OrderBy, Query}; #[derive(Clone, Debug, Default)] pub struct QueryBuilder { - aggregates: Option>>, - aggregates_limit: Option>, - fields: Option>>, - limit: Option>, - offset: Option>, - order_by: Option>, + aggregates: Option>, + aggregates_limit: Option, + fields: Option>, + limit: Option, + offset: Option, + order_by: Option, predicate: Option, } @@ -22,7 +22,7 @@ impl QueryBuilder { where I: IntoIterator, { - self.fields = Some(Some(fields.into_iter().collect())); + self.fields = Some(fields.into_iter().collect()); self } @@ -30,7 +30,7 @@ impl QueryBuilder { where I: IntoIterator, { - self.aggregates = Some(Some(aggregates.into_iter().collect())); + self.aggregates = Some(aggregates.into_iter().collect()); self } @@ -40,7 +40,7 @@ impl QueryBuilder { } pub fn order_by(mut self, order_by: OrderBy) -> Self { - self.order_by = Some(Some(order_by)); + self.order_by = Some(order_by); self } } diff --git a/crates/dc-api-types/Cargo.toml b/crates/dc-api-types/Cargo.toml index 18349561..61cfa52f 100644 --- a/crates/dc-api-types/Cargo.toml +++ b/crates/dc-api-types/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -itertools = "^0.10" +itertools = { workspace = true } nonempty = { version = "0.8.1", features = ["serialize"] } once_cell = "1" regex = "1" diff --git a/crates/dc-api-types/src/query.rs b/crates/dc-api-types/src/query.rs index 529f907f..9d106123 100644 --- a/crates/dc-api-types/src/query.rs +++ b/crates/dc-api-types/src/query.rs @@ -16,49 +16,27 @@ pub struct Query { #[serde( rename = "aggregates", default, - with = "::serde_with::rust::double_option", skip_serializing_if = "Option::is_none" )] - pub aggregates: Option>>, + pub aggregates: Option<::std::collections::HashMap>, /// Optionally limit the maximum number of rows considered while applying aggregations. This limit does not apply to returned rows. #[serde( rename = "aggregates_limit", default, - with = "::serde_with::rust::double_option", skip_serializing_if = "Option::is_none" )] - pub aggregates_limit: Option>, + pub aggregates_limit: Option, /// Fields of the query - #[serde( - rename = "fields", - default, - with = "::serde_with::rust::double_option", - skip_serializing_if = "Option::is_none" - )] - pub fields: Option>>, + #[serde(rename = "fields", default, skip_serializing_if = "Option::is_none")] + pub fields: Option<::std::collections::HashMap>, /// Optionally limit the maximum number of returned rows. This limit does not apply to records considered while apply aggregations. - #[serde( - rename = "limit", - default, - with = "::serde_with::rust::double_option", - skip_serializing_if = "Option::is_none" - )] - pub limit: Option>, + #[serde(rename = "limit", default, skip_serializing_if = "Option::is_none")] + pub limit: Option, /// Optionally offset from the Nth result. This applies to both row and aggregation results. - #[serde( - rename = "offset", - default, - with = "::serde_with::rust::double_option", - skip_serializing_if = "Option::is_none" - )] - pub offset: Option>, - #[serde( - rename = "order_by", - default, - with = "::serde_with::rust::double_option", - skip_serializing_if = "Option::is_none" - )] - pub order_by: Option>, + #[serde(rename = "offset", default, skip_serializing_if = "Option::is_none")] + pub offset: Option, + #[serde(rename = "order_by", default, skip_serializing_if = "Option::is_none")] + pub order_by: Option, #[serde(rename = "where", skip_serializing_if = "Option::is_none")] pub r#where: Option, } diff --git a/crates/dc-api-types/src/query_response.rs b/crates/dc-api-types/src/query_response.rs index 8b1d9ca6..0c48d215 100644 --- a/crates/dc-api-types/src/query_response.rs +++ b/crates/dc-api-types/src/query_response.rs @@ -42,29 +42,13 @@ pub struct ForEachRow { pub query: RowSet, } -/// A row set must contain either rows, or aggregates, or possibly both #[skip_serializing_none] -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -#[serde(untagged)] -pub enum RowSet { - Aggregate { - /// The results of the aggregates returned by the query - aggregates: HashMap, - /// The rows returned by the query, corresponding to the query's fields - rows: Option>>, - }, - Rows { - /// Rows returned by a query that did not request aggregates. - rows: Vec>, - }, -} - -impl Default for RowSet { - fn default() -> Self { - RowSet::Rows { - rows: Default::default(), - } - } +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] +pub struct RowSet { + /// The results of the aggregates returned by the query + pub aggregates: Option>, + /// The rows returned by the query, corresponding to the query's fields + pub rows: Option>>, } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] diff --git a/crates/integration-tests/src/tests/local_relationship.rs b/crates/integration-tests/src/tests/local_relationship.rs index 6a897a58..151752c0 100644 --- a/crates/integration-tests/src/tests/local_relationship.rs +++ b/crates/integration-tests/src/tests/local_relationship.rs @@ -26,6 +26,7 @@ async fn joins_local_relationships() -> anyhow::Result<()> { user { email comments(limit: 2, order_by: {id: Asc}) { + id email } } diff --git a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__basic__runs_a_query.snap b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__basic__runs_a_query.snap index cea7aa7f..a4fec50d 100644 --- a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__basic__runs_a_query.snap +++ b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__basic__runs_a_query.snap @@ -5,43 +5,53 @@ expression: "query(r#\"\n query Movies {\n movie data: movies: - imdb: - rating: 6.2 + rating: + $numberDouble: "6.2" votes: 1189 title: Blacksmith Scene - imdb: - rating: 7.4 + rating: + $numberDouble: "7.4" votes: 9847 title: The Great Train Robbery - imdb: - rating: 7.1 + rating: + $numberDouble: "7.1" votes: 448 title: The Land Beyond the Sunset - imdb: - rating: 6.6 + rating: + $numberDouble: "6.6" votes: 1375 title: A Corner in Wheat - imdb: - rating: 7.3 + rating: + $numberDouble: "7.3" votes: 1034 title: "Winsor McCay, the Famous Cartoonist of the N.Y. Herald and His Moving Comics" - imdb: - rating: 6 + rating: + $numberInt: "6" votes: 371 title: Traffic in Souls - imdb: - rating: 7.3 + rating: + $numberDouble: "7.3" votes: 1837 title: Gertie the Dinosaur - imdb: - rating: 5.8 + rating: + $numberDouble: "5.8" votes: 223 title: In the Land of the Head Hunters - imdb: - rating: 7.6 + rating: + $numberDouble: "7.6" votes: 744 title: The Perils of Pauline - imdb: - rating: 6.8 + rating: + $numberDouble: "6.8" votes: 15715 title: The Birth of a Nation errors: ~ diff --git a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__local_relationship__joins_local_relationships.snap b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__local_relationship__joins_local_relationships.snap index 9ed9a0ee..ac32decb 100644 --- a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__local_relationship__joins_local_relationships.snap +++ b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__local_relationship__joins_local_relationships.snap @@ -1,14 +1,13 @@ --- source: crates/integration-tests/src/tests/local_relationship.rs -expression: "query(r#\"\n query {\n movies(limit: 2, order_by: {title: Asc}, where: {title: {_iregex: \"Rear\"}}) {\n id\n title\n comments(limit: 2, order_by: {id: Asc}) {\n email\n text\n movie {\n id\n title\n }\n user {\n email\n comments(limit: 2, order_by: {id: Asc}) {\n email\n text\n user {\n email\n comments(limit: 2, order_by: {id: Asc}) {\n email\n }\n }\n }\n }\n }\n }\n }\n \"#).variables(json!({\n \"limit\": 11, \"movies_limit\": 2\n })).run().await?" +expression: "query(r#\"\n query {\n movies(limit: 2, order_by: {title: Asc}, where: {title: {_iregex: \"Rear\"}}) {\n id\n title\n comments(limit: 2, order_by: {id: Asc}) {\n email\n text\n movie {\n id\n title\n }\n user {\n email\n comments(limit: 2, order_by: {id: Asc}) {\n email\n text\n user {\n email\n comments(limit: 2, order_by: {id: Asc}) {\n id\n email\n }\n }\n }\n }\n }\n }\n }\n \"#).variables(json!({\n \"limit\": 11, \"movies_limit\": 2\n })).run().await?" --- data: movies: - comments: - email: iain_glen@gameofthron.es movie: - id: - $oid: 573a1398f29313caabceb0b1 + id: 573a1398f29313caabceb0b1 title: A Night in the Life of Jimmy Reardon text: Debitis tempore cum natus quaerat dolores quibusdam perferendis. Pariatur aspernatur officia libero quod pariatur nobis neque. Maiores non ipsam iste repellendus distinctio praesentium iure. user: @@ -18,24 +17,26 @@ data: user: comments: - email: iain_glen@gameofthron.es + id: 5a9427648b0beebeb69579f3 - email: iain_glen@gameofthron.es + id: 5a9427648b0beebeb6957b0f email: iain_glen@gameofthron.es - email: iain_glen@gameofthron.es text: Impedit consectetur ex cupiditate enim. Placeat assumenda reiciendis iste neque similique nesciunt aperiam. user: comments: - email: iain_glen@gameofthron.es + id: 5a9427648b0beebeb69579f3 - email: iain_glen@gameofthron.es + id: 5a9427648b0beebeb6957b0f email: iain_glen@gameofthron.es email: iain_glen@gameofthron.es - id: - $oid: 573a1398f29313caabceb0b1 + id: 573a1398f29313caabceb0b1 title: A Night in the Life of Jimmy Reardon - comments: - email: owen_teale@gameofthron.es movie: - id: - $oid: 573a1394f29313caabcdfa00 + id: 573a1394f29313caabcdfa00 title: Rear Window text: Nobis corporis rem hic ipsa cum impedit. Esse nihil cum est minima ducimus temporibus minima. Sed reprehenderit tempore similique nam. Ipsam nesciunt veniam aut amet ut. user: @@ -45,17 +46,20 @@ data: user: comments: - email: owen_teale@gameofthron.es + id: 5a9427648b0beebeb6957b44 - email: owen_teale@gameofthron.es + id: 5a9427648b0beebeb6957cf6 email: owen_teale@gameofthron.es - email: owen_teale@gameofthron.es text: Repudiandae repellat quia officiis. Quidem voluptatum vel id itaque et. Corrupti corporis magni voluptas quae itaque fugiat quae. user: comments: - email: owen_teale@gameofthron.es + id: 5a9427648b0beebeb6957b44 - email: owen_teale@gameofthron.es + id: 5a9427648b0beebeb6957cf6 email: owen_teale@gameofthron.es email: owen_teale@gameofthron.es - id: - $oid: 573a1394f29313caabcdfa00 + id: 573a1394f29313caabcdfa00 title: Rear Window errors: ~ diff --git a/crates/mongodb-agent-common/Cargo.toml b/crates/mongodb-agent-common/Cargo.toml index d61d7284..e6a9ab7e 100644 --- a/crates/mongodb-agent-common/Cargo.toml +++ b/crates/mongodb-agent-common/Cargo.toml @@ -20,7 +20,7 @@ futures-util = "0.3.28" http = "^0.2" indexmap = { version = "1", features = ["serde"] } # must match the version that ndc-client uses indent = "^0.1" -itertools = "^0.10" +itertools = { workspace = true } mongodb = "2.8" once_cell = "1" regex = "1" diff --git a/crates/mongodb-agent-common/src/explain.rs b/crates/mongodb-agent-common/src/explain.rs index 259629c3..cad0d898 100644 --- a/crates/mongodb-agent-common/src/explain.rs +++ b/crates/mongodb-agent-common/src/explain.rs @@ -17,19 +17,13 @@ pub async fn explain_query( let db = state.database(); - let (pipeline, _) = query::pipeline_for_query_request(config, &query_request)?; + let pipeline = query::pipeline_for_query_request(config, &query_request)?; let pipeline_bson = to_bson(&pipeline)?; - let aggregate_target = match QueryTarget::for_request(config, &query_request) { - QueryTarget::Collection(collection_name) => Bson::String(collection_name), - QueryTarget::NativeQuery { native_query, .. } => { - match &native_query.input_collection { - Some(collection_name) => Bson::String(collection_name.to_string()), - // 1 means aggregation without a collection target - as in `db.aggregate()` instead of - // `db..aggregate()` - None => Bson::Int32(1) - } - } + let aggregate_target = match QueryTarget::for_request(config, &query_request).input_collection() + { + Some(collection_name) => Bson::String(collection_name.to_owned()), + None => Bson::Int32(1), }; let query_command = doc! { diff --git a/crates/mongodb-agent-common/src/mongodb/mod.rs b/crates/mongodb-agent-common/src/mongodb/mod.rs index 2a8961cf..f311835e 100644 --- a/crates/mongodb-agent-common/src/mongodb/mod.rs +++ b/crates/mongodb-agent-common/src/mongodb/mod.rs @@ -2,7 +2,6 @@ mod accumulator; mod collection; mod database; mod pipeline; -mod projection; pub mod sanitize; mod selection; mod stage; @@ -15,7 +14,6 @@ pub use self::{ collection::CollectionTrait, database::DatabaseTrait, pipeline::Pipeline, - projection::{ProjectAs, Projection}, selection::Selection, stage::Stage, }; diff --git a/crates/mongodb-agent-common/src/mongodb/projection.rs b/crates/mongodb-agent-common/src/mongodb/projection.rs deleted file mode 100644 index 2cf57f41..00000000 --- a/crates/mongodb-agent-common/src/mongodb/projection.rs +++ /dev/null @@ -1,272 +0,0 @@ -use std::collections::BTreeMap; - -use mongodb::bson::{self}; -use serde::Serialize; - -use dc_api_types::Field; - -use crate::mongodb::selection::serialized_null_checked_column_reference; - -/// A projection determines which fields to request from the result of a query. -/// -/// See https://www.mongodb.com/docs/manual/reference/operator/aggregation/project/#mongodb-pipeline-pipe.-project -#[derive(Clone, Debug, PartialEq, Serialize)] -#[serde(transparent)] -pub struct Projection { - pub field_projections: BTreeMap, -} - -impl Projection { - pub fn new(fields: T) -> Projection - where - T: IntoIterator, - K: Into, - { - Projection { - field_projections: fields.into_iter().map(|(k, v)| (k.into(), v)).collect(), - } - } - - pub fn for_field_selection(field_selection: T) -> Projection - where - T: IntoIterator, - K: Into, - { - for_field_selection_helper(&[], field_selection) - } -} - -fn for_field_selection_helper(parent_columns: &[&str], field_selection: T) -> Projection -where - T: IntoIterator, - K: Into, -{ - Projection::new( - field_selection - .into_iter() - .map(|(key, value)| (key.into(), project_field_as(parent_columns, &value))), - ) -} - -fn project_field_as(parent_columns: &[&str], field: &Field) -> ProjectAs { - match field { - Field::Column { - column, - column_type, - } => { - let col_path = match parent_columns { - [] => format!("${column}"), - _ => format!("${}.{}", parent_columns.join("."), column), - }; - let bson_col_path = serialized_null_checked_column_reference(col_path, column_type); - ProjectAs::Expression(bson_col_path) - } - Field::NestedObject { column, query } => { - let nested_parent_columns = append_to_path(parent_columns, column); - let fields = query.fields.clone().flatten().unwrap_or_default(); - ProjectAs::Nested(for_field_selection_helper(&nested_parent_columns, fields)) - } - Field::NestedArray { - field, - // NOTE: We can use a $slice in our projection to do offsets and limits: - // https://www.mongodb.com/docs/manual/reference/operator/projection/slice/#mongodb-projection-proj.-slice - limit: _, - offset: _, - r#where: _, - } => project_field_as(parent_columns, field), - Field::Relationship { - query, - relationship, - } => { - // TODO: Need to determine whether the relation type is "object" or "array" and project - // accordingly - let nested_parent_columns = append_to_path(parent_columns, relationship); - let fields = query.fields.clone().flatten().unwrap_or_default(); - ProjectAs::Nested(for_field_selection_helper(&nested_parent_columns, fields)) - } - } -} - -fn append_to_path<'a, 'b, 'c>(parent_columns: &'a [&'b str], column: &'c str) -> Vec<&'c str> -where - 'b: 'c, -{ - parent_columns.iter().copied().chain(Some(column)).collect() -} - -impl TryFrom<&Projection> for bson::Document { - type Error = bson::ser::Error; - fn try_from(value: &Projection) -> Result { - bson::to_document(value) - } -} - -impl TryFrom for bson::Document { - type Error = bson::ser::Error; - fn try_from(value: Projection) -> Result { - (&value).try_into() - } -} - -#[derive(Clone, Debug, PartialEq)] -pub enum ProjectAs { - #[allow(dead_code)] - Included, - Excluded, - Expression(bson::Bson), - Nested(Projection), -} - -impl Serialize for ProjectAs { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - match self { - ProjectAs::Included => serializer.serialize_u8(1), - ProjectAs::Excluded => serializer.serialize_u8(0), - ProjectAs::Expression(v) => v.serialize(serializer), - ProjectAs::Nested(projection) => projection.serialize(serializer), - } - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - - use mongodb::bson::{bson, doc, to_bson, to_document}; - use pretty_assertions::assert_eq; - use serde_json::{from_value, json}; - - use super::{ProjectAs, Projection}; - use dc_api_types::{Field, QueryRequest}; - - #[test] - fn serializes_a_projection() -> Result<(), anyhow::Error> { - let projection = Projection { - field_projections: [ - ("foo".to_owned(), ProjectAs::Included), - ( - "bar".to_owned(), - ProjectAs::Nested(Projection { - field_projections: [("baz".to_owned(), ProjectAs::Included)].into(), - }), - ), - ] - .into(), - }; - assert_eq!( - to_bson(&projection)?, - bson!({ - "foo": 1, - "bar": { - "baz": 1 - } - }) - ); - Ok(()) - } - - #[test] - fn calculates_projection_for_fields() -> Result<(), anyhow::Error> { - let fields: HashMap = from_value(json!({ - "foo": { "type": "column", "column": "foo", "column_type": "String" }, - "foo_again": { "type": "column", "column": "foo", "column_type": "String" }, - "bar": { - "type": "object", - "column": "bar", - "query": { - "fields": { - "baz": { "type": "column", "column": "baz", "column_type": "String" }, - "baz_again": { "type": "column", "column": "baz", "column_type": "String" }, - }, - }, - }, - "bar_again": { - "type": "object", - "column": "bar", - "query": { - "fields": { - "baz": { "type": "column", "column": "baz", "column_type": "String" }, - }, - }, - }, - "my_date": { "type": "column", "column": "my_date", "column_type": "date"}, - }))?; - let projection = Projection::for_field_selection(fields); - assert_eq!( - to_document(&projection)?, - doc! { - "foo": { "$ifNull": ["$foo", null] }, - "foo_again": { "$ifNull": ["$foo", null] }, - "bar": { - "baz": { "$ifNull": ["$bar.baz", null] }, - "baz_again": { "$ifNull": ["$bar.baz", null] } - }, - "bar_again": { - "baz": { "$ifNull": ["$bar.baz", null] } - }, - "my_date": { - "$dateToString": { - "date": { "$ifNull": ["$my_date", null] } - } - } - } - ); - Ok(()) - } - - #[test] - fn produces_projection_for_relation() -> Result<(), anyhow::Error> { - let query_request: QueryRequest = from_value(json!({ - "query": { - "fields": { - "class_students": { - "type": "relationship", - "query": { - "fields": { - "name": { "type": "column", "column": "name", "column_type": "string" }, - }, - }, - "relationship": "class_students", - }, - "students": { - "type": "relationship", - "query": { - "fields": { - "student_name": { "type": "column", "column": "name", "column_type": "string" }, - }, - }, - "relationship": "class_students", - }, - }, - }, - "target": { "name": ["classes"], "type": "table" }, - "relationships": [{ - "source_table": ["classes"], - "relationships": { - "class_students": { - "column_mapping": { "_id": "classId" }, - "relationship_type": "array", - "target": {"name": ["students"], "type": "table"}, - }, - }, - }], - }))?; - let projection = - Projection::for_field_selection(query_request.query.fields.flatten().unwrap()); - assert_eq!( - to_document(&projection)?, - doc! { - "class_students": { - "name": { "$ifNull": ["$class_students.name", null] }, - }, - "students": { - "student_name": { "$ifNull": ["$class_students.name", null] }, - }, - } - ); - Ok(()) - } -} diff --git a/crates/mongodb-agent-common/src/mongodb/selection.rs b/crates/mongodb-agent-common/src/mongodb/selection.rs index d9e5dfd3..db99df03 100644 --- a/crates/mongodb-agent-common/src/mongodb/selection.rs +++ b/crates/mongodb-agent-common/src/mongodb/selection.rs @@ -29,7 +29,7 @@ impl Selection { pub fn from_query_request(query_request: &QueryRequest) -> Result { // let fields = (&query_request.query.fields).flatten().unwrap_or_default(); let empty_map = HashMap::new(); - let fields = if let Some(Some(fs)) = &query_request.query.fields { + let fields = if let Some(fs) = &query_request.query.fields { fs } else { &empty_map @@ -92,7 +92,7 @@ fn selection_for_field( Field::NestedObject { column, query } => { let nested_parent_columns = append_to_path(parent_columns, column); let nested_parent_col_path = format!("${}", nested_parent_columns.join(".")); - let fields = query.fields.clone().flatten().unwrap_or_default(); + let fields = query.fields.clone().unwrap_or_default(); let nested_selection = from_query_request_helper(table_relationships, &nested_parent_columns, &fields)?; Ok(doc! {"$cond": {"if": nested_parent_col_path, "then": nested_selection, "else": Bson::Null}}.into()) @@ -126,7 +126,7 @@ fn selection_for_array( Field::NestedObject { column, query } => { let nested_parent_columns = append_to_path(parent_columns, column); let nested_parent_col_path = format!("${}", nested_parent_columns.join(".")); - let fields = query.fields.clone().flatten().unwrap_or_default(); + let fields = query.fields.clone().unwrap_or_default(); let mut nested_selection = from_query_request_helper(table_relationships, &["$this"], &fields)?; for _ in 0..array_nesting_level { @@ -249,7 +249,7 @@ mod tests { let query_request = QueryRequest { query: Box::new(Query { - fields: Some(Some(fields)), + fields: Some(fields), ..Default::default() }), foreach: None, diff --git a/crates/mongodb-agent-common/src/mongodb/stage.rs b/crates/mongodb-agent-common/src/mongodb/stage.rs index 7164046e..4be51550 100644 --- a/crates/mongodb-agent-common/src/mongodb/stage.rs +++ b/crates/mongodb-agent-common/src/mongodb/stage.rs @@ -3,7 +3,7 @@ use std::collections::BTreeMap; use mongodb::bson; use serde::Serialize; -use super::{accumulator::Accumulator, pipeline::Pipeline, projection::Projection, Selection}; +use super::{accumulator::Accumulator, pipeline::Pipeline, Selection}; /// Aggergation Pipeline Stage. This is a work-in-progress - we are adding enum variants to match /// MongoDB pipeline stage types as we need them in this app. For documentation on all stage types @@ -133,15 +133,6 @@ pub enum Stage { #[serde(rename = "$count")] Count(String), - /// Reshapes each document in the stream, such as by adding new fields or removing existing fields. For each input document, outputs one document. - /// - /// See also [`$unset`] for removing existing fields. - /// - /// See https://www.mongodb.com/docs/manual/reference/operator/aggregation/project/#mongodb-pipeline-pipe.-project - #[allow(dead_code)] - #[serde(rename = "$project")] - Project(Projection), - /// Replaces a document with the specified embedded document. The operation replaces all /// existing fields in the input document, including the _id field. Specify a document embedded /// in the input document to promote the embedded document to the top level. diff --git a/crates/mongodb-agent-common/src/query/execute_query_request.rs b/crates/mongodb-agent-common/src/query/execute_query_request.rs index b49cb58d..71c92a54 100644 --- a/crates/mongodb-agent-common/src/query/execute_query_request.rs +++ b/crates/mongodb-agent-common/src/query/execute_query_request.rs @@ -1,16 +1,14 @@ -use anyhow::anyhow; use configuration::Configuration; -use dc_api_types::{QueryRequest, QueryResponse, RowSet}; +use dc_api_types::QueryRequest; use futures::Stream; -use futures_util::TryStreamExt; -use itertools::Itertools as _; -use mongodb::bson::{self, Document}; +use futures_util::TryStreamExt as _; +use mongodb::bson; -use super::pipeline::{pipeline_for_query_request, ResponseShape}; +use super::pipeline::pipeline_for_query_request; use crate::{ interface_types::MongoAgentError, mongodb::{CollectionTrait as _, DatabaseTrait}, - query::{foreach::foreach_variants, QueryTarget}, + query::QueryTarget, }; /// Execute a query request against the given collection. @@ -21,9 +19,9 @@ pub async fn execute_query_request( database: impl DatabaseTrait, config: &Configuration, query_request: QueryRequest, -) -> Result { +) -> Result, MongoAgentError> { let target = QueryTarget::for_request(config, &query_request); - let (pipeline, response_shape) = pipeline_for_query_request(config, &query_request)?; + let pipeline = pipeline_for_query_request(config, &query_request)?; tracing::debug!( ?query_request, ?target, @@ -34,60 +32,24 @@ pub async fn execute_query_request( // The target of a query request might be a collection, or it might be a native query. In the // latter case there is no collection to perform the aggregation against. So instead of sending // the MongoDB API call `db..aggregate` we instead call `db.aggregate`. - let documents = match target { - QueryTarget::Collection(collection_name) => { - let collection = database.collection(&collection_name); + let documents = match target.input_collection() { + Some(collection_name) => { + let collection = database.collection(collection_name); collect_from_cursor(collection.aggregate(pipeline, None).await?).await } - QueryTarget::NativeQuery { native_query, .. } => { - match &native_query.input_collection { - Some(collection_name) => { - let collection = database.collection(collection_name); - collect_from_cursor(collection.aggregate(pipeline, None).await?).await - }, - None => collect_from_cursor(database.aggregate(pipeline, None).await?).await - } - } + None => collect_from_cursor(database.aggregate(pipeline, None).await?).await, }?; - tracing::debug!(response_documents = %serde_json::to_string(&documents).unwrap(), "response from MongoDB"); - let response = match (foreach_variants(&query_request), response_shape) { - (Some(_), _) => parse_single_document(documents)?, - (None, ResponseShape::ListOfRows) => QueryResponse::Single(RowSet::Rows { - rows: documents - .into_iter() - .map(bson::from_document) - .try_collect()?, - }), - (None, ResponseShape::SingleObject) => { - QueryResponse::Single(parse_single_document(documents)?) - } - }; - tracing::debug!(response = %serde_json::to_string(&response).unwrap(), "query response"); - - Ok(response) + Ok(documents) } async fn collect_from_cursor( - document_cursor: impl Stream>, -) -> Result, MongoAgentError> { + document_cursor: impl Stream>, +) -> Result, MongoAgentError> { document_cursor .into_stream() .map_err(MongoAgentError::MongoDB) .try_collect::>() .await } - -fn parse_single_document(documents: Vec) -> Result -where - T: for<'de> serde::Deserialize<'de>, -{ - let document = documents.into_iter().next().ok_or_else(|| { - MongoAgentError::AdHoc(anyhow!( - "Expected a response document from MongoDB, but did not get one" - )) - })?; - let value = bson::from_document(document)?; - Ok(value) -} diff --git a/crates/mongodb-agent-common/src/query/foreach.rs b/crates/mongodb-agent-common/src/query/foreach.rs index d347537e..3541f4f3 100644 --- a/crates/mongodb-agent-common/src/query/foreach.rs +++ b/crates/mongodb-agent-common/src/query/foreach.rs @@ -8,7 +8,7 @@ use dc_api_types::{ }; use mongodb::bson::{doc, Bson}; -use super::pipeline::{pipeline_for_non_foreach, ResponseShape}; +use super::pipeline::pipeline_for_non_foreach; use crate::mongodb::Selection; use crate::{ interface_types::MongoAgentError, @@ -57,8 +57,8 @@ pub fn pipeline_for_foreach( foreach: Vec, config: &Configuration, query_request: &QueryRequest, -) -> Result<(Pipeline, ResponseShape), MongoAgentError> { - let pipelines_with_response_shapes: Vec<(String, (Pipeline, ResponseShape))> = foreach +) -> Result { + let pipelines: Vec<(String, Pipeline)> = foreach .into_iter() .enumerate() .map(|(index, foreach_variant)| { @@ -76,32 +76,22 @@ pub fn pipeline_for_foreach( .into(); } - let pipeline_with_response_shape = - pipeline_for_non_foreach(config, variables.as_ref(), &q)?; - Ok((facet_name(index), pipeline_with_response_shape)) + let pipeline = pipeline_for_non_foreach(config, variables.as_ref(), &q)?; + Ok((facet_name(index), pipeline)) }) .collect::>()?; let selection = Selection(doc! { - "rows": pipelines_with_response_shapes.iter().map(|(key, (_, response_shape))| doc! { - "query": match response_shape { - ResponseShape::ListOfRows => doc! { "rows": format!("${key}") }.into(), - ResponseShape::SingleObject => Bson::String(format!("${key}")), - } - }).collect::>() + "row_sets": pipelines.iter().map(|(key, _)| + Bson::String(format!("${key}")), + ).collect::>() }); - let queries = pipelines_with_response_shapes - .into_iter() - .map(|(key, (pipeline, _))| (key, pipeline)) - .collect(); + let queries = pipelines.into_iter().collect(); - Ok(( - Pipeline { - stages: vec![Stage::Facet(queries), Stage::ReplaceWith(selection)], - }, - ResponseShape::SingleObject, - )) + Ok(Pipeline { + stages: vec![Stage::Facet(queries), Stage::ReplaceWith(selection)], + }) } /// Fold a 'foreach' HashMap into an Expression. @@ -136,10 +126,8 @@ fn facet_name(index: usize) -> String { #[cfg(test)] mod tests { - use dc_api_types::{ - BinaryComparisonOperator, ComparisonColumn, Field, Query, QueryRequest, QueryResponse, - }; - use mongodb::bson::{bson, Bson}; + use dc_api_types::{BinaryComparisonOperator, ComparisonColumn, Field, Query, QueryRequest}; + use mongodb::bson::{bson, doc, Bson}; use pretty_assertions::assert_eq; use serde_json::{from_value, json}; @@ -194,56 +182,40 @@ mod tests { }, { "$replaceWith": { - "rows": [ - { "query": { "rows": "$__FACET___0" } }, - { "query": { "rows": "$__FACET___1" } }, + "row_sets": [ + "$__FACET___0", + "$__FACET___1", ] }, } ]); - let expected_response: QueryResponse = from_value(json! ({ - "rows": [ - { - "query": { - "rows": [ - { "albumId": 1, "title": "For Those About To Rock We Salute You" }, - { "albumId": 4, "title": "Let There Be Rock" } - ] - } - }, - { - "query": { - "rows": [ - { "albumId": 2, "title": "Balls to the Wall" }, - { "albumId": 3, "title": "Restless and Wild" } - ] - } - } + let expected_response = vec![doc! { + "row_sets": [ + [ + { "albumId": 1, "title": "For Those About To Rock We Salute You" }, + { "albumId": 4, "title": "Let There Be Rock" }, + ], + [ + { "albumId": 2, "title": "Balls to the Wall" }, + { "albumId": 3, "title": "Restless and Wild" }, + ], ] - }))?; + }]; let db = mock_collection_aggregate_response_for_pipeline( "tracks", expected_pipeline, bson!([{ - "rows": [ - { - "query": { - "rows": [ - { "albumId": 1, "title": "For Those About To Rock We Salute You" }, - { "albumId": 4, "title": "Let There Be Rock" } - ] - } - }, - { - "query": { - "rows": [ - { "albumId": 2, "title": "Balls to the Wall" }, - { "albumId": 3, "title": "Restless and Wild" } - ] - } - } + "row_sets": [ + [ + { "albumId": 1, "title": "For Those About To Rock We Salute You" }, + { "albumId": 4, "title": "Let There Be Rock" } + ], + [ + { "albumId": 2, "title": "Balls to the Wall" }, + { "albumId": 3, "title": "Restless and Wild" } + ], ], }]), ); @@ -327,68 +299,60 @@ mod tests { }, { "$replaceWith": { - "rows": [ - { "query": "$__FACET___0" }, - { "query": "$__FACET___1" }, + "row_sets": [ + "$__FACET___0", + "$__FACET___1", ] }, } ]); - let expected_response: QueryResponse = from_value(json! ({ - "rows": [ + let expected_response = vec![doc! { + "row_sets": [ { - "query": { + "aggregates": { + "count": 2, + }, + "rows": [ + { "albumId": 1, "title": "For Those About To Rock We Salute You" }, + { "albumId": 4, "title": "Let There Be Rock" }, + ] + }, + { + "aggregates": { + "count": 2, + }, + "rows": [ + { "albumId": 2, "title": "Balls to the Wall" }, + { "albumId": 3, "title": "Restless and Wild" }, + ] + }, + ] + }]; + + let db = mock_collection_aggregate_response_for_pipeline( + "tracks", + expected_pipeline, + bson!([{ + "row_sets": [ + { "aggregates": { "count": 2, }, "rows": [ { "albumId": 1, "title": "For Those About To Rock We Salute You" }, - { "albumId": 4, "title": "Let There Be Rock" } + { "albumId": 4, "title": "Let There Be Rock" }, ] - } - }, - { - "query": { + }, + { "aggregates": { "count": 2, }, "rows": [ { "albumId": 2, "title": "Balls to the Wall" }, - { "albumId": 3, "title": "Restless and Wild" } + { "albumId": 3, "title": "Restless and Wild" }, ] - } - } - ] - }))?; - - let db = mock_collection_aggregate_response_for_pipeline( - "tracks", - expected_pipeline, - bson!([{ - "rows": [ - { - "query": { - "aggregates": { - "count": 2, - }, - "rows": [ - { "albumId": 1, "title": "For Those About To Rock We Salute You" }, - { "albumId": 4, "title": "Let There Be Rock" } - ] - } }, - { - "query": { - "aggregates": { - "count": 2, - }, - "rows": [ - { "albumId": 2, "title": "Balls to the Wall" }, - { "albumId": 3, "title": "Restless and Wild" } - ] - } - } ] }]), ); @@ -424,7 +388,7 @@ mod tests { name: "artistId".to_owned(), }, }), - fields: Some(Some( + fields: Some( [ ( "albumId".to_owned(), @@ -442,7 +406,7 @@ mod tests { ), ] .into(), - )), + ), aggregates: None, aggregates_limit: None, limit: None, @@ -480,88 +444,68 @@ mod tests { }, { "$replaceWith": { - "rows": [ - { "query": { "rows": "$__FACET___0" } }, - { "query": { "rows": "$__FACET___1" } }, - { "query": { "rows": "$__FACET___2" } }, - { "query": { "rows": "$__FACET___3" } }, - { "query": { "rows": "$__FACET___4" } }, - { "query": { "rows": "$__FACET___5" } }, - { "query": { "rows": "$__FACET___6" } }, - { "query": { "rows": "$__FACET___7" } }, - { "query": { "rows": "$__FACET___8" } }, - { "query": { "rows": "$__FACET___9" } }, - { "query": { "rows": "$__FACET___10" } }, - { "query": { "rows": "$__FACET___11" } }, + "row_sets": [ + "$__FACET___0", + "$__FACET___1", + "$__FACET___2", + "$__FACET___3", + "$__FACET___4", + "$__FACET___5", + "$__FACET___6", + "$__FACET___7", + "$__FACET___8", + "$__FACET___9", + "$__FACET___10", + "$__FACET___11", ] }, } ]); - let expected_response: QueryResponse = from_value(json! ({ - "rows": [ - { - "query": { - "rows": [ - { "albumId": 1, "title": "For Those About To Rock We Salute You" }, - { "albumId": 4, "title": "Let There Be Rock" } - ] - } - }, - { "query": { "rows": [] } }, - { - "query": { - "rows": [ - { "albumId": 2, "title": "Balls to the Wall" }, - { "albumId": 3, "title": "Restless and Wild" } - ] - } - }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, + let expected_response = vec![doc! { + "row_sets": [ + [ + { "albumId": 1, "title": "For Those About To Rock We Salute You" }, + { "albumId": 4, "title": "Let There Be Rock" } + ], + [], + [ + { "albumId": 2, "title": "Balls to the Wall" }, + { "albumId": 3, "title": "Restless and Wild" } + ], + [], + [], + [], + [], + [], + [], + [], + [], ] - }))?; + }]; let db = mock_collection_aggregate_response_for_pipeline( "tracks", expected_pipeline, bson!([{ - "rows": [ - { - "query": { - "rows": [ - { "albumId": 1, "title": "For Those About To Rock We Salute You" }, - { "albumId": 4, "title": "Let There Be Rock" } - ] - } - }, - { - "query": { - "rows": [] - } - }, - { - "query": { - "rows": [ - { "albumId": 2, "title": "Balls to the Wall" }, - { "albumId": 3, "title": "Restless and Wild" } - ] - } - }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, + "row_sets": [ + [ + { "albumId": 1, "title": "For Those About To Rock We Salute You" }, + { "albumId": 4, "title": "Let There Be Rock" } + ], + [], + [ + { "albumId": 2, "title": "Balls to the Wall" }, + { "albumId": 3, "title": "Restless and Wild" } + ], + [], + [], + [], + [], + [], + [], + [], + [], ], }]), ); diff --git a/crates/mongodb-agent-common/src/query/mod.rs b/crates/mongodb-agent-common/src/query/mod.rs index 08498435..c86a012a 100644 --- a/crates/mongodb-agent-common/src/query/mod.rs +++ b/crates/mongodb-agent-common/src/query/mod.rs @@ -12,7 +12,8 @@ mod relations; pub mod serialization; use configuration::Configuration; -use dc_api_types::{QueryRequest, QueryResponse}; +use dc_api_types::QueryRequest; +use mongodb::bson; use self::execute_query_request::execute_query_request; pub use self::{ @@ -27,7 +28,7 @@ pub async fn handle_query_request( config: &Configuration, state: &ConnectorState, query_request: QueryRequest, -) -> Result { +) -> Result, MongoAgentError> { let database = state.database(); // This function delegates to another function which gives is a point to inject a mock database // implementation for testing. @@ -36,8 +37,8 @@ pub async fn handle_query_request( #[cfg(test)] mod tests { - use dc_api_types::{QueryRequest, QueryResponse, RowSet}; - use mongodb::bson::{self, bson}; + use dc_api_types::QueryRequest; + use mongodb::bson::{self, bson, doc}; use pretty_assertions::assert_eq; use serde_json::{from_value, json}; @@ -64,12 +65,7 @@ mod tests { "relationships": [], }))?; - let expected_response: QueryResponse = from_value(json!({ - "rows": [ - { "student_gpa": 3.1 }, - { "student_gpa": 3.6 }, - ], - }))?; + let expected_response = vec![doc! { "student_gpa": 3.1 }, doc! { "student_gpa": 3.6 }]; let expected_pipeline = bson!([ { "$match": { "gpa": { "$lt": 4.0 } } }, @@ -112,12 +108,12 @@ mod tests { "relationships": [], }))?; - let expected_response: QueryResponse = from_value(json!({ + let expected_response = vec![doc! { "aggregates": { "count": 11, "avg": 3, } - }))?; + }]; let expected_pipeline = bson!([ { @@ -191,14 +187,14 @@ mod tests { "relationships": [], }))?; - let expected_response: QueryResponse = from_value(json!({ + let expected_response = vec![doc! { "aggregates": { "avg": 3.1, }, "rows": [{ "gpa": 3.1, }], - }))?; + }]; let expected_pipeline = bson!([ { "$match": { "gpa": { "$lt": 4.0 } } }, @@ -268,11 +264,7 @@ mod tests { "relationships": [] }))?; - let expected_response: QueryResponse = from_value(json!({ - "rows": [{ - "date": "2018-08-14T15:05:03.142Z", - }] - }))?; + let expected_response = vec![doc! { "date": "2018-08-14T15:05:03.142Z" }]; let expected_pipeline = bson!([ { @@ -316,7 +308,7 @@ mod tests { "relationships": [], }))?; - let expected_response = QueryResponse::Single(RowSet::Rows { rows: vec![] }); + let expected_response: Vec = vec![]; let db = mock_collection_aggregate_response("comments", bson!([])); diff --git a/crates/mongodb-agent-common/src/query/native_query.rs b/crates/mongodb-agent-common/src/query/native_query.rs index d2b4b1c8..9657ce64 100644 --- a/crates/mongodb-agent-common/src/query/native_query.rs +++ b/crates/mongodb-agent-common/src/query/native_query.rs @@ -87,11 +87,11 @@ mod tests { Configuration, }; use dc_api_test_helpers::{column, query, query_request}; - use dc_api_types::{Argument, QueryResponse}; + use dc_api_types::Argument; use mongodb::bson::{bson, doc}; use mongodb_support::BsonScalarType as S; use pretty_assertions::assert_eq; - use serde_json::{from_value, json}; + use serde_json::json; use crate::{ mongodb::test_helpers::mock_aggregate_response_for_pipeline, query::execute_query_request, @@ -272,12 +272,10 @@ mod tests { }, ]); - let expected_response: QueryResponse = from_value(json!({ - "rows": [ - { "title": "Beau Geste", "year": 1926, "genres": ["Action", "Adventure", "Drama"] }, - { "title": "For Heaven's Sake", "year": 1926, "genres": ["Action", "Comedy", "Romance"] }, - ], - }))?; + let expected_response = vec![ + doc! { "title": "Beau Geste", "year": 1926, "genres": ["Action", "Adventure", "Drama"] }, + doc! { "title": "For Heaven's Sake", "year": 1926, "genres": ["Action", "Comedy", "Romance"] }, + ]; let db = mock_aggregate_response_for_pipeline( expected_pipeline, diff --git a/crates/mongodb-agent-common/src/query/pipeline.rs b/crates/mongodb-agent-common/src/query/pipeline.rs index d105b1d9..ed67c2ac 100644 --- a/crates/mongodb-agent-common/src/query/pipeline.rs +++ b/crates/mongodb-agent-common/src/query/pipeline.rs @@ -18,18 +18,6 @@ use super::{ relations::pipeline_for_relations, }; -/// Signals the shape of data that will be returned by MongoDB. -#[derive(Clone, Copy, Debug)] -pub enum ResponseShape { - /// Indicates that the response will be a stream of records that must be wrapped in an object - /// with a `rows` field to produce a valid `QueryResponse` for HGE. - ListOfRows, - - /// Indicates that the response has already been wrapped in a single object with `rows` and/or - /// `aggregates` fields. - SingleObject, -} - /// A query that includes aggregates will be run using a $facet pipeline stage, while a query /// without aggregates will not. The choice affects how result rows are mapped to a QueryResponse. /// @@ -38,7 +26,7 @@ pub enum ResponseShape { /// can instead be appended to `pipeline`. pub fn is_response_faceted(query: &Query) -> bool { match &query.aggregates { - Some(Some(aggregates)) => !aggregates.is_empty(), + Some(aggregates) => !aggregates.is_empty(), _ => false, } } @@ -50,7 +38,7 @@ pub fn is_response_faceted(query: &Query) -> bool { pub fn pipeline_for_query_request( config: &Configuration, query_request: &QueryRequest, -) -> Result<(Pipeline, ResponseShape), MongoAgentError> { +) -> Result { let foreach = foreach_variants(query_request); if let Some(foreach) = foreach { pipeline_for_foreach(foreach, config, query_request) @@ -68,7 +56,7 @@ pub fn pipeline_for_non_foreach( config: &Configuration, variables: Option<&VariableSet>, query_request: &QueryRequest, -) -> Result<(Pipeline, ResponseShape), MongoAgentError> { +) -> Result { let query = &*query_request.query; let Query { offset, @@ -89,12 +77,8 @@ pub fn pipeline_for_non_foreach( .map(|expression| make_selector(variables, expression)) .transpose()? .map(Stage::Match); - let sort_stage: Option = order_by - .iter() - .flatten() - .map(|o| Stage::Sort(make_sort(o))) - .next(); - let skip_stage = offset.flatten().map(Stage::Skip); + let sort_stage: Option = order_by.iter().map(|o| Stage::Sort(make_sort(o))).next(); + let skip_stage = offset.map(Stage::Skip); [match_stage, sort_stage, skip_stage] .into_iter() @@ -104,19 +88,17 @@ pub fn pipeline_for_non_foreach( // `diverging_stages` includes either a $facet stage if the query includes aggregates, or the // sort and limit stages if we are requesting rows only. In both cases the last stage is // a $replaceWith. - let (diverging_stages, response_shape) = if is_response_faceted(query) { + let diverging_stages = if is_response_faceted(query) { let (facet_pipelines, select_facet_results) = facet_pipelines_for_query(query_request)?; let aggregation_stages = Stage::Facet(facet_pipelines); let replace_with_stage = Stage::ReplaceWith(select_facet_results); - let stages = Pipeline::from_iter([aggregation_stages, replace_with_stage]); - (stages, ResponseShape::SingleObject) + Pipeline::from_iter([aggregation_stages, replace_with_stage]) } else { - let stages = pipeline_for_fields_facet(query_request)?; - (stages, ResponseShape::ListOfRows) + pipeline_for_fields_facet(query_request)? }; pipeline.append(diverging_stages); - Ok((pipeline, response_shape)) + Ok(pipeline) } /// Generate a pipeline to select fields requested by the given query. This is intended to be used @@ -128,7 +110,7 @@ pub fn pipeline_for_fields_facet( ) -> Result { let Query { limit, .. } = &*query_request.query; - let limit_stage = limit.flatten().map(Stage::Limit); + let limit_stage = limit.map(Stage::Limit); let replace_with_stage: Stage = Stage::ReplaceWith(Selection::from_query_request(query_request)?); @@ -155,16 +137,15 @@ fn facet_pipelines_for_query( let mut facet_pipelines = aggregates .iter() .flatten() - .flatten() .map(|(key, aggregate)| { Ok(( key.clone(), - pipeline_for_aggregate(aggregate.clone(), aggregates_limit.flatten())?, + pipeline_for_aggregate(aggregate.clone(), *aggregates_limit)?, )) }) .collect::, MongoAgentError>>()?; - if let Some(Some(_)) = fields { + if fields.is_some() { let fields_pipeline = pipeline_for_fields_facet(query_request)?; facet_pipelines.insert(ROWS_FIELD.to_owned(), fields_pipeline); } @@ -174,7 +155,6 @@ fn facet_pipelines_for_query( let aggregate_selections: bson::Document = aggregates .iter() .flatten() - .flatten() .map(|(key, _aggregate)| { // The facet result for each aggregate is an array containing a single document which // has a field called `result`. This code selects each facet result by name, and pulls @@ -201,7 +181,7 @@ fn facet_pipelines_for_query( }; let select_rows = match fields { - Some(Some(_)) => Some(("rows".to_owned(), Bson::String(format!("${ROWS_FIELD}")))), + Some(_) => Some(("rows".to_owned(), Bson::String(format!("${ROWS_FIELD}")))), _ => None, }; diff --git a/crates/mongodb-agent-common/src/query/query_target.rs b/crates/mongodb-agent-common/src/query/query_target.rs index 937365ec..25c62442 100644 --- a/crates/mongodb-agent-common/src/query/query_target.rs +++ b/crates/mongodb-agent-common/src/query/query_target.rs @@ -29,6 +29,15 @@ impl QueryTarget<'_> { None => QueryTarget::Collection(target_name), } } + + pub fn input_collection(&self) -> Option<&str> { + match self { + QueryTarget::Collection(collection_name) => Some(collection_name), + QueryTarget::NativeQuery { native_query, .. } => { + native_query.input_collection.as_deref() + } + } + } } impl Display for QueryTarget<'_> { diff --git a/crates/mongodb-agent-common/src/query/relations.rs b/crates/mongodb-agent-common/src/query/relations.rs index c6bc918c..206e603f 100644 --- a/crates/mongodb-agent-common/src/query/relations.rs +++ b/crates/mongodb-agent-common/src/query/relations.rs @@ -29,7 +29,7 @@ pub fn pipeline_for_relations( } = query_request; let empty_field_map = HashMap::new(); - let fields = if let Some(Some(fs)) = &query.fields { + let fields = if let Some(fs) = &query.fields { fs } else { &empty_field_map @@ -94,7 +94,7 @@ fn lookups_for_field( Field::Column { .. } => Ok(vec![]), Field::NestedObject { column, query } => { let nested_parent_columns = append_to_path(parent_columns, column); - let fields = query.fields.clone().flatten().unwrap_or_default(); + let fields = query.fields.clone().unwrap_or_default(); lookups_for_fields( config, query_request, @@ -138,7 +138,7 @@ fn lookups_for_field( let from = collection_reference(target.name())?; // Recursively build pipeline according to relation query - let (lookup_pipeline, _) = pipeline_for_non_foreach( + let lookup_pipeline = pipeline_for_non_foreach( config, variables, &QueryRequest { @@ -243,8 +243,8 @@ where #[cfg(test)] mod tests { - use dc_api_types::{QueryRequest, QueryResponse}; - use mongodb::bson::{bson, Bson}; + use dc_api_types::QueryRequest; + use mongodb::bson::{bson, doc, Bson}; use pretty_assertions::assert_eq; use serde_json::{from_value, json}; @@ -281,17 +281,13 @@ mod tests { }], }))?; - let expected_response: QueryResponse = from_value(json!({ - "rows": [ - { - "class_title": "MongoDB 101", - "students": [ - { "student_name": "Alice" }, - { "student_name": "Bob" }, - ], - }, - ], - }))?; + let expected_response = vec![doc! { + "class_title": "MongoDB 101", + "students": { "rows": [ + { "student_name": "Alice" }, + { "student_name": "Bob" }, + ] }, + }]; let expected_pipeline = bson!([ { @@ -332,10 +328,10 @@ mod tests { expected_pipeline, bson!([{ "class_title": "MongoDB 101", - "students": [ + "students": { "rows": [ { "student_name": "Alice" }, { "student_name": "Bob" }, - ], + ] }, }]), ); @@ -375,18 +371,16 @@ mod tests { }], }))?; - let expected_response: QueryResponse = from_value(json!({ - "rows": [ - { - "student_name": "Alice", - "class": { "class_title": "MongoDB 101" }, - }, - { - "student_name": "Bob", - "class": { "class_title": "MongoDB 101" }, - }, - ], - }))?; + let expected_response = vec![ + doc! { + "student_name": "Alice", + "class": { "rows": [{ "class_title": "MongoDB 101" }] }, + }, + doc! { + "student_name": "Bob", + "class": { "rows": [{ "class_title": "MongoDB 101" }] }, + }, + ]; let expected_pipeline = bson!([ { @@ -426,11 +420,11 @@ mod tests { bson!([ { "student_name": "Alice", - "class": { "class_title": "MongoDB 101" }, + "class": { "rows": [{ "class_title": "MongoDB 101" }] }, }, { "student_name": "Bob", - "class": { "class_title": "MongoDB 101" }, + "class": { "rows": [{ "class_title": "MongoDB 101" }] }, }, ]), ); @@ -471,17 +465,13 @@ mod tests { }], }))?; - let expected_response: QueryResponse = from_value(json!({ - "rows": [ - { - "class_title": "MongoDB 101", - "students": [ - { "student_name": "Alice" }, - { "student_name": "Bob" }, - ], - }, - ], - }))?; + let expected_response = vec![doc! { + "class_title": "MongoDB 101", + "students": { "rows": [ + { "student_name": "Alice" }, + { "student_name": "Bob" }, + ] }, + }]; let expected_pipeline = bson!([ { @@ -524,10 +514,10 @@ mod tests { expected_pipeline, bson!([{ "class_title": "MongoDB 101", - "students": [ - { "student_name": "Alice" }, - { "student_name": "Bob" }, - ], + "students": { "rows": [ + { "student_name": "Alice" }, + { "student_name": "Bob" }, + ] }, }]), ); @@ -589,28 +579,24 @@ mod tests { ], }))?; - let expected_response: QueryResponse = from_value(json!({ - "rows": [ + let expected_response = vec![doc! { + "class_title": "MongoDB 101", + "students": { "rows": [ { - "class_title": "MongoDB 101", - "students": { "rows": [ - { - "student_name": "Alice", - "assignments": { "rows": [ - { "assignment_title": "read chapter 2" }, - ]} - }, - { - "student_name": "Bob", - "assignments": { "rows": [ - { "assignment_title": "JSON Basics" }, - { "assignment_title": "read chapter 2" }, - ]} - }, - ]}, + "student_name": "Alice", + "assignments": { "rows": [ + { "assignment_title": "read chapter 2" }, + ]} }, - ], - }))?; + { + "student_name": "Bob", + "assignments": { "rows": [ + { "assignment_title": "JSON Basics" }, + { "assignment_title": "read chapter 2" }, + ]} + }, + ]}, + }]; let expected_pipeline = bson!([ { @@ -736,17 +722,13 @@ mod tests { }], }))?; - let expected_response: QueryResponse = from_value(json!({ - "rows": [ - { - "students_aggregate": { - "aggregates": { - "aggregate_count": 2, - }, - }, + let expected_response = vec![doc! { + "students_aggregate": { + "aggregates": { + "aggregate_count": 2, }, - ], - }))?; + }, + }]; let expected_pipeline = bson!([ { @@ -868,15 +850,13 @@ mod tests { ] }))?; - let expected_response: QueryResponse = from_value(json!({ - "rows": [{ + let expected_response = vec![doc! { "name": "Mercedes Tyler", "movie": { "rows": [{ - "title": "The Land Beyond the Sunset", - "year": 1912 + "title": "The Land Beyond the Sunset", + "year": 1912 }] }, - }] - }))?; + }]; let expected_pipeline = bson!([ { @@ -1004,16 +984,14 @@ mod tests { ] }))?; - let expected_response: QueryResponse = from_value(json!({ - "rows": [{ - "name": "Beric Dondarrion", - "movie": { "rows": [{ - "credits": { - "director": "Martin Scorsese", - } - }] }, - }] - }))?; + let expected_response = vec![doc! { + "name": "Beric Dondarrion", + "movie": { "rows": [{ + "credits": { + "director": "Martin Scorsese", + } + }] }, + }]; let expected_pipeline = bson!([ { diff --git a/crates/mongodb-agent-common/src/query/serialization/bson_to_json.rs b/crates/mongodb-agent-common/src/query/serialization/bson_to_json.rs index f745634e..2d4adbc9 100644 --- a/crates/mongodb-agent-common/src/query/serialization/bson_to_json.rs +++ b/crates/mongodb-agent-common/src/query/serialization/bson_to_json.rs @@ -92,7 +92,7 @@ fn bson_scalar_to_json(expected_type: BsonScalarType, value: Bson) -> Result { Ok(to_value::(b.into())?) } - (BsonScalarType::ObjectId, Bson::ObjectId(oid)) => Ok(to_value(oid)?), + (BsonScalarType::ObjectId, Bson::ObjectId(oid)) => Ok(Value::String(oid.to_hex())), (BsonScalarType::DbPointer, v) => Ok(v.into_canonical_extjson()), (_, v) => Err(BsonToJsonError::TypeMismatch( Type::Scalar(expected_type), @@ -226,11 +226,25 @@ fn convert_small_number(expected_type: BsonScalarType, value: Bson) -> Result anyhow::Result<()> { + let expected_string = "573a1390f29313caabcd446f"; + let json = bson_to_json( + &Type::Scalar(BsonScalarType::ObjectId), + &Default::default(), + Bson::ObjectId(FromStr::from_str(expected_string)?), + )?; + assert_eq!(json, Value::String(expected_string.to_owned())); + Ok(()) + } + #[test] fn serializes_document_with_missing_nullable_field() -> anyhow::Result<()> { let expected_type = Type::Object("test_object".to_owned()); diff --git a/crates/mongodb-agent-common/src/query/serialization/mod.rs b/crates/mongodb-agent-common/src/query/serialization/mod.rs index 31e63af4..be3becd0 100644 --- a/crates/mongodb-agent-common/src/query/serialization/mod.rs +++ b/crates/mongodb-agent-common/src/query/serialization/mod.rs @@ -5,5 +5,5 @@ mod json_to_bson; #[cfg(test)] mod tests; -pub use self::bson_to_json::bson_to_json; +pub use self::bson_to_json::{bson_to_json, BsonToJsonError}; pub use self::json_to_bson::{json_to_bson, json_to_bson_scalar, JsonToBsonError}; diff --git a/crates/mongodb-connector/Cargo.toml b/crates/mongodb-connector/Cargo.toml index 2ab44609..1c39372f 100644 --- a/crates/mongodb-connector/Cargo.toml +++ b/crates/mongodb-connector/Cargo.toml @@ -13,7 +13,7 @@ enum-iterator = "^2.0.0" futures = "^0.3" http = "^0.2" indexmap = { version = "2.1.0", features = ["serde"] } -itertools = "^0.10" +itertools = { workspace = true } lazy_static = "^1.4.0" mongodb = "2.8" mongodb-agent-common = { path = "../mongodb-agent-common" } diff --git a/crates/mongodb-connector/src/api_type_conversions/conversion_error.rs b/crates/mongodb-connector/src/api_type_conversions/conversion_error.rs index 4f34c8ca..b032f484 100644 --- a/crates/mongodb-connector/src/api_type_conversions/conversion_error.rs +++ b/crates/mongodb-connector/src/api_type_conversions/conversion_error.rs @@ -21,17 +21,32 @@ pub enum ConversionError { #[error("Unknown object type, \"{0}\"")] UnknownObjectType(String), - #[error("Unknown field \"{field_name}\" in object type \"{object_type}\"")] - UnknownObjectTypeField { object_type: String, field_name: String }, + #[error( + "Unknown field \"{field_name}\" in object type \"{object_type}\"{}", + at_path(path) + )] + UnknownObjectTypeField { + object_type: String, + field_name: String, + path: Vec, + }, #[error("Unknown collection, \"{0}\"")] UnknownCollection(String), - #[error("Unknown relationship, \"{0}\"")] - UnknownRelationship(String), + #[error("Unknown relationship, \"{relationship_name}\"{}", at_path(path))] + UnknownRelationship { + relationship_name: String, + path: Vec, + }, - #[error("Unknown aggregate function, \"{aggregate_function}\" in scalar type \"{scalar_type}\"")] - UnknownAggregateFunction { scalar_type: String, aggregate_function: String }, + #[error( + "Unknown aggregate function, \"{aggregate_function}\" in scalar type \"{scalar_type}\"" + )] + UnknownAggregateFunction { + scalar_type: String, + aggregate_function: String, + }, #[error("Query referenced a function, \"{0}\", but it has not been defined")] UnspecifiedFunction(String), @@ -57,3 +72,11 @@ impl From for ExplainError { } } } + +fn at_path(path: &[String]) -> String { + if path.is_empty() { + "".to_owned() + } else { + format!(" at path {}", path.join(".")) + } +} diff --git a/crates/mongodb-connector/src/api_type_conversions/mod.rs b/crates/mongodb-connector/src/api_type_conversions/mod.rs index 4b77162e..87386b60 100644 --- a/crates/mongodb-connector/src/api_type_conversions/mod.rs +++ b/crates/mongodb-connector/src/api_type_conversions/mod.rs @@ -8,5 +8,5 @@ mod query_traversal; pub use self::{ conversion_error::ConversionError, query_request::{v3_to_v2_query_request, QueryContext}, - query_response::{v2_to_v3_explain_response, v2_to_v3_query_response}, + query_response::v2_to_v3_explain_response, }; diff --git a/crates/mongodb-connector/src/api_type_conversions/query_request.rs b/crates/mongodb-connector/src/api_type_conversions/query_request.rs index 24e1d6ad..69acff43 100644 --- a/crates/mongodb-connector/src/api_type_conversions/query_request.rs +++ b/crates/mongodb-connector/src/api_type_conversions/query_request.rs @@ -24,7 +24,7 @@ pub struct QueryContext<'a> { } impl QueryContext<'_> { - fn find_collection( + pub fn find_collection( &self, collection_name: &str, ) -> Result<&v3::CollectionInfo, ConversionError> { @@ -40,7 +40,7 @@ impl QueryContext<'_> { )) } - fn find_collection_object_type( + pub fn find_collection_object_type( &self, collection_name: &str, ) -> Result, ConversionError> { @@ -48,7 +48,7 @@ impl QueryContext<'_> { self.find_object_type(&collection.collection_type) } - fn find_object_type<'a>( + pub fn find_object_type<'a>( &'a self, object_type_name: &'a str, ) -> Result, ConversionError> { @@ -105,6 +105,7 @@ fn find_object_field<'a>( ConversionError::UnknownObjectTypeField { object_type: object_type.name.to_string(), field_name: field_name.to_string(), + path: Default::default(), // TODO: set a path for more helpful error reporting } }) } @@ -144,7 +145,7 @@ fn v3_to_v2_query( query: v3::Query, collection_object_type: &WithNameRef, ) -> Result { - let aggregates: Option>> = query + let aggregates: Option> = query .aggregates .map(|aggregates| -> Result<_, ConversionError> { aggregates @@ -157,8 +158,7 @@ fn v3_to_v2_query( }) .collect() }) - .transpose()? - .map(Some); + .transpose()?; let fields = v3_to_v2_fields( context, @@ -168,7 +168,7 @@ fn v3_to_v2_query( query.fields, )?; - let order_by: Option> = query + let order_by: Option = query .order_by .map(|order_by| -> Result<_, ConversionError> { let (elements, relations) = order_by @@ -201,8 +201,7 @@ fn v3_to_v2_query( relations, }) }) - .transpose()? - .map(Some); + .transpose()?; let limit = optional_32bit_number_to_64bit(query.limit); let offset = optional_32bit_number_to_64bit(query.offset); @@ -293,8 +292,8 @@ fn v3_to_v2_fields( root_collection_object_type: &WithNameRef, object_type: &WithNameRef, v3_fields: Option>, -) -> Result>>, ConversionError> { - let v2_fields: Option>> = v3_fields +) -> Result>, ConversionError> { + let v2_fields: Option> = v3_fields .map(|fields| { fields .into_iter() @@ -312,8 +311,7 @@ fn v3_to_v2_fields( }) .collect::>() }) - .transpose()? - .map(Some); + .transpose()?; Ok(v2_fields) } @@ -871,11 +869,11 @@ fn v3_to_v2_comparison_value( } #[inline] -fn optional_32bit_number_to_64bit(n: Option) -> Option> +fn optional_32bit_number_to_64bit(n: Option) -> Option where B: From, { - n.map(|input| Some(input.into())) + n.map(|input| input.into()) } fn v3_to_v2_arguments(arguments: BTreeMap) -> HashMap { @@ -909,23 +907,17 @@ fn v3_to_v2_relationship_arguments( #[cfg(test)] mod tests { - use std::{ - borrow::Cow, - collections::{BTreeMap, HashMap}, - }; + use std::collections::HashMap; - use configuration::schema; use dc_api_test_helpers::{self as v2, source, table_relationships, target}; - use mongodb_support::BsonScalarType; - use ndc_sdk::models::{ - self as v3, AggregateFunctionDefinition, ComparisonOperatorDefinition, OrderByElement, - OrderByTarget, OrderDirection, ScalarType, Type, TypeRepresentation, - }; + use ndc_sdk::models::{OrderByElement, OrderByTarget, OrderDirection}; use ndc_test_helpers::*; use pretty_assertions::assert_eq; use serde_json::json; - use super::{v3_to_v2_query_request, v3_to_v2_relationships, QueryContext}; + use crate::test_helpers::{make_flat_schema, make_nested_schema}; + + use super::{v3_to_v2_query_request, v3_to_v2_relationships}; #[test] fn translates_query_request_relationships() -> Result<(), anyhow::Error> { @@ -1269,222 +1261,4 @@ mod tests { assert_eq!(v2_request, expected); Ok(()) } - - fn make_scalar_types() -> BTreeMap { - BTreeMap::from([ - ( - "String".to_owned(), - ScalarType { - representation: Some(TypeRepresentation::String), - aggregate_functions: Default::default(), - comparison_operators: BTreeMap::from([ - ("_eq".to_owned(), ComparisonOperatorDefinition::Equal), - ( - "_regex".to_owned(), - ComparisonOperatorDefinition::Custom { - argument_type: Type::Named { - name: "String".to_owned(), - }, - }, - ), - ]), - }, - ), - ( - "Int".to_owned(), - ScalarType { - representation: Some(TypeRepresentation::Int32), - aggregate_functions: BTreeMap::from([( - "avg".into(), - AggregateFunctionDefinition { - result_type: Type::Named { - name: "Float".into(), // Different result type to the input scalar type - }, - }, - )]), - comparison_operators: BTreeMap::from([( - "_eq".to_owned(), - ComparisonOperatorDefinition::Equal, - )]), - }, - ), - ]) - } - - fn make_flat_schema() -> QueryContext<'static> { - QueryContext { - collections: Cow::Owned(BTreeMap::from([ - ( - "authors".into(), - v3::CollectionInfo { - name: "authors".to_owned(), - description: None, - collection_type: "Author".into(), - arguments: Default::default(), - uniqueness_constraints: make_primary_key_uniqueness_constraint("authors"), - foreign_keys: Default::default(), - }, - ), - ( - "articles".into(), - v3::CollectionInfo { - name: "articles".to_owned(), - description: None, - collection_type: "Article".into(), - arguments: Default::default(), - uniqueness_constraints: make_primary_key_uniqueness_constraint("articles"), - foreign_keys: Default::default(), - }, - ), - ])), - functions: Default::default(), - object_types: Cow::Owned(BTreeMap::from([ - ( - "Author".into(), - schema::ObjectType { - description: None, - fields: BTreeMap::from([ - ( - "id".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::Scalar(BsonScalarType::Int), - }, - ), - ( - "last_name".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::Scalar(BsonScalarType::String), - }, - ), - ]), - }, - ), - ( - "Article".into(), - schema::ObjectType { - description: None, - fields: BTreeMap::from([ - ( - "author_id".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::Scalar(BsonScalarType::Int), - }, - ), - ( - "title".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::Scalar(BsonScalarType::String), - }, - ), - ( - "year".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::Nullable(Box::new(schema::Type::Scalar( - BsonScalarType::Int, - ))), - }, - ), - ]), - }, - ), - ])), - scalar_types: Cow::Owned(make_scalar_types()), - } - } - - fn make_nested_schema() -> QueryContext<'static> { - QueryContext { - collections: Cow::Owned(BTreeMap::from([( - "authors".into(), - v3::CollectionInfo { - name: "authors".into(), - description: None, - collection_type: "Author".into(), - arguments: Default::default(), - uniqueness_constraints: make_primary_key_uniqueness_constraint("authors"), - foreign_keys: Default::default(), - }, - )])), - functions: Default::default(), - object_types: Cow::Owned(BTreeMap::from([ - ( - "Author".into(), - schema::ObjectType { - description: None, - fields: BTreeMap::from([ - ( - "address".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::Object("Address".into()), - }, - ), - ( - "articles".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::ArrayOf(Box::new(schema::Type::Object( - "Article".into(), - ))), - }, - ), - ( - "array_of_arrays".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::ArrayOf(Box::new(schema::Type::ArrayOf( - Box::new(schema::Type::Object("Article".into())), - ))), - }, - ), - ]), - }, - ), - ( - "Address".into(), - schema::ObjectType { - description: None, - fields: BTreeMap::from([( - "country".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::Scalar(BsonScalarType::String), - }, - )]), - }, - ), - ( - "Article".into(), - schema::ObjectType { - description: None, - fields: BTreeMap::from([( - "title".into(), - schema::ObjectField { - description: None, - r#type: schema::Type::Scalar(BsonScalarType::String), - }, - )]), - }, - ), - ])), - scalar_types: Cow::Owned(make_scalar_types()), - } - } - - fn make_primary_key_uniqueness_constraint( - collection_name: &str, - ) -> BTreeMap { - [( - format!("{collection_name}_id"), - v3::UniquenessConstraint { - unique_columns: vec!["_id".to_owned()], - }, - )] - .into() - } } diff --git a/crates/mongodb-connector/src/api_type_conversions/query_response.rs b/crates/mongodb-connector/src/api_type_conversions/query_response.rs index f1cc2791..1985f8c9 100644 --- a/crates/mongodb-connector/src/api_type_conversions/query_response.rs +++ b/crates/mongodb-connector/src/api_type_conversions/query_response.rs @@ -3,50 +3,6 @@ use std::collections::BTreeMap; use dc_api_types::{self as v2}; use ndc_sdk::models::{self as v3}; -pub fn v2_to_v3_query_response(response: v2::QueryResponse) -> v3::QueryResponse { - let rows: Vec = match response { - v2::QueryResponse::ForEach { rows } => rows - .into_iter() - .map(|foreach| v2_to_v3_row_set(foreach.query)) - .collect(), - v2::QueryResponse::Single(row_set) => vec![v2_to_v3_row_set(row_set)], - }; - v3::QueryResponse(rows) -} - -fn v2_to_v3_row_set(row_set: v2::RowSet) -> v3::RowSet { - let (aggregates, rows) = match row_set { - v2::RowSet::Aggregate { aggregates, rows } => (Some(aggregates), rows), - v2::RowSet::Rows { rows } => (None, Some(rows)), - }; - - v3::RowSet { - aggregates: aggregates.map(hash_map_to_index_map), - rows: rows.map(|xs| { - xs.into_iter() - .map(|field_values| { - field_values - .into_iter() - .map(|(name, value)| (name, v2_to_v3_field_value(value))) - .collect() - }) - .collect() - }), - } -} - -fn v2_to_v3_field_value(field_value: v2::ResponseFieldValue) -> v3::RowFieldValue { - v3::RowFieldValue(serde_json::to_value(field_value).expect("serializing result field value")) -} - -fn hash_map_to_index_map(xs: InputMap) -> OutputMap -where - InputMap: IntoIterator, - OutputMap: FromIterator<(K, V)>, -{ - xs.into_iter().collect::() -} - pub fn v2_to_v3_explain_response(response: v2::ExplainResponse) -> v3::ExplainResponse { v3::ExplainResponse { details: BTreeMap::from_iter([ diff --git a/crates/mongodb-connector/src/main.rs b/crates/mongodb-connector/src/main.rs index 00071bc7..261a1185 100644 --- a/crates/mongodb-connector/src/main.rs +++ b/crates/mongodb-connector/src/main.rs @@ -4,8 +4,12 @@ mod error_mapping; mod mongo_connector; mod mutation; mod query_context; +mod query_response; mod schema; +#[cfg(test)] +mod test_helpers; + use std::error::Error; use mongo_connector::MongoConnector; diff --git a/crates/mongodb-connector/src/mongo_connector.rs b/crates/mongodb-connector/src/mongo_connector.rs index 8705c132..892c8741 100644 --- a/crates/mongodb-connector/src/mongo_connector.rs +++ b/crates/mongodb-connector/src/mongo_connector.rs @@ -21,11 +21,10 @@ use ndc_sdk::{ use tracing::instrument; use crate::{ - api_type_conversions::{ - v2_to_v3_explain_response, v2_to_v3_query_response, v3_to_v2_query_request, - }, + api_type_conversions::{v2_to_v3_explain_response, v3_to_v2_query_request}, error_mapping::{mongo_agent_error_to_explain_error, mongo_agent_error_to_query_error}, query_context::get_query_context, + query_response::serialize_query_response, }; use crate::{capabilities::mongo_capabilities_response, mutation::handle_mutation_request}; @@ -143,10 +142,17 @@ impl Connector for MongoConnector { request: QueryRequest, ) -> Result, QueryError> { tracing::debug!(query_request = %serde_json::to_string(&request).unwrap(), "received query request"); - let v2_request = v3_to_v2_query_request(&get_query_context(configuration), request)?; - let response = handle_query_request(configuration, state, v2_request) + let query_context = get_query_context(configuration); + let v2_request = v3_to_v2_query_request(&query_context, request.clone())?; + let response_documents = handle_query_request(configuration, state, v2_request) .await .map_err(mongo_agent_error_to_query_error)?; - Ok(v2_to_v3_query_response(response).into()) + let response = serialize_query_response(&query_context, &request, response_documents) + .map_err(|err| { + QueryError::UnprocessableContent(format!( + "error converting MongoDB response to JSON: {err}" + )) + })?; + Ok(response.into()) } } diff --git a/crates/mongodb-connector/src/query_response.rs b/crates/mongodb-connector/src/query_response.rs new file mode 100644 index 00000000..0643dc52 --- /dev/null +++ b/crates/mongodb-connector/src/query_response.rs @@ -0,0 +1,957 @@ +use std::{borrow::Cow, collections::BTreeMap}; + +use configuration::schema::{ObjectField, ObjectType, Type}; +use indexmap::IndexMap; +use itertools::Itertools; +use mongodb::bson::{self, Bson}; +use mongodb_agent_common::query::serialization::{bson_to_json, BsonToJsonError}; +use ndc_sdk::models::{ + self as ndc, Aggregate, Field, NestedField, NestedObject, Query, QueryRequest, QueryResponse, + RowFieldValue, RowSet, +}; +use serde::Deserialize; +use thiserror::Error; + +use crate::api_type_conversions::{ConversionError, QueryContext}; + +const GEN_OBJECT_TYPE_PREFIX: &str = "__query__"; + +#[derive(Debug, Error)] +pub enum QueryResponseError { + #[error("expected aggregates to be an object at path {}", path.join("."))] + AggregatesNotObject { path: Vec }, + + #[error("{0}")] + BsonDeserialization(#[from] bson::de::Error), + + #[error("{0}")] + BsonToJson(#[from] BsonToJsonError), + + #[error("{0}")] + Conversion(#[from] ConversionError), + + #[error("expected an array at path {}", path.join("."))] + ExpectedArray { path: Vec }, + + #[error("expected an object at path {}", path.join("."))] + ExpectedObject { path: Vec }, + + #[error("expected a single response document from MongoDB, but did not get one")] + ExpectedSingleDocument, +} + +type ObjectTypes = Vec<(String, ObjectType)>; +type Result = std::result::Result; + +// These structs describe possible shapes of data returned by MongoDB query plans + +#[derive(Debug, Deserialize)] +struct ResponsesForVariableSets { + row_sets: Vec>, +} + +#[derive(Debug, Deserialize)] +struct BsonRowSet { + #[serde(default)] + aggregates: Bson, + #[serde(default)] + rows: Vec, +} + +pub fn serialize_query_response( + query_context: &QueryContext<'_>, + query_request: &QueryRequest, + response_documents: Vec, +) -> Result { + tracing::debug!(response_documents = %serde_json::to_string(&response_documents).unwrap(), "response from MongoDB"); + + let collection_info = query_context.find_collection(&query_request.collection)?; + let collection_name = &collection_info.name; + + // If the query request specified variable sets then we should have gotten a single document + // from MongoDB with fields for multiple sets of results - one for each set of variables. + let row_sets = if query_request.variables.is_some() { + let responses: ResponsesForVariableSets = parse_single_document(response_documents)?; + responses + .row_sets + .into_iter() + .map(|docs| { + serialize_row_set( + query_context, + query_request, + &[collection_name], + collection_name, + &query_request.query, + docs, + ) + }) + .try_collect() + } else { + Ok(vec![serialize_row_set( + query_context, + query_request, + &[], + collection_name, + &query_request.query, + response_documents, + )?]) + }?; + let response = QueryResponse(row_sets); + tracing::debug!(query_response = %serde_json::to_string(&response).unwrap()); + Ok(response) +} + +fn serialize_row_set( + query_context: &QueryContext<'_>, + query_request: &QueryRequest, + path: &[&str], + collection_name: &str, + query: &Query, + docs: Vec, +) -> Result { + if !has_aggregates(query) { + // When there are no aggregates we expect a list of rows + let rows = query + .fields + .as_ref() + .map(|fields| { + serialize_rows( + query_context, + query_request, + path, + collection_name, + fields, + docs, + ) + }) + .transpose()?; + + Ok(RowSet { + aggregates: None, + rows, + }) + } else { + // When there are aggregates we expect a single document with `rows` and `aggregates` + // fields + let row_set: BsonRowSet = parse_single_document(docs)?; + + let aggregates = query + .aggregates + .as_ref() + .map(|aggregates| { + serialize_aggregates(query_context, path, aggregates, row_set.aggregates) + }) + .transpose()?; + + let rows = query + .fields + .as_ref() + .map(|fields| { + serialize_rows( + query_context, + query_request, + path, + collection_name, + fields, + row_set.rows, + ) + }) + .transpose()?; + + Ok(RowSet { aggregates, rows }) + } +} + +fn serialize_aggregates( + query_context: &QueryContext<'_>, + path: &[&str], + _query_aggregates: &IndexMap, + value: Bson, +) -> Result> { + let (aggregates_type, temp_object_types) = type_for_aggregates()?; + + let object_types = extend_configured_object_types(query_context, temp_object_types); + + let json = bson_to_json(&aggregates_type, &object_types, value)?; + + // The NDC type uses an IndexMap for aggregate values; we need to convert the map + // underlying the Value::Object value to an IndexMap + let aggregate_values = match json { + serde_json::Value::Object(obj) => obj.into_iter().collect(), + _ => Err(QueryResponseError::AggregatesNotObject { + path: path_to_owned(path), + })?, + }; + Ok(aggregate_values) +} + +fn serialize_rows( + query_context: &QueryContext<'_>, + query_request: &QueryRequest, + path: &[&str], + collection_name: &str, + query_fields: &IndexMap, + docs: Vec, +) -> Result>> { + let (row_type, temp_object_types) = type_for_row( + query_context, + query_request, + path, + collection_name, + query_fields, + )?; + + let object_types = extend_configured_object_types(query_context, temp_object_types); + + docs.into_iter() + .map(|doc| { + let json = bson_to_json(&row_type, &object_types, doc.into())?; + // The NDC types use an IndexMap for each row value; we need to convert the map + // underlying the Value::Object value to an IndexMap + let index_map = match json { + serde_json::Value::Object(obj) => obj + .into_iter() + .map(|(key, value)| (key, RowFieldValue(value))) + .collect(), + _ => unreachable!(), + }; + Ok(index_map) + }) + .try_collect() +} + +fn type_for_row_set( + query_context: &QueryContext<'_>, + query_request: &QueryRequest, + path: &[&str], + collection_name: &str, + query: &Query, +) -> Result<(Type, ObjectTypes)> { + let mut fields = BTreeMap::new(); + let mut object_types = vec![]; + + if has_aggregates(query) { + let (aggregates_type, nested_object_types) = type_for_aggregates()?; + fields.insert( + "aggregates".to_owned(), + ObjectField { + r#type: aggregates_type, + description: Default::default(), + }, + ); + object_types.extend(nested_object_types); + } + + if let Some(query_fields) = &query.fields { + let (row_type, nested_object_types) = type_for_row( + query_context, + query_request, + path, + collection_name, + query_fields, + )?; + fields.insert( + "rows".to_owned(), + ObjectField { + r#type: Type::ArrayOf(Box::new(row_type)), + description: Default::default(), + }, + ); + object_types.extend(nested_object_types); + } + + let (row_set_type_name, row_set_type) = named_type(path, "row_set"); + let object_type = ObjectType { + description: Default::default(), + fields, + }; + object_types.push((row_set_type_name, object_type)); + + Ok((row_set_type, object_types)) +} + +// TODO: infer response type for aggregates MDB-130 +fn type_for_aggregates() -> Result<(Type, ObjectTypes)> { + Ok((Type::ExtendedJSON, Default::default())) +} + +fn type_for_row( + query_context: &QueryContext<'_>, + query_request: &QueryRequest, + path: &[&str], + collection_name: &str, + query_fields: &IndexMap, +) -> Result<(Type, ObjectTypes)> { + let mut object_types = vec![]; + + let fields = query_fields + .iter() + .map(|(field_name, field_definition)| { + let (field_type, nested_object_types) = type_for_field( + query_context, + query_request, + &append_to_path(path, [field_name.as_ref()]), + collection_name, + field_definition, + )?; + object_types.extend(nested_object_types); + Ok(( + field_name.clone(), + ObjectField { + description: Default::default(), + r#type: field_type, + }, + )) + }) + .try_collect::<_, _, QueryResponseError>()?; + + let (row_type_name, row_type) = named_type(path, "row"); + let object_type = ObjectType { + description: Default::default(), + fields, + }; + object_types.push((row_type_name, object_type)); + + Ok((row_type, object_types)) +} + +fn type_for_field( + query_context: &QueryContext<'_>, + query_request: &QueryRequest, + path: &[&str], + collection_name: &str, + field_definition: &ndc::Field, +) -> Result<(Type, ObjectTypes)> { + match field_definition { + ndc::Field::Column { column, fields } => { + let field_type = find_field_type(query_context, path, collection_name, column)?; + + let (requested_type, temp_object_types) = prune_type_to_field_selection( + query_context, + query_request, + path, + field_type, + fields.as_ref(), + )?; + + Ok((requested_type, temp_object_types)) + } + + ndc::Field::Relationship { + query, + relationship, + .. + } => { + let (requested_type, temp_object_types) = + type_for_relation_field(query_context, query_request, path, query, relationship)?; + + Ok((requested_type, temp_object_types)) + } + } +} + +fn find_field_type<'a>( + query_context: &'a QueryContext<'a>, + path: &[&str], + collection_name: &str, + column: &str, +) -> Result<&'a Type> { + let object_type = query_context.find_collection_object_type(collection_name)?; + let field_type = object_type.value.fields.get(column).ok_or_else(|| { + ConversionError::UnknownObjectTypeField { + object_type: object_type.name.to_string(), + field_name: column.to_string(), + path: path_to_owned(path), + } + })?; + Ok(&field_type.r#type) +} + +/// Computes a new hierarchy of object types (if necessary) that select a subset of fields from +/// existing object types to match the fields requested by the query. Recurses into nested objects, +/// arrays, and nullable type references. +/// +/// Scalar types are returned without modification. +/// +/// Returns a reference to the pruned type, and a list of newly-computed object types with +/// generated names. +fn prune_type_to_field_selection( + query_context: &QueryContext<'_>, + query_request: &QueryRequest, + path: &[&str], + field_type: &Type, + fields: Option<&NestedField>, +) -> Result<(Type, Vec<(String, ObjectType)>)> { + match (field_type, fields) { + (t, None) => Ok((t.clone(), Default::default())), + (t @ Type::Scalar(_) | t @ Type::ExtendedJSON, _) => Ok((t.clone(), Default::default())), + + (Type::Nullable(t), _) => { + let (underlying_type, object_types) = + prune_type_to_field_selection(query_context, query_request, path, t, fields)?; + Ok((Type::Nullable(Box::new(underlying_type)), object_types)) + } + (Type::ArrayOf(t), Some(NestedField::Array(nested))) => { + let (element_type, object_types) = prune_type_to_field_selection( + query_context, + query_request, + path, + t, + Some(&nested.fields), + )?; + Ok((Type::ArrayOf(Box::new(element_type)), object_types)) + } + (Type::Object(t), Some(NestedField::Object(nested))) => { + object_type_for_field_subset(query_context, query_request, path, t, nested) + } + + (_, Some(NestedField::Array(_))) => Err(QueryResponseError::ExpectedArray { + path: path_to_owned(path), + }), + (_, Some(NestedField::Object(_))) => Err(QueryResponseError::ExpectedObject { + path: path_to_owned(path), + }), + } +} + +/// We have a configured object type for a collection, or for a nested object in a collection. But +/// the query may request a subset of fields from that object type. We need to compute a new object +/// type for that requested subset. +/// +/// Returns a reference to the newly-generated object type, and a list of all new object types with +/// generated names including the newly-generated object type, and types for any nested objects. +fn object_type_for_field_subset( + query_context: &QueryContext<'_>, + query_request: &QueryRequest, + path: &[&str], + object_type_name: &str, + requested_fields: &NestedObject, +) -> Result<(Type, Vec<(String, ObjectType)>)> { + let object_type = query_context.find_object_type(object_type_name)?.value; + let (fields, object_type_sets): (_, Vec>) = requested_fields + .fields + .iter() + .map(|(name, requested_field)| { + let (object_field, object_types) = requested_field_definition( + query_context, + query_request, + &append_to_path(path, [name.as_ref()]), + object_type_name, + object_type, + requested_field, + )?; + Ok(((name.clone(), object_field), object_types)) + }) + .process_results::<_, _, QueryResponseError, _>(|iter| iter.unzip())?; + + let pruned_object_type = ObjectType { + fields, + description: None, + }; + let (pruned_object_type_name, pruned_type) = named_type(path, "fields"); + + let mut object_types: Vec<(String, ObjectType)> = + object_type_sets.into_iter().flatten().collect(); + object_types.push((pruned_object_type_name, pruned_object_type)); + + Ok((pruned_type, object_types)) +} + +/// Given an object type for a value, and a requested field from that value, produce an updated +/// object field definition to match the request. This must take into account aliasing where the +/// name of the requested field maps to a different name on the underlying type. +fn requested_field_definition( + query_context: &QueryContext<'_>, + query_request: &QueryRequest, + path: &[&str], + object_type_name: &str, + object_type: &ObjectType, + requested_field: &Field, +) -> Result<(ObjectField, Vec<(String, ObjectType)>)> { + match requested_field { + Field::Column { column, fields } => { + let field_def = object_type.fields.get(column).ok_or_else(|| { + ConversionError::UnknownObjectTypeField { + object_type: object_type_name.to_owned(), + field_name: column.to_owned(), + path: path_to_owned(path), + } + })?; + let (field_type, object_types) = prune_type_to_field_selection( + query_context, + query_request, + path, + &field_def.r#type, + fields.as_ref(), + )?; + let pruned_field = ObjectField { + r#type: field_type, + description: None, + }; + Ok((pruned_field, object_types)) + } + Field::Relationship { + query, + relationship, + .. + } => { + let (relation_type, temp_object_types) = + type_for_relation_field(query_context, query_request, path, query, relationship)?; + let relation_field = ObjectField { + r#type: relation_type, + description: None, + }; + Ok((relation_field, temp_object_types)) + } + } +} + +fn type_for_relation_field( + query_context: &QueryContext<'_>, + query_request: &QueryRequest, + path: &[&str], + query: &Query, + relationship: &str, +) -> Result<(Type, Vec<(String, ObjectType)>)> { + let relationship_def = query_request + .collection_relationships + .get(relationship) + .ok_or_else(|| ConversionError::UnknownRelationship { + relationship_name: relationship.to_owned(), + path: path_to_owned(path), + })?; + type_for_row_set( + query_context, + query_request, + path, + &relationship_def.target_collection, + query, + ) +} + +fn extend_configured_object_types<'a>( + query_context: &QueryContext<'a>, + object_types: ObjectTypes, +) -> Cow<'a, BTreeMap> { + if object_types.is_empty() { + // We're cloning a Cow, not a BTreeMap here. In production that will be a [Cow::Borrowed] + // variant so effectively that means we're cloning a wide pointer + query_context.object_types.clone() + } else { + // This time we're cloning the BTreeMap + let mut extended_object_types = query_context.object_types.clone().into_owned(); + extended_object_types.extend(object_types); + Cow::Owned(extended_object_types) + } +} + +fn parse_single_document(documents: Vec) -> Result +where + T: for<'de> serde::Deserialize<'de>, +{ + let document = documents + .into_iter() + .next() + .ok_or(QueryResponseError::ExpectedSingleDocument)?; + let value = bson::from_document(document)?; + Ok(value) +} + +fn has_aggregates(query: &Query) -> bool { + match &query.aggregates { + Some(aggregates) => !aggregates.is_empty(), + None => false, + } +} + +fn append_to_path<'a>(path: &[&'a str], elems: impl IntoIterator) -> Vec<&'a str> { + path.iter().copied().chain(elems).collect() +} + +fn path_to_owned(path: &[&str]) -> Vec { + path.iter().map(|x| (*x).to_owned()).collect() +} + +fn named_type(path: &[&str], name_suffix: &str) -> (String, Type) { + let name = format!( + "{GEN_OBJECT_TYPE_PREFIX}{}_{name_suffix}", + path.iter().join("_") + ); + let t = Type::Object(name.clone()); + (name, t) +} + +#[cfg(test)] +mod tests { + use std::{borrow::Cow, collections::BTreeMap, str::FromStr}; + + use configuration::schema::{ObjectType, Type}; + use mongodb::bson::{self, Bson}; + use mongodb_support::BsonScalarType; + use ndc_sdk::models::{QueryResponse, RowFieldValue, RowSet}; + use ndc_test_helpers::{ + array, collection, field, object, query, query_request, relation_field, relationship, + }; + use pretty_assertions::assert_eq; + use serde_json::json; + + use crate::{ + api_type_conversions::QueryContext, + test_helpers::{make_nested_schema, make_scalar_types, object_type}, + }; + + use super::{serialize_query_response, type_for_row_set}; + + #[test] + fn serializes_response_with_nested_fields() -> anyhow::Result<()> { + let query_context = make_nested_schema(); + let request = query_request() + .collection("authors") + .query(query().fields([field!("address" => "address", object!([ + field!("street"), + field!("geocode" => "geocode", object!([ + field!("longitude"), + ])), + ]))])) + .into(); + + let response_documents = vec![bson::doc! { + "address": { + "street": "137 Maple Dr", + "geocode": { + "longitude": 122.4194, + }, + }, + }]; + + let response = serialize_query_response(&query_context, &request, response_documents)?; + assert_eq!( + response, + QueryResponse(vec![RowSet { + aggregates: Default::default(), + rows: Some(vec![[( + "address".into(), + RowFieldValue(json!({ + "street": "137 Maple Dr", + "geocode": { + "longitude": 122.4194, + }, + })) + )] + .into()]), + }]) + ); + Ok(()) + } + + #[test] + fn serializes_response_with_nested_object_inside_array() -> anyhow::Result<()> { + let query_context = make_nested_schema(); + let request = query_request() + .collection("authors") + .query(query().fields([field!("articles" => "articles", array!( + object!([ + field!("title"), + ]) + ))])) + .into(); + + let response_documents = vec![bson::doc! { + "articles": [ + { "title": "Modeling MongoDB with relational model" }, + { "title": "NoSQL databases: MongoDB vs cassandra" }, + ], + }]; + + let response = serialize_query_response(&query_context, &request, response_documents)?; + assert_eq!( + response, + QueryResponse(vec![RowSet { + aggregates: Default::default(), + rows: Some(vec![[( + "articles".into(), + RowFieldValue(json!([ + { "title": "Modeling MongoDB with relational model" }, + { "title": "NoSQL databases: MongoDB vs cassandra" }, + ])) + )] + .into()]), + }]) + ); + Ok(()) + } + + #[test] + fn serializes_response_with_aliased_fields() -> anyhow::Result<()> { + let query_context = make_nested_schema(); + let request = query_request() + .collection("authors") + .query(query().fields([ + field!("address1" => "address", object!([ + field!("line1" => "street"), + ])), + field!("address2" => "address", object!([ + field!("latlong" => "geocode", object!([ + field!("long" => "longitude"), + ])), + ])), + ])) + .into(); + + let response_documents = vec![bson::doc! { + "address1": { + "line1": "137 Maple Dr", + }, + "address2": { + "latlong": { + "long": 122.4194, + }, + }, + }]; + + let response = serialize_query_response(&query_context, &request, response_documents)?; + assert_eq!( + response, + QueryResponse(vec![RowSet { + aggregates: Default::default(), + rows: Some(vec![[ + ( + "address1".into(), + RowFieldValue(json!({ + "line1": "137 Maple Dr", + })) + ), + ( + "address2".into(), + RowFieldValue(json!({ + "latlong": { + "long": 122.4194, + }, + })) + ) + ] + .into()]), + }]) + ); + Ok(()) + } + + #[test] + fn serializes_response_with_decimal_128_fields() -> anyhow::Result<()> { + let query_context = QueryContext { + collections: Cow::Owned([collection("business")].into()), + functions: Default::default(), + object_types: Cow::Owned( + [( + "business".to_owned(), + object_type([ + ("price", Type::Scalar(BsonScalarType::Decimal)), + ("price_extjson", Type::ExtendedJSON), + ]), + )] + .into(), + ), + scalar_types: Cow::Owned(make_scalar_types()), + }; + + let request = query_request() + .collection("business") + .query(query().fields([field!("price"), field!("price_extjson")])) + .into(); + + let response_documents = vec![bson::doc! { + "price": Bson::Decimal128(bson::Decimal128::from_str("127.6486654").unwrap()), + "price_extjson": Bson::Decimal128(bson::Decimal128::from_str("-4.9999999999").unwrap()), + }]; + + let response = serialize_query_response(&query_context, &request, response_documents)?; + assert_eq!( + response, + QueryResponse(vec![RowSet { + aggregates: Default::default(), + rows: Some(vec![[ + ("price".into(), RowFieldValue(json!("127.6486654"))), + ( + "price_extjson".into(), + RowFieldValue(json!({ + "$numberDecimal": "-4.9999999999" + })) + ), + ] + .into()]), + }]) + ); + Ok(()) + } + + #[test] + fn serializes_response_with_nested_extjson() -> anyhow::Result<()> { + let query_context = QueryContext { + collections: Cow::Owned([collection("data")].into()), + functions: Default::default(), + object_types: Cow::Owned( + [( + "data".to_owned(), + object_type([("value", Type::ExtendedJSON)]), + )] + .into(), + ), + scalar_types: Cow::Owned(make_scalar_types()), + }; + + let request = query_request() + .collection("data") + .query(query().fields([field!("value")])) + .into(); + + let response_documents = vec![bson::doc! { + "value": { + "array": [ + { "number": Bson::Int32(3) }, + { "number": Bson::Decimal128(bson::Decimal128::from_str("127.6486654").unwrap()) }, + ], + "string": "hello", + "object": { + "foo": 1, + "bar": 2, + }, + }, + }]; + + let response = serialize_query_response(&query_context, &request, response_documents)?; + assert_eq!( + response, + QueryResponse(vec![RowSet { + aggregates: Default::default(), + rows: Some(vec![[( + "value".into(), + RowFieldValue(json!({ + "array": [ + { "number": { "$numberInt": "3" } }, + { "number": { "$numberDecimal": "127.6486654" } }, + ], + "string": "hello", + "object": { + "foo": { "$numberInt": "1" }, + "bar": { "$numberInt": "2" }, + }, + })) + )] + .into()]), + }]) + ); + Ok(()) + } + + #[test] + fn uses_field_path_to_guarantee_distinct_type_names() -> anyhow::Result<()> { + let query_context = make_nested_schema(); + let collection_name = "appearances"; + let request = query_request() + .collection(collection_name) + .relationships([("author", relationship("authors", [("authorId", "id")]))]) + .query( + query().fields([relation_field!("author" => "presenter", query().fields([ + field!("addr" => "address", object!([ + field!("street"), + field!("geocode" => "geocode", object!([ + field!("latitude"), + field!("long" => "longitude"), + ])) + ])), + field!("articles" => "articles", array!(object!([ + field!("article_title" => "title") + ]))), + ]))]), + ) + .into(); + let path = [collection_name]; + + let (row_set_type, object_types) = type_for_row_set( + &query_context, + &request, + &path, + collection_name, + &request.query, + )?; + + // Convert object types into a map so we can compare without worrying about order + let object_types: BTreeMap = object_types.into_iter().collect(); + + assert_eq!( + (row_set_type, object_types), + ( + Type::Object("__query__appearances_row_set".to_owned()), + [ + ( + "__query__appearances_row_set".to_owned(), + object_type([( + "rows".to_owned(), + Type::ArrayOf(Box::new(Type::Object( + "__query__appearances_row".to_owned() + ))) + )]), + ), + ( + "__query__appearances_row".to_owned(), + object_type([( + "presenter".to_owned(), + Type::Object("__query__appearances_presenter_row_set".to_owned()) + )]), + ), + ( + "__query__appearances_presenter_row_set".to_owned(), + object_type([( + "rows", + Type::ArrayOf(Box::new(Type::Object( + "__query__appearances_presenter_row".to_owned() + ))) + )]), + ), + ( + "__query__appearances_presenter_row".to_owned(), + object_type([ + ( + "addr", + Type::Object( + "__query__appearances_presenter_addr_fields".to_owned() + ) + ), + ( + "articles", + Type::ArrayOf(Box::new(Type::Object( + "__query__appearances_presenter_articles_fields".to_owned() + ))) + ), + ]), + ), + ( + "__query__appearances_presenter_addr_fields".to_owned(), + object_type([ + ( + "geocode", + Type::Nullable(Box::new(Type::Object( + "__query__appearances_presenter_addr_geocode_fields".to_owned() + ))) + ), + ("street", Type::Scalar(BsonScalarType::String)), + ]), + ), + ( + "__query__appearances_presenter_addr_geocode_fields".to_owned(), + object_type([ + ("latitude", Type::Scalar(BsonScalarType::Double)), + ("long", Type::Scalar(BsonScalarType::Double)), + ]), + ), + ( + "__query__appearances_presenter_articles_fields".to_owned(), + object_type([("article_title", Type::Scalar(BsonScalarType::String))]), + ), + ] + .into() + ) + ); + Ok(()) + } +} diff --git a/crates/mongodb-connector/src/test_helpers.rs b/crates/mongodb-connector/src/test_helpers.rs new file mode 100644 index 00000000..4c9a9918 --- /dev/null +++ b/crates/mongodb-connector/src/test_helpers.rs @@ -0,0 +1,293 @@ +use std::{borrow::Cow, collections::BTreeMap}; + +use configuration::schema; +use mongodb_support::BsonScalarType; +use ndc_sdk::models::{ + AggregateFunctionDefinition, CollectionInfo, ComparisonOperatorDefinition, ScalarType, Type, + TypeRepresentation, +}; +use ndc_test_helpers::{collection, make_primary_key_uniqueness_constraint}; + +use crate::api_type_conversions::QueryContext; + +pub fn object_type( + fields: impl IntoIterator)>, +) -> schema::ObjectType { + schema::ObjectType { + description: Default::default(), + fields: fields + .into_iter() + .map(|(name, field_type)| { + ( + name.to_string(), + schema::ObjectField { + description: Default::default(), + r#type: field_type.into(), + }, + ) + }) + .collect(), + } +} + +pub fn make_scalar_types() -> BTreeMap { + BTreeMap::from([ + ( + "String".to_owned(), + ScalarType { + representation: Some(TypeRepresentation::String), + aggregate_functions: Default::default(), + comparison_operators: BTreeMap::from([ + ("_eq".to_owned(), ComparisonOperatorDefinition::Equal), + ( + "_regex".to_owned(), + ComparisonOperatorDefinition::Custom { + argument_type: Type::Named { + name: "String".to_owned(), + }, + }, + ), + ]), + }, + ), + ( + "Int".to_owned(), + ScalarType { + representation: Some(TypeRepresentation::Int32), + aggregate_functions: BTreeMap::from([( + "avg".into(), + AggregateFunctionDefinition { + result_type: Type::Named { + name: "Float".into(), // Different result type to the input scalar type + }, + }, + )]), + comparison_operators: BTreeMap::from([( + "_eq".to_owned(), + ComparisonOperatorDefinition::Equal, + )]), + }, + ), + ]) +} + +pub fn make_flat_schema() -> QueryContext<'static> { + QueryContext { + collections: Cow::Owned(BTreeMap::from([ + ( + "authors".into(), + CollectionInfo { + name: "authors".to_owned(), + description: None, + collection_type: "Author".into(), + arguments: Default::default(), + uniqueness_constraints: make_primary_key_uniqueness_constraint("authors"), + foreign_keys: Default::default(), + }, + ), + ( + "articles".into(), + CollectionInfo { + name: "articles".to_owned(), + description: None, + collection_type: "Article".into(), + arguments: Default::default(), + uniqueness_constraints: make_primary_key_uniqueness_constraint("articles"), + foreign_keys: Default::default(), + }, + ), + ])), + functions: Default::default(), + object_types: Cow::Owned(BTreeMap::from([ + ( + "Author".into(), + schema::ObjectType { + description: None, + fields: BTreeMap::from([ + ( + "id".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Scalar(BsonScalarType::Int), + }, + ), + ( + "last_name".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Scalar(BsonScalarType::String), + }, + ), + ]), + }, + ), + ( + "Article".into(), + schema::ObjectType { + description: None, + fields: BTreeMap::from([ + ( + "author_id".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Scalar(BsonScalarType::Int), + }, + ), + ( + "title".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Scalar(BsonScalarType::String), + }, + ), + ( + "year".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Nullable(Box::new(schema::Type::Scalar( + BsonScalarType::Int, + ))), + }, + ), + ]), + }, + ), + ])), + scalar_types: Cow::Owned(make_scalar_types()), + } +} + +pub fn make_nested_schema() -> QueryContext<'static> { + QueryContext { + collections: Cow::Owned(BTreeMap::from([ + ( + "authors".into(), + CollectionInfo { + name: "authors".into(), + description: None, + collection_type: "Author".into(), + arguments: Default::default(), + uniqueness_constraints: make_primary_key_uniqueness_constraint("authors"), + foreign_keys: Default::default(), + }, + ), + collection("appearances"), // new helper gives more concise syntax + ])), + functions: Default::default(), + object_types: Cow::Owned(BTreeMap::from([ + ( + "Author".into(), + schema::ObjectType { + description: None, + fields: BTreeMap::from([ + ( + "address".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Object("Address".into()), + }, + ), + ( + "articles".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::ArrayOf(Box::new(schema::Type::Object( + "Article".into(), + ))), + }, + ), + ( + "array_of_arrays".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::ArrayOf(Box::new(schema::Type::ArrayOf( + Box::new(schema::Type::Object("Article".into())), + ))), + }, + ), + ]), + }, + ), + ( + "Address".into(), + schema::ObjectType { + description: None, + fields: BTreeMap::from([ + ( + "country".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Scalar(BsonScalarType::String), + }, + ), + ( + "street".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Scalar(BsonScalarType::String), + }, + ), + ( + "apartment".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Nullable(Box::new(schema::Type::Scalar( + BsonScalarType::String, + ))), + }, + ), + ( + "geocode".into(), + schema::ObjectField { + description: Some("Lat/Long".to_owned()), + r#type: schema::Type::Nullable(Box::new(schema::Type::Object( + "Geocode".to_owned(), + ))), + }, + ), + ]), + }, + ), + ( + "Article".into(), + schema::ObjectType { + description: None, + fields: BTreeMap::from([( + "title".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Scalar(BsonScalarType::String), + }, + )]), + }, + ), + ( + "Geocode".into(), + schema::ObjectType { + description: None, + fields: BTreeMap::from([ + ( + "latitude".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Scalar(BsonScalarType::Double), + }, + ), + ( + "longitude".into(), + schema::ObjectField { + description: None, + r#type: schema::Type::Scalar(BsonScalarType::Double), + }, + ), + ]), + }, + ), + ( + "appearances".to_owned(), + object_type([("authorId", schema::Type::Scalar(BsonScalarType::ObjectId))]), + ), + ])), + scalar_types: Cow::Owned(make_scalar_types()), + } +} diff --git a/crates/ndc-test-helpers/Cargo.toml b/crates/ndc-test-helpers/Cargo.toml index d42fcb22..b0d18672 100644 --- a/crates/ndc-test-helpers/Cargo.toml +++ b/crates/ndc-test-helpers/Cargo.toml @@ -5,6 +5,6 @@ edition = "2021" [dependencies] indexmap = "2" -itertools = "^0.10" +itertools = { workspace = true } ndc-models = { workspace = true } serde_json = "1" diff --git a/crates/ndc-test-helpers/src/collection_info.rs b/crates/ndc-test-helpers/src/collection_info.rs new file mode 100644 index 00000000..4b41d802 --- /dev/null +++ b/crates/ndc-test-helpers/src/collection_info.rs @@ -0,0 +1,27 @@ +use std::{collections::BTreeMap, fmt::Display}; + +use ndc_models::{CollectionInfo, ObjectField, ObjectType, Type, UniquenessConstraint}; + +pub fn collection(name: impl Display + Clone) -> (String, CollectionInfo) { + let coll = CollectionInfo { + name: name.to_string(), + description: None, + arguments: Default::default(), + collection_type: name.to_string(), + uniqueness_constraints: make_primary_key_uniqueness_constraint(name.clone()), + foreign_keys: Default::default(), + }; + (name.to_string(), coll) +} + +pub fn make_primary_key_uniqueness_constraint( + collection_name: impl Display, +) -> BTreeMap { + [( + format!("{collection_name}_id"), + UniquenessConstraint { + unique_columns: vec!["_id".to_owned()], + }, + )] + .into() +} diff --git a/crates/ndc-test-helpers/src/lib.rs b/crates/ndc-test-helpers/src/lib.rs index 3d916a09..c1fe9731 100644 --- a/crates/ndc-test-helpers/src/lib.rs +++ b/crates/ndc-test-helpers/src/lib.rs @@ -2,6 +2,7 @@ #![allow(unused_imports)] mod aggregates; +mod collection_info; mod comparison_target; mod comparison_value; mod exists_in_collection; @@ -16,6 +17,7 @@ use ndc_models::{ QueryRequest, Relationship, RelationshipArgument, RelationshipType, }; +pub use collection_info::*; pub use comparison_target::*; pub use comparison_value::*; pub use exists_in_collection::*;