diff --git a/CHANGELOG.md b/CHANGELOG.md index 6fb3354d..91b3edb0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,9 +4,15 @@ This changelog documents the changes between release versions. ## [Unreleased v2] +### Added + +- You can now group documents for aggregation according to multiple grouping criteria ([#144](https://github.com/hasura/ndc-mongodb/pull/144)) + ### Changed - **BREAKING:** Update to ndc-spec v0.2 ([#139](https://github.com/hasura/ndc-mongodb/pull/139)) +- **BREAKING:** Remove custom count aggregation - use standard count instead ([#144](https://github.com/hasura/ndc-mongodb/pull/144)) +- Results for `avg` and `sum` aggregations are coerced to consistent result types ([#144](https://github.com/hasura/ndc-mongodb/pull/144)) #### ndc-spec v0.2 @@ -26,7 +32,23 @@ changelog](https://hasura.github.io/ndc-spec/specification/changelog.html#020). Use of the new spec requires a version of GraphQL Engine that supports ndc-spec v0.2, and there are required metadata changes. +#### Removed custom count aggregation + +Previously there were two options for getting document counts named `count` and +`_count`. These did the same thing. `count` has been removed - use `_count` +instead. + +#### Results for `avg` and `sum` aggregations are coerced to consistent result types + +This change is required for compliance with ndc-spec. + +Results for `avg` are always coerced to `double`. + +Results for `sum` are coerced to `double` if the summed inputs use a fractional +numeric type, or to `long` if inputs use an integral numeric type. + ## [Unreleased v1] + ### Added - Add uuid scalar type ([#148](https://github.com/hasura/ndc-mongodb/pull/148)) diff --git a/crates/configuration/src/mongo_scalar_type.rs b/crates/configuration/src/mongo_scalar_type.rs index 1876c260..38c3532f 100644 --- a/crates/configuration/src/mongo_scalar_type.rs +++ b/crates/configuration/src/mongo_scalar_type.rs @@ -1,7 +1,9 @@ +use std::fmt::Display; + use mongodb_support::{BsonScalarType, EXTENDED_JSON_TYPE_NAME}; use ndc_query_plan::QueryPlanError; -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, Hash, PartialEq, Eq)] pub enum MongoScalarType { /// One of the predefined BSON scalar types Bson(BsonScalarType), @@ -40,3 +42,14 @@ impl TryFrom<&ndc_models::ScalarTypeName> for MongoScalarType { } } } + +impl Display for MongoScalarType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + MongoScalarType::ExtendedJSON => write!(f, "extendedJSON"), + MongoScalarType::Bson(bson_scalar_type) => { + write!(f, "{}", bson_scalar_type.bson_name()) + } + } + } +} diff --git a/crates/integration-tests/src/tests/aggregation.rs b/crates/integration-tests/src/tests/aggregation.rs index dedfad6a..86d6a180 100644 --- a/crates/integration-tests/src/tests/aggregation.rs +++ b/crates/integration-tests/src/tests/aggregation.rs @@ -131,7 +131,7 @@ async fn returns_zero_when_counting_empty_result_set() -> anyhow::Result<()> { moviesAggregate(filter_input: {where: {title: {_eq: "no such movie"}}}) { _count title { - count + _count } } } @@ -152,7 +152,6 @@ async fn returns_zero_when_counting_nested_fields_in_empty_result_set() -> anyho moviesAggregate(filter_input: {where: {title: {_eq: "no such movie"}}}) { awards { nominations { - count _count } } diff --git a/crates/integration-tests/src/tests/grouping.rs b/crates/integration-tests/src/tests/grouping.rs new file mode 100644 index 00000000..b15b7cde --- /dev/null +++ b/crates/integration-tests/src/tests/grouping.rs @@ -0,0 +1,134 @@ +use insta::assert_yaml_snapshot; +use ndc_test_helpers::{ + asc, binop, column_aggregate, dimension_column, field, grouping, or, ordered_dimensions, query, + query_request, target, value, +}; + +use crate::{connector::Connector, run_connector_query}; + +#[tokio::test] +async fn runs_single_column_aggregate_on_groups() -> anyhow::Result<()> { + assert_yaml_snapshot!( + run_connector_query( + Connector::SampleMflix, + query_request().collection("movies").query( + query() + // The predicate avoids an error when encountering documents where `year` is + // a string instead of a number. + .predicate(or([ + binop("_gt", target!("year"), value!(0)), + binop("_lte", target!("year"), value!(0)), + ])) + .order_by([asc!("_id")]) + .limit(10) + .groups( + grouping() + .dimensions([dimension_column("year")]) + .aggregates([ + ( + "average_viewer_rating", + column_aggregate("tomatoes.viewer.rating", "avg"), + ), + ("max_runtime", column_aggregate("runtime", "max")), + ]) + .order_by(ordered_dimensions()), + ), + ), + ) + .await? + ); + Ok(()) +} + +#[tokio::test] +async fn groups_by_multiple_dimensions() -> anyhow::Result<()> { + assert_yaml_snapshot!( + run_connector_query( + Connector::SampleMflix, + query_request().collection("movies").query( + query() + .predicate(binop("_lt", target!("year"), value!(1950))) + .order_by([asc!("_id")]) + .limit(10) + .groups( + grouping() + .dimensions([ + dimension_column("year"), + dimension_column("languages"), + dimension_column("rated"), + ]) + .aggregates([( + "average_viewer_rating", + column_aggregate("tomatoes.viewer.rating", "avg"), + )]) + .order_by(ordered_dimensions()), + ), + ), + ) + .await? + ); + Ok(()) +} + +#[tokio::test] +async fn combines_aggregates_and_groups_in_one_query() -> anyhow::Result<()> { + assert_yaml_snapshot!( + run_connector_query( + Connector::SampleMflix, + query_request().collection("movies").query( + query() + .predicate(binop("_gte", target!("year"), value!(2000))) + .order_by([asc!("_id")]) + .limit(10) + .aggregates([( + "average_viewer_rating", + column_aggregate("tomatoes.viewer.rating", "avg") + )]) + .groups( + grouping() + .dimensions([dimension_column("year"),]) + .aggregates([( + "average_viewer_rating_by_year", + column_aggregate("tomatoes.viewer.rating", "avg"), + )]) + .order_by(ordered_dimensions()), + ), + ), + ) + .await? + ); + Ok(()) +} + +#[tokio::test] +async fn combines_fields_and_groups_in_one_query() -> anyhow::Result<()> { + assert_yaml_snapshot!( + run_connector_query( + Connector::SampleMflix, + query_request().collection("movies").query( + query() + // The predicate avoids an error when encountering documents where `year` is + // a string instead of a number. + .predicate(or([ + binop("_gt", target!("year"), value!(0)), + binop("_lte", target!("year"), value!(0)), + ])) + .order_by([asc!("_id")]) + .limit(3) + .fields([field!("title"), field!("year")]) + .order_by([asc!("_id")]) + .groups( + grouping() + .dimensions([dimension_column("year")]) + .aggregates([( + "average_viewer_rating_by_year", + column_aggregate("tomatoes.viewer.rating", "avg"), + )]) + .order_by(ordered_dimensions()), + ) + ), + ) + .await? + ); + Ok(()) +} diff --git a/crates/integration-tests/src/tests/local_relationship.rs b/crates/integration-tests/src/tests/local_relationship.rs index 5906d8eb..4bfc31aa 100644 --- a/crates/integration-tests/src/tests/local_relationship.rs +++ b/crates/integration-tests/src/tests/local_relationship.rs @@ -1,9 +1,10 @@ use crate::{connector::Connector, graphql_query, run_connector_query}; use insta::assert_yaml_snapshot; use ndc_test_helpers::{ - asc, binop, exists, field, query, query_request, related, relation_field, - relationship, target, value, + asc, binop, column, column_aggregate, dimension_column, exists, field, grouping, is_in, + ordered_dimensions, query, query_request, related, relation_field, relationship, target, value, }; +use serde_json::json; #[tokio::test] async fn joins_local_relationships() -> anyhow::Result<()> { @@ -243,3 +244,117 @@ async fn joins_relationships_on_nested_key() -> anyhow::Result<()> { ); Ok(()) } + +#[tokio::test] +async fn groups_by_related_field() -> anyhow::Result<()> { + assert_yaml_snapshot!( + run_connector_query( + Connector::Chinook, + query_request() + .collection("Track") + .query( + query() + // avoid albums that are modified in mutation tests + .predicate(is_in( + target!("AlbumId"), + [json!(15), json!(91), json!(227)] + )) + .groups( + grouping() + .dimensions([dimension_column( + column("Name").from_relationship("track_genre") + )]) + .aggregates([( + "average_price", + column_aggregate("UnitPrice", "avg") + )]) + .order_by(ordered_dimensions()) + ) + ) + .relationships([( + "track_genre", + relationship("Genre", [("GenreId", &["GenreId"])]).object_type() + )]) + ) + .await? + ); + Ok(()) +} + +#[tokio::test] +async fn gets_groups_through_relationship() -> anyhow::Result<()> { + assert_yaml_snapshot!( + run_connector_query( + Connector::Chinook, + query_request() + .collection("Album") + .query( + query() + // avoid albums that are modified in mutation tests + .predicate(is_in(target!("AlbumId"), [json!(15), json!(91), json!(227)])) + .order_by([asc!("_id")]) + .fields([field!("AlbumId"), relation_field!("tracks" => "album_tracks", query() + .groups(grouping() + .dimensions([dimension_column(column("Name").from_relationship("track_genre"))]) + .aggregates([ + ("AlbumId", column_aggregate("AlbumId", "avg")), + ("average_price", column_aggregate("UnitPrice", "avg")), + ]) + .order_by(ordered_dimensions()), + ) + )]) + ) + .relationships([ + ( + "album_tracks", + relationship("Track", [("AlbumId", &["AlbumId"])]) + ), + ( + "track_genre", + relationship("Genre", [("GenreId", &["GenreId"])]).object_type() + ) + ]) + ) + .await? + ); + Ok(()) +} + +#[tokio::test] +async fn gets_fields_and_groups_through_relationship() -> anyhow::Result<()> { + assert_yaml_snapshot!( + run_connector_query( + Connector::Chinook, + query_request() + .collection("Album") + .query( + query() + .predicate(is_in(target!("AlbumId"), [json!(15), json!(91), json!(227)])) + .order_by([asc!("_id")]) + .fields([field!("AlbumId"), relation_field!("tracks" => "album_tracks", query() + .order_by([asc!("_id")]) + .fields([field!("AlbumId"), field!("Name"), field!("UnitPrice")]) + .groups(grouping() + .dimensions([dimension_column(column("Name").from_relationship("track_genre"))]) + .aggregates([( + "average_price", column_aggregate("UnitPrice", "avg") + )]) + .order_by(ordered_dimensions()), + ) + )]) + ) + .relationships([ + ( + "album_tracks", + relationship("Track", [("AlbumId", &["AlbumId"])]) + ), + ( + "track_genre", + relationship("Genre", [("GenreId", &["GenreId"])]).object_type() + ) + ]) + ) + .await? + ); + Ok(()) +} diff --git a/crates/integration-tests/src/tests/mod.rs b/crates/integration-tests/src/tests/mod.rs index de65332f..6533de72 100644 --- a/crates/integration-tests/src/tests/mod.rs +++ b/crates/integration-tests/src/tests/mod.rs @@ -11,6 +11,7 @@ mod aggregation; mod basic; mod expressions; mod filtering; +mod grouping; mod local_relationship; mod native_mutation; mod native_query; diff --git a/crates/integration-tests/src/tests/remote_relationship.rs b/crates/integration-tests/src/tests/remote_relationship.rs index c607b30b..a1570732 100644 --- a/crates/integration-tests/src/tests/remote_relationship.rs +++ b/crates/integration-tests/src/tests/remote_relationship.rs @@ -1,6 +1,9 @@ use crate::{connector::Connector, graphql_query, run_connector_query}; use insta::assert_yaml_snapshot; -use ndc_test_helpers::{and, asc, binop, field, query, query_request, target, variable}; +use ndc_test_helpers::{ + and, asc, binop, column_aggregate, dimension_column, field, grouping, ordered_dimensions, + query, query_request, target, variable, +}; use serde_json::json; #[tokio::test] @@ -74,3 +77,60 @@ async fn variable_used_in_multiple_type_contexts() -> anyhow::Result<()> { ); Ok(()) } + +#[tokio::test] +async fn provides_groups_for_variable_set() -> anyhow::Result<()> { + assert_yaml_snapshot!( + run_connector_query( + Connector::SampleMflix, + query_request() + .collection("movies") + .variables([[("year", json!(2014))]]) + .query( + query() + .predicate(binop("_eq", target!("year"), variable!(year))) + .groups( + grouping() + .dimensions([dimension_column("rated")]) + .aggregates([( + "average_viewer_rating", + column_aggregate("tomatoes.viewer.rating", "avg"), + ),]) + .order_by(ordered_dimensions()), + ), + ), + ) + .await? + ); + Ok(()) +} + +#[tokio::test] +async fn provides_fields_combined_with_groups_for_variable_set() -> anyhow::Result<()> { + assert_yaml_snapshot!( + run_connector_query( + Connector::SampleMflix, + query_request() + .collection("movies") + .variables([[("year", json!(2014))]]) + .query( + query() + .predicate(binop("_eq", target!("year"), variable!(year))) + .fields([field!("title"), field!("rated")]) + .order_by([asc!("_id")]) + .groups( + grouping() + .dimensions([dimension_column("rated")]) + .aggregates([( + "average_viewer_rating", + column_aggregate("tomatoes.viewer.rating", "avg"), + ),]) + .order_by(ordered_dimensions()), + ) + .limit(3), + ), + ) + .await? + ); + Ok(()) +} diff --git a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__aggregation__aggregates_extended_json_representing_mixture_of_numeric_types.snap b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__aggregation__aggregates_extended_json_representing_mixture_of_numeric_types.snap index c4a039c5..bcaa082a 100644 --- a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__aggregation__aggregates_extended_json_representing_mixture_of_numeric_types.snap +++ b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__aggregation__aggregates_extended_json_representing_mixture_of_numeric_types.snap @@ -6,14 +6,14 @@ data: extendedJsonTestDataAggregate: value: avg: - $numberDecimal: "4.5" + $numberDouble: "4.5" _count: 8 max: $numberLong: "8" min: $numberDecimal: "1" sum: - $numberDecimal: "36" + $numberDouble: "36.0" _count_distinct: 8 extendedJsonTestData: - type: decimal diff --git a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__aggregation__returns_zero_when_counting_empty_result_set.snap b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__aggregation__returns_zero_when_counting_empty_result_set.snap index 61d3c939..f436ce34 100644 --- a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__aggregation__returns_zero_when_counting_empty_result_set.snap +++ b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__aggregation__returns_zero_when_counting_empty_result_set.snap @@ -1,10 +1,10 @@ --- source: crates/integration-tests/src/tests/aggregation.rs -expression: "graphql_query(r#\"\n query {\n moviesAggregate(filter_input: {where: {title: {_eq: \"no such movie\"}}}) {\n _count\n title {\n count\n }\n }\n }\n \"#).run().await?" +expression: "graphql_query(r#\"\n query {\n moviesAggregate(filter_input: {where: {title: {_eq: \"no such movie\"}}}) {\n _count\n title {\n _count\n }\n }\n }\n \"#).run().await?" --- data: moviesAggregate: _count: 0 title: - count: 0 + _count: 0 errors: ~ diff --git a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__aggregation__returns_zero_when_counting_nested_fields_in_empty_result_set.snap b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__aggregation__returns_zero_when_counting_nested_fields_in_empty_result_set.snap index c621c020..f7d33a3c 100644 --- a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__aggregation__returns_zero_when_counting_nested_fields_in_empty_result_set.snap +++ b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__aggregation__returns_zero_when_counting_nested_fields_in_empty_result_set.snap @@ -1,11 +1,10 @@ --- source: crates/integration-tests/src/tests/aggregation.rs -expression: "graphql_query(r#\"\n query {\n moviesAggregate(filter_input: {where: {title: {_eq: \"no such movie\"}}}) {\n awards {\n nominations {\n count\n _count\n }\n }\n }\n }\n \"#).run().await?" +expression: "graphql_query(r#\"\n query {\n moviesAggregate(filter_input: {where: {title: {_eq: \"no such movie\"}}}) {\n awards {\n nominations {\n _count\n }\n }\n }\n }\n \"#).run().await?" --- data: moviesAggregate: awards: nominations: - count: 0 _count: 0 errors: ~ diff --git a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__grouping__combines_aggregates_and_groups_in_one_query.snap b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__grouping__combines_aggregates_and_groups_in_one_query.snap new file mode 100644 index 00000000..efff0c4f --- /dev/null +++ b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__grouping__combines_aggregates_and_groups_in_one_query.snap @@ -0,0 +1,27 @@ +--- +source: crates/integration-tests/src/tests/grouping.rs +expression: "run_connector_query(Connector::SampleMflix,\nquery_request().collection(\"movies\").query(query().predicate(binop(\"_gte\",\ntarget!(\"year\"),\nvalue!(2000))).limit(10).aggregates([(\"average_viewer_rating\",\ncolumn_aggregate(\"tomatoes.viewer.rating\",\n\"avg\"))]).groups(grouping().dimensions([dimension_column(\"year\"),]).aggregates([(\"average_viewer_rating_by_year\",\ncolumn_aggregate(\"tomatoes.viewer.rating\",\n\"avg\"),)]).order_by(ordered_dimensions()),),),).await?" +--- +- aggregates: + average_viewer_rating: 3.05 + groups: + - dimensions: + - 2000 + aggregates: + average_viewer_rating_by_year: 3.825 + - dimensions: + - 2001 + aggregates: + average_viewer_rating_by_year: 2.55 + - dimensions: + - 2002 + aggregates: + average_viewer_rating_by_year: 1.8 + - dimensions: + - 2003 + aggregates: + average_viewer_rating_by_year: 3 + - dimensions: + - 2005 + aggregates: + average_viewer_rating_by_year: 3.5 diff --git a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__grouping__combines_fields_and_groups_in_one_query.snap b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__grouping__combines_fields_and_groups_in_one_query.snap new file mode 100644 index 00000000..236aadae --- /dev/null +++ b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__grouping__combines_fields_and_groups_in_one_query.snap @@ -0,0 +1,24 @@ +--- +source: crates/integration-tests/src/tests/grouping.rs +expression: "run_connector_query(Connector::SampleMflix,\nquery_request().collection(\"movies\").query(query().predicate(or([binop(\"_gt\",\ntarget!(\"year\"), value!(0)),\nbinop(\"_lte\", target!(\"year\"),\nvalue!(0)),])).fields([field!(\"title\"),\nfield!(\"year\")]).order_by([asc!(\"_id\")]).groups(grouping().dimensions([dimension_column(\"year\")]).aggregates([(\"average_viewer_rating_by_year\",\ncolumn_aggregate(\"tomatoes.viewer.rating\",\n\"avg\"),)]).order_by(ordered_dimensions()),).limit(3),),).await?" +--- +- rows: + - title: Blacksmith Scene + year: 1893 + - title: The Great Train Robbery + year: 1903 + - title: The Land Beyond the Sunset + year: 1912 + groups: + - dimensions: + - 1893 + aggregates: + average_viewer_rating_by_year: 3 + - dimensions: + - 1903 + aggregates: + average_viewer_rating_by_year: 3.7 + - dimensions: + - 1912 + aggregates: + average_viewer_rating_by_year: 3.7 diff --git a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__grouping__groups_by_multiple_dimensions.snap b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__grouping__groups_by_multiple_dimensions.snap new file mode 100644 index 00000000..f2f0d486 --- /dev/null +++ b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__grouping__groups_by_multiple_dimensions.snap @@ -0,0 +1,53 @@ +--- +source: crates/integration-tests/src/tests/grouping.rs +expression: "run_connector_query(Connector::SampleMflix,\nquery_request().collection(\"movies\").query(query().predicate(binop(\"_lt\",\ntarget!(\"year\"),\nvalue!(1950))).order_by([asc!(\"_id\")]).limit(10).groups(grouping().dimensions([dimension_column(\"year\"),\ndimension_column(\"languages\"),\ndimension_column(\"rated\"),]).aggregates([(\"average_viewer_rating\",\ncolumn_aggregate(\"tomatoes.viewer.rating\",\n\"avg\"),)]).order_by(ordered_dimensions()),),),).await?" +--- +- groups: + - dimensions: + - 1893 + - ~ + - UNRATED + aggregates: + average_viewer_rating: 3 + - dimensions: + - 1903 + - - English + - TV-G + aggregates: + average_viewer_rating: 3.7 + - dimensions: + - 1909 + - - English + - G + aggregates: + average_viewer_rating: 3.6 + - dimensions: + - 1911 + - - English + - ~ + aggregates: + average_viewer_rating: 3.4 + - dimensions: + - 1912 + - - English + - UNRATED + aggregates: + average_viewer_rating: 3.7 + - dimensions: + - 1913 + - - English + - TV-PG + aggregates: + average_viewer_rating: 3 + - dimensions: + - 1914 + - - English + - ~ + aggregates: + average_viewer_rating: 3.0666666666666664 + - dimensions: + - 1915 + - ~ + - NOT RATED + aggregates: + average_viewer_rating: 3.2 diff --git a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__grouping__runs_single_column_aggregate_on_groups.snap b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__grouping__runs_single_column_aggregate_on_groups.snap new file mode 100644 index 00000000..4b3177a1 --- /dev/null +++ b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__grouping__runs_single_column_aggregate_on_groups.snap @@ -0,0 +1,45 @@ +--- +source: crates/integration-tests/src/tests/grouping.rs +expression: "run_connector_query(Connector::SampleMflix,\nquery_request().collection(\"movies\").query(query().predicate(or([binop(\"_gt\",\ntarget!(\"year\"), value!(0)),\nbinop(\"_lte\", target!(\"year\"),\nvalue!(0)),])).order_by([asc!(\"_id\")]).limit(10).groups(grouping().dimensions([dimension_column(\"year\")]).aggregates([(\"average_viewer_rating\",\ncolumn_aggregate(\"tomatoes.viewer.rating\", \"avg\"),),\n(\"max_runtime\",\ncolumn_aggregate(\"runtime\",\n\"max\")),]).order_by(ordered_dimensions()),),),).await?" +--- +- groups: + - dimensions: + - 1893 + aggregates: + average_viewer_rating: 3 + max_runtime: 1 + - dimensions: + - 1903 + aggregates: + average_viewer_rating: 3.7 + max_runtime: 11 + - dimensions: + - 1909 + aggregates: + average_viewer_rating: 3.6 + max_runtime: 14 + - dimensions: + - 1911 + aggregates: + average_viewer_rating: 3.4 + max_runtime: 7 + - dimensions: + - 1912 + aggregates: + average_viewer_rating: 3.7 + max_runtime: 14 + - dimensions: + - 1913 + aggregates: + average_viewer_rating: 3 + max_runtime: 88 + - dimensions: + - 1914 + aggregates: + average_viewer_rating: 3.0666666666666664 + max_runtime: 199 + - dimensions: + - 1915 + aggregates: + average_viewer_rating: 3.2 + max_runtime: 165 diff --git a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__local_relationship__gets_fields_and_groups_through_relationship.snap b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__local_relationship__gets_fields_and_groups_through_relationship.snap new file mode 100644 index 00000000..f3aaa8ea --- /dev/null +++ b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__local_relationship__gets_fields_and_groups_through_relationship.snap @@ -0,0 +1,152 @@ +--- +source: crates/integration-tests/src/tests/local_relationship.rs +expression: "run_connector_query(Connector::Chinook,\nquery_request().collection(\"Album\").query(query().predicate(is_in(target!(\"AlbumId\"),\n[json!(15), json!(91),\njson!(227)])).order_by([asc!(\"_id\")]).fields([field!(\"AlbumId\"),\nrelation_field!(\"tracks\" => \"album_tracks\",\nquery().order_by([asc!(\"_id\")]).fields([field!(\"AlbumId\"), field!(\"Name\"),\nfield!(\"UnitPrice\")]).groups(grouping().dimensions([dimension_column(column(\"Name\").from_relationship(\"track_genre\"))]).aggregates([(\"average_price\",\ncolumn_aggregate(\"UnitPrice\",\n\"avg\"))]).order_by(ordered_dimensions()),))])).relationships([(\"album_tracks\",\nrelationship(\"Track\", [(\"AlbumId\", &[\"AlbumId\"])])),\n(\"track_genre\",\nrelationship(\"Genre\", [(\"GenreId\", &[\"GenreId\"])]).object_type())])).await?" +--- +- rows: + - AlbumId: 15 + tracks: + groups: + - average_price: 0.99 + dimensions: + - - Metal + rows: + - AlbumId: 15 + Name: Heart Of Gold + UnitPrice: "0.99" + - AlbumId: 15 + Name: Snowblind + UnitPrice: "0.99" + - AlbumId: 15 + Name: Like A Bird + UnitPrice: "0.99" + - AlbumId: 15 + Name: Blood In The Wall + UnitPrice: "0.99" + - AlbumId: 15 + Name: The Beginning...At Last + UnitPrice: "0.99" + - AlbumId: 91 + tracks: + groups: + - average_price: 0.99 + dimensions: + - - Rock + rows: + - AlbumId: 91 + Name: Right Next Door to Hell + UnitPrice: "0.99" + - AlbumId: 91 + Name: "Dust N' Bones" + UnitPrice: "0.99" + - AlbumId: 91 + Name: Live and Let Die + UnitPrice: "0.99" + - AlbumId: 91 + Name: "Don't Cry (Original)" + UnitPrice: "0.99" + - AlbumId: 91 + Name: Perfect Crime + UnitPrice: "0.99" + - AlbumId: 91 + Name: "You Ain't the First" + UnitPrice: "0.99" + - AlbumId: 91 + Name: Bad Obsession + UnitPrice: "0.99" + - AlbumId: 91 + Name: Back off Bitch + UnitPrice: "0.99" + - AlbumId: 91 + Name: "Double Talkin' Jive" + UnitPrice: "0.99" + - AlbumId: 91 + Name: November Rain + UnitPrice: "0.99" + - AlbumId: 91 + Name: The Garden + UnitPrice: "0.99" + - AlbumId: 91 + Name: Garden of Eden + UnitPrice: "0.99" + - AlbumId: 91 + Name: "Don't Damn Me" + UnitPrice: "0.99" + - AlbumId: 91 + Name: Bad Apples + UnitPrice: "0.99" + - AlbumId: 91 + Name: Dead Horse + UnitPrice: "0.99" + - AlbumId: 91 + Name: Coma + UnitPrice: "0.99" + - AlbumId: 227 + tracks: + groups: + - average_price: 1.99 + dimensions: + - - Sci Fi & Fantasy + - average_price: 1.99 + dimensions: + - - Science Fiction + - average_price: 1.99 + dimensions: + - - TV Shows + rows: + - AlbumId: 227 + Name: Occupation / Precipice + UnitPrice: "1.99" + - AlbumId: 227 + Name: "Exodus, Pt. 1" + UnitPrice: "1.99" + - AlbumId: 227 + Name: "Exodus, Pt. 2" + UnitPrice: "1.99" + - AlbumId: 227 + Name: Collaborators + UnitPrice: "1.99" + - AlbumId: 227 + Name: Torn + UnitPrice: "1.99" + - AlbumId: 227 + Name: A Measure of Salvation + UnitPrice: "1.99" + - AlbumId: 227 + Name: Hero + UnitPrice: "1.99" + - AlbumId: 227 + Name: Unfinished Business + UnitPrice: "1.99" + - AlbumId: 227 + Name: The Passage + UnitPrice: "1.99" + - AlbumId: 227 + Name: The Eye of Jupiter + UnitPrice: "1.99" + - AlbumId: 227 + Name: Rapture + UnitPrice: "1.99" + - AlbumId: 227 + Name: Taking a Break from All Your Worries + UnitPrice: "1.99" + - AlbumId: 227 + Name: The Woman King + UnitPrice: "1.99" + - AlbumId: 227 + Name: A Day In the Life + UnitPrice: "1.99" + - AlbumId: 227 + Name: Dirty Hands + UnitPrice: "1.99" + - AlbumId: 227 + Name: Maelstrom + UnitPrice: "1.99" + - AlbumId: 227 + Name: The Son Also Rises + UnitPrice: "1.99" + - AlbumId: 227 + Name: "Crossroads, Pt. 1" + UnitPrice: "1.99" + - AlbumId: 227 + Name: "Crossroads, Pt. 2" + UnitPrice: "1.99" diff --git a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__local_relationship__gets_groups_through_relationship.snap b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__local_relationship__gets_groups_through_relationship.snap new file mode 100644 index 00000000..9d6719e1 --- /dev/null +++ b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__local_relationship__gets_groups_through_relationship.snap @@ -0,0 +1,34 @@ +--- +source: crates/integration-tests/src/tests/local_relationship.rs +expression: "run_connector_query(Connector::Chinook,\nquery_request().collection(\"Album\").query(query().predicate(is_in(target!(\"AlbumId\"),\n[json!(15), json!(91),\njson!(227)])).order_by([asc!(\"_id\")]).fields([field!(\"AlbumId\"),\nrelation_field!(\"tracks\" => \"album_tracks\",\nquery().groups(grouping().dimensions([dimension_column(column(\"Name\").from_relationship(\"track_genre\"))]).aggregates([(\"AlbumId\",\ncolumn_aggregate(\"AlbumId\", \"avg\")),\n(\"average_price\",\ncolumn_aggregate(\"UnitPrice\",\n\"avg\")),]).order_by(ordered_dimensions()),))])).relationships([(\"album_tracks\",\nrelationship(\"Track\", [(\"AlbumId\", &[\"AlbumId\"])])),\n(\"track_genre\",\nrelationship(\"Genre\", [(\"GenreId\", &[\"GenreId\"])]).object_type())])).await?" +--- +- rows: + - AlbumId: 15 + tracks: + groups: + - AlbumId: 15 + average_price: 0.99 + dimensions: + - - Metal + - AlbumId: 91 + tracks: + groups: + - AlbumId: 91 + average_price: 0.99 + dimensions: + - - Rock + - AlbumId: 227 + tracks: + groups: + - AlbumId: 227 + average_price: 1.99 + dimensions: + - - Sci Fi & Fantasy + - AlbumId: 227 + average_price: 1.99 + dimensions: + - - Science Fiction + - AlbumId: 227 + average_price: 1.99 + dimensions: + - - TV Shows diff --git a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__local_relationship__groups_by_related_field.snap b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__local_relationship__groups_by_related_field.snap new file mode 100644 index 00000000..5e960c98 --- /dev/null +++ b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__local_relationship__groups_by_related_field.snap @@ -0,0 +1,25 @@ +--- +source: crates/integration-tests/src/tests/local_relationship.rs +expression: "run_connector_query(Connector::Chinook,\nquery_request().collection(\"Track\").query(query().predicate(is_in(target!(\"AlbumId\"),\n[json!(15), json!(91),\njson!(227)])).groups(grouping().dimensions([dimension_column(column(\"Name\").from_relationship(\"track_genre\"))]).aggregates([(\"average_price\",\ncolumn_aggregate(\"UnitPrice\",\n\"avg\"))]).order_by(ordered_dimensions()))).relationships([(\"track_genre\",\nrelationship(\"Genre\", [(\"GenreId\", &[\"GenreId\"])]).object_type())])).await?" +--- +- groups: + - dimensions: + - - Metal + aggregates: + average_price: 0.99 + - dimensions: + - - Rock + aggregates: + average_price: 0.99 + - dimensions: + - - Sci Fi & Fantasy + aggregates: + average_price: 1.99 + - dimensions: + - - Science Fiction + aggregates: + average_price: 1.99 + - dimensions: + - - TV Shows + aggregates: + average_price: 1.99 diff --git a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__remote_relationship__provides_fields_combined_with_groups_for_variable_set.snap b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__remote_relationship__provides_fields_combined_with_groups_for_variable_set.snap new file mode 100644 index 00000000..37d2867c --- /dev/null +++ b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__remote_relationship__provides_fields_combined_with_groups_for_variable_set.snap @@ -0,0 +1,24 @@ +--- +source: crates/integration-tests/src/tests/remote_relationship.rs +expression: "run_connector_query(Connector::SampleMflix,\nquery_request().collection(\"movies\").variables([[(\"year\",\njson!(2014))]]).query(query().predicate(binop(\"_eq\", target!(\"year\"),\nvariable!(year))).fields([field!(\"title\"),\nfield!(\"rated\")]).order_by([asc!(\"_id\")]).groups(grouping().dimensions([dimension_column(\"rated\")]).aggregates([(\"average_viewer_rating\",\ncolumn_aggregate(\"tomatoes.viewer.rating\",\n\"avg\"),),]).order_by(ordered_dimensions()),).limit(3),),).await?" +--- +- rows: + - rated: ~ + title: Action Jackson + - rated: PG-13 + title: The Giver + - rated: R + title: The Equalizer + groups: + - dimensions: + - ~ + aggregates: + average_viewer_rating: 2.3 + - dimensions: + - PG-13 + aggregates: + average_viewer_rating: 3.4 + - dimensions: + - R + aggregates: + average_viewer_rating: 3.9 diff --git a/crates/integration-tests/src/tests/snapshots/integration_tests__tests__remote_relationship__provides_groups_for_variable_set.snap b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__remote_relationship__provides_groups_for_variable_set.snap new file mode 100644 index 00000000..fad8a471 --- /dev/null +++ b/crates/integration-tests/src/tests/snapshots/integration_tests__tests__remote_relationship__provides_groups_for_variable_set.snap @@ -0,0 +1,49 @@ +--- +source: crates/integration-tests/src/tests/remote_relationship.rs +expression: "run_connector_query(Connector::SampleMflix,\nquery_request().collection(\"movies\").variables([[(\"year\",\njson!(2014))]]).query(query().predicate(binop(\"_eq\", target!(\"year\"),\nvariable!(year))).groups(grouping().dimensions([dimension_column(\"rated\")]).aggregates([(\"average_viewer_rating\",\ncolumn_aggregate(\"tomatoes.viewer.rating\",\n\"avg\"),),]).order_by(ordered_dimensions()),),),).await?" +--- +- groups: + - dimensions: + - ~ + aggregates: + average_viewer_rating: 3.1320754716981134 + - dimensions: + - G + aggregates: + average_viewer_rating: 3.8 + - dimensions: + - NOT RATED + aggregates: + average_viewer_rating: 2.824242424242424 + - dimensions: + - PG + aggregates: + average_viewer_rating: 3.7096774193548385 + - dimensions: + - PG-13 + aggregates: + average_viewer_rating: 3.470707070707071 + - dimensions: + - R + aggregates: + average_viewer_rating: 3.3283783783783787 + - dimensions: + - TV-14 + aggregates: + average_viewer_rating: 3.233333333333333 + - dimensions: + - TV-G + aggregates: + average_viewer_rating: ~ + - dimensions: + - TV-MA + aggregates: + average_viewer_rating: 4.2 + - dimensions: + - TV-PG + aggregates: + average_viewer_rating: ~ + - dimensions: + - UNRATED + aggregates: + average_viewer_rating: 3.06875 diff --git a/crates/mongodb-agent-common/src/aggregation_function.rs b/crates/mongodb-agent-common/src/aggregation_function.rs index 54cb0c0f..9c637dd6 100644 --- a/crates/mongodb-agent-common/src/aggregation_function.rs +++ b/crates/mongodb-agent-common/src/aggregation_function.rs @@ -1,23 +1,24 @@ +use configuration::MongoScalarType; use enum_iterator::{all, Sequence}; -// TODO: How can we unify this with the Accumulator type in the mongodb module? -#[derive(Copy, Clone, Debug, PartialEq, Eq, Sequence)] +#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, Sequence)] pub enum AggregationFunction { Avg, - Count, Min, Max, Sum, } +use mongodb_support::BsonScalarType; use ndc_query_plan::QueryPlanError; use AggregationFunction as A; +use crate::mongo_query_plan::Type; + impl AggregationFunction { pub fn graphql_name(self) -> &'static str { match self { A::Avg => "avg", - A::Count => "count", A::Min => "min", A::Max => "max", A::Sum => "sum", @@ -32,13 +33,28 @@ impl AggregationFunction { }) } - pub fn is_count(self) -> bool { + /// Returns the result type that is declared for this function in the schema. + pub fn expected_result_type(self, argument_type: &Type) -> Option { match self { - A::Avg => false, - A::Count => true, - A::Min => false, - A::Max => false, - A::Sum => false, + A::Avg => Some(BsonScalarType::Double), + A::Min => None, + A::Max => None, + A::Sum => Some(if is_fractional(argument_type) { + BsonScalarType::Double + } else { + BsonScalarType::Long + }), } } } + +fn is_fractional(t: &Type) -> bool { + match t { + Type::Scalar(MongoScalarType::Bson(s)) => s.is_fractional(), + Type::Scalar(MongoScalarType::ExtendedJSON) => true, + Type::Object(_) => false, + Type::ArrayOf(_) => false, + Type::Tuple(ts) => ts.iter().all(is_fractional), + Type::Nullable(t) => is_fractional(t), + } +} diff --git a/crates/mongodb-agent-common/src/comparison_function.rs b/crates/mongodb-agent-common/src/comparison_function.rs index 5ed5ca82..f6357687 100644 --- a/crates/mongodb-agent-common/src/comparison_function.rs +++ b/crates/mongodb-agent-common/src/comparison_function.rs @@ -5,7 +5,7 @@ use ndc_models as ndc; /// Supported binary comparison operators. This type provides GraphQL names, MongoDB operator /// names, and aggregation pipeline code for each operator. Argument types are defined in /// mongodb-agent-common/src/scalar_types_capabilities.rs. -#[derive(Copy, Clone, Debug, PartialEq, Eq, Sequence)] +#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, Sequence)] pub enum ComparisonFunction { LessThan, LessThanOrEqual, diff --git a/crates/mongodb-agent-common/src/constants.rs b/crates/mongodb-agent-common/src/constants.rs new file mode 100644 index 00000000..0d26f41c --- /dev/null +++ b/crates/mongodb-agent-common/src/constants.rs @@ -0,0 +1,26 @@ +use mongodb::bson::{self, Bson}; +use serde::Deserialize; + +pub const RESULT_FIELD: &str = "result"; + +/// Value must match the field name in [BsonRowSet] +pub const ROW_SET_AGGREGATES_KEY: &str = "aggregates"; + +/// Value must match the field name in [BsonRowSet] +pub const ROW_SET_GROUPS_KEY: &str = "groups"; + +/// Value must match the field name in [BsonRowSet] +pub const ROW_SET_ROWS_KEY: &str = "rows"; + +#[derive(Debug, Deserialize)] +pub struct BsonRowSet { + #[serde(default)] + pub aggregates: Bson, // name matches ROW_SET_AGGREGATES_KEY + #[serde(default)] + pub groups: Vec, // name matches ROW_SET_GROUPS_KEY + #[serde(default)] + pub rows: Vec, // name matches ROW_SET_ROWS_KEY +} + +/// Value must match the field name in [ndc_models::Group] +pub const GROUP_DIMENSIONS_KEY: &str = "dimensions"; diff --git a/crates/mongodb-agent-common/src/interface_types/mongo_agent_error.rs b/crates/mongodb-agent-common/src/interface_types/mongo_agent_error.rs index fe285960..ede7be2c 100644 --- a/crates/mongodb-agent-common/src/interface_types/mongo_agent_error.rs +++ b/crates/mongodb-agent-common/src/interface_types/mongo_agent_error.rs @@ -8,7 +8,7 @@ use mongodb::bson; use ndc_query_plan::QueryPlanError; use thiserror::Error; -use crate::{procedure::ProcedureError, query::QueryResponseError}; +use crate::{mongo_query_plan::Dimension, procedure::ProcedureError, query::QueryResponseError}; /// A superset of the DC-API `AgentError` type. This enum adds error cases specific to the MongoDB /// agent. @@ -16,6 +16,7 @@ use crate::{procedure::ProcedureError, query::QueryResponseError}; pub enum MongoAgentError { BadCollectionSchema(Box<(String, bson::Bson, bson::de::Error)>), // boxed to avoid an excessively-large stack value BadQuery(anyhow::Error), + InvalidGroupDimension(Dimension), InvalidVariableName(String), InvalidScalarTypeName(String), MongoDB(#[from] mongodb::error::Error), @@ -66,6 +67,9 @@ impl MongoAgentError { ) }, BadQuery(err) => (StatusCode::BAD_REQUEST, ErrorResponse::new(&err)), + InvalidGroupDimension(dimension) => ( + StatusCode::BAD_REQUEST, ErrorResponse::new(&format!("Cannot express grouping dimension as a MongoDB query document expression: {dimension:?}")) + ), InvalidVariableName(name) => ( StatusCode::BAD_REQUEST, ErrorResponse::new(&format!("Column identifier includes characters that are not permitted in a MongoDB variable name: {name}")) diff --git a/crates/mongodb-agent-common/src/lib.rs b/crates/mongodb-agent-common/src/lib.rs index ff8e8132..02819e93 100644 --- a/crates/mongodb-agent-common/src/lib.rs +++ b/crates/mongodb-agent-common/src/lib.rs @@ -1,5 +1,6 @@ pub mod aggregation_function; pub mod comparison_function; +mod constants; pub mod explain; pub mod interface_types; pub mod mongo_query_plan; diff --git a/crates/mongodb-agent-common/src/mongo_query_plan/mod.rs b/crates/mongodb-agent-common/src/mongo_query_plan/mod.rs index 8c6e128e..2ce94cf6 100644 --- a/crates/mongodb-agent-common/src/mongo_query_plan/mod.rs +++ b/crates/mongodb-agent-common/src/mongo_query_plan/mod.rs @@ -116,6 +116,10 @@ pub type ComparisonValue = ndc_query_plan::ComparisonValue; pub type ExistsInCollection = ndc_query_plan::ExistsInCollection; pub type Expression = ndc_query_plan::Expression; pub type Field = ndc_query_plan::Field; +pub type Dimension = ndc_query_plan::Dimension; +pub type Grouping = ndc_query_plan::Grouping; +pub type GroupOrderBy = ndc_query_plan::GroupOrderBy; +pub type GroupOrderByTarget = ndc_query_plan::GroupOrderByTarget; pub type MutationOperation = ndc_query_plan::MutationOperation; pub type MutationPlan = ndc_query_plan::MutationPlan; pub type MutationProcedureArgument = ndc_query_plan::MutationProcedureArgument; diff --git a/crates/mongodb-agent-common/src/mongodb/mod.rs b/crates/mongodb-agent-common/src/mongodb/mod.rs index 48f16304..2e489234 100644 --- a/crates/mongodb-agent-common/src/mongodb/mod.rs +++ b/crates/mongodb-agent-common/src/mongodb/mod.rs @@ -1,14 +1,11 @@ mod collection; mod database; pub mod sanitize; -mod selection; #[cfg(any(test, feature = "test-helpers"))] pub mod test_helpers; -pub use self::{ - collection::CollectionTrait, database::DatabaseTrait, selection::selection_from_query_request, -}; +pub use self::{collection::CollectionTrait, database::DatabaseTrait}; // MockCollectionTrait is generated by automock when the test flag is active. #[cfg(any(test, feature = "test-helpers"))] diff --git a/crates/mongodb-agent-common/src/query/aggregates.rs b/crates/mongodb-agent-common/src/query/aggregates.rs new file mode 100644 index 00000000..c34ba1e4 --- /dev/null +++ b/crates/mongodb-agent-common/src/query/aggregates.rs @@ -0,0 +1,529 @@ +use std::collections::BTreeMap; + +use configuration::MongoScalarType; +use mongodb::bson::{self, doc, Bson}; +use mongodb_support::{ + aggregate::{Accumulator, Pipeline, Selection, Stage}, + BsonScalarType, +}; +use ndc_models::FieldName; + +use crate::{ + aggregation_function::AggregationFunction, + comparison_function::ComparisonFunction, + constants::RESULT_FIELD, + constants::{ROW_SET_AGGREGATES_KEY, ROW_SET_GROUPS_KEY, ROW_SET_ROWS_KEY}, + interface_types::MongoAgentError, + mongo_query_plan::{ + Aggregate, ComparisonTarget, ComparisonValue, Expression, Query, QueryPlan, Type, + }, + mongodb::sanitize::get_field, +}; + +use super::{ + column_ref::ColumnRef, groups::pipeline_for_groups, make_selector, + pipeline::pipeline_for_fields_facet, query_level::QueryLevel, +}; + +type Result = std::result::Result; + +/// Returns a map of pipelines for evaluating each aggregate independently, paired with +/// a `Selection` that converts results of each pipeline to a format compatible with +/// `QueryResponse`. +pub fn facet_pipelines_for_query( + query_plan: &QueryPlan, + query_level: QueryLevel, +) -> Result<(BTreeMap, Selection)> { + let query = &query_plan.query; + let Query { + aggregates, + fields, + groups, + .. + } = query; + let mut facet_pipelines = aggregates + .iter() + .flatten() + .map(|(key, aggregate)| Ok((key.to_string(), pipeline_for_aggregate(aggregate.clone())?))) + .collect::>>()?; + + // This builds a map that feeds into a `$replaceWith` pipeline stage to build a map of + // aggregation results. + let aggregate_selections: bson::Document = aggregates + .iter() + .flatten() + .map(|(key, aggregate)| { + // The facet result for each aggregate is an array containing a single document which + // has a field called `result`. This code selects each facet result by name, and pulls + // out the `result` value. + let value_expr = doc! { + "$getField": { + "field": RESULT_FIELD, // evaluates to the value of this field + "input": { "$first": get_field(key.as_str()) }, // field is accessed from this document + }, + }; + + // Matching SQL semantics, if a **count** aggregation does not match any rows we want + // to return zero. Other aggregations should return null. + let value_expr = if is_count(aggregate) { + doc! { + "$ifNull": [value_expr, 0], + } + // Otherwise if the aggregate value is missing because the aggregation applied to an + // empty document set then provide an explicit `null` value. + } else { + convert_aggregate_result_type(value_expr, aggregate) + }; + + (key.to_string(), value_expr.into()) + }) + .collect(); + + let select_aggregates = if !aggregate_selections.is_empty() { + Some(( + ROW_SET_AGGREGATES_KEY.to_string(), + aggregate_selections.into(), + )) + } else { + None + }; + + let (groups_pipeline_facet, select_groups) = match groups { + Some(grouping) => { + let internal_key = "__GROUPS__"; + let groups_pipeline = pipeline_for_groups(grouping)?; + let facet = (internal_key.to_string(), groups_pipeline); + let selection = ( + ROW_SET_GROUPS_KEY.to_string(), + Bson::String(format!("${internal_key}")), + ); + (Some(facet), Some(selection)) + } + None => (None, None), + }; + + let (rows_pipeline_facet, select_rows) = match fields { + Some(_) => { + let internal_key = "__ROWS__"; + let rows_pipeline = pipeline_for_fields_facet(query_plan, query_level)?; + let facet = (internal_key.to_string(), rows_pipeline); + let selection = ( + ROW_SET_ROWS_KEY.to_string().to_string(), + Bson::String(format!("${internal_key}")), + ); + (Some(facet), Some(selection)) + } + None => (None, None), + }; + + for (key, pipeline) in [groups_pipeline_facet, rows_pipeline_facet] + .into_iter() + .flatten() + { + facet_pipelines.insert(key, pipeline); + } + + let selection = Selection::new( + [select_aggregates, select_groups, select_rows] + .into_iter() + .flatten() + .collect(), + ); + + Ok((facet_pipelines, selection)) +} + +fn is_count(aggregate: &Aggregate) -> bool { + match aggregate { + Aggregate::ColumnCount { .. } => true, + Aggregate::StarCount { .. } => true, + Aggregate::SingleColumn { .. } => false, + } +} + +/// The system expects specific return types for specific aggregates. That means we may need +/// to do a numeric type conversion here. The conversion applies to the aggregated result, +/// not to input values. +pub fn convert_aggregate_result_type( + column_ref: impl Into, + aggregate: &Aggregate, +) -> bson::Document { + let convert_to = match aggregate { + Aggregate::ColumnCount { .. } => None, + Aggregate::SingleColumn { + column_type, + function, + .. + } => function.expected_result_type(column_type), + Aggregate::StarCount => None, + }; + match convert_to { + // $convert implicitly fills `null` if input value is missing + Some(scalar_type) => doc! { + "$convert": { + "input": column_ref, + "to": scalar_type.bson_name(), + } + }, + None => doc! { + "$ifNull": [column_ref, null] + }, + } +} + +// TODO: We can probably combine some aggregates in the same group stage: +// - single column +// - star count +// - column count, non-distinct +// +// We might still need separate facets for +// - column count, distinct +// +// The issue with non-distinct column count is we want to exclude null and non-existent values. +// That could probably be done with an accumulator like, +// +// count: if $exists: ["$column", true] then 1 else 0 +// +// Distinct counts need a group by the target column AFAIK so they need a facet. +fn pipeline_for_aggregate(aggregate: Aggregate) -> Result { + let pipeline = match aggregate { + Aggregate::ColumnCount { + column, + field_path, + distinct, + .. + } if distinct => { + let target_field = mk_target_field(column, field_path); + Pipeline::new(vec![ + filter_to_documents_with_value(target_field.clone())?, + Stage::Group { + key_expression: ColumnRef::from_comparison_target(&target_field) + .into_aggregate_expression() + .into_bson(), + accumulators: [].into(), + }, + Stage::Count(RESULT_FIELD.to_string()), + ]) + } + + // TODO: ENG-1465 count by distinct + Aggregate::ColumnCount { + column, + field_path, + distinct: _, + .. + } => Pipeline::new(vec![ + filter_to_documents_with_value(mk_target_field(column, field_path))?, + Stage::Count(RESULT_FIELD.to_string()), + ]), + + Aggregate::SingleColumn { + column, + field_path, + function, + .. + } => { + use AggregationFunction as A; + + let field_ref = ColumnRef::from_column_and_field_path(&column, field_path.as_ref()) + .into_aggregate_expression() + .into_bson(); + + let accumulator = match function { + A::Avg => Accumulator::Avg(field_ref), + A::Min => Accumulator::Min(field_ref), + A::Max => Accumulator::Max(field_ref), + A::Sum => Accumulator::Sum(field_ref), + }; + Pipeline::new(vec![Stage::Group { + key_expression: Bson::Null, + accumulators: [(RESULT_FIELD.to_string(), accumulator)].into(), + }]) + } + + Aggregate::StarCount {} => Pipeline::new(vec![Stage::Count(RESULT_FIELD.to_string())]), + }; + Ok(pipeline) +} + +fn mk_target_field(name: FieldName, field_path: Option>) -> ComparisonTarget { + ComparisonTarget::Column { + name, + arguments: Default::default(), + field_path, + field_type: Type::Scalar(MongoScalarType::ExtendedJSON), // type does not matter here + } +} + +fn filter_to_documents_with_value(target_field: ComparisonTarget) -> Result { + Ok(Stage::Match(make_selector( + &Expression::BinaryComparisonOperator { + column: target_field, + operator: ComparisonFunction::NotEqual, + value: ComparisonValue::Scalar { + value: serde_json::Value::Null, + value_type: Type::Scalar(MongoScalarType::Bson(BsonScalarType::Null)), + }, + }, + )?)) +} + +#[cfg(test)] +mod tests { + use configuration::Configuration; + use mongodb::bson::bson; + use ndc_test_helpers::{ + binop, collection, column_aggregate, column_count_aggregate, dimension_column, field, + group, grouping, named_type, object_type, query, query_request, row_set, target, value, + }; + use serde_json::json; + + use crate::{ + mongo_query_plan::MongoConfiguration, + mongodb::test_helpers::mock_collection_aggregate_response_for_pipeline, + query::execute_query_request::execute_query_request, test_helpers::mflix_config, + }; + + #[tokio::test] + async fn executes_aggregation() -> Result<(), anyhow::Error> { + let query_request = query_request() + .collection("students") + .query(query().aggregates([ + column_count_aggregate!("count" => "gpa", distinct: true), + ("avg", column_aggregate("gpa", "avg").into()), + ])) + .into(); + + let expected_response = row_set() + .aggregates([("count", json!(11)), ("avg", json!(3))]) + .into_response(); + + let expected_pipeline = bson!([ + { + "$facet": { + "avg": [ + { "$group": { "_id": null, "result": { "$avg": "$gpa" } } }, + ], + "count": [ + { "$match": { "gpa": { "$ne": null } } }, + { "$group": { "_id": "$gpa" } }, + { "$count": "result" }, + ], + }, + }, + { + "$replaceWith": { + "aggregates": { + "avg": { + "$convert": { + "input": { + "$getField": { + "field": "result", + "input": { "$first": { "$getField": { "$literal": "avg" } } }, + } + }, + "to": "double", + } + }, + "count": { + "$ifNull": [ + { + "$getField": { + "field": "result", + "input": { "$first": { "$getField": { "$literal": "count" } } }, + } + }, + 0, + ] + }, + }, + }, + }, + ]); + + let db = mock_collection_aggregate_response_for_pipeline( + "students", + expected_pipeline, + bson!([{ + "aggregates": { + "count": 11, + "avg": 3, + }, + }]), + ); + + let result = execute_query_request(db, &students_config(), query_request).await?; + assert_eq!(result, expected_response); + Ok(()) + } + + #[tokio::test] + async fn executes_aggregation_with_fields() -> Result<(), anyhow::Error> { + let query_request = query_request() + .collection("students") + .query( + query() + .aggregates([("avg", column_aggregate("gpa", "avg"))]) + .fields([field!("student_gpa" => "gpa")]) + .predicate(binop("_lt", target!("gpa"), value!(4.0))), + ) + .into(); + + let expected_response = row_set() + .aggregates([("avg", json!(3.1))]) + .row([("student_gpa", 3.1)]) + .into_response(); + + let expected_pipeline = bson!([ + { "$match": { "gpa": { "$lt": 4.0 } } }, + { + "$facet": { + "__ROWS__": [{ + "$replaceWith": { + "student_gpa": { "$ifNull": ["$gpa", null] }, + }, + }], + "avg": [ + { "$group": { "_id": null, "result": { "$avg": "$gpa" } } }, + ], + }, + }, + { + "$replaceWith": { + "aggregates": { + "avg": { + "$convert": { + "input": { + "$getField": { + "field": "result", + "input": { "$first": { "$getField": { "$literal": "avg" } } }, + } + }, + "to": "double", + } + }, + }, + "rows": "$__ROWS__", + }, + }, + ]); + + let db = mock_collection_aggregate_response_for_pipeline( + "students", + expected_pipeline, + bson!([{ + "aggregates": { + "avg": 3.1, + }, + "rows": [{ + "student_gpa": 3.1, + }], + }]), + ); + + let result = execute_query_request(db, &students_config(), query_request).await?; + assert_eq!(result, expected_response); + Ok(()) + } + + #[tokio::test] + async fn executes_query_with_groups_with_single_column_aggregates() -> Result<(), anyhow::Error> + { + let query_request = query_request() + .collection("movies") + .query( + query().groups( + grouping() + .dimensions([dimension_column("year")]) + .aggregates([ + ( + "average_viewer_rating", + column_aggregate("tomatoes.viewer.rating", "avg"), + ), + ("max.runtime", column_aggregate("runtime", "max")), + ]), + ), + ) + .into(); + + let expected_response = row_set() + .groups([ + group( + [2007], + [ + ("average_viewer_rating", json!(7.5)), + ("max.runtime", json!(207)), + ], + ), + group( + [2015], + [ + ("average_viewer_rating", json!(6.9)), + ("max.runtime", json!(412)), + ], + ), + ]) + .into_response(); + + let expected_pipeline = bson!([ + { + "$group": { + "_id": ["$year"], + "average_viewer_rating": { "$avg": "$tomatoes.viewer.rating" }, + "max.runtime": { "$max": "$runtime" }, + } + }, + { + "$replaceWith": { + "dimensions": "$_id", + "average_viewer_rating": { "$convert": { "input": "$average_viewer_rating", "to": "double" } }, + "max.runtime": { "$ifNull": [{ "$getField": { "$literal": "max.runtime" } }, null] }, + } + }, + ]); + + let db = mock_collection_aggregate_response_for_pipeline( + "movies", + expected_pipeline, + bson!([ + { + "dimensions": [2007], + "average_viewer_rating": 7.5, + "max.runtime": 207, + }, + { + "dimensions": [2015], + "average_viewer_rating": 6.9, + "max.runtime": 412, + }, + ]), + ); + + let result = execute_query_request(db, &mflix_config(), query_request).await?; + assert_eq!(result, expected_response); + Ok(()) + } + + // TODO: Test: + // - fields & group by + // - group by & aggregates + // - various counts on groups + // - groups and variables + // - groups and relationships + + fn students_config() -> MongoConfiguration { + MongoConfiguration(Configuration { + collections: [collection("students")].into(), + object_types: [( + "students".into(), + object_type([("gpa", named_type("Double"))]), + )] + .into(), + functions: Default::default(), + procedures: Default::default(), + native_mutations: Default::default(), + native_queries: Default::default(), + options: Default::default(), + }) + } +} diff --git a/crates/mongodb-agent-common/src/query/column_ref.rs b/crates/mongodb-agent-common/src/query/column_ref.rs index 43f26ca4..5ca17693 100644 --- a/crates/mongodb-agent-common/src/query/column_ref.rs +++ b/crates/mongodb-agent-common/src/query/column_ref.rs @@ -60,7 +60,15 @@ impl<'a> ColumnRef<'a> { name: &'b FieldName, field_path: Option<&'b Vec>, ) -> ColumnRef<'b> { - from_column_and_field_path(name, field_path) + from_column_and_field_path(&[], name, field_path) + } + + pub fn from_relationship_path_column_and_field_path<'b>( + relationship_path: &'b [ndc_models::RelationshipName], + name: &'b FieldName, + field_path: Option<&'b Vec>, + ) -> ColumnRef<'b> { + from_column_and_field_path(relationship_path, name, field_path) } /// TODO: This will hopefully become infallible once ENG-1011 & ENG-1010 are implemented. @@ -120,21 +128,26 @@ fn from_comparison_target(column: &ComparisonTarget) -> ColumnRef<'_> { match column { ComparisonTarget::Column { name, field_path, .. - } => from_column_and_field_path(name, field_path.as_ref()), + } => from_column_and_field_path(&[], name, field_path.as_ref()), } } fn from_column_and_field_path<'a>( + relationship_path: &'a [ndc_models::RelationshipName], name: &'a FieldName, field_path: Option<&'a Vec>, ) -> ColumnRef<'a> { - let name_and_path = once(name.as_ref() as &str).chain( - field_path - .iter() - .copied() - .flatten() - .map(|field_name| field_name.as_ref() as &str), - ); + let name_and_path = relationship_path + .iter() + .map(|r| r.as_ref() as &str) + .chain(once(name.as_ref() as &str)) + .chain( + field_path + .iter() + .copied() + .flatten() + .map(|field_name| field_name.as_ref() as &str), + ); // The None case won't come up if the input to [from_target_helper] has at least // one element, and we know it does because we start the iterable with `name` from_path(None, name_and_path).unwrap() diff --git a/crates/mongodb-agent-common/src/query/constants.rs b/crates/mongodb-agent-common/src/query/constants.rs deleted file mode 100644 index a8569fc0..00000000 --- a/crates/mongodb-agent-common/src/query/constants.rs +++ /dev/null @@ -1,3 +0,0 @@ -// TODO: check for collision with aggregation field names -pub const ROWS_FIELD: &str = "__ROWS__"; -pub const RESULT_FIELD: &str = "result"; diff --git a/crates/mongodb-agent-common/src/query/foreach.rs b/crates/mongodb-agent-common/src/query/foreach.rs index 4995eb40..75fd3c26 100644 --- a/crates/mongodb-agent-common/src/query/foreach.rs +++ b/crates/mongodb-agent-common/src/query/foreach.rs @@ -1,14 +1,16 @@ use anyhow::anyhow; use itertools::Itertools as _; -use mongodb::bson::{self, doc, Bson}; +use mongodb::bson::{self, bson, doc, Bson}; use mongodb_support::aggregate::{Pipeline, Selection, Stage}; use ndc_query_plan::VariableSet; +use super::is_response_faceted::ResponseFacets; use super::pipeline::pipeline_for_non_foreach; use super::query_level::QueryLevel; use super::query_variable_name::query_variable_name; use super::serialization::json_to_bson; use super::QueryTarget; +use crate::constants::{ROW_SET_AGGREGATES_KEY, ROW_SET_GROUPS_KEY, ROW_SET_ROWS_KEY}; use crate::interface_types::MongoAgentError; use crate::mongo_query_plan::{MongoConfiguration, QueryPlan, Type, VariableTypes}; @@ -45,18 +47,36 @@ pub fn pipeline_for_foreach( r#as: "query".to_string(), }; - let selection = if query_request.query.has_aggregates() && query_request.query.has_fields() { - doc! { - "aggregates": { "$getField": { "input": { "$first": "$query" }, "field": "aggregates" } }, - "rows": { "$getField": { "input": { "$first": "$query" }, "field": "rows" } }, + let selection = match ResponseFacets::from_query(&query_request.query) { + ResponseFacets::Combination { + aggregates, + fields, + groups, + } => { + let mut keys = vec![]; + if aggregates.is_some() { + keys.push(ROW_SET_AGGREGATES_KEY); + } + if fields.is_some() { + keys.push(ROW_SET_ROWS_KEY); + } + if groups.is_some() { + keys.push(ROW_SET_GROUPS_KEY) + } + keys.into_iter() + .map(|key| { + ( + key.to_string(), + bson!({ "$getField": { "input": { "$first": "$query" }, "field": key } }), + ) + }) + .collect() } - } else if query_request.query.has_aggregates() { - doc! { - "aggregates": { "$getField": { "input": { "$first": "$query" }, "field": "aggregates" } }, + ResponseFacets::FieldsOnly(_) => { + doc! { ROW_SET_ROWS_KEY: "$query" } } - } else { - doc! { - "rows": "$query" + ResponseFacets::GroupsOnly(_) => { + doc! { ROW_SET_GROUPS_KEY: "$query" } } }; let selection_stage = Stage::ReplaceWith(Selection::new(selection)); diff --git a/crates/mongodb-agent-common/src/query/groups.rs b/crates/mongodb-agent-common/src/query/groups.rs new file mode 100644 index 00000000..8e370bb8 --- /dev/null +++ b/crates/mongodb-agent-common/src/query/groups.rs @@ -0,0 +1,162 @@ +use std::{borrow::Cow, collections::BTreeMap}; + +use indexmap::IndexMap; +use mongodb::bson::{self, bson}; +use mongodb_support::aggregate::{Accumulator, Pipeline, Selection, SortDocument, Stage}; +use ndc_models::{FieldName, OrderDirection}; + +use crate::{ + aggregation_function::AggregationFunction, + constants::GROUP_DIMENSIONS_KEY, + interface_types::MongoAgentError, + mongo_query_plan::{Aggregate, Dimension, GroupOrderBy, GroupOrderByTarget, Grouping}, +}; + +use super::{aggregates::convert_aggregate_result_type, column_ref::ColumnRef}; + +type Result = std::result::Result; + +// TODO: This function can be infallible once ENG-1562 is implemented. +pub fn pipeline_for_groups(grouping: &Grouping) -> Result { + let group_stage = Stage::Group { + key_expression: dimensions_to_expression(&grouping.dimensions).into(), + accumulators: accumulators_for_aggregates(&grouping.aggregates)?, + }; + + // TODO: ENG-1562 This implementation does not fully implement the + // 'query.aggregates.group_by.order' capability! This only orders by dimensions. Before + // enabling the capability we also need to be able to order by aggregates. We need partial + // support for order by to get consistent integration test snapshots. + let sort_groups_stage = grouping + .order_by + .as_ref() + .map(sort_stage_for_grouping) + .transpose()?; + + // TODO: ENG-1563 to implement 'query.aggregates.group_by.paginate' apply grouping.limit and + // grouping.offset **after** group stage because those options count groups, not documents + + let replace_with_stage = Stage::ReplaceWith(selection_for_grouping_internal(grouping, "_id")); + + Ok(Pipeline::new( + [ + Some(group_stage), + sort_groups_stage, + Some(replace_with_stage), + ] + .into_iter() + .flatten() + .collect(), + )) +} + +/// Converts each dimension to a MongoDB aggregate expression that evaluates to the appropriate +/// value when applied to each input document. The array of expressions can be used directly as the +/// group stage key expression. +fn dimensions_to_expression(dimensions: &[Dimension]) -> bson::Array { + dimensions + .iter() + .map(|dimension| { + let column_ref = match dimension { + Dimension::Column { + path, + column_name, + field_path, + .. + } => ColumnRef::from_relationship_path_column_and_field_path( + path, + column_name, + field_path.as_ref(), + ), + }; + column_ref.into_aggregate_expression().into_bson() + }) + .collect() +} + +// TODO: This function can be infallible once counts are implemented +fn accumulators_for_aggregates( + aggregates: &IndexMap, +) -> Result> { + aggregates + .into_iter() + .map(|(name, aggregate)| Ok((name.to_string(), aggregate_to_accumulator(aggregate)?))) + .collect() +} + +// TODO: This function can be infallible once counts are implemented +fn aggregate_to_accumulator(aggregate: &Aggregate) -> Result { + use Aggregate as A; + match aggregate { + A::ColumnCount { .. } => Err(MongoAgentError::NotImplemented(Cow::Borrowed( + "count aggregates in groups", + ))), + A::SingleColumn { + column, + field_path, + function, + .. + } => { + use AggregationFunction as A; + + let field_ref = ColumnRef::from_column_and_field_path(column, field_path.as_ref()) + .into_aggregate_expression() + .into_bson(); + + Ok(match function { + A::Avg => Accumulator::Avg(field_ref), + A::Min => Accumulator::Min(field_ref), + A::Max => Accumulator::Max(field_ref), + A::Sum => Accumulator::Sum(field_ref), + }) + } + A::StarCount => Err(MongoAgentError::NotImplemented(Cow::Borrowed( + "count aggregates in groups", + ))), + } +} + +pub fn selection_for_grouping(grouping: &Grouping) -> Selection { + // This function is called externally to propagate groups from relationship lookups. In that + // case the group has already gone through [selection_for_grouping_internal] once so we want to + // reference the dimensions key as "dimensions". + selection_for_grouping_internal(grouping, GROUP_DIMENSIONS_KEY) +} + +fn selection_for_grouping_internal(grouping: &Grouping, dimensions_field_name: &str) -> Selection { + let dimensions = ( + GROUP_DIMENSIONS_KEY.to_string(), + bson!(format!("${dimensions_field_name}")), + ); + let selected_aggregates = grouping.aggregates.iter().map(|(key, aggregate)| { + let column_ref = ColumnRef::from_field(key).into_aggregate_expression(); + let selection = convert_aggregate_result_type(column_ref, aggregate); + (key.to_string(), selection.into()) + }); + let selection_doc = std::iter::once(dimensions) + .chain(selected_aggregates) + .collect(); + Selection::new(selection_doc) +} + +// TODO: ENG-1562 This is where we need to implement sorting by aggregates +fn sort_stage_for_grouping(order_by: &GroupOrderBy) -> Result { + let sort_doc = order_by + .elements + .iter() + .map(|element| match element.target { + GroupOrderByTarget::Dimension { index } => { + let key = format!("_id.{index}"); + let direction = match element.order_direction { + OrderDirection::Asc => bson!(1), + OrderDirection::Desc => bson!(-1), + }; + Ok((key, direction)) + } + GroupOrderByTarget::Aggregate { .. } => Err(MongoAgentError::NotImplemented( + Cow::Borrowed("sorting groups by aggregate"), + )), + }) + .collect::>()?; + Ok(Stage::Sort(SortDocument::from_doc(sort_doc))) +} diff --git a/crates/mongodb-agent-common/src/query/is_response_faceted.rs b/crates/mongodb-agent-common/src/query/is_response_faceted.rs new file mode 100644 index 00000000..92050097 --- /dev/null +++ b/crates/mongodb-agent-common/src/query/is_response_faceted.rs @@ -0,0 +1,103 @@ +//! Centralized logic for query response packing. + +use indexmap::IndexMap; +use lazy_static::lazy_static; +use ndc_models::FieldName; + +use crate::mongo_query_plan::{Aggregate, Field, Grouping, Query}; + +lazy_static! { + static ref DEFAULT_FIELDS: IndexMap = IndexMap::new(); +} + +/// In some queries we may need to "fork" the query to provide data that requires incompatible +/// pipelines. For example queries that combine two or more of row, group, and aggregates, or +/// queries that use multiple aggregates that use different buckets. In these cases we use the +/// `$facet` aggregation stage which runs multiple sub-pipelines, and stores the results of +/// each in fields of the output pipeline document with array values. +/// +/// In other queries we don't need to fork - instead of providing data in a nested array the stream +/// of pipeline output documents is itself the requested data. +/// +/// Depending on whether or not a pipeline needs to use `$facet` to fork response processing needs +/// to be done differently. +pub enum ResponseFacets<'a> { + /// When matching on the Combination variant assume that requested data has already been checked to make sure that maps are not empty. + Combination { + aggregates: Option<&'a IndexMap>, + fields: Option<&'a IndexMap>, + groups: Option<&'a Grouping>, + }, + FieldsOnly(&'a IndexMap), + GroupsOnly(&'a Grouping), +} + +impl ResponseFacets<'_> { + pub fn from_parameters<'a>( + aggregates: Option<&'a IndexMap>, + fields: Option<&'a IndexMap>, + groups: Option<&'a Grouping>, + ) -> ResponseFacets<'a> { + let aggregates_score = if has_aggregates(aggregates) { 2 } else { 0 }; + let fields_score = if has_fields(fields) { 1 } else { 0 }; + let groups_score = if has_groups(groups) { 1 } else { 0 }; + + if aggregates_score + fields_score + groups_score > 1 { + ResponseFacets::Combination { + aggregates: if has_aggregates(aggregates) { + aggregates + } else { + None + }, + fields: if has_fields(fields) { fields } else { None }, + groups: if has_groups(groups) { groups } else { None }, + } + } else if let Some(grouping) = groups { + ResponseFacets::GroupsOnly(grouping) + } else { + ResponseFacets::FieldsOnly(fields.unwrap_or(&DEFAULT_FIELDS)) + } + } + + pub fn from_query(query: &Query) -> ResponseFacets<'_> { + Self::from_parameters( + query.aggregates.as_ref(), + query.fields.as_ref(), + query.groups.as_ref(), + ) + } +} + +/// A query that includes aggregates will be run using a $facet pipeline stage. A query that +/// combines two ore more of rows, groups, and aggregates will also use facets. The choice affects +/// how result rows are mapped to a QueryResponse. +/// +/// If we have aggregate pipelines they should be combined with the fields pipeline (if there is +/// one) in a single facet stage. If we have fields, and no aggregates then the fields pipeline +/// can instead be appended to `pipeline`. +pub fn is_response_faceted(query: &Query) -> bool { + matches!( + ResponseFacets::from_query(query), + ResponseFacets::Combination { .. } + ) +} + +fn has_aggregates(aggregates: Option<&IndexMap>) -> bool { + if let Some(aggregates) = aggregates { + !aggregates.is_empty() + } else { + false + } +} + +fn has_fields(fields: Option<&IndexMap>) -> bool { + if let Some(fields) = fields { + !fields.is_empty() + } else { + false + } +} + +fn has_groups(groups: Option<&Grouping>) -> bool { + groups.is_some() +} diff --git a/crates/mongodb-agent-common/src/query/mod.rs b/crates/mongodb-agent-common/src/query/mod.rs index 8d5b5372..6bc505af 100644 --- a/crates/mongodb-agent-common/src/query/mod.rs +++ b/crates/mongodb-agent-common/src/query/mod.rs @@ -1,7 +1,9 @@ +mod aggregates; pub mod column_ref; -mod constants; mod execute_query_request; mod foreach; +mod groups; +mod is_response_faceted; mod make_selector; mod make_sort; mod native_query; @@ -11,6 +13,7 @@ mod query_target; mod query_variable_name; mod relations; pub mod response; +mod selection; pub mod serialization; use ndc_models::{QueryRequest, QueryResponse}; @@ -19,7 +22,7 @@ use self::execute_query_request::execute_query_request; pub use self::{ make_selector::make_selector, make_sort::make_sort_stages, - pipeline::{is_response_faceted, pipeline_for_non_foreach, pipeline_for_query_request}, + pipeline::{pipeline_for_non_foreach, pipeline_for_query_request}, query_target::QueryTarget, response::QueryResponseError, }; @@ -44,11 +47,10 @@ mod tests { use mongodb::bson::{self, bson}; use ndc_models::{QueryResponse, RowSet}; use ndc_test_helpers::{ - binop, collection, column_aggregate, column_count_aggregate, field, named_type, - object_type, query, query_request, row_set, target, value, + binop, collection, field, named_type, object_type, query, query_request, row_set, target, + value, }; use pretty_assertions::assert_eq; - use serde_json::json; use super::execute_query_request; use crate::{ @@ -92,150 +94,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn executes_aggregation() -> Result<(), anyhow::Error> { - let query_request = query_request() - .collection("students") - .query(query().aggregates([ - column_count_aggregate!("count" => "gpa", distinct: true), - column_aggregate!("avg" => "gpa", "avg"), - ])) - .into(); - - let expected_response = row_set() - .aggregates([("count", json!(11)), ("avg", json!(3))]) - .into_response(); - - let expected_pipeline = bson!([ - { - "$facet": { - "avg": [ - { "$match": { "gpa": { "$ne": null } } }, - { "$group": { "_id": null, "result": { "$avg": "$gpa" } } }, - ], - "count": [ - { "$match": { "gpa": { "$ne": null } } }, - { "$group": { "_id": "$gpa" } }, - { "$count": "result" }, - ], - }, - }, - { - "$replaceWith": { - "aggregates": { - "avg": { - "$ifNull": [ - { - "$getField": { - "field": "result", - "input": { "$first": { "$getField": { "$literal": "avg" } } }, - } - }, - null - ] - }, - "count": { - "$ifNull": [ - { - "$getField": { - "field": "result", - "input": { "$first": { "$getField": { "$literal": "count" } } }, - } - }, - 0, - ] - }, - }, - }, - }, - ]); - - let db = mock_collection_aggregate_response_for_pipeline( - "students", - expected_pipeline, - bson!([{ - "aggregates": { - "count": 11, - "avg": 3, - }, - }]), - ); - - let result = execute_query_request(db, &students_config(), query_request).await?; - assert_eq!(result, expected_response); - Ok(()) - } - - #[tokio::test] - async fn executes_aggregation_with_fields() -> Result<(), anyhow::Error> { - let query_request = query_request() - .collection("students") - .query( - query() - .aggregates([column_aggregate!("avg" => "gpa", "avg")]) - .fields([field!("student_gpa" => "gpa")]) - .predicate(binop("_lt", target!("gpa"), value!(4.0))), - ) - .into(); - - let expected_response = row_set() - .aggregates([("avg", json!(3.1))]) - .row([("student_gpa", 3.1)]) - .into_response(); - - let expected_pipeline = bson!([ - { "$match": { "gpa": { "$lt": 4.0 } } }, - { - "$facet": { - "__ROWS__": [{ - "$replaceWith": { - "student_gpa": { "$ifNull": ["$gpa", null] }, - }, - }], - "avg": [ - { "$match": { "gpa": { "$ne": null } } }, - { "$group": { "_id": null, "result": { "$avg": "$gpa" } } }, - ], - }, - }, - { - "$replaceWith": { - "aggregates": { - "avg": { - "$ifNull": [ - { - "$getField": { - "field": "result", - "input": { "$first": { "$getField": { "$literal": "avg" } } }, - } - }, - null - ] - }, - }, - "rows": "$__ROWS__", - }, - }, - ]); - - let db = mock_collection_aggregate_response_for_pipeline( - "students", - expected_pipeline, - bson!([{ - "aggregates": { - "avg": 3.1, - }, - "rows": [{ - "student_gpa": 3.1, - }], - }]), - ); - - let result = execute_query_request(db, &students_config(), query_request).await?; - assert_eq!(result, expected_response); - Ok(()) - } - #[tokio::test] async fn converts_date_inputs_to_bson() -> Result<(), anyhow::Error> { let query_request = query_request() diff --git a/crates/mongodb-agent-common/src/query/pipeline.rs b/crates/mongodb-agent-common/src/query/pipeline.rs index 6174de15..c532610f 100644 --- a/crates/mongodb-agent-common/src/query/pipeline.rs +++ b/crates/mongodb-agent-common/src/query/pipeline.rs @@ -1,47 +1,20 @@ -use std::collections::BTreeMap; - -use configuration::MongoScalarType; use itertools::Itertools; -use mongodb::bson::{self, doc, Bson}; -use mongodb_support::{ - aggregate::{Accumulator, Pipeline, Selection, Stage}, - BsonScalarType, -}; -use ndc_models::FieldName; +use mongodb_support::aggregate::{Pipeline, Stage}; use tracing::instrument; use crate::{ - aggregation_function::AggregationFunction, - comparison_function::ComparisonFunction, interface_types::MongoAgentError, - mongo_query_plan::{ - Aggregate, ComparisonTarget, ComparisonValue, Expression, MongoConfiguration, Query, - QueryPlan, Type, - }, - mongodb::{sanitize::get_field, selection_from_query_request}, + mongo_query_plan::{MongoConfiguration, Query, QueryPlan}, + mongodb::sanitize::get_field, }; use super::{ - column_ref::ColumnRef, - constants::{RESULT_FIELD, ROWS_FIELD}, - foreach::pipeline_for_foreach, - make_selector, - make_sort::make_sort_stages, - native_query::pipeline_for_native_query, - query_level::QueryLevel, - relations::pipeline_for_relations, + aggregates::facet_pipelines_for_query, foreach::pipeline_for_foreach, + groups::pipeline_for_groups, is_response_faceted::is_response_faceted, make_selector, + make_sort::make_sort_stages, native_query::pipeline_for_native_query, query_level::QueryLevel, + relations::pipeline_for_relations, selection::selection_for_fields, }; -/// A query that includes aggregates will be run using a $facet pipeline stage, while a query -/// without aggregates will not. The choice affects how result rows are mapped to a QueryResponse. -/// -/// If we have aggregate pipelines they should be combined with the fields pipeline (if there is -/// one) in a single facet stage. If we have fields, and no aggregates then the fields pipeline -/// can instead be appended to `pipeline`. -pub fn is_response_faceted(query: &Query) -> bool { - query.has_aggregates() -} - /// Shared logic to produce a MongoDB aggregation pipeline for a query request. #[instrument(name = "Build Query Pipeline" skip_all, fields(internal.visibility = "user"))] pub fn pipeline_for_query_request( @@ -65,6 +38,7 @@ pub fn pipeline_for_non_foreach( ) -> Result { let query = &query_plan.query; let Query { + limit, offset, order_by, predicate, @@ -88,23 +62,24 @@ pub fn pipeline_for_non_foreach( .map(make_sort_stages) .flatten_ok() .collect::, _>>()?; + let limit_stage = limit.map(Into::into).map(Stage::Limit); let skip_stage = offset.map(Into::into).map(Stage::Skip); match_stage .into_iter() .chain(sort_stages) .chain(skip_stage) + .chain(limit_stage) .for_each(|stage| pipeline.push(stage)); - // `diverging_stages` includes either a $facet stage if the query includes aggregates, or the - // sort and limit stages if we are requesting rows only. In both cases the last stage is - // a $replaceWith. let diverging_stages = if is_response_faceted(query) { let (facet_pipelines, select_facet_results) = facet_pipelines_for_query(query_plan, query_level)?; let aggregation_stages = Stage::Facet(facet_pipelines); let replace_with_stage = Stage::ReplaceWith(select_facet_results); Pipeline::from_iter([aggregation_stages, replace_with_stage]) + } else if let Some(grouping) = &query.groups { + pipeline_for_groups(grouping)? } else { pipeline_for_fields_facet(query_plan, query_level)? }; @@ -114,20 +89,16 @@ pub fn pipeline_for_non_foreach( } /// Generate a pipeline to select fields requested by the given query. This is intended to be used -/// within a $facet stage. We assume that the query's `where`, `order_by`, `offset` criteria (which -/// are shared with aggregates) have already been applied, and that we have already joined -/// relations. +/// within a $facet stage. We assume that the query's `where`, `order_by`, `offset`, `limit` +/// criteria (which are shared with aggregates) have already been applied, and that we have already +/// joined relations. pub fn pipeline_for_fields_facet( query_plan: &QueryPlan, query_level: QueryLevel, ) -> Result { - let Query { - limit, - relationships, - .. - } = &query_plan.query; + let Query { relationships, .. } = &query_plan.query; - let mut selection = selection_from_query_request(query_plan)?; + let mut selection = selection_for_fields(query_plan.query.fields.as_ref())?; if query_level != QueryLevel::Top { // Queries higher up the chain might need to reference relationships from this query. So we // forward relationship arrays if this is not the top-level query. @@ -142,227 +113,6 @@ pub fn pipeline_for_fields_facet( } } - let limit_stage = limit.map(Into::into).map(Stage::Limit); let replace_with_stage: Stage = Stage::ReplaceWith(selection); - - Ok(Pipeline::from_iter( - [limit_stage, replace_with_stage.into()] - .into_iter() - .flatten(), - )) -} - -/// Returns a map of pipelines for evaluating each aggregate independently, paired with -/// a `Selection` that converts results of each pipeline to a format compatible with -/// `QueryResponse`. -fn facet_pipelines_for_query( - query_plan: &QueryPlan, - query_level: QueryLevel, -) -> Result<(BTreeMap, Selection), MongoAgentError> { - let query = &query_plan.query; - let Query { - aggregates, - aggregates_limit, - fields, - .. - } = query; - let mut facet_pipelines = aggregates - .iter() - .flatten() - .map(|(key, aggregate)| { - Ok(( - key.to_string(), - pipeline_for_aggregate(aggregate.clone(), *aggregates_limit)?, - )) - }) - .collect::, MongoAgentError>>()?; - - if fields.is_some() { - let fields_pipeline = pipeline_for_fields_facet(query_plan, query_level)?; - facet_pipelines.insert(ROWS_FIELD.to_owned(), fields_pipeline); - } - - // This builds a map that feeds into a `$replaceWith` pipeline stage to build a map of - // aggregation results. - let aggregate_selections: bson::Document = aggregates - .iter() - .flatten() - .map(|(key, aggregate)| { - // The facet result for each aggregate is an array containing a single document which - // has a field called `result`. This code selects each facet result by name, and pulls - // out the `result` value. - let value_expr = doc! { - "$getField": { - "field": RESULT_FIELD, // evaluates to the value of this field - "input": { "$first": get_field(key.as_str()) }, // field is accessed from this document - }, - }; - - // Matching SQL semantics, if a **count** aggregation does not match any rows we want - // to return zero. Other aggregations should return null. - let value_expr = if is_count(aggregate) { - doc! { - "$ifNull": [value_expr, 0], - } - // Otherwise if the aggregate value is missing because the aggregation applied to an - // empty document set then provide an explicit `null` value. - } else { - doc! { - "$ifNull": [value_expr, null] - } - }; - - (key.to_string(), value_expr.into()) - }) - .collect(); - - let select_aggregates = if !aggregate_selections.is_empty() { - Some(("aggregates".to_owned(), aggregate_selections.into())) - } else { - None - }; - - let select_rows = match fields { - Some(_) => Some(("rows".to_owned(), Bson::String(format!("${ROWS_FIELD}")))), - _ => None, - }; - - let selection = Selection::new( - [select_aggregates, select_rows] - .into_iter() - .flatten() - .collect(), - ); - - Ok((facet_pipelines, selection)) -} - -fn is_count(aggregate: &Aggregate) -> bool { - match aggregate { - Aggregate::ColumnCount { .. } => true, - Aggregate::StarCount { .. } => true, - Aggregate::SingleColumn { function, .. } => function.is_count(), - } -} - -fn pipeline_for_aggregate( - aggregate: Aggregate, - limit: Option, -) -> Result { - fn mk_target_field(name: FieldName, field_path: Option>) -> ComparisonTarget { - ComparisonTarget::Column { - name, - arguments: Default::default(), - field_path, - field_type: Type::Scalar(MongoScalarType::ExtendedJSON), // type does not matter here - } - } - - fn filter_to_documents_with_value( - target_field: ComparisonTarget, - ) -> Result { - Ok(Stage::Match(make_selector( - &Expression::BinaryComparisonOperator { - column: target_field, - operator: ComparisonFunction::NotEqual, - value: ComparisonValue::Scalar { - value: serde_json::Value::Null, - value_type: Type::Scalar(MongoScalarType::Bson(BsonScalarType::Null)), - }, - }, - )?)) - } - - let pipeline = match aggregate { - Aggregate::ColumnCount { - column, - field_path, - distinct, - .. - } if distinct => { - let target_field = mk_target_field(column, field_path); - Pipeline::from_iter( - [ - Some(filter_to_documents_with_value(target_field.clone())?), - limit.map(Into::into).map(Stage::Limit), - Some(Stage::Group { - key_expression: ColumnRef::from_comparison_target(&target_field) - .into_aggregate_expression() - .into_bson(), - accumulators: [].into(), - }), - Some(Stage::Count(RESULT_FIELD.to_string())), - ] - .into_iter() - .flatten(), - ) - } - - // TODO: ENG-1465 count by distinct - Aggregate::ColumnCount { - column, - field_path, - distinct: _, - .. - } => Pipeline::from_iter( - [ - Some(filter_to_documents_with_value(mk_target_field( - column, field_path, - ))?), - limit.map(Into::into).map(Stage::Limit), - Some(Stage::Count(RESULT_FIELD.to_string())), - ] - .into_iter() - .flatten(), - ), - - Aggregate::SingleColumn { - column, - field_path, - function, - .. - } => { - use AggregationFunction::*; - - let target_field = ComparisonTarget::Column { - name: column.clone(), - arguments: Default::default(), - field_path: field_path.clone(), - field_type: Type::Scalar(MongoScalarType::Bson(BsonScalarType::Null)), // type does not matter here - }; - let field_ref = ColumnRef::from_column_and_field_path(&column, field_path.as_ref()) - .into_aggregate_expression() - .into_bson(); - - let accumulator = match function { - Avg => Accumulator::Avg(field_ref), - Count => Accumulator::Count, - Min => Accumulator::Min(field_ref), - Max => Accumulator::Max(field_ref), - Sum => Accumulator::Sum(field_ref), - }; - Pipeline::from_iter( - [ - Some(filter_to_documents_with_value(target_field)?), - limit.map(Into::into).map(Stage::Limit), - Some(Stage::Group { - key_expression: Bson::Null, - accumulators: [(RESULT_FIELD.to_string(), accumulator)].into(), - }), - ] - .into_iter() - .flatten(), - ) - } - - Aggregate::StarCount {} => Pipeline::from_iter( - [ - limit.map(Into::into).map(Stage::Limit), - Some(Stage::Count(RESULT_FIELD.to_string())), - ] - .into_iter() - .flatten(), - ), - }; - Ok(pipeline) + Ok(Pipeline::new(vec![replace_with_stage])) } diff --git a/crates/mongodb-agent-common/src/query/query_variable_name.rs b/crates/mongodb-agent-common/src/query/query_variable_name.rs index ee910b34..66589962 100644 --- a/crates/mongodb-agent-common/src/query/query_variable_name.rs +++ b/crates/mongodb-agent-common/src/query/query_variable_name.rs @@ -1,6 +1,7 @@ use std::borrow::Cow; use configuration::MongoScalarType; +use itertools::Itertools; use crate::{ mongo_query_plan::{ObjectType, Type}, @@ -28,6 +29,7 @@ fn type_name(input_type: &Type) -> Cow<'static, str> { Type::Object(obj) => object_type_name(obj).into(), Type::ArrayOf(t) => format!("[{}]", type_name(t)).into(), Type::Nullable(t) => format!("nullable({})", type_name(t)).into(), + Type::Tuple(ts) => format!("({})", ts.iter().map(type_name).join(", ")).into(), } } diff --git a/crates/mongodb-agent-common/src/query/response.rs b/crates/mongodb-agent-common/src/query/response.rs index 714b4559..66daad94 100644 --- a/crates/mongodb-agent-common/src/query/response.rs +++ b/crates/mongodb-agent-common/src/query/response.rs @@ -1,21 +1,24 @@ -use std::collections::BTreeMap; +use std::{borrow::Cow, collections::BTreeMap}; use configuration::MongoScalarType; use indexmap::IndexMap; use itertools::Itertools; use mongodb::bson::{self, Bson}; use mongodb_support::ExtendedJsonMode; -use ndc_models::{QueryResponse, RowFieldValue, RowSet}; -use serde::Deserialize; +use ndc_models::{Group, QueryResponse, RowFieldValue, RowSet}; use thiserror::Error; use tracing::instrument; use crate::{ + constants::{BsonRowSet, GROUP_DIMENSIONS_KEY, ROW_SET_AGGREGATES_KEY, ROW_SET_GROUPS_KEY, ROW_SET_ROWS_KEY}, mongo_query_plan::{ - Aggregate, Field, NestedArray, NestedField, NestedObject, ObjectField, ObjectType, Query, - QueryPlan, Type, + Aggregate, Dimension, Field, Grouping, NestedArray, NestedField, NestedObject, ObjectField, + ObjectType, Query, QueryPlan, Type, + }, + query::{ + is_response_faceted::is_response_faceted, + serialization::{bson_to_json, BsonToJsonError}, }, - query::serialization::{bson_to_json, BsonToJsonError}, }; use super::serialization::is_nullable; @@ -31,6 +34,9 @@ pub enum QueryResponseError { #[error("{0}")] BsonToJson(#[from] BsonToJsonError), + #[error("a group response is missing its '{GROUP_DIMENSIONS_KEY}' field")] + GroupMissingDimensions { path: Vec }, + #[error("expected a single response document from MongoDB, but did not get one")] ExpectedSingleDocument, @@ -40,14 +46,6 @@ pub enum QueryResponseError { type Result = std::result::Result; -#[derive(Debug, Deserialize)] -struct BsonRowSet { - #[serde(default)] - aggregates: Bson, - #[serde(default)] - rows: Vec, -} - #[instrument(name = "Serialize Query Response", skip_all, fields(internal.visibility = "user"))] pub fn serialize_query_response( mode: ExtendedJsonMode, @@ -61,7 +59,7 @@ pub fn serialize_query_response( .into_iter() .map(|document| { let row_set = bson::from_document(document)?; - serialize_row_set_with_aggregates( + serialize_row_set( mode, &[collection_name.as_str()], &query_plan.query, @@ -69,14 +67,21 @@ pub fn serialize_query_response( ) }) .try_collect() - } else if query_plan.query.has_aggregates() { + } else if is_response_faceted(&query_plan.query) { let row_set = parse_single_document(response_documents)?; - Ok(vec![serialize_row_set_with_aggregates( + Ok(vec![serialize_row_set( mode, &[], &query_plan.query, row_set, )?]) + } else if let Some(grouping) = &query_plan.query.groups { + Ok(vec![serialize_row_set_groups_only( + mode, + &[], + grouping, + response_documents, + )?]) } else { Ok(vec![serialize_row_set_rows_only( mode, @@ -90,7 +95,7 @@ pub fn serialize_query_response( Ok(response) } -// When there are no aggregates we expect a list of rows +// When there are no aggregates or groups we expect a list of rows fn serialize_row_set_rows_only( mode: ExtendedJsonMode, path: &[&str], @@ -106,13 +111,27 @@ fn serialize_row_set_rows_only( Ok(RowSet { aggregates: None, rows, - groups: None, // TODO: ENG-1486 implement group by + groups: None, + }) +} + +fn serialize_row_set_groups_only( + mode: ExtendedJsonMode, + path: &[&str], + grouping: &Grouping, + docs: Vec, +) -> Result { + Ok(RowSet { + aggregates: None, + rows: None, + groups: Some(serialize_groups(mode, path, grouping, docs)?), }) } -// When there are aggregates we expect a single document with `rows` and `aggregates` -// fields -fn serialize_row_set_with_aggregates( +// When a query includes aggregates, or some combination of aggregates, rows, or groups then the +// response is "faceted" to give us a single document with `rows`, `aggregates`, and `groups` +// fields. +fn serialize_row_set( mode: ExtendedJsonMode, path: &[&str], query: &Query, @@ -124,6 +143,12 @@ fn serialize_row_set_with_aggregates( .map(|aggregates| serialize_aggregates(mode, path, aggregates, row_set.aggregates)) .transpose()?; + let groups = query + .groups + .as_ref() + .map(|grouping| serialize_groups(mode, path, grouping, row_set.groups)) + .transpose()?; + let rows = query .fields .as_ref() @@ -133,7 +158,7 @@ fn serialize_row_set_with_aggregates( Ok(RowSet { aggregates, rows, - groups: None, // TODO: ENG-1486 implement group by + groups, }) } @@ -144,7 +169,7 @@ fn serialize_aggregates( value: Bson, ) -> Result> { let aggregates_type = type_for_aggregates(query_aggregates); - let json = bson_to_json(mode, &aggregates_type, value)?; + let json = bson_to_json(mode, &Type::Object(aggregates_type), value)?; // The NDC type uses an IndexMap for aggregate values; we need to convert the map // underlying the Value::Object value to an IndexMap @@ -182,18 +207,68 @@ fn serialize_rows( .try_collect() } +fn serialize_groups( + mode: ExtendedJsonMode, + path: &[&str], + grouping: &Grouping, + docs: Vec, +) -> Result> { + docs.into_iter() + .map(|doc| { + let dimensions_field_value = doc.get(GROUP_DIMENSIONS_KEY).ok_or_else(|| { + QueryResponseError::GroupMissingDimensions { + path: path_to_owned(path), + } + })?; + + let dimensions_array = match dimensions_field_value { + Bson::Array(vec) => Cow::Borrowed(vec), + other_bson_value => Cow::Owned(vec![other_bson_value.clone()]), + }; + + let dimensions = grouping + .dimensions + .iter() + .zip(dimensions_array.iter()) + .map(|(dimension_definition, dimension_value)| { + Ok(bson_to_json( + mode, + dimension_definition.value_type(), + dimension_value.clone(), + )?) + }) + .collect::>()?; + + let aggregates = serialize_aggregates(mode, path, &grouping.aggregates, doc.into())?; + + // TODO: This conversion step can be removed when the aggregates map key type is + // changed from String to FieldName + let aggregates = aggregates + .into_iter() + .map(|(key, value)| (key.to_string(), value)) + .collect(); + + Ok(Group { + dimensions, + aggregates, + }) + }) + .try_collect() +} + fn type_for_row_set( path: &[&str], aggregates: &Option>, fields: &Option>, + groups: &Option, ) -> Result { let mut object_fields = BTreeMap::new(); if let Some(aggregates) = aggregates { object_fields.insert( - "aggregates".into(), + ROW_SET_AGGREGATES_KEY.into(), ObjectField { - r#type: type_for_aggregates(aggregates), + r#type: Type::Object(type_for_aggregates(aggregates)), parameters: Default::default(), }, ); @@ -202,7 +277,7 @@ fn type_for_row_set( if let Some(query_fields) = fields { let row_type = type_for_row(path, query_fields)?; object_fields.insert( - "rows".into(), + ROW_SET_ROWS_KEY.into(), ObjectField { r#type: Type::ArrayOf(Box::new(row_type)), parameters: Default::default(), @@ -210,13 +285,36 @@ fn type_for_row_set( ); } + if let Some(grouping) = groups { + let dimension_types = grouping + .dimensions + .iter() + .map(Dimension::value_type) + .cloned() + .collect(); + let dimension_tuple_type = Type::Tuple(dimension_types); + let mut group_object_type = type_for_aggregates(&grouping.aggregates); + group_object_type + .fields + .insert(GROUP_DIMENSIONS_KEY.into(), dimension_tuple_type.into()); + object_fields.insert( + ROW_SET_GROUPS_KEY.into(), + ObjectField { + r#type: Type::array_of(Type::Object(group_object_type)), + parameters: Default::default(), + }, + ); + } + Ok(Type::Object(ObjectType { fields: object_fields, name: None, })) } -fn type_for_aggregates(query_aggregates: &IndexMap) -> Type { +fn type_for_aggregates( + query_aggregates: &IndexMap, +) -> ObjectType { let fields = query_aggregates .iter() .map(|(field_name, aggregate)| { @@ -238,7 +336,7 @@ fn type_for_aggregates(query_aggregates: &IndexMap Result { .. } => type_for_nested_field(path, column_type, nested_field)?, Field::Relationship { - aggregates, fields, .. - } => type_for_row_set(path, aggregates, fields)?, + aggregates, + fields, + groups, + .. + } => type_for_row_set(path, aggregates, fields, groups)?, }; Ok(field_type) } @@ -715,6 +816,7 @@ mod tests { &path, &query_plan.query.aggregates, &query_plan.query.fields, + &query_plan.query.groups, )?; let expected = Type::object([( diff --git a/crates/mongodb-agent-common/src/mongodb/selection.rs b/crates/mongodb-agent-common/src/query/selection.rs similarity index 71% rename from crates/mongodb-agent-common/src/mongodb/selection.rs rename to crates/mongodb-agent-common/src/query/selection.rs index fbc3f0bf..d97b042a 100644 --- a/crates/mongodb-agent-common/src/mongodb/selection.rs +++ b/crates/mongodb-agent-common/src/query/selection.rs @@ -5,27 +5,31 @@ use ndc_models::FieldName; use nonempty::NonEmpty; use crate::{ + constants::{ROW_SET_AGGREGATES_KEY, ROW_SET_GROUPS_KEY, ROW_SET_ROWS_KEY}, interface_types::MongoAgentError, - mongo_query_plan::{Field, NestedArray, NestedField, NestedObject, QueryPlan}, + mongo_query_plan::{Field, NestedArray, NestedField, NestedObject}, mongodb::sanitize::get_field, - query::column_ref::ColumnRef, + query::{column_ref::ColumnRef, groups::selection_for_grouping}, }; -pub fn selection_from_query_request( - query_request: &QueryPlan, +use super::is_response_faceted::ResponseFacets; + +/// Creates a document to use in a $replaceWith stage to limit query results to the specific fields +/// requested. Assumes that only fields are requested. +pub fn selection_for_fields( + fields: Option<&IndexMap>, ) -> Result { - // let fields = (&query_request.query.fields).flatten().unwrap_or_default(); let empty_map = IndexMap::new(); - let fields = if let Some(fs) = &query_request.query.fields { + let fields = if let Some(fs) = fields { fs } else { &empty_map }; - let doc = from_query_request_helper(None, fields)?; + let doc = for_fields_helper(None, fields)?; Ok(Selection::new(doc)) } -fn from_query_request_helper( +fn for_fields_helper( parent: Option>, field_selection: &IndexMap, ) -> Result { @@ -62,7 +66,7 @@ fn selection_for_field( .. } => { let col_ref = nested_column_reference(parent, column); - let nested_selection = from_query_request_helper(Some(col_ref.clone()), fields)?; + let nested_selection = for_fields_helper(Some(col_ref.clone()), fields)?; Ok(doc! {"$cond": {"if": col_ref.into_aggregate_expression(), "then": nested_selection, "else": Bson::Null}}.into()) } Field::Column { @@ -77,13 +81,22 @@ fn selection_for_field( relationship, aggregates, fields, + groups, .. } => { + // TODO: ENG-1569 If we get a unification of two relationship references where one + // selects only fields, and the other selects only groups, we may end up in a broken + // state where the response should be faceted but is not. Data will be populated + // correctly - the issue is only here where we need to figure out whether to write + // a selection for faceted data or not. Instead of referencing the + // [Field::Relationship] value to determine faceting we need to reference the + // [Relationship] attached to the [Query] that populated it. + // The pipeline for the relationship has already selected the requested fields with the // appropriate aliases. At this point all we need to do is to prune the selection down // to requested fields, omitting fields of the relationship that were selected for // filtering and sorting. - let field_selection: Option = fields.as_ref().map(|fields| { + let field_selection = |fields: &IndexMap| -> Document { fields .iter() .map(|(field_name, _)| { @@ -96,52 +109,90 @@ fn selection_for_field( ) }) .collect() - }); + }; - if let Some(aggregates) = aggregates { - let aggregate_selecion: Document = aggregates - .iter() - .map(|(aggregate_name, _)| { - ( - aggregate_name.to_string(), - format!("$$row_set.aggregates.{aggregate_name}").into(), - ) - }) - .collect(); - let mut new_row_set = doc! { "aggregates": aggregate_selecion }; + // Field of the incoming pipeline document that contains data fetched for the + // relationship. + let relationship_field = get_field(relationship.as_str()); - if let Some(field_selection) = field_selection { - new_row_set.insert( - "rows", - doc! { - "$map": { - "input": "$$row_set.rows", - "in": field_selection, - } - }, - ); - } + let doc = match ResponseFacets::from_parameters( + aggregates.as_ref(), + fields.as_ref(), + groups.as_ref(), + ) { + ResponseFacets::Combination { + aggregates, + fields, + groups, + } => { + let aggregate_selection: Document = aggregates + .into_iter() + .flatten() + .map(|(aggregate_name, _)| { + ( + aggregate_name.to_string(), + format!("$$row_set.{ROW_SET_AGGREGATES_KEY}.{aggregate_name}") + .into(), + ) + }) + .collect(); + let mut new_row_set = doc! { ROW_SET_AGGREGATES_KEY: aggregate_selection }; + + if let Some(fields) = fields { + new_row_set.insert( + ROW_SET_ROWS_KEY, + doc! { + "$map": { + "input": format!("$$row_set.{ROW_SET_ROWS_KEY}"), + "in": field_selection(fields), + } + }, + ); + } - Ok(doc! { - "$let": { - "vars": { "row_set": { "$first": get_field(relationship.as_str()) } }, - "in": new_row_set, + if let Some(grouping) = groups { + new_row_set.insert( + ROW_SET_GROUPS_KEY, + doc! { + "$map": { + "input": format!("$$row_set.{ROW_SET_GROUPS_KEY}"), + "as": "CURRENT", // implicitly changes the document root in `in` to be the array element + "in": selection_for_grouping(grouping), + } + }, + ); + } + + doc! { + "$let": { + "vars": { "row_set": { "$first": relationship_field } }, + "in": new_row_set, + } } } - .into()) - } else if let Some(field_selection) = field_selection { - Ok(doc! { - "rows": { + ResponseFacets::FieldsOnly(fields) => doc! { + ROW_SET_ROWS_KEY: { "$map": { - "input": get_field(relationship.as_str()), - "in": field_selection, + "input": relationship_field, + "in": field_selection(fields), } } - } - .into()) - } else { - Ok(doc! { "rows": [] }.into()) - } + }, + ResponseFacets::GroupsOnly(grouping) => doc! { + // We can reuse the grouping selection logic instead of writing a custom one + // like with `field_selection` because `selection_for_grouping` only selects + // top-level keys - it doesn't have logic that we don't want to duplicate like + // `selection_for_field` does. + ROW_SET_GROUPS_KEY: { + "$map": { + "input": relationship_field, + "as": "CURRENT", // implicitly changes the document root in `in` to be the array element + "in": selection_for_grouping(grouping), + } + } + }, + }; + Ok(doc.into()) } } } @@ -154,7 +205,7 @@ fn selection_for_array( match field { NestedField::Object(NestedObject { fields }) => { let mut nested_selection = - from_query_request_helper(Some(ColumnRef::variable("this")), fields)?; + for_fields_helper(Some(ColumnRef::variable("this")), fields)?; for _ in 0..array_nesting_level { nested_selection = doc! {"$map": {"input": "$$this", "in": nested_selection}} } @@ -188,7 +239,9 @@ mod tests { }; use pretty_assertions::assert_eq; - use crate::{mongo_query_plan::MongoConfiguration, mongodb::selection_from_query_request}; + use crate::mongo_query_plan::MongoConfiguration; + + use super::*; #[test] fn calculates_selection_for_query_request() -> Result<(), anyhow::Error> { @@ -216,7 +269,7 @@ mod tests { let query_plan = plan_for_query_request(&foo_config(), query_request)?; - let selection = selection_from_query_request(&query_plan)?; + let selection = selection_for_fields(query_plan.query.fields.as_ref())?; assert_eq!( Into::::into(selection), doc! { @@ -308,7 +361,7 @@ mod tests { // twice (once with the key `class_students`, and then with the key `class_students_0`). // This is because the queries on the two relationships have different scope names. The // query would work with just one lookup. Can we do that optimization? - let selection = selection_from_query_request(&query_plan)?; + let selection = selection_for_fields(query_plan.query.fields.as_ref())?; assert_eq!( Into::::into(selection), doc! { diff --git a/crates/mongodb-agent-common/src/query/serialization/bson_to_json.rs b/crates/mongodb-agent-common/src/query/serialization/bson_to_json.rs index d7321927..7cc80e02 100644 --- a/crates/mongodb-agent-common/src/query/serialization/bson_to_json.rs +++ b/crates/mongodb-agent-common/src/query/serialization/bson_to_json.rs @@ -21,14 +21,14 @@ pub enum BsonToJsonError { #[error("error converting UUID from BSON to JSON: {0}")] UuidConversion(#[from] bson::uuid::Error), - #[error("input object of type {0:?} is missing a field, \"{1}\"")] + #[error("input object of type {0} is missing a field, \"{1}\"")] MissingObjectField(Type, String), #[error("error converting value to JSON: {0}")] Serde(#[from] serde_json::Error), // TODO: It would be great if we could capture a path into the larger BSON value here - #[error("expected a value of type {0:?}, but got {1}")] + #[error("expected a value of type {0}, but got {1}")] TypeMismatch(Type, Bson), #[error("unknown object type, \"{0}\"")] @@ -52,6 +52,7 @@ pub fn bson_to_json(mode: ExtendedJsonMode, expected_type: &Type, value: Bson) - } Type::Object(object_type) => convert_object(mode, object_type, value), Type::ArrayOf(element_type) => convert_array(mode, element_type, value), + Type::Tuple(element_types) => convert_tuple(mode, element_types, value), Type::Nullable(t) => convert_nullable(mode, t, value), } } @@ -118,6 +119,22 @@ fn convert_array(mode: ExtendedJsonMode, element_type: &Type, value: Bson) -> Re Ok(Value::Array(json_array)) } +fn convert_tuple(mode: ExtendedJsonMode, element_types: &[Type], value: Bson) -> Result { + let values = match value { + Bson::Array(values) => Ok(values), + _ => Err(BsonToJsonError::TypeMismatch( + Type::Tuple(element_types.to_vec()), + value, + )), + }?; + let json_array = element_types + .iter() + .zip(values) + .map(|(element_type, value)| bson_to_json(mode, element_type, value)) + .try_collect()?; + Ok(Value::Array(json_array)) +} + fn convert_object(mode: ExtendedJsonMode, object_type: &ObjectType, value: Bson) -> Result { let input_doc = match value { Bson::Document(fields) => Ok(fields), diff --git a/crates/mongodb-agent-common/src/query/serialization/json_to_bson.rs b/crates/mongodb-agent-common/src/query/serialization/json_to_bson.rs index 4faa26cd..7c04b91a 100644 --- a/crates/mongodb-agent-common/src/query/serialization/json_to_bson.rs +++ b/crates/mongodb-agent-common/src/query/serialization/json_to_bson.rs @@ -66,6 +66,7 @@ pub fn json_to_bson(expected_type: &Type, value: Value) -> Result { Type::Object(object_type) => convert_object(object_type, value), Type::ArrayOf(element_type) => convert_array(element_type, value), Type::Nullable(t) => convert_nullable(t, value), + Type::Tuple(element_types) => convert_tuple(element_types, value), } } @@ -130,6 +131,16 @@ fn convert_array(element_type: &Type, value: Value) -> Result { Ok(Bson::Array(bson_array)) } +fn convert_tuple(element_types: &[Type], value: Value) -> Result { + let input_elements: Vec = serde_json::from_value(value)?; + let bson_array = element_types + .iter() + .zip(input_elements) + .map(|(element_type, v)| json_to_bson(element_type, v)) + .try_collect()?; + Ok(Bson::Array(bson_array)) +} + fn convert_object(object_type: &ObjectType, value: Value) -> Result { let input_fields: BTreeMap = serde_json::from_value(value)?; let bson_doc: bson::Document = object_type diff --git a/crates/mongodb-agent-common/src/scalar_types_capabilities.rs b/crates/mongodb-agent-common/src/scalar_types_capabilities.rs index daf29984..3140217d 100644 --- a/crates/mongodb-agent-common/src/scalar_types_capabilities.rs +++ b/crates/mongodb-agent-common/src/scalar_types_capabilities.rs @@ -10,6 +10,7 @@ use ndc_models::{ use crate::aggregation_function::{AggregationFunction, AggregationFunction as A}; use crate::comparison_function::{ComparisonFunction, ComparisonFunction as C}; +use crate::mongo_query_plan as plan; use BsonScalarType as S; @@ -53,9 +54,6 @@ fn extended_json_scalar_type() -> (ndc_models::ScalarTypeName, ScalarType) { name: mongodb_support::EXTENDED_JSON_TYPE_NAME.into(), }, }, - Plan::Count => NDC::Custom { - result_type: bson_to_named_type(S::Int), - }, Plan::Min => NDC::Min, Plan::Max => NDC::Max, Plan::Sum => NDC::Custom { @@ -164,34 +162,31 @@ fn aggregate_functions( scalar_type: BsonScalarType, ) -> impl Iterator { use AggregateFunctionDefinition as NDC; - [( - A::Count, - NDC::Custom { - result_type: bson_to_named_type(S::Int), - }, - )] - .into_iter() - .chain(iter_if( + iter_if( scalar_type.is_orderable(), [(A::Min, NDC::Min), (A::Max, NDC::Max)].into_iter(), - )) + ) .chain(iter_if( scalar_type.is_numeric(), [ ( A::Avg, NDC::Average { - result_type: bson_to_scalar_type_name(S::Double), + result_type: bson_to_scalar_type_name( + A::expected_result_type(A::Avg, &plan::Type::scalar(scalar_type)) + .expect("average result type is defined"), + // safety: this expect is checked in integration tests + ), }, ), ( A::Sum, NDC::Sum { - result_type: bson_to_scalar_type_name(if scalar_type.is_fractional() { - S::Double - } else { - S::Long - }), + result_type: bson_to_scalar_type_name( + A::expected_result_type(A::Sum, &plan::Type::scalar(scalar_type)) + .expect("sum result type is defined"), + // safety: this expect is checked in integration tests + ), }, ), ] diff --git a/crates/mongodb-connector/src/capabilities.rs b/crates/mongodb-connector/src/capabilities.rs index 5ab5f8ea..6e7a5724 100644 --- a/crates/mongodb-connector/src/capabilities.rs +++ b/crates/mongodb-connector/src/capabilities.rs @@ -1,5 +1,5 @@ use ndc_sdk::models::{ - AggregateCapabilities, Capabilities, ExistsCapabilities, LeafCapability, + AggregateCapabilities, Capabilities, ExistsCapabilities, GroupByCapabilities, LeafCapability, NestedArrayFilterByCapabilities, NestedFieldCapabilities, NestedFieldFilterByCapabilities, QueryCapabilities, RelationshipCapabilities, }; @@ -9,7 +9,11 @@ pub fn mongo_capabilities() -> Capabilities { query: QueryCapabilities { aggregates: Some(AggregateCapabilities { filter_by: None, - group_by: None, + group_by: Some(GroupByCapabilities { + filter: None, + order: None, + paginate: None, + }), }), variables: Some(LeafCapability {}), explain: Some(LeafCapability {}), diff --git a/crates/mongodb-support/src/aggregate/selection.rs b/crates/mongodb-support/src/aggregate/selection.rs index faa04b0d..8d6fbf28 100644 --- a/crates/mongodb-support/src/aggregate/selection.rs +++ b/crates/mongodb-support/src/aggregate/selection.rs @@ -36,6 +36,12 @@ impl Extend<(String, Bson)> for Selection { } } +impl From for Bson { + fn from(value: Selection) -> Self { + value.0.into() + } +} + impl From for bson::Document { fn from(value: Selection) -> Self { value.0 diff --git a/crates/ndc-query-plan/src/lib.rs b/crates/ndc-query-plan/src/lib.rs index 3af97eca..000e7e5b 100644 --- a/crates/ndc-query-plan/src/lib.rs +++ b/crates/ndc-query-plan/src/lib.rs @@ -6,7 +6,8 @@ pub mod vec_set; pub use mutation_plan::*; pub use plan_for_query_request::{ - plan_for_mutation_request, plan_for_query_request, + plan_for_mutation_request::plan_for_mutation_request, + plan_for_query_request, query_context::QueryContext, query_plan_error::QueryPlanError, type_annotated_field::{type_annotated_field, type_annotated_nested_field}, diff --git a/crates/ndc-query-plan/src/plan_for_query_request/helpers.rs b/crates/ndc-query-plan/src/plan_for_query_request/helpers.rs index e8503f07..11abe277 100644 --- a/crates/ndc-query-plan/src/plan_for_query_request/helpers.rs +++ b/crates/ndc-query-plan/src/plan_for_query_request/helpers.rs @@ -1,5 +1,6 @@ use std::collections::BTreeMap; +use itertools::Itertools as _; use ndc_models::{self as ndc}; use crate::{self as plan}; @@ -66,6 +67,21 @@ fn find_object_type<'a, S>( }), crate::Type::Nullable(t) => find_object_type(t, parent_type, field_name), crate::Type::Object(object_type) => Ok(object_type), + crate::Type::Tuple(ts) => { + let object_types = ts + .iter() + .flat_map(|t| find_object_type(t, parent_type, field_name)) + .collect_vec(); + if object_types.len() == 1 { + Ok(object_types[0]) + } else { + Err(QueryPlanError::ExpectedObjectTypeAtField { + parent_type: parent_type.to_owned(), + field_name: field_name.to_owned(), + got: "array".to_owned(), + }) + } + } } } diff --git a/crates/ndc-query-plan/src/plan_for_query_request/mod.rs b/crates/ndc-query-plan/src/plan_for_query_request/mod.rs index 71020d93..f5d87585 100644 --- a/crates/ndc-query-plan/src/plan_for_query_request/mod.rs +++ b/crates/ndc-query-plan/src/plan_for_query_request/mod.rs @@ -1,6 +1,9 @@ mod helpers; mod plan_for_arguments; -mod plan_for_mutation_request; +mod plan_for_expression; +mod plan_for_grouping; +pub mod plan_for_mutation_request; +mod plan_for_relationship; pub mod query_context; pub mod query_plan_error; mod query_plan_state; @@ -12,23 +15,18 @@ mod plan_test_helpers; #[cfg(test)] mod tests; -use std::{collections::VecDeque, iter::once}; - -use crate::{self as plan, type_annotated_field, ObjectType, QueryPlan, Scope}; -use helpers::find_nested_collection_type; +use crate::{self as plan, type_annotated_field, QueryPlan, Scope}; use indexmap::IndexMap; use itertools::Itertools; -use ndc::{ExistsInCollection, QueryRequest}; -use ndc_models::{self as ndc}; +use ndc_models::{self as ndc, QueryRequest}; +use plan_for_relationship::plan_for_relationship_path; use query_plan_state::QueryPlanInfo; -pub use self::plan_for_mutation_request::plan_for_mutation_request; use self::{ - helpers::{ - find_nested_collection_object_type, find_object_field, get_object_field_by_path, - lookup_relationship, - }, + helpers::{find_object_field, get_object_field_by_path}, plan_for_arguments::{plan_arguments_from_plan_parameters, plan_for_arguments}, + plan_for_expression::plan_for_expression, + plan_for_grouping::plan_for_grouping, query_context::QueryContext, query_plan_error::QueryPlanError, query_plan_state::QueryPlanState, @@ -102,8 +100,10 @@ pub fn plan_for_query( ) -> Result> { let mut plan_state = plan_state.state_for_subquery(); - let aggregates = - plan_for_aggregates(&mut plan_state, collection_object_type, query.aggregates)?; + let aggregates = query + .aggregates + .map(|aggregates| plan_for_aggregates(&mut plan_state, collection_object_type, aggregates)) + .transpose()?; let fields = plan_for_fields( &mut plan_state, root_collection_object_type, @@ -138,14 +138,26 @@ pub fn plan_for_query( }) .transpose()?; + let groups = query + .groups + .map(|grouping| { + plan_for_grouping( + &mut plan_state, + root_collection_object_type, + collection_object_type, + grouping, + ) + }) + .transpose()?; + Ok(plan::Query { aggregates, - aggregates_limit: limit, fields, order_by, limit, offset, predicate, + groups, relationships: plan_state.into_relationships(), scope: None, }) @@ -154,21 +166,17 @@ pub fn plan_for_query( fn plan_for_aggregates( plan_state: &mut QueryPlanState<'_, T>, collection_object_type: &plan::ObjectType, - ndc_aggregates: Option>, -) -> Result>>> { + ndc_aggregates: IndexMap, +) -> Result>> { ndc_aggregates - .map(|aggregates| -> Result<_> { - aggregates - .into_iter() - .map(|(name, aggregate)| { - Ok(( - name, - plan_for_aggregate(plan_state, collection_object_type, aggregate)?, - )) - }) - .collect() + .into_iter() + .map(|(name, aggregate)| { + Ok(( + name, + plan_for_aggregate(plan_state, collection_object_type, aggregate)?, + )) }) - .transpose() + .collect() } fn plan_for_aggregate( @@ -204,6 +212,7 @@ fn plan_for_aggregate( } => { let nested_object_field = get_object_field_by_path(collection_object_type, &column, field_path.as_deref())?; + let column_type = &nested_object_field.r#type; let object_field = collection_object_type.get(&column)?; let plan_arguments = plan_arguments_from_plan_parameters( plan_state, @@ -212,9 +221,10 @@ fn plan_for_aggregate( )?; let (function, definition) = plan_state .context - .find_aggregation_function_definition(&nested_object_field.r#type, &function)?; + .find_aggregation_function_definition(column_type, &function)?; Ok(plan::Aggregate::SingleColumn { column, + column_type: column_type.clone(), arguments: plan_arguments, field_path, function, @@ -371,14 +381,16 @@ fn plan_for_order_by_element( )?; let object_field = find_object_field(&collection_object_type, &column)?; + let column_type = &object_field.r#type; let (function, function_definition) = plan_state .context - .find_aggregation_function_definition(&object_field.r#type, &function)?; + .find_aggregation_function_definition(column_type, &function)?; plan::OrderByTarget::Aggregate { path: plan_path, aggregate: plan::Aggregate::SingleColumn { column, + column_type: column_type.clone(), arguments: plan_arguments, field_path, function, @@ -409,540 +421,3 @@ fn plan_for_order_by_element( target, }) } - -/// Returns list of aliases for joins to traverse, plus the object type of the final collection in -/// the path. -fn plan_for_relationship_path( - plan_state: &mut QueryPlanState<'_, T>, - root_collection_object_type: &plan::ObjectType, - object_type: &plan::ObjectType, - relationship_path: Vec, - requested_columns: Vec, // columns to select from last path element -) -> Result<(Vec, ObjectType)> { - let end_of_relationship_path_object_type = relationship_path - .last() - .map(|last_path_element| { - let relationship = lookup_relationship( - plan_state.collection_relationships, - &last_path_element.relationship, - )?; - plan_state - .context - .find_collection_object_type(&relationship.target_collection) - }) - .transpose()?; - let target_object_type = end_of_relationship_path_object_type.unwrap_or(object_type.clone()); - - let reversed_relationship_path = { - let mut path = relationship_path; - path.reverse(); - path - }; - - let vec_deque = plan_for_relationship_path_helper( - plan_state, - root_collection_object_type, - reversed_relationship_path, - requested_columns, - )?; - let aliases = vec_deque.into_iter().collect(); - - Ok((aliases, target_object_type)) -} - -fn plan_for_relationship_path_helper( - plan_state: &mut QueryPlanState<'_, T>, - root_collection_object_type: &plan::ObjectType, - mut reversed_relationship_path: Vec, - requested_columns: Vec, // columns to select from last path element -) -> Result> { - if reversed_relationship_path.is_empty() { - return Ok(VecDeque::new()); - } - - // safety: we just made an early return if the path is empty - let head = reversed_relationship_path.pop().unwrap(); - let tail = reversed_relationship_path; - let is_last = tail.is_empty(); - - let ndc::PathElement { - field_path: _, // TODO: ENG-1458 support nested relationships - relationship, - arguments, - predicate, - } = head; - - let relationship_def = lookup_relationship(plan_state.collection_relationships, &relationship)?; - let related_collection_type = plan_state - .context - .find_collection_object_type(&relationship_def.target_collection)?; - let mut nested_state = plan_state.state_for_subquery(); - - // If this is the last path element then we need to apply the requested fields to the - // relationship query. Otherwise we need to recursively process the rest of the path. Both - // cases take ownership of `requested_columns` so we group them together. - let (mut rest_path, fields) = if is_last { - let fields = requested_columns - .into_iter() - .map(|column_name| { - let object_field = - find_object_field(&related_collection_type, &column_name)?.clone(); - Ok(( - column_name.clone(), - plan::Field::Column { - column: column_name, - fields: None, - column_type: object_field.r#type, - }, - )) - }) - .collect::>()?; - (VecDeque::new(), Some(fields)) - } else { - let rest = plan_for_relationship_path_helper( - &mut nested_state, - root_collection_object_type, - tail, - requested_columns, - )?; - (rest, None) - }; - - let predicate_plan = predicate - .map(|p| { - plan_for_expression( - &mut nested_state, - root_collection_object_type, - &related_collection_type, - *p, - ) - }) - .transpose()?; - - let nested_relationships = nested_state.into_relationships(); - - let relationship_query = plan::Query { - predicate: predicate_plan, - relationships: nested_relationships, - fields, - ..Default::default() - }; - - let relation_key = - plan_state.register_relationship(relationship, arguments, relationship_query)?; - - rest_path.push_front(relation_key); - Ok(rest_path) -} - -fn plan_for_expression( - plan_state: &mut QueryPlanState, - root_collection_object_type: &plan::ObjectType, - object_type: &plan::ObjectType, - expression: ndc::Expression, -) -> Result> { - match expression { - ndc::Expression::And { expressions } => Ok(plan::Expression::And { - expressions: expressions - .into_iter() - .map(|expr| { - plan_for_expression(plan_state, root_collection_object_type, object_type, expr) - }) - .collect::>()?, - }), - ndc::Expression::Or { expressions } => Ok(plan::Expression::Or { - expressions: expressions - .into_iter() - .map(|expr| { - plan_for_expression(plan_state, root_collection_object_type, object_type, expr) - }) - .collect::>()?, - }), - ndc::Expression::Not { expression } => Ok(plan::Expression::Not { - expression: Box::new(plan_for_expression( - plan_state, - root_collection_object_type, - object_type, - *expression, - )?), - }), - ndc::Expression::UnaryComparisonOperator { column, operator } => { - Ok(plan::Expression::UnaryComparisonOperator { - column: plan_for_comparison_target(plan_state, object_type, column)?, - operator, - }) - } - ndc::Expression::BinaryComparisonOperator { - column, - operator, - value, - } => plan_for_binary_comparison( - plan_state, - root_collection_object_type, - object_type, - column, - operator, - value, - ), - ndc::Expression::ArrayComparison { column, comparison } => plan_for_array_comparison( - plan_state, - root_collection_object_type, - object_type, - column, - comparison, - ), - ndc::Expression::Exists { - in_collection, - predicate, - } => plan_for_exists( - plan_state, - root_collection_object_type, - in_collection, - predicate, - ), - } -} - -fn plan_for_binary_comparison( - plan_state: &mut QueryPlanState<'_, T>, - root_collection_object_type: &plan::ObjectType, - object_type: &plan::ObjectType, - column: ndc::ComparisonTarget, - operator: ndc::ComparisonOperatorName, - value: ndc::ComparisonValue, -) -> Result> { - let comparison_target = plan_for_comparison_target(plan_state, object_type, column)?; - let (operator, operator_definition) = plan_state - .context - .find_comparison_operator(comparison_target.target_type(), &operator)?; - let value_type = operator_definition.argument_type(comparison_target.target_type()); - Ok(plan::Expression::BinaryComparisonOperator { - operator, - value: plan_for_comparison_value( - plan_state, - root_collection_object_type, - object_type, - value_type, - value, - )?, - column: comparison_target, - }) -} - -fn plan_for_array_comparison( - plan_state: &mut QueryPlanState<'_, T>, - root_collection_object_type: &plan::ObjectType, - object_type: &plan::ObjectType, - column: ndc::ComparisonTarget, - comparison: ndc::ArrayComparison, -) -> Result> { - let comparison_target = plan_for_comparison_target(plan_state, object_type, column)?; - let plan_comparison = match comparison { - ndc::ArrayComparison::Contains { value } => { - let array_element_type = comparison_target - .target_type() - .clone() - .into_array_element_type()?; - let value = plan_for_comparison_value( - plan_state, - root_collection_object_type, - object_type, - array_element_type, - value, - )?; - plan::ArrayComparison::Contains { value } - } - ndc::ArrayComparison::IsEmpty => plan::ArrayComparison::IsEmpty, - }; - Ok(plan::Expression::ArrayComparison { - column: comparison_target, - comparison: plan_comparison, - }) -} - -fn plan_for_comparison_target( - plan_state: &mut QueryPlanState<'_, T>, - object_type: &plan::ObjectType, - target: ndc::ComparisonTarget, -) -> Result> { - match target { - ndc::ComparisonTarget::Column { - name, - arguments, - field_path, - } => { - let object_field = - get_object_field_by_path(object_type, &name, field_path.as_deref())?.clone(); - let plan_arguments = plan_arguments_from_plan_parameters( - plan_state, - &object_field.parameters, - arguments, - )?; - Ok(plan::ComparisonTarget::Column { - name, - arguments: plan_arguments, - field_path, - field_type: object_field.r#type, - }) - } - ndc::ComparisonTarget::Aggregate { .. } => { - // TODO: ENG-1457 implement query.aggregates.filter_by - Err(QueryPlanError::NotImplemented( - "filter by aggregate".to_string(), - )) - } - } -} - -fn plan_for_comparison_value( - plan_state: &mut QueryPlanState<'_, T>, - root_collection_object_type: &plan::ObjectType, - object_type: &plan::ObjectType, - expected_type: plan::Type, - value: ndc::ComparisonValue, -) -> Result> { - match value { - ndc::ComparisonValue::Column { - path, - name, - arguments, - field_path, - scope, - } => { - let (plan_path, collection_object_type) = plan_for_relationship_path( - plan_state, - root_collection_object_type, - object_type, - path, - vec![name.clone()], - )?; - let object_field = collection_object_type.get(&name)?; - let plan_arguments = plan_arguments_from_plan_parameters( - plan_state, - &object_field.parameters, - arguments, - )?; - Ok(plan::ComparisonValue::Column { - path: plan_path, - name, - arguments: plan_arguments, - field_path, - field_type: object_field.r#type.clone(), - scope, - }) - } - ndc::ComparisonValue::Scalar { value } => Ok(plan::ComparisonValue::Scalar { - value, - value_type: expected_type, - }), - ndc::ComparisonValue::Variable { name } => { - plan_state.register_variable_use(&name, expected_type.clone()); - Ok(plan::ComparisonValue::Variable { - name, - variable_type: expected_type, - }) - } - } -} - -fn plan_for_exists( - plan_state: &mut QueryPlanState<'_, T>, - root_collection_object_type: &plan::ObjectType, - in_collection: ExistsInCollection, - predicate: Option>, -) -> Result> { - let mut nested_state = plan_state.state_for_subquery(); - - let (in_collection, predicate) = match in_collection { - ndc::ExistsInCollection::Related { - relationship, - arguments, - field_path: _, // TODO: ENG-1490 requires propagating this, probably through the `register_relationship` call - } => { - let ndc_relationship = - lookup_relationship(plan_state.collection_relationships, &relationship)?; - let collection_object_type = plan_state - .context - .find_collection_object_type(&ndc_relationship.target_collection)?; - - let predicate = predicate - .map(|expression| { - plan_for_expression( - &mut nested_state, - root_collection_object_type, - &collection_object_type, - *expression, - ) - }) - .transpose()?; - - // TODO: ENG-1457 When we implement query.aggregates.filter_by we'll need to collect aggregates - // here as well as fields. - let fields = predicate.as_ref().map(|p| { - let mut fields = IndexMap::new(); - for comparison_target in p.query_local_comparison_targets() { - match comparison_target.into_owned() { - plan::ComparisonTarget::Column { - name, - arguments: _, - field_type, - .. - } => fields.insert( - name.clone(), - plan::Field::Column { - column: name, - fields: None, - column_type: field_type, - }, - ), - }; - } - fields - }); - - let relationship_query = plan::Query { - fields, - relationships: nested_state.into_relationships(), - ..Default::default() - }; - - let relationship_key = - plan_state.register_relationship(relationship, arguments, relationship_query)?; - - let in_collection = plan::ExistsInCollection::Related { - relationship: relationship_key, - }; - - Ok((in_collection, predicate)) as Result<_> - } - ndc::ExistsInCollection::Unrelated { - collection, - arguments, - } => { - let collection_object_type = plan_state - .context - .find_collection_object_type(&collection)?; - - let predicate = predicate - .map(|expression| { - plan_for_expression( - &mut nested_state, - root_collection_object_type, - &collection_object_type, - *expression, - ) - }) - .transpose()?; - - let join_query = plan::Query { - predicate: predicate.clone(), - relationships: nested_state.into_relationships(), - ..Default::default() - }; - - let join_key = plan_state.register_unrelated_join(collection, arguments, join_query)?; - - let in_collection = plan::ExistsInCollection::Unrelated { - unrelated_collection: join_key, - }; - Ok((in_collection, predicate)) - } - ndc::ExistsInCollection::NestedCollection { - column_name, - arguments, - field_path, - } => { - let object_field = root_collection_object_type.get(&column_name)?; - let plan_arguments = plan_arguments_from_plan_parameters( - &mut nested_state, - &object_field.parameters, - arguments, - )?; - - let nested_collection_type = find_nested_collection_object_type( - root_collection_object_type.clone(), - &field_path - .clone() - .into_iter() - .chain(once(column_name.clone())) - .collect_vec(), - )?; - - let in_collection = plan::ExistsInCollection::NestedCollection { - column_name, - arguments: plan_arguments, - field_path, - }; - - let predicate = predicate - .map(|expression| { - plan_for_expression( - &mut nested_state, - root_collection_object_type, - &nested_collection_type, - *expression, - ) - }) - .transpose()?; - - Ok((in_collection, predicate)) - } - ExistsInCollection::NestedScalarCollection { - column_name, - arguments, - field_path, - } => { - let object_field = root_collection_object_type.get(&column_name)?; - let plan_arguments = plan_arguments_from_plan_parameters( - &mut nested_state, - &object_field.parameters, - arguments, - )?; - - let nested_collection_type = find_nested_collection_type( - root_collection_object_type.clone(), - &field_path - .clone() - .into_iter() - .chain(once(column_name.clone())) - .collect_vec(), - )?; - - let virtual_object_type = plan::ObjectType { - name: None, - fields: [( - "__value".into(), - plan::ObjectField { - r#type: nested_collection_type, - parameters: Default::default(), - }, - )] - .into(), - }; - - let in_collection = plan::ExistsInCollection::NestedScalarCollection { - column_name, - arguments: plan_arguments, - field_path, - }; - - let predicate = predicate - .map(|expression| { - plan_for_expression( - &mut nested_state, - root_collection_object_type, - &virtual_object_type, - *expression, - ) - }) - .transpose()?; - - Ok((in_collection, predicate)) - } - }?; - - Ok(plan::Expression::Exists { - in_collection, - predicate: predicate.map(Box::new), - }) -} diff --git a/crates/ndc-query-plan/src/plan_for_query_request/plan_for_expression.rs b/crates/ndc-query-plan/src/plan_for_query_request/plan_for_expression.rs new file mode 100644 index 00000000..8c30d984 --- /dev/null +++ b/crates/ndc-query-plan/src/plan_for_query_request/plan_for_expression.rs @@ -0,0 +1,431 @@ +use std::iter::once; + +use indexmap::IndexMap; +use itertools::Itertools as _; +use ndc_models::{self as ndc, ExistsInCollection}; + +use crate::{self as plan, QueryContext, QueryPlanError}; + +use super::{ + helpers::{ + find_nested_collection_object_type, find_nested_collection_type, + get_object_field_by_path, lookup_relationship, + }, + plan_for_arguments::plan_arguments_from_plan_parameters, + plan_for_relationship::plan_for_relationship_path, + query_plan_state::QueryPlanState, +}; + +type Result = std::result::Result; + +pub fn plan_for_expression( + plan_state: &mut QueryPlanState, + root_collection_object_type: &plan::ObjectType, + object_type: &plan::ObjectType, + expression: ndc::Expression, +) -> Result> { + match expression { + ndc::Expression::And { expressions } => Ok(plan::Expression::And { + expressions: expressions + .into_iter() + .map(|expr| { + plan_for_expression(plan_state, root_collection_object_type, object_type, expr) + }) + .collect::>()?, + }), + ndc::Expression::Or { expressions } => Ok(plan::Expression::Or { + expressions: expressions + .into_iter() + .map(|expr| { + plan_for_expression(plan_state, root_collection_object_type, object_type, expr) + }) + .collect::>()?, + }), + ndc::Expression::Not { expression } => Ok(plan::Expression::Not { + expression: Box::new(plan_for_expression( + plan_state, + root_collection_object_type, + object_type, + *expression, + )?), + }), + ndc::Expression::UnaryComparisonOperator { column, operator } => { + Ok(plan::Expression::UnaryComparisonOperator { + column: plan_for_comparison_target(plan_state, object_type, column)?, + operator, + }) + } + ndc::Expression::BinaryComparisonOperator { + column, + operator, + value, + } => plan_for_binary_comparison( + plan_state, + root_collection_object_type, + object_type, + column, + operator, + value, + ), + ndc::Expression::ArrayComparison { column, comparison } => plan_for_array_comparison( + plan_state, + root_collection_object_type, + object_type, + column, + comparison, + ), + ndc::Expression::Exists { + in_collection, + predicate, + } => plan_for_exists( + plan_state, + root_collection_object_type, + in_collection, + predicate, + ), + } +} + +fn plan_for_binary_comparison( + plan_state: &mut QueryPlanState<'_, T>, + root_collection_object_type: &plan::ObjectType, + object_type: &plan::ObjectType, + column: ndc::ComparisonTarget, + operator: ndc::ComparisonOperatorName, + value: ndc::ComparisonValue, +) -> Result> { + let comparison_target = plan_for_comparison_target(plan_state, object_type, column)?; + let (operator, operator_definition) = plan_state + .context + .find_comparison_operator(comparison_target.target_type(), &operator)?; + let value_type = operator_definition.argument_type(comparison_target.target_type()); + Ok(plan::Expression::BinaryComparisonOperator { + operator, + value: plan_for_comparison_value( + plan_state, + root_collection_object_type, + object_type, + value_type, + value, + )?, + column: comparison_target, + }) +} + +fn plan_for_array_comparison( + plan_state: &mut QueryPlanState<'_, T>, + root_collection_object_type: &plan::ObjectType, + object_type: &plan::ObjectType, + column: ndc::ComparisonTarget, + comparison: ndc::ArrayComparison, +) -> Result> { + let comparison_target = plan_for_comparison_target(plan_state, object_type, column)?; + let plan_comparison = match comparison { + ndc::ArrayComparison::Contains { value } => { + let array_element_type = comparison_target + .target_type() + .clone() + .into_array_element_type()?; + let value = plan_for_comparison_value( + plan_state, + root_collection_object_type, + object_type, + array_element_type, + value, + )?; + plan::ArrayComparison::Contains { value } + } + ndc::ArrayComparison::IsEmpty => plan::ArrayComparison::IsEmpty, + }; + Ok(plan::Expression::ArrayComparison { + column: comparison_target, + comparison: plan_comparison, + }) +} + +fn plan_for_comparison_target( + plan_state: &mut QueryPlanState<'_, T>, + object_type: &plan::ObjectType, + target: ndc::ComparisonTarget, +) -> Result> { + match target { + ndc::ComparisonTarget::Column { + name, + arguments, + field_path, + } => { + let object_field = + get_object_field_by_path(object_type, &name, field_path.as_deref())?.clone(); + let plan_arguments = plan_arguments_from_plan_parameters( + plan_state, + &object_field.parameters, + arguments, + )?; + Ok(plan::ComparisonTarget::Column { + name, + arguments: plan_arguments, + field_path, + field_type: object_field.r#type, + }) + } + ndc::ComparisonTarget::Aggregate { .. } => { + // TODO: ENG-1457 implement query.aggregates.filter_by + Err(QueryPlanError::NotImplemented( + "filter by aggregate".to_string(), + )) + } + } +} + +fn plan_for_comparison_value( + plan_state: &mut QueryPlanState<'_, T>, + root_collection_object_type: &plan::ObjectType, + object_type: &plan::ObjectType, + expected_type: plan::Type, + value: ndc::ComparisonValue, +) -> Result> { + match value { + ndc::ComparisonValue::Column { + path, + name, + arguments, + field_path, + scope, + } => { + let (plan_path, collection_object_type) = plan_for_relationship_path( + plan_state, + root_collection_object_type, + object_type, + path, + vec![name.clone()], + )?; + let object_field = collection_object_type.get(&name)?; + let plan_arguments = plan_arguments_from_plan_parameters( + plan_state, + &object_field.parameters, + arguments, + )?; + Ok(plan::ComparisonValue::Column { + path: plan_path, + name, + arguments: plan_arguments, + field_path, + field_type: object_field.r#type.clone(), + scope, + }) + } + ndc::ComparisonValue::Scalar { value } => Ok(plan::ComparisonValue::Scalar { + value, + value_type: expected_type, + }), + ndc::ComparisonValue::Variable { name } => { + plan_state.register_variable_use(&name, expected_type.clone()); + Ok(plan::ComparisonValue::Variable { + name, + variable_type: expected_type, + }) + } + } +} + +fn plan_for_exists( + plan_state: &mut QueryPlanState<'_, T>, + root_collection_object_type: &plan::ObjectType, + in_collection: ExistsInCollection, + predicate: Option>, +) -> Result> { + let mut nested_state = plan_state.state_for_subquery(); + + let (in_collection, predicate) = match in_collection { + ndc::ExistsInCollection::Related { + relationship, + arguments, + field_path: _, // TODO: ENG-1490 requires propagating this, probably through the `register_relationship` call + } => { + let ndc_relationship = + lookup_relationship(plan_state.collection_relationships, &relationship)?; + let collection_object_type = plan_state + .context + .find_collection_object_type(&ndc_relationship.target_collection)?; + + let predicate = predicate + .map(|expression| { + plan_for_expression( + &mut nested_state, + root_collection_object_type, + &collection_object_type, + *expression, + ) + }) + .transpose()?; + + // TODO: ENG-1457 When we implement query.aggregates.filter_by we'll need to collect aggregates + // here as well as fields. + let fields = predicate.as_ref().map(|p| { + let mut fields = IndexMap::new(); + for comparison_target in p.query_local_comparison_targets() { + match comparison_target.into_owned() { + plan::ComparisonTarget::Column { + name, + arguments: _, + field_type, + .. + } => fields.insert( + name.clone(), + plan::Field::Column { + column: name, + fields: None, + column_type: field_type, + }, + ), + }; + } + fields + }); + + let relationship_query = plan::Query { + fields, + relationships: nested_state.into_relationships(), + ..Default::default() + }; + + let relationship_key = + plan_state.register_relationship(relationship, arguments, relationship_query)?; + + let in_collection = plan::ExistsInCollection::Related { + relationship: relationship_key, + }; + + Ok((in_collection, predicate)) as Result<_> + } + ndc::ExistsInCollection::Unrelated { + collection, + arguments, + } => { + let collection_object_type = plan_state + .context + .find_collection_object_type(&collection)?; + + let predicate = predicate + .map(|expression| { + plan_for_expression( + &mut nested_state, + root_collection_object_type, + &collection_object_type, + *expression, + ) + }) + .transpose()?; + + let join_query = plan::Query { + predicate: predicate.clone(), + relationships: nested_state.into_relationships(), + ..Default::default() + }; + + let join_key = plan_state.register_unrelated_join(collection, arguments, join_query)?; + + let in_collection = plan::ExistsInCollection::Unrelated { + unrelated_collection: join_key, + }; + Ok((in_collection, predicate)) + } + ndc::ExistsInCollection::NestedCollection { + column_name, + arguments, + field_path, + } => { + let object_field = root_collection_object_type.get(&column_name)?; + let plan_arguments = plan_arguments_from_plan_parameters( + &mut nested_state, + &object_field.parameters, + arguments, + )?; + + let nested_collection_type = find_nested_collection_object_type( + root_collection_object_type.clone(), + &field_path + .clone() + .into_iter() + .chain(once(column_name.clone())) + .collect_vec(), + )?; + + let in_collection = plan::ExistsInCollection::NestedCollection { + column_name, + arguments: plan_arguments, + field_path, + }; + + let predicate = predicate + .map(|expression| { + plan_for_expression( + &mut nested_state, + root_collection_object_type, + &nested_collection_type, + *expression, + ) + }) + .transpose()?; + + Ok((in_collection, predicate)) + } + ExistsInCollection::NestedScalarCollection { + column_name, + arguments, + field_path, + } => { + let object_field = root_collection_object_type.get(&column_name)?; + let plan_arguments = plan_arguments_from_plan_parameters( + &mut nested_state, + &object_field.parameters, + arguments, + )?; + + let nested_collection_type = find_nested_collection_type( + root_collection_object_type.clone(), + &field_path + .clone() + .into_iter() + .chain(once(column_name.clone())) + .collect_vec(), + )?; + + let virtual_object_type = plan::ObjectType { + name: None, + fields: [( + "__value".into(), + plan::ObjectField { + r#type: nested_collection_type, + parameters: Default::default(), + }, + )] + .into(), + }; + + let in_collection = plan::ExistsInCollection::NestedScalarCollection { + column_name, + arguments: plan_arguments, + field_path, + }; + + let predicate = predicate + .map(|expression| { + plan_for_expression( + &mut nested_state, + root_collection_object_type, + &virtual_object_type, + *expression, + ) + }) + .transpose()?; + + Ok((in_collection, predicate)) + } + }?; + + Ok(plan::Expression::Exists { + in_collection, + predicate: predicate.map(Box::new), + }) +} diff --git a/crates/ndc-query-plan/src/plan_for_query_request/plan_for_grouping.rs b/crates/ndc-query-plan/src/plan_for_query_request/plan_for_grouping.rs new file mode 100644 index 00000000..6d848e67 --- /dev/null +++ b/crates/ndc-query-plan/src/plan_for_query_request/plan_for_grouping.rs @@ -0,0 +1,241 @@ +use ndc_models::{self as ndc}; + +use crate::{self as plan, ConnectorTypes, QueryContext, QueryPlanError}; + +use super::{ + helpers::get_object_field_by_path, plan_for_aggregate, plan_for_aggregates, + plan_for_arguments::plan_arguments_from_plan_parameters, + plan_for_relationship::plan_for_relationship_path, query_plan_state::QueryPlanState, +}; + +type Result = std::result::Result; + +pub fn plan_for_grouping( + plan_state: &mut QueryPlanState<'_, T>, + root_collection_object_type: &plan::ObjectType, + collection_object_type: &plan::ObjectType, + grouping: ndc::Grouping, +) -> Result> { + let dimensions = grouping + .dimensions + .into_iter() + .map(|d| { + plan_for_dimension( + plan_state, + root_collection_object_type, + collection_object_type, + d, + ) + }) + .collect::>()?; + + let aggregates = plan_for_aggregates( + plan_state, + collection_object_type, + grouping + .aggregates + .into_iter() + .map(|(key, aggregate)| (key.into(), aggregate)) + .collect(), + )?; + + let predicate = grouping + .predicate + .map(|predicate| plan_for_group_expression(plan_state, collection_object_type, predicate)) + .transpose()?; + + let order_by = grouping + .order_by + .map(|order_by| plan_for_group_order_by(plan_state, collection_object_type, order_by)) + .transpose()?; + + let plan_grouping = plan::Grouping { + dimensions, + aggregates, + predicate, + order_by, + limit: grouping.limit, + offset: grouping.offset, + }; + Ok(plan_grouping) +} + +fn plan_for_dimension( + plan_state: &mut QueryPlanState<'_, T>, + root_collection_object_type: &plan::ObjectType, + collection_object_type: &plan::ObjectType, + dimension: ndc::Dimension, +) -> Result> { + let plan_dimension = match dimension { + ndc_models::Dimension::Column { + path, + column_name, + arguments, + field_path, + } => { + let (relationship_path, collection_type) = plan_for_relationship_path( + plan_state, + root_collection_object_type, + collection_object_type, + path, + vec![column_name.clone()], + )?; + + let plan_arguments = plan_arguments_from_plan_parameters( + plan_state, + &collection_type.get(&column_name)?.parameters, + arguments, + )?; + + let object_field = + get_object_field_by_path(&collection_type, &column_name, field_path.as_deref())? + .clone(); + + let references_relationship = !relationship_path.is_empty(); + let field_type = if references_relationship { + plan::Type::array_of(object_field.r#type) + } else { + object_field.r#type + }; + + plan::Dimension::Column { + path: relationship_path, + column_name, + arguments: plan_arguments, + field_path, + field_type, + } + } + }; + Ok(plan_dimension) +} + +fn plan_for_group_expression( + plan_state: &mut QueryPlanState, + object_type: &plan::ObjectType, + expression: ndc::GroupExpression, +) -> Result> { + match expression { + ndc::GroupExpression::And { expressions } => Ok(plan::GroupExpression::And { + expressions: expressions + .into_iter() + .map(|expr| plan_for_group_expression(plan_state, object_type, expr)) + .collect::>()?, + }), + ndc::GroupExpression::Or { expressions } => Ok(plan::GroupExpression::Or { + expressions: expressions + .into_iter() + .map(|expr| plan_for_group_expression(plan_state, object_type, expr)) + .collect::>()?, + }), + ndc::GroupExpression::Not { expression } => Ok(plan::GroupExpression::Not { + expression: Box::new(plan_for_group_expression( + plan_state, + object_type, + *expression, + )?), + }), + ndc::GroupExpression::UnaryComparisonOperator { target, operator } => { + Ok(plan::GroupExpression::UnaryComparisonOperator { + target: plan_for_group_comparison_target(plan_state, object_type, target)?, + operator, + }) + } + ndc::GroupExpression::BinaryComparisonOperator { + target, + operator, + value, + } => { + let target = plan_for_group_comparison_target(plan_state, object_type, target)?; + let (operator, operator_definition) = plan_state + .context + .find_comparison_operator(&target.result_type(), &operator)?; + let value_type = operator_definition.argument_type(&target.result_type()); + Ok(plan::GroupExpression::BinaryComparisonOperator { + target, + operator, + value: plan_for_group_comparison_value(plan_state, value_type, value)?, + }) + } + } +} + +fn plan_for_group_comparison_target( + plan_state: &mut QueryPlanState, + object_type: &plan::ObjectType, + target: ndc::GroupComparisonTarget, +) -> Result> { + let plan_target = match target { + ndc::GroupComparisonTarget::Aggregate { aggregate } => { + let target_aggregate = plan_for_aggregate(plan_state, object_type, aggregate)?; + plan::GroupComparisonTarget::Aggregate { + aggregate: target_aggregate, + } + } + }; + Ok(plan_target) +} + +fn plan_for_group_comparison_value( + plan_state: &mut QueryPlanState, + expected_type: plan::Type, + value: ndc::GroupComparisonValue, +) -> Result> { + match value { + ndc::GroupComparisonValue::Scalar { value } => Ok(plan::GroupComparisonValue::Scalar { + value, + value_type: expected_type, + }), + ndc::GroupComparisonValue::Variable { name } => { + plan_state.register_variable_use(&name, expected_type.clone()); + Ok(plan::GroupComparisonValue::Variable { + name, + variable_type: expected_type, + }) + } + } +} + +fn plan_for_group_order_by( + plan_state: &mut QueryPlanState<'_, T>, + collection_object_type: &plan::ObjectType, + order_by: ndc::GroupOrderBy, +) -> Result> { + Ok(plan::GroupOrderBy { + elements: order_by + .elements + .into_iter() + .map(|elem| plan_for_group_order_by_element(plan_state, collection_object_type, elem)) + .collect::>()?, + }) +} + +fn plan_for_group_order_by_element( + plan_state: &mut QueryPlanState<'_, T>, + collection_object_type: &plan::ObjectType<::ScalarType>, + element: ndc::GroupOrderByElement, +) -> Result> { + Ok(plan::GroupOrderByElement { + order_direction: element.order_direction, + target: plan_for_group_order_by_target(plan_state, collection_object_type, element.target)?, + }) +} + +fn plan_for_group_order_by_target( + plan_state: &mut QueryPlanState<'_, T>, + collection_object_type: &plan::ObjectType, + target: ndc::GroupOrderByTarget, +) -> Result> { + match target { + ndc::GroupOrderByTarget::Dimension { index } => { + Ok(plan::GroupOrderByTarget::Dimension { index }) + } + ndc::GroupOrderByTarget::Aggregate { aggregate } => { + let target_aggregate = + plan_for_aggregate(plan_state, collection_object_type, aggregate)?; + Ok(plan::GroupOrderByTarget::Aggregate { + aggregate: target_aggregate, + }) + } + } +} diff --git a/crates/ndc-query-plan/src/plan_for_query_request/plan_for_relationship.rs b/crates/ndc-query-plan/src/plan_for_query_request/plan_for_relationship.rs new file mode 100644 index 00000000..de98e178 --- /dev/null +++ b/crates/ndc-query-plan/src/plan_for_query_request/plan_for_relationship.rs @@ -0,0 +1,137 @@ +use std::collections::VecDeque; + +use crate::{self as plan, ObjectType, QueryContext, QueryPlanError}; +use ndc_models::{self as ndc}; + +use super::{ + helpers::{find_object_field, lookup_relationship}, + plan_for_expression, + query_plan_state::QueryPlanState, +}; + +type Result = std::result::Result; + +/// Returns list of aliases for joins to traverse, plus the object type of the final collection in +/// the path. +pub fn plan_for_relationship_path( + plan_state: &mut QueryPlanState<'_, T>, + root_collection_object_type: &plan::ObjectType, + object_type: &plan::ObjectType, + relationship_path: Vec, + requested_columns: Vec, // columns to select from last path element +) -> Result<(Vec, ObjectType)> { + let end_of_relationship_path_object_type = relationship_path + .last() + .map(|last_path_element| { + let relationship = lookup_relationship( + plan_state.collection_relationships, + &last_path_element.relationship, + )?; + plan_state + .context + .find_collection_object_type(&relationship.target_collection) + }) + .transpose()?; + let target_object_type = end_of_relationship_path_object_type.unwrap_or(object_type.clone()); + + let reversed_relationship_path = { + let mut path = relationship_path; + path.reverse(); + path + }; + + let vec_deque = plan_for_relationship_path_helper( + plan_state, + root_collection_object_type, + reversed_relationship_path, + requested_columns, + )?; + let aliases = vec_deque.into_iter().collect(); + + Ok((aliases, target_object_type)) +} + +fn plan_for_relationship_path_helper( + plan_state: &mut QueryPlanState<'_, T>, + root_collection_object_type: &plan::ObjectType, + mut reversed_relationship_path: Vec, + requested_columns: Vec, // columns to select from last path element +) -> Result> { + if reversed_relationship_path.is_empty() { + return Ok(VecDeque::new()); + } + + // safety: we just made an early return if the path is empty + let head = reversed_relationship_path.pop().unwrap(); + let tail = reversed_relationship_path; + let is_last = tail.is_empty(); + + let ndc::PathElement { + field_path: _, // TODO: ENG-1458 support nested relationships + relationship, + arguments, + predicate, + } = head; + + let relationship_def = lookup_relationship(plan_state.collection_relationships, &relationship)?; + let related_collection_type = plan_state + .context + .find_collection_object_type(&relationship_def.target_collection)?; + let mut nested_state = plan_state.state_for_subquery(); + + // If this is the last path element then we need to apply the requested fields to the + // relationship query. Otherwise we need to recursively process the rest of the path. Both + // cases take ownership of `requested_columns` so we group them together. + let (mut rest_path, fields) = if is_last { + let fields = requested_columns + .into_iter() + .map(|column_name| { + let object_field = + find_object_field(&related_collection_type, &column_name)?.clone(); + Ok(( + column_name.clone(), + plan::Field::Column { + column: column_name, + fields: None, + column_type: object_field.r#type, + }, + )) + }) + .collect::>()?; + (VecDeque::new(), Some(fields)) + } else { + let rest = plan_for_relationship_path_helper( + &mut nested_state, + root_collection_object_type, + tail, + requested_columns, + )?; + (rest, None) + }; + + let predicate_plan = predicate + .map(|p| { + plan_for_expression( + &mut nested_state, + root_collection_object_type, + &related_collection_type, + *p, + ) + }) + .transpose()?; + + let nested_relationships = nested_state.into_relationships(); + + let relationship_query = plan::Query { + predicate: predicate_plan, + relationships: nested_relationships, + fields, + ..Default::default() + }; + + let relation_key = + plan_state.register_relationship(relationship, arguments, relationship_query)?; + + rest_path.push_front(relation_key); + Ok(rest_path) +} diff --git a/crates/ndc-query-plan/src/plan_for_query_request/plan_test_helpers/mod.rs b/crates/ndc-query-plan/src/plan_for_query_request/plan_test_helpers/mod.rs index 8f5895af..970f4d34 100644 --- a/crates/ndc-query-plan/src/plan_for_query_request/plan_test_helpers/mod.rs +++ b/crates/ndc-query-plan/src/plan_for_query_request/plan_test_helpers/mod.rs @@ -102,7 +102,7 @@ impl QueryContext for TestContext { } } -#[derive(Clone, Copy, Debug, PartialEq, Sequence)] +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, Sequence)] pub enum AggregateFunction { Average, } @@ -115,7 +115,7 @@ impl NamedEnum for AggregateFunction { } } -#[derive(Clone, Copy, Debug, PartialEq, Sequence)] +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, Sequence)] pub enum ComparisonOperator { Equal, Regex, @@ -130,7 +130,7 @@ impl NamedEnum for ComparisonOperator { } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Sequence)] +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, Sequence)] pub enum ScalarType { Bool, Date, diff --git a/crates/ndc-query-plan/src/plan_for_query_request/plan_test_helpers/query.rs b/crates/ndc-query-plan/src/plan_for_query_request/plan_test_helpers/query.rs index ddb9df8c..444870b4 100644 --- a/crates/ndc-query-plan/src/plan_for_query_request/plan_test_helpers/query.rs +++ b/crates/ndc-query-plan/src/plan_for_query_request/plan_test_helpers/query.rs @@ -1,8 +1,7 @@ use indexmap::IndexMap; use crate::{ - Aggregate, ConnectorTypes, Expression, Field, OrderBy, OrderByElement, Query, Relationships, - Scope, + Aggregate, ConnectorTypes, Expression, Field, Grouping, OrderBy, OrderByElement, Query, Relationships, Scope }; #[derive(Clone, Debug, Default)] @@ -10,10 +9,10 @@ pub struct QueryBuilder { aggregates: Option>>, fields: Option>>, limit: Option, - aggregates_limit: Option, offset: Option, order_by: Option>, predicate: Option>, + groups: Option>, relationships: Relationships, scope: Option, } @@ -29,10 +28,10 @@ impl QueryBuilder { fields: None, aggregates: Default::default(), limit: None, - aggregates_limit: None, offset: None, order_by: None, predicate: None, + groups: None, relationships: Default::default(), scope: None, } @@ -88,10 +87,10 @@ impl From> for Query { aggregates: value.aggregates, fields: value.fields, limit: value.limit, - aggregates_limit: value.aggregates_limit, offset: value.offset, order_by: value.order_by, predicate: value.predicate, + groups: value.groups, relationships: value.relationships, scope: value.scope, } diff --git a/crates/ndc-query-plan/src/plan_for_query_request/tests.rs b/crates/ndc-query-plan/src/plan_for_query_request/tests.rs index a9a4f17a..6e2251b8 100644 --- a/crates/ndc-query-plan/src/plan_for_query_request/tests.rs +++ b/crates/ndc-query-plan/src/plan_for_query_request/tests.rs @@ -6,7 +6,7 @@ use pretty_assertions::assert_eq; use crate::{ self as plan, plan_for_query_request::plan_test_helpers::{self, make_flat_schema, make_nested_schema}, - QueryContext, QueryPlan, + QueryContext, QueryPlan, Type, }; use super::plan_for_query_request; @@ -521,7 +521,7 @@ fn translates_aggregate_selections() -> Result<(), anyhow::Error> { .query(query().aggregates([ star_count_aggregate!("count_star"), column_count_aggregate!("count_id" => "last_name", distinct: true), - column_aggregate!("avg_id" => "id", "Average"), + ("avg_id", column_aggregate("id", "Average").into()), ])) .into(); let query_plan = plan_for_query_request(&query_context, query)?; @@ -545,6 +545,7 @@ fn translates_aggregate_selections() -> Result<(), anyhow::Error> { "avg_id".into(), plan::Aggregate::SingleColumn { column: "id".into(), + column_type: Type::scalar(plan_test_helpers::ScalarType::Int), arguments: Default::default(), field_path: None, function: plan_test_helpers::AggregateFunction::Average, @@ -644,6 +645,7 @@ fn translates_relationships_in_fields_predicates_and_orderings() -> Result<(), a path: vec!["author_articles".into()], aggregate: plan::Aggregate::SingleColumn { column: "year".into(), + column_type: Type::scalar(plan_test_helpers::ScalarType::Int).into_nullable(), arguments: Default::default(), field_path: Default::default(), function: plan_test_helpers::AggregateFunction::Average, @@ -680,6 +682,7 @@ fn translates_relationships_in_fields_predicates_and_orderings() -> Result<(), a plan::Field::Relationship { relationship: "author_articles".into(), aggregates: None, + groups: None, fields: Some( [ ( @@ -915,6 +918,7 @@ fn translates_predicate_referencing_field_of_related_collection() -> anyhow::Res plan::Field::Relationship { relationship: "author".into(), aggregates: None, + groups: None, fields: Some( [( "name".into(), diff --git a/crates/ndc-query-plan/src/plan_for_query_request/type_annotated_field.rs b/crates/ndc-query-plan/src/plan_for_query_request/type_annotated_field.rs index 70140626..2fca802f 100644 --- a/crates/ndc-query-plan/src/plan_for_query_request/type_annotated_field.rs +++ b/crates/ndc-query-plan/src/plan_for_query_request/type_annotated_field.rs @@ -90,6 +90,7 @@ fn type_annotated_field_helper( // with fields and aggregates from other references to the same relationship. let aggregates = query_plan.aggregates.clone(); let fields = query_plan.fields.clone(); + let groups = query_plan.groups.clone(); let relationship_key = plan_state.register_relationship(relationship, arguments, query_plan)?; @@ -97,6 +98,7 @@ fn type_annotated_field_helper( relationship: relationship_key, aggregates, fields, + groups, } } }; diff --git a/crates/ndc-query-plan/src/plan_for_query_request/unify_relationship_references.rs b/crates/ndc-query-plan/src/plan_for_query_request/unify_relationship_references.rs index 0f5c4527..be2bae6c 100644 --- a/crates/ndc-query-plan/src/plan_for_query_request/unify_relationship_references.rs +++ b/crates/ndc-query-plan/src/plan_for_query_request/unify_relationship_references.rs @@ -7,8 +7,8 @@ use ndc_models as ndc; use thiserror::Error; use crate::{ - Aggregate, ConnectorTypes, Expression, Field, NestedArray, NestedField, NestedObject, Query, - Relationship, RelationshipArgument, Relationships, + Aggregate, ConnectorTypes, Expression, Field, GroupExpression, Grouping, NestedArray, + NestedField, NestedObject, Query, Relationship, RelationshipArgument, Relationships, }; #[derive(Debug, Error)] @@ -95,7 +95,6 @@ where let mismatching_fields = [ (a.limit != b.limit, "limit"), - (a.aggregates_limit != b.aggregates_limit, "aggregates_limit"), (a.offset != b.offset, "offset"), (a.order_by != b.order_by, "order_by"), (predicate_a != predicate_b, "predicate"), @@ -117,13 +116,13 @@ where })?; let query = Query { - aggregates: unify_aggregates(a.aggregates, b.aggregates)?, + aggregates: unify_options(a.aggregates, b.aggregates, unify_aggregates)?, fields: unify_fields(a.fields, b.fields)?, limit: a.limit, - aggregates_limit: a.aggregates_limit, offset: a.offset, order_by: a.order_by, predicate: predicate_a, + groups: unify_options(a.groups, b.groups, unify_groups)?, relationships: unify_nested_relationships(a.relationships, b.relationships)?, scope, }; @@ -131,9 +130,9 @@ where } fn unify_aggregates( - a: Option>>, - b: Option>>, -) -> Result>>> + a: IndexMap>, + b: IndexMap>, +) -> Result>> where T: ConnectorTypes, { @@ -210,11 +209,13 @@ where relationship: relationship_a, aggregates: aggregates_a, fields: fields_a, + groups: groups_a, }, Field::Relationship { relationship: relationship_b, aggregates: aggregates_b, fields: fields_b, + groups: groups_b, }, ) => { if relationship_a != relationship_b { @@ -224,8 +225,9 @@ where } else { Ok(Field::Relationship { relationship: relationship_b, - aggregates: unify_aggregates(aggregates_a, aggregates_b)?, + aggregates: unify_options(aggregates_a, aggregates_b, unify_aggregates)?, fields: unify_fields(fields_a, fields_b)?, + groups: unify_options(groups_a, groups_b, unify_groups)?, }) } } @@ -284,6 +286,39 @@ where .try_collect() } +fn unify_groups(a: Grouping, b: Grouping) -> Result> +where + T: ConnectorTypes, +{ + let predicate_a = a.predicate.and_then(GroupExpression::simplify); + let predicate_b = b.predicate.and_then(GroupExpression::simplify); + + let mismatching_fields = [ + (a.dimensions != b.dimensions, "dimensions"), + (predicate_a != predicate_b, "predicate"), + (a.order_by != b.order_by, "order_by"), + (a.limit != b.limit, "limit"), + (a.offset != b.offset, "offset"), + ] + .into_iter() + .filter_map(|(is_mismatch, field_name)| if is_mismatch { Some(field_name) } else { None }) + .collect_vec(); + + if !mismatching_fields.is_empty() { + return Err(RelationshipUnificationError::Mismatch(mismatching_fields)); + } + + let unified = Grouping { + dimensions: a.dimensions, + aggregates: unify_aggregates(a.aggregates, b.aggregates)?, + predicate: predicate_a, + order_by: a.order_by, + limit: a.limit, + offset: a.offset, + }; + Ok(unified) +} + /// In some cases we receive the predicate expression `Some(Expression::And [])` which does not /// filter out anything, but fails equality checks with `None`. Simplifying that expression to /// `None` allows us to unify relationship references that we wouldn't otherwise be able to. diff --git a/crates/ndc-query-plan/src/query_plan.rs b/crates/ndc-query-plan/src/query_plan.rs deleted file mode 100644 index 84f5c2f1..00000000 --- a/crates/ndc-query-plan/src/query_plan.rs +++ /dev/null @@ -1,623 +0,0 @@ -use std::{borrow::Cow, collections::BTreeMap, fmt::Debug, iter}; - -use derivative::Derivative; -use indexmap::IndexMap; -use itertools::Either; -use ndc_models::{ - self as ndc, ArgumentName, FieldName, OrderDirection, RelationshipType, UnaryComparisonOperator, -}; -use nonempty::NonEmpty; - -use crate::{vec_set::VecSet, Type}; - -pub trait ConnectorTypes { - type ScalarType: Clone + Debug + PartialEq + Eq; - type AggregateFunction: Clone + Debug + PartialEq; - type ComparisonOperator: Clone + Debug + PartialEq; - - /// Result type for count aggregations - fn count_aggregate_type() -> Type; - - fn string_type() -> Type; -} - -#[derive(Derivative)] -#[derivative( - Clone(bound = ""), - Debug(bound = ""), - PartialEq(bound = "T::ScalarType: PartialEq") -)] -pub struct QueryPlan { - pub collection: ndc::CollectionName, - pub query: Query, - pub arguments: BTreeMap>, - pub variables: Option>, - - /// Types for values from the `variables` map as inferred by usages in the query request. It is - /// possible for the same variable to be used in multiple contexts with different types. This - /// map provides sets of all observed types. - /// - /// The observed type may be `None` if the type of a variable use could not be inferred. - pub variable_types: VariableTypes, - - // TODO: type for unrelated collection - pub unrelated_collections: BTreeMap>, -} - -impl QueryPlan { - pub fn has_variables(&self) -> bool { - self.variables.is_some() - } -} - -pub type Arguments = BTreeMap>; -pub type Relationships = BTreeMap>; -pub type VariableSet = BTreeMap; -pub type VariableTypes = BTreeMap>>; - -#[derive(Derivative)] -#[derivative( - Clone(bound = ""), - Debug(bound = ""), - Default(bound = ""), - PartialEq(bound = "") -)] -pub struct Query { - pub aggregates: Option>>, - pub fields: Option>>, - pub limit: Option, - pub aggregates_limit: Option, - pub offset: Option, - pub order_by: Option>, - pub predicate: Option>, - - /// Relationships referenced by fields and expressions in this query or sub-query. Does not - /// include relationships in sub-queries nested under this one. - pub relationships: Relationships, - - /// Some relationship references may introduce a named "scope" so that other parts of the query - /// request can reference fields of documents in the related collection. The connector must - /// introduce a variable, or something similar, for such references. - pub scope: Option, -} - -impl Query { - pub fn has_aggregates(&self) -> bool { - if let Some(aggregates) = &self.aggregates { - !aggregates.is_empty() - } else { - false - } - } - - pub fn has_fields(&self) -> bool { - if let Some(fields) = &self.fields { - !fields.is_empty() - } else { - false - } - } -} - -#[derive(Derivative)] -#[derivative( - Clone(bound = ""), - Debug(bound = ""), - PartialEq(bound = "T::ScalarType: PartialEq") -)] -pub enum Argument { - /// The argument is provided by reference to a variable - Variable { - name: ndc::VariableName, - argument_type: Type, - }, - /// The argument is provided as a literal value - Literal { - value: serde_json::Value, - argument_type: Type, - }, - /// The argument was a literal value that has been parsed as an [Expression] - Predicate { expression: Expression }, -} - -#[derive(Derivative)] -#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] -pub struct Relationship { - /// A mapping between columns on the source row to columns on the target collection. - /// The column on the target collection is specified via a field path (ie. an array of field - /// names that descend through nested object fields). The field path will only contain a single item, - /// meaning a column on the target collection's type, unless the 'relationships.nested' - /// capability is supported, in which case multiple items denotes a nested object field. - pub column_mapping: BTreeMap>, - pub relationship_type: RelationshipType, - /// The name of a collection - pub target_collection: ndc::CollectionName, - /// Values to be provided to any collection arguments - pub arguments: BTreeMap>, - pub query: Query, -} - -#[derive(Derivative)] -#[derivative( - Clone(bound = ""), - Debug(bound = ""), - PartialEq(bound = "T::ScalarType: PartialEq") -)] -pub enum RelationshipArgument { - /// The argument is provided by reference to a variable - Variable { - name: ndc::VariableName, - argument_type: Type, - }, - /// The argument is provided as a literal value - Literal { - value: serde_json::Value, - argument_type: Type, - }, - // The argument is provided based on a column of the source collection - Column { - name: ndc::FieldName, - argument_type: Type, - }, - /// The argument was a literal value that has been parsed as an [Expression] - Predicate { expression: Expression }, -} - -#[derive(Derivative)] -#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] -pub struct UnrelatedJoin { - pub target_collection: ndc::CollectionName, - pub arguments: BTreeMap>, - pub query: Query, -} - -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum Scope { - Root, - Named(String), -} - -#[derive(Derivative)] -#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] -pub enum Aggregate { - ColumnCount { - /// The column to apply the count aggregate function to - column: ndc::FieldName, - /// Arguments to satisfy the column specified by 'column' - arguments: BTreeMap>, - /// Path to a nested field within an object column - field_path: Option>, - /// Whether or not only distinct items should be counted - distinct: bool, - }, - SingleColumn { - /// The column to apply the aggregation function to - column: ndc::FieldName, - /// Arguments to satisfy the column specified by 'column' - arguments: BTreeMap>, - /// Path to a nested field within an object column - field_path: Option>, - /// Single column aggregate function name. - function: T::AggregateFunction, - result_type: Type, - }, - StarCount, -} - -impl Aggregate { - pub fn result_type(&self) -> Cow> { - match self { - Aggregate::ColumnCount { .. } => Cow::Owned(T::count_aggregate_type()), - Aggregate::SingleColumn { result_type, .. } => Cow::Borrowed(result_type), - Aggregate::StarCount => Cow::Owned(T::count_aggregate_type()), - } - } -} - -#[derive(Derivative)] -#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] -pub struct NestedObject { - pub fields: IndexMap>, -} - -#[derive(Derivative)] -#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] -pub struct NestedArray { - pub fields: Box>, -} - -#[derive(Derivative)] -#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] -pub enum NestedField { - Object(NestedObject), - Array(NestedArray), - // TODO: ENG-1464 add `Collection(NestedCollection)` variant -} - -#[derive(Derivative)] -#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] -pub enum Field { - Column { - column: ndc::FieldName, - - /// When the type of the column is a (possibly-nullable) array or object, - /// the caller can request a subset of the complete column data, - /// by specifying fields to fetch here. - /// If omitted, the column data will be fetched in full. - fields: Option>, - - column_type: Type, - }, - Relationship { - /// The name of the relationship to follow for the subquery - this is the key in the - /// [Query] relationships map in this module, it is **not** the key in the - /// [ndc::QueryRequest] collection_relationships map. - relationship: ndc::RelationshipName, - aggregates: Option>>, - fields: Option>>, - }, -} - -#[derive(Derivative)] -#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] -pub enum Expression { - And { - expressions: Vec>, - }, - Or { - expressions: Vec>, - }, - Not { - expression: Box>, - }, - UnaryComparisonOperator { - column: ComparisonTarget, - operator: UnaryComparisonOperator, - }, - BinaryComparisonOperator { - column: ComparisonTarget, - operator: T::ComparisonOperator, - value: ComparisonValue, - }, - /// A comparison against a nested array column. - /// Only used if the 'query.nested_fields.filter_by.nested_arrays' capability is supported. - ArrayComparison { - column: ComparisonTarget, - comparison: ArrayComparison, - }, - Exists { - in_collection: ExistsInCollection, - predicate: Option>>, - }, -} - -impl Expression { - /// Get an iterator of columns referenced by the expression, not including columns of related - /// collections. This is used to build a plan for joining the referenced collection - we need - /// to include fields in the join that the expression needs to access. - // - // TODO: ENG-1457 When we implement query.aggregates.filter_by we'll need to collect aggregates - // references. That's why this function returns [ComparisonTarget] instead of [Field]. - pub fn query_local_comparison_targets<'a>( - &'a self, - ) -> Box>> + 'a> { - match self { - Expression::And { expressions } => Box::new( - expressions - .iter() - .flat_map(|e| e.query_local_comparison_targets()), - ), - Expression::Or { expressions } => Box::new( - expressions - .iter() - .flat_map(|e| e.query_local_comparison_targets()), - ), - Expression::Not { expression } => expression.query_local_comparison_targets(), - Expression::UnaryComparisonOperator { column, .. } => { - Box::new(std::iter::once(Cow::Borrowed(column))) - } - Expression::BinaryComparisonOperator { column, value, .. } => Box::new( - std::iter::once(Cow::Borrowed(column)) - .chain(Self::local_targets_from_comparison_value(value).map(Cow::Owned)), - ), - Expression::ArrayComparison { column, comparison } => { - let value_targets = match comparison { - ArrayComparison::Contains { value } => Either::Left( - Self::local_targets_from_comparison_value(value).map(Cow::Owned), - ), - ArrayComparison::IsEmpty => Either::Right(std::iter::empty()), - }; - Box::new(std::iter::once(Cow::Borrowed(column)).chain(value_targets)) - } - Expression::Exists { .. } => Box::new(iter::empty()), - } - } - - fn local_targets_from_comparison_value( - value: &ComparisonValue, - ) -> impl Iterator> { - match value { - ComparisonValue::Column { - path, - name, - arguments, - field_path, - field_type, - .. - } => { - if path.is_empty() { - Either::Left(iter::once(ComparisonTarget::Column { - name: name.clone(), - arguments: arguments.clone(), - field_path: field_path.clone(), - field_type: field_type.clone(), - })) - } else { - Either::Right(iter::empty()) - } - } - _ => Either::Right(std::iter::empty()), - } - } -} - -#[derive(Derivative)] -#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] -pub enum ArrayComparison { - /// Check if the array contains the specified value. - /// Only used if the 'query.nested_fields.filter_by.nested_arrays.contains' capability is supported. - Contains { value: ComparisonValue }, - /// Check is the array is empty. - /// Only used if the 'query.nested_fields.filter_by.nested_arrays.is_empty' capability is supported. - IsEmpty, -} - -#[derive(Derivative)] -#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] -pub struct OrderBy { - /// The elements to order by, in priority order - pub elements: Vec>, -} - -#[derive(Derivative)] -#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] -pub struct OrderByElement { - pub order_direction: OrderDirection, - pub target: OrderByTarget, -} - -#[derive(Derivative)] -#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] -pub enum OrderByTarget { - Column { - /// Any relationships to traverse to reach this column. These are translated from - /// [ndc::OrderByElement] values in the [ndc::QueryRequest] to names of relation - /// fields for the [QueryPlan]. - path: Vec, - - /// The name of the column - name: ndc::FieldName, - - /// Arguments to satisfy the column specified by 'name' - arguments: BTreeMap>, - - /// Path to a nested field within an object column - field_path: Option>, - }, - Aggregate { - /// Non-empty collection of relationships to traverse - path: Vec, - /// The aggregation method to use - aggregate: Aggregate, - }, -} - -#[derive(Derivative)] -#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] -pub enum ComparisonTarget { - /// The comparison targets a column. - Column { - /// The name of the column - name: ndc::FieldName, - - /// Arguments to satisfy the column specified by 'name' - arguments: BTreeMap>, - - /// Path to a nested field within an object column - field_path: Option>, - - /// Type of the field that you get *after* follwing `field_path` to a possibly-nested - /// field. - field_type: Type, - }, - // TODO: ENG-1457 Add this variant to support query.aggregates.filter_by - // /// The comparison targets the result of aggregation. - // /// Only used if the 'query.aggregates.filter_by' capability is supported. - // Aggregate { - // /// Non-empty collection of relationships to traverse - // path: Vec, - // /// The aggregation method to use - // aggregate: Aggregate, - // }, -} - -impl ComparisonTarget { - pub fn column(name: impl Into, field_type: Type) -> Self { - Self::Column { - name: name.into(), - arguments: Default::default(), - field_path: Default::default(), - field_type, - } - } - - pub fn target_type(&self) -> &Type { - match self { - ComparisonTarget::Column { field_type, .. } => field_type, - // TODO: ENG-1457 - // ComparisonTarget::Aggregate { aggregate, .. } => aggregate.result_type, - } - } -} - -#[derive(Derivative)] -#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] -pub enum ComparisonValue { - Column { - /// Any relationships to traverse to reach this column. - /// Only non-empty if the 'relationships.relation_comparisons' is supported. - path: Vec, - /// The name of the column - name: ndc::FieldName, - /// Arguments to satisfy the column specified by 'name' - arguments: BTreeMap>, - /// Path to a nested field within an object column. - /// Only non-empty if the 'query.nested_fields.filter_by' capability is supported. - field_path: Option>, - /// Type of the field that you get *after* follwing `field_path` to a possibly-nested - /// field. - field_type: Type, - /// The scope in which this column exists, identified - /// by an top-down index into the stack of scopes. - /// The stack grows inside each `Expression::Exists`, - /// so scope 0 (the default) refers to the current collection, - /// and each subsequent index refers to the collection outside - /// its predecessor's immediately enclosing `Expression::Exists` - /// expression. - /// Only used if the 'query.exists.named_scopes' capability is supported. - scope: Option, - }, - Scalar { - value: serde_json::Value, - value_type: Type, - }, - Variable { - name: ndc::VariableName, - variable_type: Type, - }, -} - -impl ComparisonValue { - pub fn column(name: impl Into, field_type: Type) -> Self { - Self::Column { - path: Default::default(), - name: name.into(), - arguments: Default::default(), - field_path: Default::default(), - field_type, - scope: Default::default(), - } - } -} - -#[derive(Derivative)] -#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] -pub struct AggregateFunctionDefinition { - /// The scalar or object type of the result of this function - pub result_type: Type, -} - -#[derive(Derivative)] -#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] -pub enum ComparisonOperatorDefinition { - Equal, - In, - LessThan, - LessThanOrEqual, - GreaterThan, - GreaterThanOrEqual, - Contains, - ContainsInsensitive, - StartsWith, - StartsWithInsensitive, - EndsWith, - EndsWithInsensitive, - Custom { - /// The type of the argument to this operator - argument_type: Type, - }, -} - -impl ComparisonOperatorDefinition { - pub fn argument_type(self, left_operand_type: &Type) -> Type { - use ComparisonOperatorDefinition as C; - match self { - C::In => Type::ArrayOf(Box::new(left_operand_type.clone())), - C::Equal - | C::LessThan - | C::LessThanOrEqual - | C::GreaterThan - | C::GreaterThanOrEqual => left_operand_type.clone(), - C::Contains - | C::ContainsInsensitive - | C::StartsWith - | C::StartsWithInsensitive - | C::EndsWith - | C::EndsWithInsensitive => T::string_type(), - C::Custom { argument_type } => argument_type, - } - } - - pub fn from_ndc_definition( - ndc_definition: &ndc::ComparisonOperatorDefinition, - map_type: impl FnOnce(&ndc::Type) -> Result, E>, - ) -> Result { - use ndc::ComparisonOperatorDefinition as NDC; - let definition = match ndc_definition { - NDC::Equal => Self::Equal, - NDC::In => Self::In, - NDC::LessThan => Self::LessThan, - NDC::LessThanOrEqual => Self::LessThanOrEqual, - NDC::GreaterThan => Self::GreaterThan, - NDC::GreaterThanOrEqual => Self::GreaterThanOrEqual, - NDC::Contains => Self::Contains, - NDC::ContainsInsensitive => Self::ContainsInsensitive, - NDC::StartsWith => Self::StartsWith, - NDC::StartsWithInsensitive => Self::StartsWithInsensitive, - NDC::EndsWith => Self::EndsWith, - NDC::EndsWithInsensitive => Self::EndsWithInsensitive, - NDC::Custom { argument_type } => Self::Custom { - argument_type: map_type(argument_type)?, - }, - }; - Ok(definition) - } -} - -#[derive(Derivative)] -#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] -pub enum ExistsInCollection { - /// The rows to evaluate the exists predicate against come from a related collection. - /// Only used if the 'relationships' capability is supported. - Related { - /// Key of the relation in the [Query] joins map. Relationships are scoped to the sub-query - /// that defines the relation source. - relationship: ndc::RelationshipName, - }, - /// The rows to evaluate the exists predicate against come from an unrelated collection - /// Only used if the 'query.exists.unrelated' capability is supported. - Unrelated { - /// Key of the relation in the [QueryPlan] joins map. Unrelated collections are not scoped - /// to a sub-query, instead they are given in the root [QueryPlan]. - unrelated_collection: String, - }, - /// The rows to evaluate the exists predicate against come from a nested array field. - /// Only used if the 'query.exists.nested_collections' capability is supported. - NestedCollection { - column_name: ndc::FieldName, - arguments: BTreeMap>, - /// Path to a nested collection via object columns - field_path: Vec, - }, - /// Specifies a column that contains a nested array of scalars. The - /// array will be brought into scope of the nested expression where - /// each element becomes an object with one '__value' column that - /// contains the element value. - /// Only used if the 'query.exists.nested_scalar_collections' capability is supported. - NestedScalarCollection { - column_name: FieldName, - arguments: BTreeMap>, - /// Path to a nested collection via object columns - field_path: Vec, - }, -} diff --git a/crates/ndc-query-plan/src/query_plan/aggregation.rs b/crates/ndc-query-plan/src/query_plan/aggregation.rs new file mode 100644 index 00000000..2b6e2087 --- /dev/null +++ b/crates/ndc-query-plan/src/query_plan/aggregation.rs @@ -0,0 +1,205 @@ +use std::{borrow::Cow, collections::BTreeMap}; + +use derivative::Derivative; +use indexmap::IndexMap; +use ndc_models::{self as ndc, ArgumentName, FieldName}; + +use crate::Type; + +use super::{Argument, ConnectorTypes}; + +pub type Arguments = BTreeMap>; + +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] +pub enum Aggregate { + ColumnCount { + /// The column to apply the count aggregate function to + column: ndc::FieldName, + /// Arguments to satisfy the column specified by 'column' + arguments: BTreeMap>, + /// Path to a nested field within an object column + field_path: Option>, + /// Whether or not only distinct items should be counted + distinct: bool, + }, + SingleColumn { + /// The column to apply the aggregation function to + column: ndc::FieldName, + column_type: Type, + /// Arguments to satisfy the column specified by 'column' + arguments: BTreeMap>, + /// Path to a nested field within an object column + field_path: Option>, + /// Single column aggregate function name. + function: T::AggregateFunction, + result_type: Type, + }, + StarCount, +} + +impl Aggregate { + pub fn result_type(&self) -> Cow> { + match self { + Aggregate::ColumnCount { .. } => Cow::Owned(T::count_aggregate_type()), + Aggregate::SingleColumn { result_type, .. } => Cow::Borrowed(result_type), + Aggregate::StarCount => Cow::Owned(T::count_aggregate_type()), + } + } +} + +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] +pub struct Grouping { + /// Dimensions along which to partition the data + pub dimensions: Vec>, + /// Aggregates to compute in each group + pub aggregates: IndexMap>, + /// Optionally specify a predicate to apply after grouping rows. + /// Only used if the 'query.aggregates.group_by.filter' capability is supported. + pub predicate: Option>, + /// Optionally specify how groups should be ordered + /// Only used if the 'query.aggregates.group_by.order' capability is supported. + pub order_by: Option>, + /// Optionally limit to N groups + /// Only used if the 'query.aggregates.group_by.paginate' capability is supported. + pub limit: Option, + /// Optionally offset from the Nth group + /// Only used if the 'query.aggregates.group_by.paginate' capability is supported. + pub offset: Option, +} + +/// [GroupExpression] is like [Expression] but without [Expression::ArrayComparison] or +/// [Expression::Exists] variants. +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] +pub enum GroupExpression { + And { + expressions: Vec>, + }, + Or { + expressions: Vec>, + }, + Not { + expression: Box>, + }, + UnaryComparisonOperator { + target: GroupComparisonTarget, + operator: ndc::UnaryComparisonOperator, + }, + BinaryComparisonOperator { + target: GroupComparisonTarget, + operator: T::ComparisonOperator, + value: GroupComparisonValue, + }, +} + +impl GroupExpression { + /// In some cases we receive the predicate expression `Some(Expression::And [])` which does not + /// filter out anything, but fails equality checks with `None`. Simplifying that expression to + /// `None` allows us to unify relationship references that we wouldn't otherwise be able to. + pub fn simplify(self) -> Option { + match self { + GroupExpression::And { expressions } if expressions.is_empty() => None, + e => Some(e), + } + } +} + +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] +pub enum GroupComparisonTarget { + Aggregate { aggregate: Aggregate }, +} + +impl GroupComparisonTarget { + pub fn result_type(&self) -> Cow> { + match self { + GroupComparisonTarget::Aggregate { aggregate } => aggregate.result_type(), + } + } +} + +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] +pub enum GroupComparisonValue { + /// A scalar value to compare against + Scalar { + value: serde_json::Value, + value_type: Type, + }, + /// A value to compare against that is to be drawn from the query's variables. + /// Only used if the 'query.variables' capability is supported. + Variable { + name: ndc::VariableName, + variable_type: Type, + }, +} + +#[derive(Derivative)] +#[derivative( + Clone(bound = ""), + Debug(bound = ""), + Hash(bound = ""), + PartialEq(bound = ""), + Eq(bound = "") +)] +pub enum Dimension { + Column { + /// Any (object) relationships to traverse to reach this column. + /// Only non-empty if the 'relationships' capability is supported. + /// + /// These are translated from [ndc::PathElement] values in the to names of relation fields + /// for the [crate::QueryPlan]. + path: Vec, + /// The name of the column + column_name: FieldName, + /// Arguments to satisfy the column specified by 'column_name' + arguments: BTreeMap>, + /// Path to a nested field within an object column + field_path: Option>, + /// Type of the field that you get **after** follwing `field_path` to a possibly-nested + /// field. + /// + /// If this column references a field in a related collection then this type will be an + /// array type whose element type is the type of the related field. The array type wrapper + /// applies regardless of whether the relationship is an array or an object relationship. + field_type: Type, + }, +} + +impl Dimension { + pub fn value_type(&self) -> &Type { + match self { + Dimension::Column { field_type, .. } => field_type, + } + } +} + +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] +pub struct GroupOrderBy { + /// The elements to order by, in priority order + pub elements: Vec>, +} + +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] +pub struct GroupOrderByElement { + pub order_direction: ndc::OrderDirection, + pub target: GroupOrderByTarget, +} + +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] +pub enum GroupOrderByTarget { + Dimension { + /// The index of the dimension to order by, selected from the + /// dimensions provided in the `Grouping` request. + index: usize, + }, + Aggregate { + /// Aggregation method to apply + aggregate: Aggregate, + }, +} diff --git a/crates/ndc-query-plan/src/query_plan/connector_types.rs b/crates/ndc-query-plan/src/query_plan/connector_types.rs new file mode 100644 index 00000000..94b65b4e --- /dev/null +++ b/crates/ndc-query-plan/src/query_plan/connector_types.rs @@ -0,0 +1,15 @@ +use std::fmt::Debug; +use std::hash::Hash; + +use crate::Type; + +pub trait ConnectorTypes { + type ScalarType: Clone + Debug + Hash + PartialEq + Eq; + type AggregateFunction: Clone + Debug + Hash + PartialEq + Eq; + type ComparisonOperator: Clone + Debug + Hash + PartialEq + Eq; + + /// Result type for count aggregations + fn count_aggregate_type() -> Type; + + fn string_type() -> Type; +} diff --git a/crates/ndc-query-plan/src/query_plan/expression.rs b/crates/ndc-query-plan/src/query_plan/expression.rs new file mode 100644 index 00000000..5f854259 --- /dev/null +++ b/crates/ndc-query-plan/src/query_plan/expression.rs @@ -0,0 +1,299 @@ +use std::{borrow::Cow, collections::BTreeMap, iter}; + +use derivative::Derivative; +use itertools::Either; +use ndc_models::{self as ndc, ArgumentName, FieldName}; + +use crate::Type; + +use super::{Argument, ConnectorTypes}; + +#[derive(Derivative)] +#[derivative( + Clone(bound = ""), + Debug(bound = ""), + Hash(bound = ""), + PartialEq(bound = ""), + Eq(bound = "") +)] +pub enum Expression { + And { + expressions: Vec>, + }, + Or { + expressions: Vec>, + }, + Not { + expression: Box>, + }, + UnaryComparisonOperator { + column: ComparisonTarget, + operator: ndc::UnaryComparisonOperator, + }, + BinaryComparisonOperator { + column: ComparisonTarget, + operator: T::ComparisonOperator, + value: ComparisonValue, + }, + /// A comparison against a nested array column. + /// Only used if the 'query.nested_fields.filter_by.nested_arrays' capability is supported. + ArrayComparison { + column: ComparisonTarget, + comparison: ArrayComparison, + }, + Exists { + in_collection: ExistsInCollection, + predicate: Option>>, + }, +} + +impl Expression { + /// In some cases we receive the predicate expression `Some(Expression::And [])` which does not + /// filter out anything, but fails equality checks with `None`. Simplifying that expression to + /// `None` allows us to unify relationship references that we wouldn't otherwise be able to. + pub fn simplify(self) -> Option { + match self { + Expression::And { expressions } if expressions.is_empty() => None, + e => Some(e), + } + } + + /// Get an iterator of columns referenced by the expression, not including columns of related + /// collections. This is used to build a plan for joining the referenced collection - we need + /// to include fields in the join that the expression needs to access. + // + // TODO: ENG-1457 When we implement query.aggregates.filter_by we'll need to collect aggregates + // references. That's why this function returns [ComparisonTarget] instead of [Field]. + pub fn query_local_comparison_targets<'a>( + &'a self, + ) -> Box>> + 'a> { + match self { + Expression::And { expressions } => Box::new( + expressions + .iter() + .flat_map(|e| e.query_local_comparison_targets()), + ), + Expression::Or { expressions } => Box::new( + expressions + .iter() + .flat_map(|e| e.query_local_comparison_targets()), + ), + Expression::Not { expression } => expression.query_local_comparison_targets(), + Expression::UnaryComparisonOperator { column, .. } => { + Box::new(std::iter::once(Cow::Borrowed(column))) + } + Expression::BinaryComparisonOperator { column, value, .. } => Box::new( + std::iter::once(Cow::Borrowed(column)) + .chain(Self::local_targets_from_comparison_value(value).map(Cow::Owned)), + ), + Expression::ArrayComparison { column, comparison } => { + let value_targets = match comparison { + ArrayComparison::Contains { value } => Either::Left( + Self::local_targets_from_comparison_value(value).map(Cow::Owned), + ), + ArrayComparison::IsEmpty => Either::Right(std::iter::empty()), + }; + Box::new(std::iter::once(Cow::Borrowed(column)).chain(value_targets)) + } + Expression::Exists { .. } => Box::new(iter::empty()), + } + } + + fn local_targets_from_comparison_value( + value: &ComparisonValue, + ) -> impl Iterator> { + match value { + ComparisonValue::Column { + path, + name, + arguments, + field_path, + field_type, + .. + } => { + if path.is_empty() { + Either::Left(iter::once(ComparisonTarget::Column { + name: name.clone(), + arguments: arguments.clone(), + field_path: field_path.clone(), + field_type: field_type.clone(), + })) + } else { + Either::Right(iter::empty()) + } + } + _ => Either::Right(std::iter::empty()), + } + } +} + +#[derive(Derivative)] +#[derivative( + Clone(bound = ""), + Debug(bound = ""), + Hash(bound = ""), + PartialEq(bound = ""), + Eq(bound = "") +)] +pub enum ArrayComparison { + /// Check if the array contains the specified value. + /// Only used if the 'query.nested_fields.filter_by.nested_arrays.contains' capability is supported. + Contains { value: ComparisonValue }, + /// Check is the array is empty. + /// Only used if the 'query.nested_fields.filter_by.nested_arrays.is_empty' capability is supported. + IsEmpty, +} + +#[derive(Derivative)] +#[derivative( + Clone(bound = ""), + Debug(bound = ""), + Hash(bound = ""), + PartialEq(bound = ""), + Eq(bound = "") +)] +pub enum ComparisonTarget { + /// The comparison targets a column. + Column { + /// The name of the column + name: ndc::FieldName, + + /// Arguments to satisfy the column specified by 'name' + arguments: BTreeMap>, + + /// Path to a nested field within an object column + field_path: Option>, + + /// Type of the field that you get *after* follwing `field_path` to a possibly-nested + /// field. + field_type: Type, + }, + // TODO: ENG-1457 Add this variant to support query.aggregates.filter_by + // /// The comparison targets the result of aggregation. + // /// Only used if the 'query.aggregates.filter_by' capability is supported. + // Aggregate { + // /// Non-empty collection of relationships to traverse + // path: Vec, + // /// The aggregation method to use + // aggregate: Aggregate, + // }, +} + +impl ComparisonTarget { + pub fn column(name: impl Into, field_type: Type) -> Self { + Self::Column { + name: name.into(), + arguments: Default::default(), + field_path: Default::default(), + field_type, + } + } + + pub fn target_type(&self) -> &Type { + match self { + ComparisonTarget::Column { field_type, .. } => field_type, + // TODO: ENG-1457 + // ComparisonTarget::Aggregate { aggregate, .. } => aggregate.result_type, + } + } +} + +#[derive(Derivative)] +#[derivative( + Clone(bound = ""), + Debug(bound = ""), + Hash(bound = ""), + PartialEq(bound = ""), + Eq(bound = "") +)] +pub enum ComparisonValue { + Column { + /// Any relationships to traverse to reach this column. + /// Only non-empty if the 'relationships.relation_comparisons' is supported. + path: Vec, + /// The name of the column + name: ndc::FieldName, + /// Arguments to satisfy the column specified by 'name' + arguments: BTreeMap>, + /// Path to a nested field within an object column. + /// Only non-empty if the 'query.nested_fields.filter_by' capability is supported. + field_path: Option>, + /// Type of the field that you get *after* follwing `field_path` to a possibly-nested + /// field. + field_type: Type, + /// The scope in which this column exists, identified + /// by an top-down index into the stack of scopes. + /// The stack grows inside each `Expression::Exists`, + /// so scope 0 (the default) refers to the current collection, + /// and each subsequent index refers to the collection outside + /// its predecessor's immediately enclosing `Expression::Exists` + /// expression. + /// Only used if the 'query.exists.named_scopes' capability is supported. + scope: Option, + }, + Scalar { + value: serde_json::Value, + value_type: Type, + }, + Variable { + name: ndc::VariableName, + variable_type: Type, + }, +} + +impl ComparisonValue { + pub fn column(name: impl Into, field_type: Type) -> Self { + Self::Column { + path: Default::default(), + name: name.into(), + arguments: Default::default(), + field_path: Default::default(), + field_type, + scope: Default::default(), + } + } +} + +#[derive(Derivative)] +#[derivative( + Clone(bound = ""), + Debug(bound = ""), + Hash(bound = ""), + PartialEq(bound = ""), + Eq(bound = "") +)] +pub enum ExistsInCollection { + /// The rows to evaluate the exists predicate against come from a related collection. + /// Only used if the 'relationships' capability is supported. + Related { + /// Key of the relation in the [Query] joins map. Relationships are scoped to the sub-query + /// that defines the relation source. + relationship: ndc::RelationshipName, + }, + /// The rows to evaluate the exists predicate against come from an unrelated collection + /// Only used if the 'query.exists.unrelated' capability is supported. + Unrelated { + /// Key of the relation in the [QueryPlan] joins map. Unrelated collections are not scoped + /// to a sub-query, instead they are given in the root [QueryPlan]. + unrelated_collection: String, + }, + /// The rows to evaluate the exists predicate against come from a nested array field. + /// Only used if the 'query.exists.nested_collections' capability is supported. + NestedCollection { + column_name: ndc::FieldName, + arguments: BTreeMap>, + /// Path to a nested collection via object columns + field_path: Vec, + }, + /// Specifies a column that contains a nested array of scalars. The + /// array will be brought into scope of the nested expression where + /// each element becomes an object with one '__value' column that + /// contains the element value. + /// Only used if the 'query.exists.nested_scalar_collections' capability is supported. + NestedScalarCollection { + column_name: FieldName, + arguments: BTreeMap>, + /// Path to a nested collection via object columns + field_path: Vec, + }, +} diff --git a/crates/ndc-query-plan/src/query_plan/fields.rs b/crates/ndc-query-plan/src/query_plan/fields.rs new file mode 100644 index 00000000..c2f88957 --- /dev/null +++ b/crates/ndc-query-plan/src/query_plan/fields.rs @@ -0,0 +1,54 @@ +use derivative::Derivative; +use indexmap::IndexMap; +use ndc_models as ndc; + +use crate::Type; + +use super::{Aggregate, ConnectorTypes, Grouping}; + +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] +pub enum Field { + Column { + column: ndc::FieldName, + + /// When the type of the column is a (possibly-nullable) array or object, + /// the caller can request a subset of the complete column data, + /// by specifying fields to fetch here. + /// If omitted, the column data will be fetched in full. + fields: Option>, + + column_type: Type, + }, + Relationship { + /// The name of the relationship to follow for the subquery - this is the key in the + /// [Query] relationships map in this module, it is **not** the key in the + /// [ndc::QueryRequest] collection_relationships map. + relationship: ndc::RelationshipName, + aggregates: Option>>, + fields: Option>>, + groups: Option>, + }, +} + +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] +pub struct NestedObject { + pub fields: IndexMap>, +} + +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] +pub struct NestedArray { + pub fields: Box>, +} + +// TODO: ENG-1464 define NestedCollection struct + +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] +pub enum NestedField { + Object(NestedObject), + Array(NestedArray), + // TODO: ENG-1464 add `Collection(NestedCollection)` variant +} diff --git a/crates/ndc-query-plan/src/query_plan/mod.rs b/crates/ndc-query-plan/src/query_plan/mod.rs new file mode 100644 index 00000000..1ba7757c --- /dev/null +++ b/crates/ndc-query-plan/src/query_plan/mod.rs @@ -0,0 +1,14 @@ +mod aggregation; +pub use aggregation::*; +mod connector_types; +pub use connector_types::*; +mod expression; +pub use expression::*; +mod fields; +pub use fields::*; +mod ordering; +pub use ordering::*; +mod requests; +pub use requests::*; +mod schema; +pub use schema::*; diff --git a/crates/ndc-query-plan/src/query_plan/ordering.rs b/crates/ndc-query-plan/src/query_plan/ordering.rs new file mode 100644 index 00000000..2e2cb0b7 --- /dev/null +++ b/crates/ndc-query-plan/src/query_plan/ordering.rs @@ -0,0 +1,46 @@ +use std::collections::BTreeMap; + +use derivative::Derivative; +use ndc_models::{self as ndc, ArgumentName, OrderDirection}; + +use super::{Aggregate, Argument, ConnectorTypes}; + +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] +pub struct OrderBy { + /// The elements to order by, in priority order + pub elements: Vec>, +} + +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] +pub struct OrderByElement { + pub order_direction: OrderDirection, + pub target: OrderByTarget, +} + +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] +pub enum OrderByTarget { + Column { + /// Any relationships to traverse to reach this column. These are translated from + /// [ndc::OrderByElement] values in the [ndc::QueryRequest] to names of relation + /// fields for the [crate::QueryPlan]. + path: Vec, + + /// The name of the column + name: ndc::FieldName, + + /// Arguments to satisfy the column specified by 'name' + arguments: BTreeMap>, + + /// Path to a nested field within an object column + field_path: Option>, + }, + Aggregate { + /// Non-empty collection of relationships to traverse + path: Vec, + /// The aggregation method to use + aggregate: Aggregate, + }, +} diff --git a/crates/ndc-query-plan/src/query_plan/requests.rs b/crates/ndc-query-plan/src/query_plan/requests.rs new file mode 100644 index 00000000..a5dc7ed6 --- /dev/null +++ b/crates/ndc-query-plan/src/query_plan/requests.rs @@ -0,0 +1,171 @@ +use std::collections::BTreeMap; + +use derivative::Derivative; +use indexmap::IndexMap; +use ndc_models::{self as ndc, RelationshipType}; +use nonempty::NonEmpty; + +use crate::{vec_set::VecSet, Type}; + +use super::{Aggregate, ConnectorTypes, Expression, Field, Grouping, OrderBy}; + +#[derive(Derivative)] +#[derivative( + Clone(bound = ""), + Debug(bound = ""), + PartialEq(bound = "T::ScalarType: PartialEq") +)] +pub struct QueryPlan { + pub collection: ndc::CollectionName, + pub query: Query, + pub arguments: BTreeMap>, + pub variables: Option>, + + /// Types for values from the `variables` map as inferred by usages in the query request. It is + /// possible for the same variable to be used in multiple contexts with different types. This + /// map provides sets of all observed types. + /// + /// The observed type may be `None` if the type of a variable use could not be inferred. + pub variable_types: VariableTypes, + + // TODO: type for unrelated collection + pub unrelated_collections: BTreeMap>, +} + +impl QueryPlan { + pub fn has_variables(&self) -> bool { + self.variables.is_some() + } +} + +pub type Relationships = BTreeMap>; +pub type VariableSet = BTreeMap; +pub type VariableTypes = BTreeMap>>; + +#[derive(Derivative)] +#[derivative( + Clone(bound = ""), + Debug(bound = ""), + Default(bound = ""), + PartialEq(bound = "") +)] +pub struct Query { + pub aggregates: Option>>, + pub fields: Option>>, + pub limit: Option, + pub offset: Option, + pub order_by: Option>, + pub predicate: Option>, + pub groups: Option>, + + /// Relationships referenced by fields and expressions in this query or sub-query. Does not + /// include relationships in sub-queries nested under this one. + pub relationships: Relationships, + + /// Some relationship references may introduce a named "scope" so that other parts of the query + /// request can reference fields of documents in the related collection. The connector must + /// introduce a variable, or something similar, for such references. + pub scope: Option, +} + +impl Query { + pub fn has_aggregates(&self) -> bool { + if let Some(aggregates) = &self.aggregates { + !aggregates.is_empty() + } else { + false + } + } + + pub fn has_fields(&self) -> bool { + if let Some(fields) = &self.fields { + !fields.is_empty() + } else { + false + } + } + + pub fn has_groups(&self) -> bool { + self.groups.is_some() + } +} + +#[derive(Derivative)] +#[derivative( + Clone(bound = ""), + Debug(bound = ""), + Hash(bound = ""), + PartialEq(bound = ""), + Eq(bound = "") +)] +pub enum Argument { + /// The argument is provided by reference to a variable + Variable { + name: ndc::VariableName, + argument_type: Type, + }, + /// The argument is provided as a literal value + Literal { + value: serde_json::Value, + argument_type: Type, + }, + /// The argument was a literal value that has been parsed as an [Expression] + Predicate { expression: Expression }, +} + +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] +pub struct Relationship { + /// A mapping between columns on the source row to columns on the target collection. + /// The column on the target collection is specified via a field path (ie. an array of field + /// names that descend through nested object fields). The field path will only contain a single item, + /// meaning a column on the target collection's type, unless the 'relationships.nested' + /// capability is supported, in which case multiple items denotes a nested object field. + pub column_mapping: BTreeMap>, + pub relationship_type: RelationshipType, + /// The name of a collection + pub target_collection: ndc::CollectionName, + /// Values to be provided to any collection arguments + pub arguments: BTreeMap>, + pub query: Query, +} + +#[derive(Derivative)] +#[derivative( + Clone(bound = ""), + Debug(bound = ""), + PartialEq(bound = "T::ScalarType: PartialEq") +)] +pub enum RelationshipArgument { + /// The argument is provided by reference to a variable + Variable { + name: ndc::VariableName, + argument_type: Type, + }, + /// The argument is provided as a literal value + Literal { + value: serde_json::Value, + argument_type: Type, + }, + // The argument is provided based on a column of the source collection + Column { + name: ndc::FieldName, + argument_type: Type, + }, + /// The argument was a literal value that has been parsed as an [Expression] + Predicate { expression: Expression }, +} + +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] +pub struct UnrelatedJoin { + pub target_collection: ndc::CollectionName, + pub arguments: BTreeMap>, + pub query: Query, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum Scope { + Root, + Named(String), +} diff --git a/crates/ndc-query-plan/src/query_plan/schema.rs b/crates/ndc-query-plan/src/query_plan/schema.rs new file mode 100644 index 00000000..36ee6dc2 --- /dev/null +++ b/crates/ndc-query-plan/src/query_plan/schema.rs @@ -0,0 +1,80 @@ +use derivative::Derivative; +use ndc_models as ndc; + +use crate::Type; + +use super::ConnectorTypes; + +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] +pub enum ComparisonOperatorDefinition { + Equal, + In, + LessThan, + LessThanOrEqual, + GreaterThan, + GreaterThanOrEqual, + Contains, + ContainsInsensitive, + StartsWith, + StartsWithInsensitive, + EndsWith, + EndsWithInsensitive, + Custom { + /// The type of the argument to this operator + argument_type: Type, + }, +} + +impl ComparisonOperatorDefinition { + pub fn argument_type(self, left_operand_type: &Type) -> Type { + use ComparisonOperatorDefinition as C; + match self { + C::In => Type::ArrayOf(Box::new(left_operand_type.clone())), + C::Equal + | C::LessThan + | C::LessThanOrEqual + | C::GreaterThan + | C::GreaterThanOrEqual => left_operand_type.clone(), + C::Contains + | C::ContainsInsensitive + | C::StartsWith + | C::StartsWithInsensitive + | C::EndsWith + | C::EndsWithInsensitive => T::string_type(), + C::Custom { argument_type } => argument_type, + } + } + + pub fn from_ndc_definition( + ndc_definition: &ndc::ComparisonOperatorDefinition, + map_type: impl FnOnce(&ndc::Type) -> Result, E>, + ) -> Result { + use ndc::ComparisonOperatorDefinition as NDC; + let definition = match ndc_definition { + NDC::Equal => Self::Equal, + NDC::In => Self::In, + NDC::LessThan => Self::LessThan, + NDC::LessThanOrEqual => Self::LessThanOrEqual, + NDC::GreaterThan => Self::GreaterThan, + NDC::GreaterThanOrEqual => Self::GreaterThanOrEqual, + NDC::Contains => Self::Contains, + NDC::ContainsInsensitive => Self::ContainsInsensitive, + NDC::StartsWith => Self::StartsWith, + NDC::StartsWithInsensitive => Self::StartsWithInsensitive, + NDC::EndsWith => Self::EndsWith, + NDC::EndsWithInsensitive => Self::EndsWithInsensitive, + NDC::Custom { argument_type } => Self::Custom { + argument_type: map_type(argument_type)?, + }, + }; + Ok(definition) + } +} + +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""), PartialEq(bound = ""))] +pub struct AggregateFunctionDefinition { + /// The scalar or object type of the result of this function + pub result_type: Type, +} diff --git a/crates/ndc-query-plan/src/type_system.rs b/crates/ndc-query-plan/src/type_system.rs index 922b52c4..dce58f1d 100644 --- a/crates/ndc-query-plan/src/type_system.rs +++ b/crates/ndc-query-plan/src/type_system.rs @@ -1,5 +1,5 @@ use ref_cast::RefCast; -use std::collections::BTreeMap; +use std::{collections::BTreeMap, fmt::Display}; use itertools::Itertools as _; use ndc_models::{self as ndc, ArgumentName, ObjectTypeName}; @@ -9,7 +9,7 @@ use crate::{self as plan, QueryPlanError}; type Result = std::result::Result; /// The type of values that a column, field, or argument may take. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, Hash, PartialEq, Eq)] pub enum Type { Scalar(ScalarType), /// The name of an object type declared in `objectTypes` @@ -17,6 +17,8 @@ pub enum Type { ArrayOf(Box>), /// A nullable form of any of the other types Nullable(Box>), + /// Used internally + Tuple(Vec>), } impl Type { @@ -87,7 +89,41 @@ impl Type { } } -#[derive(Debug, Clone, PartialEq, Eq)] +impl Display for Type { + /// Display types using GraphQL-style syntax + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn helper(t: &Type, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result + where + S: Display, + { + match t { + Type::Scalar(s) => write!(f, "{}", s), + Type::Object(ot) => write!(f, "{ot}"), + Type::ArrayOf(t) => write!(f, "[{t}]"), + Type::Nullable(t) => write!(f, "{t}"), + Type::Tuple(ts) => { + write!(f, "(")?; + for (index, t) in ts.iter().enumerate() { + write!(f, "{t}")?; + if index < ts.len() - 1 { + write!(f, ", ")?; + } + } + write!(f, ")") + } + } + } + match self { + Type::Nullable(t) => helper(t, f), + t => { + helper(t, f)?; + write!(f, "!") + } + } + } +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] pub struct ObjectType { /// A type name may be tracked for error reporting. The name does not affect how query plans /// are generated. @@ -130,7 +166,21 @@ impl ObjectType { } } -#[derive(Clone, Debug, PartialEq, Eq)] +impl Display for ObjectType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{{ ")?; + for (index, (name, field)) in self.fields.iter().enumerate() { + write!(f, "{name}: {}", field.r#type)?; + if index < self.fields.len() - 1 { + write!(f, ", ")?; + } + } + write!(f, " }}")?; + Ok(()) + } +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] pub struct ObjectField { pub r#type: Type, /// The arguments available to the field - Matches implementation from CollectionInfo diff --git a/crates/ndc-test-helpers/src/aggregates.rs b/crates/ndc-test-helpers/src/aggregates.rs index 894a823a..16c1eb75 100644 --- a/crates/ndc-test-helpers/src/aggregates.rs +++ b/crates/ndc-test-helpers/src/aggregates.rs @@ -1,16 +1,48 @@ -#[macro_export()] -macro_rules! column_aggregate { - ($name:literal => $column:literal, $function:literal) => { - ( - $name, - $crate::ndc_models::Aggregate::SingleColumn { - column: $column.into(), - arguments: Default::default(), - function: $function.into(), - field_path: None, - }, - ) - }; +use std::collections::BTreeMap; + +use ndc_models::{Aggregate, AggregateFunctionName, Argument, ArgumentName, FieldName}; + +use crate::column::Column; + +pub struct AggregateColumnBuilder { + column: FieldName, + arguments: BTreeMap, + field_path: Option>, + function: AggregateFunctionName, +} + +pub fn column_aggregate( + column: impl Into, + function: impl Into, +) -> AggregateColumnBuilder { + let column = column.into(); + AggregateColumnBuilder { + column: column.column, + function: function.into(), + arguments: column.arguments, + field_path: column.field_path, + } +} + +impl AggregateColumnBuilder { + pub fn field_path( + mut self, + field_path: impl IntoIterator>, + ) -> Self { + self.field_path = Some(field_path.into_iter().map(Into::into).collect()); + self + } +} + +impl From for Aggregate { + fn from(builder: AggregateColumnBuilder) -> Self { + Aggregate::SingleColumn { + column: builder.column, + arguments: builder.arguments, + function: builder.function, + field_path: builder.field_path, + } + } } #[macro_export()] diff --git a/crates/ndc-test-helpers/src/column.rs b/crates/ndc-test-helpers/src/column.rs new file mode 100644 index 00000000..ce492ab6 --- /dev/null +++ b/crates/ndc-test-helpers/src/column.rs @@ -0,0 +1,63 @@ +use std::collections::BTreeMap; + +use itertools::Itertools as _; +use ndc_models::{Argument, ArgumentName, FieldName, PathElement, RelationshipName}; + +use crate::path_element; + +/// An intermediate struct that can be used to populate ComparisonTarget::Column, +/// Dimension::Column, etc. +pub struct Column { + pub path: Vec, + pub column: FieldName, + pub arguments: BTreeMap, + pub field_path: Option>, +} + +impl Column { + pub fn path(mut self, elements: impl IntoIterator>) -> Self { + self.path = elements.into_iter().map(Into::into).collect(); + self + } + + pub fn from_relationship(mut self, name: impl Into) -> Self { + self.path = vec![path_element(name).into()]; + self + } +} + +pub fn column(name: impl Into) -> Column { + Column { + path: Default::default(), + column: name.into(), + arguments: Default::default(), + field_path: Default::default(), + } +} + +impl From<&str> for Column { + fn from(input: &str) -> Self { + let mut parts = input.split("."); + let column = parts + .next() + .expect("a column reference must not be an empty string") + .into(); + let field_path = parts.map(Into::into).collect_vec(); + Column { + path: Default::default(), + column, + arguments: Default::default(), + field_path: if field_path.is_empty() { + None + } else { + Some(field_path) + }, + } + } +} + +impl From for Column { + fn from(name: FieldName) -> Self { + column(name) + } +} diff --git a/crates/ndc-test-helpers/src/groups.rs b/crates/ndc-test-helpers/src/groups.rs new file mode 100644 index 00000000..4899f3b2 --- /dev/null +++ b/crates/ndc-test-helpers/src/groups.rs @@ -0,0 +1,144 @@ +use std::collections::BTreeMap; + +use indexmap::IndexMap; +use ndc_models::{ + Aggregate, Argument, ArgumentName, Dimension, FieldName, GroupExpression, GroupOrderBy, + GroupOrderByElement, Grouping, OrderBy, OrderDirection, PathElement, +}; + +use crate::column::Column; + +#[derive(Clone, Debug, Default)] +pub struct GroupingBuilder { + dimensions: Vec, + aggregates: IndexMap, + predicate: Option, + order_by: Option, + limit: Option, + offset: Option, +} + +pub fn grouping() -> GroupingBuilder { + Default::default() +} + +impl GroupingBuilder { + pub fn dimensions( + mut self, + dimensions: impl IntoIterator>, + ) -> Self { + self.dimensions = dimensions.into_iter().map(Into::into).collect(); + self + } + + pub fn aggregates( + mut self, + aggregates: impl IntoIterator, impl Into)>, + ) -> Self { + self.aggregates = aggregates + .into_iter() + .map(|(name, aggregate)| (name.into(), aggregate.into())) + .collect(); + self + } + + pub fn predicate(mut self, predicate: impl Into) -> Self { + self.predicate = Some(predicate.into()); + self + } + + pub fn order_by(mut self, order_by: impl Into) -> Self { + self.order_by = Some(order_by.into()); + self + } + + pub fn limit(mut self, limit: u32) -> Self { + self.limit = Some(limit); + self + } + + pub fn offset(mut self, offset: u32) -> Self { + self.offset = Some(offset); + self + } +} + +impl From for Grouping { + fn from(value: GroupingBuilder) -> Self { + Grouping { + dimensions: value.dimensions, + aggregates: value.aggregates, + predicate: value.predicate, + order_by: value.order_by, + limit: value.limit, + offset: value.offset, + } + } +} + +#[derive(Clone, Debug)] +pub struct DimensionColumnBuilder { + path: Vec, + column_name: FieldName, + arguments: BTreeMap, + field_path: Option>, +} + +pub fn dimension_column(column: impl Into) -> DimensionColumnBuilder { + let column = column.into(); + DimensionColumnBuilder { + path: column.path, + column_name: column.column, + arguments: column.arguments, + field_path: column.field_path, + } +} + +impl DimensionColumnBuilder { + pub fn path(mut self, path: impl IntoIterator>) -> Self { + self.path = path.into_iter().map(Into::into).collect(); + self + } + + pub fn arguments( + mut self, + arguments: impl IntoIterator, impl Into)>, + ) -> Self { + self.arguments = arguments + .into_iter() + .map(|(name, argument)| (name.into(), argument.into())) + .collect(); + self + } + + pub fn field_path( + mut self, + field_path: impl IntoIterator>, + ) -> Self { + self.field_path = Some(field_path.into_iter().map(Into::into).collect()); + self + } +} + +impl From for Dimension { + fn from(value: DimensionColumnBuilder) -> Self { + Dimension::Column { + path: value.path, + column_name: value.column_name, + arguments: value.arguments, + field_path: value.field_path, + } + } +} + +/// Produces a consistent ordering for up to 10 dimensions +pub fn ordered_dimensions() -> GroupOrderBy { + GroupOrderBy { + elements: (0..10) + .map(|index| GroupOrderByElement { + order_direction: OrderDirection::Asc, + target: ndc_models::GroupOrderByTarget::Dimension { index }, + }) + .collect(), + } +} diff --git a/crates/ndc-test-helpers/src/lib.rs b/crates/ndc-test-helpers/src/lib.rs index 299c346a..1d79d525 100644 --- a/crates/ndc-test-helpers/src/lib.rs +++ b/crates/ndc-test-helpers/src/lib.rs @@ -2,12 +2,16 @@ #![allow(unused_imports)] mod aggregates; +pub use aggregates::*; mod collection_info; +mod column; +pub use column::*; mod comparison_target; mod comparison_value; mod exists_in_collection; mod expressions; mod field; +mod groups; mod object_type; mod order_by; mod path_element; @@ -19,7 +23,7 @@ use std::collections::BTreeMap; use indexmap::IndexMap; use ndc_models::{ - Aggregate, Argument, Expression, Field, OrderBy, OrderByElement, PathElement, Query, + Aggregate, Argument, Expression, Field, FieldName, OrderBy, OrderByElement, PathElement, Query, QueryRequest, Relationship, RelationshipArgument, RelationshipType, }; @@ -33,6 +37,7 @@ pub use comparison_value::*; pub use exists_in_collection::*; pub use expressions::*; pub use field::*; +pub use groups::*; pub use object_type::*; pub use order_by::*; pub use path_element::*; @@ -47,7 +52,6 @@ pub struct QueryRequestBuilder { arguments: Option>, collection_relationships: Option>, variables: Option>>, - groups: Option, } pub fn query_request() -> QueryRequestBuilder { @@ -62,7 +66,6 @@ impl QueryRequestBuilder { arguments: None, collection_relationships: None, variables: None, - groups: None, } } @@ -118,11 +121,6 @@ impl QueryRequestBuilder { ); self } - - pub fn groups(mut self, groups: impl Into) -> Self { - self.groups = Some(groups.into()); - self - } } impl From for QueryRequest { @@ -179,11 +177,14 @@ impl QueryBuilder { self } - pub fn aggregates(mut self, aggregates: [(&str, Aggregate); S]) -> Self { + pub fn aggregates( + mut self, + aggregates: impl IntoIterator, impl Into)>, + ) -> Self { self.aggregates = Some( aggregates .into_iter() - .map(|(name, aggregate)| (name.to_owned().into(), aggregate)) + .map(|(name, aggregate)| (name.into(), aggregate.into())) .collect(), ); self @@ -208,6 +209,11 @@ impl QueryBuilder { self.predicate = Some(expression); self } + + pub fn groups(mut self, groups: impl Into) -> Self { + self.groups = Some(groups.into()); + self + } } impl From for Query { diff --git a/crates/ndc-test-helpers/src/query_response.rs b/crates/ndc-test-helpers/src/query_response.rs index 3c94378f..6b87f5c6 100644 --- a/crates/ndc-test-helpers/src/query_response.rs +++ b/crates/ndc-test-helpers/src/query_response.rs @@ -1,5 +1,5 @@ use indexmap::IndexMap; -use ndc_models::{QueryResponse, RowFieldValue, RowSet}; +use ndc_models::{FieldName, Group, QueryResponse, RowFieldValue, RowSet}; #[derive(Clone, Debug, Default)] pub struct QueryResponseBuilder { @@ -56,13 +56,10 @@ impl RowSetBuilder { pub fn aggregates( mut self, - aggregates: impl IntoIterator)>, + aggregates: impl IntoIterator, impl Into)>, ) -> Self { - self.aggregates.extend( - aggregates - .into_iter() - .map(|(k, v)| (k.to_string().into(), v.into())), - ); + self.aggregates + .extend(aggregates.into_iter().map(|(k, v)| (k.into(), v.into()))); self } @@ -134,3 +131,16 @@ pub fn query_response() -> QueryResponseBuilder { pub fn row_set() -> RowSetBuilder { Default::default() } + +pub fn group( + dimensions: impl IntoIterator>, + aggregates: impl IntoIterator)>, +) -> Group { + Group { + dimensions: dimensions.into_iter().map(Into::into).collect(), + aggregates: aggregates + .into_iter() + .map(|(name, value)| (name.to_string(), value.into())) + .collect(), + } +} diff --git a/flake.lock b/flake.lock index b0b135c2..79c8ca2f 100644 --- a/flake.lock +++ b/flake.lock @@ -110,11 +110,11 @@ "graphql-engine-source": { "flake": false, "locked": { - "lastModified": 1736343392, - "narHash": "sha256-qv7MPD9NhZE1q7yFbGuqkoRF1igV0hCfn16DzhgZSUs=", + "lastModified": 1738870584, + "narHash": "sha256-YYp1IJpEv+MIsIVQ25rw2/aKHWZZ9avIW7GMXYJPkJU=", "owner": "hasura", "repo": "graphql-engine", - "rev": "48910e25ef253f033b80b487381f0e94e5f1ea27", + "rev": "249552b0ea8669d37b77da205abac2c2b41e5b34", "type": "github" }, "original": { @@ -145,11 +145,11 @@ "nixpkgs": "nixpkgs" }, "locked": { - "lastModified": 1733604522, - "narHash": "sha256-9XNxIgOGq8MJ3a1GPE1lGaMBSz6Ossgv/Ec+KhyaC68=", + "lastModified": 1738802037, + "narHash": "sha256-2rFnj+lf9ecXH+/qFA2ncyz/+mH/ho+XftUgVXrLjBQ=", "owner": "hasura", "repo": "ddn-cli-nix", - "rev": "8e9695beabd6d111a69ae288f8abba6ebf8d1c82", + "rev": "d439eab6b2254977234261081191f5d83bce49fd", "type": "github" }, "original": {