diff --git a/CHANGELOG.md b/CHANGELOG.md index fd736888..651f7189 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,10 +6,47 @@ This changelog documents the changes between release versions. ### Added +- Add option to skip rows on response type mismatch ([#162](https://github.com/hasura/ndc-mongodb/pull/162)) + ### Changed ### Fixed +### Option to skip rows on response type mismatch + +When sending response data for a query if we encounter a value that does not match the type declared in the connector +schema the default behavior is to respond with an error. That prevents the user from getting any data. This change adds +an option to silently skip rows that contain type mismatches so that the user can get a partial set of result data. + +This can come up if, for example, you have database documents with a field that nearly always contains an `int` value, +but in a handful of cases that field contains a `string`. Introspection may determine that the type of the field is +`int` if the random document sampling does not happen to check one of the documents with a `string`. Then when you run +a query that _does_ read one of those documents the query fails because the connector refuses to return a value of an +unexpected type. + +The new option, `onResponseTypeMismatch`, has two possible values: `fail` (the existing, default behavior), or `skipRow` +(the new, opt-in behavior). If you set the option to `skipRow` in the example case above the connector will silently +exclude documents with unexpected `string` values in the response. This allows you to get access to the "good" data. +This is opt-in because we don't want to exclude data if users are not aware that might be happening. + +The option is set in connector configuration in `configuration.json`. Here is an example configuration: + +```json +{ + "introspectionOptions": { + "sampleSize": 1000, + "noValidatorSchema": false, + "allSchemaNullable": false + }, + "serializationOptions": { + "extendedJsonMode": "relaxed", + "onResponseTypeMismatch": "skipRow" + } +} +``` + +The `skipRow` behavior does not affect aggregations, or queries that do not request the field with the unexpected type. + ## [1.7.2] - 2025-04-16 ### Fixed @@ -22,10 +59,6 @@ This changelog documents the changes between release versions. - Add watch command while initializing metadata ([#157](https://github.com/hasura/ndc-mongodb/pull/157)) -### Changed - -### Fixed - ## [1.7.0] - 2025-03-10 ### Added diff --git a/crates/configuration/src/configuration.rs b/crates/configuration/src/configuration.rs index 729b680b..2880057a 100644 --- a/crates/configuration/src/configuration.rs +++ b/crates/configuration/src/configuration.rs @@ -244,6 +244,26 @@ pub struct ConfigurationSerializationOptions { /// used for output. This setting has no effect on inputs (query arguments, etc.). #[serde(default)] pub extended_json_mode: ExtendedJsonMode, + + /// When sending response data the connector may encounter data in a field that does not match + /// the type declared for that field in the connector schema. This option specifies what the + /// connector should do in this situation. + #[serde(default)] + pub on_response_type_mismatch: OnResponseTypeMismatch, +} + +/// Options for connector behavior on encountering a type mismatch between query response data, and +/// declared types in schema. +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum OnResponseTypeMismatch { + /// On a type mismatch, send an error instead of response data. Fails the entire query. + #[default] + Fail, + + /// If any field in a response row contains data of an incorrect type, exclude that row from + /// the response. + SkipRow, } fn merge_object_types<'a>( diff --git a/crates/configuration/src/lib.rs b/crates/configuration/src/lib.rs index 9e0402a2..2e229594 100644 --- a/crates/configuration/src/lib.rs +++ b/crates/configuration/src/lib.rs @@ -7,7 +7,10 @@ pub mod schema; pub mod serialized; mod with_name; -pub use crate::configuration::Configuration; +pub use crate::configuration::{ + Configuration, ConfigurationIntrospectionOptions, ConfigurationOptions, + ConfigurationSerializationOptions, OnResponseTypeMismatch, +}; pub use crate::directory::parse_configuration_options_file; pub use crate::directory::read_existing_schemas; pub use crate::directory::write_schema_directory; diff --git a/crates/mongodb-agent-common/src/mongo_query_plan/mod.rs b/crates/mongodb-agent-common/src/mongo_query_plan/mod.rs index f3312356..e2339955 100644 --- a/crates/mongodb-agent-common/src/mongo_query_plan/mod.rs +++ b/crates/mongodb-agent-common/src/mongo_query_plan/mod.rs @@ -1,9 +1,10 @@ use std::collections::BTreeMap; +use configuration::ConfigurationSerializationOptions; use configuration::{ native_mutation::NativeMutation, native_query::NativeQuery, Configuration, MongoScalarType, }; -use mongodb_support::{ExtendedJsonMode, EXTENDED_JSON_TYPE_NAME}; +use mongodb_support::EXTENDED_JSON_TYPE_NAME; use ndc_models as ndc; use ndc_query_plan::{ConnectorTypes, QueryContext, QueryPlanError}; @@ -15,8 +16,8 @@ use crate::scalar_types_capabilities::SCALAR_TYPES; pub struct MongoConfiguration(pub Configuration); impl MongoConfiguration { - pub fn extended_json_mode(&self) -> ExtendedJsonMode { - self.0.options.serialization_options.extended_json_mode + pub fn serialization_options(&self) -> &ConfigurationSerializationOptions { + &self.0.options.serialization_options } pub fn native_queries(&self) -> &BTreeMap { diff --git a/crates/mongodb-agent-common/src/query/execute_query_request.rs b/crates/mongodb-agent-common/src/query/execute_query_request.rs index aa1b4551..1a3a961f 100644 --- a/crates/mongodb-agent-common/src/query/execute_query_request.rs +++ b/crates/mongodb-agent-common/src/query/execute_query_request.rs @@ -33,7 +33,8 @@ pub async fn execute_query_request( tracing::debug!(?query_plan, "abstract query plan"); let pipeline = pipeline_for_query_request(config, &query_plan)?; let documents = execute_query_pipeline(database, config, &query_plan, pipeline).await?; - let response = serialize_query_response(config.extended_json_mode(), &query_plan, documents)?; + let response = + serialize_query_response(config.serialization_options(), &query_plan, documents)?; Ok(response) } diff --git a/crates/mongodb-agent-common/src/query/response.rs b/crates/mongodb-agent-common/src/query/response.rs index cec6f1b8..0b31b82a 100644 --- a/crates/mongodb-agent-common/src/query/response.rs +++ b/crates/mongodb-agent-common/src/query/response.rs @@ -1,10 +1,9 @@ use std::collections::BTreeMap; -use configuration::MongoScalarType; +use configuration::{ConfigurationSerializationOptions, MongoScalarType, OnResponseTypeMismatch}; use indexmap::IndexMap; use itertools::Itertools; use mongodb::bson::{self, Bson}; -use mongodb_support::ExtendedJsonMode; use ndc_models::{QueryResponse, RowFieldValue, RowSet}; use serde::Deserialize; use thiserror::Error; @@ -50,7 +49,7 @@ struct BsonRowSet { #[instrument(name = "Serialize Query Response", skip_all, fields(internal.visibility = "user"))] pub fn serialize_query_response( - mode: ExtendedJsonMode, + options: &ConfigurationSerializationOptions, query_plan: &QueryPlan, response_documents: Vec, ) -> Result { @@ -62,7 +61,7 @@ pub fn serialize_query_response( .map(|document| { let row_set = bson::from_document(document)?; serialize_row_set_with_aggregates( - mode, + options, &[collection_name.as_str()], &query_plan.query, row_set, @@ -72,14 +71,14 @@ pub fn serialize_query_response( } else if query_plan.query.has_aggregates() { let row_set = parse_single_document(response_documents)?; Ok(vec![serialize_row_set_with_aggregates( - mode, + options, &[], &query_plan.query, row_set, )?]) } else { Ok(vec![serialize_row_set_rows_only( - mode, + options, &[], &query_plan.query, response_documents, @@ -92,7 +91,7 @@ pub fn serialize_query_response( // When there are no aggregates we expect a list of rows fn serialize_row_set_rows_only( - mode: ExtendedJsonMode, + options: &ConfigurationSerializationOptions, path: &[&str], query: &Query, docs: Vec, @@ -100,7 +99,7 @@ fn serialize_row_set_rows_only( let rows = query .fields .as_ref() - .map(|fields| serialize_rows(mode, path, fields, docs)) + .map(|fields| serialize_rows(options, path, fields, docs)) .transpose()?; Ok(RowSet { @@ -112,7 +111,7 @@ fn serialize_row_set_rows_only( // When there are aggregates we expect a single document with `rows` and `aggregates` // fields fn serialize_row_set_with_aggregates( - mode: ExtendedJsonMode, + options: &ConfigurationSerializationOptions, path: &[&str], query: &Query, row_set: BsonRowSet, @@ -120,26 +119,26 @@ fn serialize_row_set_with_aggregates( let aggregates = query .aggregates .as_ref() - .map(|aggregates| serialize_aggregates(mode, path, aggregates, row_set.aggregates)) + .map(|aggregates| serialize_aggregates(options, path, aggregates, row_set.aggregates)) .transpose()?; let rows = query .fields .as_ref() - .map(|fields| serialize_rows(mode, path, fields, row_set.rows)) + .map(|fields| serialize_rows(options, path, fields, row_set.rows)) .transpose()?; Ok(RowSet { aggregates, rows }) } fn serialize_aggregates( - mode: ExtendedJsonMode, + options: &ConfigurationSerializationOptions, path: &[&str], query_aggregates: &IndexMap, value: Bson, ) -> Result> { let aggregates_type = type_for_aggregates(query_aggregates); - let json = bson_to_json(mode, &aggregates_type, value)?; + let json = bson_to_json(options.extended_json_mode, &aggregates_type, value)?; // The NDC type uses an IndexMap for aggregate values; we need to convert the map // underlying the Value::Object value to an IndexMap @@ -153,28 +152,39 @@ fn serialize_aggregates( } fn serialize_rows( - mode: ExtendedJsonMode, + options: &ConfigurationSerializationOptions, path: &[&str], query_fields: &IndexMap, docs: Vec, ) -> Result>> { let row_type = type_for_row(path, query_fields)?; - docs.into_iter() - .map(|doc| { - let json = bson_to_json(mode, &row_type, doc.into())?; + let rows = docs + .into_iter() + .filter_map( + |doc| match bson_to_json(options.extended_json_mode, &row_type, doc.into()) { + Ok(json) => Some(Ok(json)), + Err(BsonToJsonError::TypeMismatch(_, _)) + if options.on_response_type_mismatch == OnResponseTypeMismatch::SkipRow => + { + None + } + Err(error) => Some(Err(error)), + }, + ) + .map_ok(|json| { // The NDC types use an IndexMap for each row value; we need to convert the map // underlying the Value::Object value to an IndexMap - let index_map = match json { + match json { serde_json::Value::Object(obj) => obj .into_iter() .map(|(key, value)| (key.into(), RowFieldValue(value))) .collect(), _ => unreachable!(), - }; - Ok(index_map) + } }) - .try_collect() + .try_collect()?; + Ok(rows) } fn type_for_row_set( @@ -322,9 +332,12 @@ fn path_to_owned(path: &[&str]) -> Vec { mod tests { use std::str::FromStr; - use configuration::{Configuration, MongoScalarType}; + use configuration::{ + Configuration, ConfigurationOptions, ConfigurationSerializationOptions, MongoScalarType, + OnResponseTypeMismatch, + }; use mongodb::bson::{self, Bson}; - use mongodb_support::{BsonScalarType, ExtendedJsonMode}; + use mongodb_support::BsonScalarType; use ndc_models::{QueryRequest, QueryResponse, RowFieldValue, RowSet}; use ndc_query_plan::plan_for_query_request; use ndc_test_helpers::{ @@ -336,7 +349,7 @@ mod tests { use crate::{ mongo_query_plan::{MongoConfiguration, ObjectType, Type}, - test_helpers::make_nested_schema, + test_helpers::{chinook_config, chinook_relationships, make_nested_schema}, }; use super::{serialize_query_response, type_for_row_set}; @@ -364,7 +377,7 @@ mod tests { }]; let response = - serialize_query_response(ExtendedJsonMode::Canonical, &query_plan, response_documents)?; + serialize_query_response(&Default::default(), &query_plan, response_documents)?; assert_eq!( response, QueryResponse(vec![RowSet { @@ -404,7 +417,7 @@ mod tests { }]; let response = - serialize_query_response(ExtendedJsonMode::Canonical, &query_plan, response_documents)?; + serialize_query_response(&Default::default(), &query_plan, response_documents)?; assert_eq!( response, QueryResponse(vec![RowSet { @@ -451,7 +464,7 @@ mod tests { }]; let response = - serialize_query_response(ExtendedJsonMode::Canonical, &query_plan, response_documents)?; + serialize_query_response(&Default::default(), &query_plan, response_documents)?; assert_eq!( response, QueryResponse(vec![RowSet { @@ -509,8 +522,11 @@ mod tests { "price_extjson": Bson::Decimal128(bson::Decimal128::from_str("-4.9999999999").unwrap()), }]; - let response = - serialize_query_response(ExtendedJsonMode::Canonical, &query_plan, response_documents)?; + let response = serialize_query_response( + query_context.serialization_options(), + &query_plan, + response_documents, + )?; assert_eq!( response, QueryResponse(vec![RowSet { @@ -567,8 +583,11 @@ mod tests { }, }]; - let response = - serialize_query_response(ExtendedJsonMode::Canonical, &query_plan, response_documents)?; + let response = serialize_query_response( + query_context.serialization_options(), + &query_plan, + response_documents, + )?; assert_eq!( response, QueryResponse(vec![RowSet { @@ -602,11 +621,14 @@ mod tests { object_type([("value", named_type("ExtendedJSON"))]), )] .into(), - functions: Default::default(), - procedures: Default::default(), - native_mutations: Default::default(), - native_queries: Default::default(), - options: Default::default(), + options: ConfigurationOptions { + serialization_options: ConfigurationSerializationOptions { + extended_json_mode: mongodb_support::ExtendedJsonMode::Relaxed, + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() }); let request = query_request() @@ -630,8 +652,11 @@ mod tests { }, }]; - let response = - serialize_query_response(ExtendedJsonMode::Relaxed, &query_plan, response_documents)?; + let response = serialize_query_response( + query_context.serialization_options(), + &query_plan, + response_documents, + )?; assert_eq!( response, QueryResponse(vec![RowSet { @@ -729,4 +754,135 @@ mod tests { assert_eq!(row_set_type, expected); Ok(()) } + + #[test] + fn fails_on_response_type_mismatch() -> anyhow::Result<()> { + let options = ConfigurationSerializationOptions { + on_response_type_mismatch: OnResponseTypeMismatch::Fail, + ..Default::default() + }; + + let request = query_request() + .collection("Track") + .query(query().fields([field!("Milliseconds")])) + .into(); + + let query_plan = plan_for_query_request(&chinook_config(), request)?; + + let response_documents = vec![ + bson::doc! { "Milliseconds": 1 }, + bson::doc! { "Milliseconds": "two" }, + bson::doc! { "Milliseconds": 3 }, + ]; + + let response_result = serialize_query_response(&options, &query_plan, response_documents); + assert!( + response_result.is_err(), + "serialize_query_response returns an error" + ); + Ok(()) + } + + #[test] + fn skips_rows_with_unexpected_data_type() -> anyhow::Result<()> { + let options = ConfigurationSerializationOptions { + on_response_type_mismatch: OnResponseTypeMismatch::SkipRow, + ..Default::default() + }; + + let request = query_request() + .collection("Track") + .query(query().fields([field!("Milliseconds")])) + .into(); + + let query_plan = plan_for_query_request(&chinook_config(), request)?; + + let response_documents = vec![ + bson::doc! { "Milliseconds": 1 }, + bson::doc! { "Milliseconds": "two" }, + bson::doc! { "Milliseconds": 3 }, + ]; + + let response = serialize_query_response(&options, &query_plan, response_documents)?; + assert_eq!( + response, + QueryResponse(vec![RowSet { + aggregates: Default::default(), + rows: Some(vec![ + [("Milliseconds".into(), RowFieldValue(json!(1)))].into(), + [("Milliseconds".into(), RowFieldValue(json!(3)))].into(), + ]) + }]) + ); + Ok(()) + } + + #[test] + fn fails_on_response_type_mismatch_in_related_collection() -> anyhow::Result<()> { + let options = ConfigurationSerializationOptions { + on_response_type_mismatch: OnResponseTypeMismatch::Fail, + ..Default::default() + }; + + let request = query_request() + .collection("Album") + .query( + query().fields([relation_field!("Tracks" => "Tracks", query().fields([ + field!("Milliseconds") + ]))]), + ) + .relationships(chinook_relationships()) + .into(); + + let query_plan = plan_for_query_request(&chinook_config(), request)?; + + let response_documents = vec![bson::doc! { "Tracks": { "rows": [ + bson::doc! { "Milliseconds": 1 }, + bson::doc! { "Milliseconds": "two" }, + bson::doc! { "Milliseconds": 3 }, + ] } }]; + + let response_result = serialize_query_response(&options, &query_plan, response_documents); + assert!( + response_result.is_err(), + "serialize_query_response returns an error" + ); + Ok(()) + } + + #[test] + fn skips_rows_with_unexpected_data_type_in_related_collection() -> anyhow::Result<()> { + let options = ConfigurationSerializationOptions { + on_response_type_mismatch: OnResponseTypeMismatch::SkipRow, + ..Default::default() + }; + + let request = query_request() + .collection("Album") + .query( + query().fields([relation_field!("Tracks" => "Tracks", query().fields([ + field!("Milliseconds") + ]))]), + ) + .relationships(chinook_relationships()) + .into(); + + let query_plan = plan_for_query_request(&chinook_config(), request)?; + + let response_documents = vec![bson::doc! { "Tracks": { "rows": [ + bson::doc! { "Milliseconds": 1 }, + bson::doc! { "Milliseconds": "two" }, + bson::doc! { "Milliseconds": 3 }, + ] } }]; + + let response = serialize_query_response(&options, &query_plan, response_documents)?; + assert_eq!( + response, + QueryResponse(vec![RowSet { + aggregates: Default::default(), + rows: Some(vec![]) + }]) + ); + Ok(()) + } } diff --git a/crates/mongodb-connector/src/mutation.rs b/crates/mongodb-connector/src/mutation.rs index 7b932fbd..7082f9e2 100644 --- a/crates/mongodb-connector/src/mutation.rs +++ b/crates/mongodb-connector/src/mutation.rs @@ -109,7 +109,7 @@ async fn execute_procedure( }; let json_result = bson_to_json( - config.extended_json_mode(), + config.serialization_options().extended_json_mode, &requested_result_type, rewritten_result, )