diff --git a/Cargo.lock b/Cargo.lock index 1aad36ee..8e4570ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -432,6 +432,7 @@ dependencies = [ "itertools 0.12.1", "mongodb", "mongodb-support", + "ndc-models", "schemars", "serde", "serde_json", @@ -658,12 +659,6 @@ version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8" -[[package]] -name = "difflib" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" - [[package]] name = "digest" version = "0.10.7" @@ -762,15 +757,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6" -[[package]] -name = "float-cmp" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" -dependencies = [ - "num-traits", -] - [[package]] name = "fnv" version = "1.0.7" @@ -1402,9 +1388,9 @@ dependencies = [ [[package]] name = "mockall" -version = "0.11.4" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c84490118f2ee2d74570d114f3d0493cbf02790df303d2707606c3e14e07c96" +checksum = "43766c2b5203b10de348ffe19f7e54564b64f3d6018ff7648d1e2d6d3a0f0a48" dependencies = [ "cfg-if", "downcast", @@ -1417,14 +1403,14 @@ dependencies = [ [[package]] name = "mockall_derive" -version = "0.11.4" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ce75669015c4f47b289fd4d4f56e894e4c96003ffdf3ac51313126f94c6cbb" +checksum = "af7cbce79ec385a1d4f54baa90a76401eb15d9cab93685f62e7e9f942aa00ae2" dependencies = [ "cfg-if", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.52", ] [[package]] @@ -1483,6 +1469,7 @@ dependencies = [ "bytes", "configuration", "dc-api", + "dc-api-test-helpers", "dc-api-types", "enum-iterator", "futures", @@ -1677,12 +1664,6 @@ dependencies = [ "serde", ] -[[package]] -name = "normalize-line-endings" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" - [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1995,16 +1976,12 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "predicates" -version = "2.1.5" +version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59230a63c37f3e18569bdb90e4a89cbf5bf8b06fea0b84e65ea10cc4df47addd" +checksum = "68b87bfd4605926cdfefc1c3b5f8fe560e3feca9d5552cf68c466d3d8236c7e8" dependencies = [ - "difflib", - "float-cmp", - "itertools 0.10.5", - "normalize-line-endings", + "anstyle", "predicates-core", - "regex", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7a4df658..7c6ceb00 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,12 @@ members = [ ] resolver = "2" +# The tag or rev of ndc-models must match the locked tag or rev of the +# ndc-models dependency of ndc-sdk +[workspace.dependencies] +ndc-sdk = { git = "https://github.com/hasura/ndc-sdk-rs.git" } +ndc-models = { git = "http://github.com/hasura/ndc-spec.git", tag = "v0.1.2" } + # We have a fork of the mongodb driver with a fix for reading metadata from time # series collections. # See the upstream PR: https://github.com/mongodb/mongo-rust-driver/pull/1003 diff --git a/crates/cli/src/introspection/sampling.rs b/crates/cli/src/introspection/sampling.rs index b2adf101..86bce3c4 100644 --- a/crates/cli/src/introspection/sampling.rs +++ b/crates/cli/src/introspection/sampling.rs @@ -7,7 +7,7 @@ use configuration::{ }; use futures_util::TryStreamExt; use mongodb::bson::{doc, Bson, Document}; -use mongodb_agent_common::interface_types::MongoConfig; +use mongodb_agent_common::state::ConnectorState; use mongodb_support::BsonScalarType::{self, *}; type ObjectField = WithName; @@ -19,18 +19,18 @@ type ObjectType = WithName; /// are not unifiable. pub async fn sample_schema_from_db( sample_size: u32, - config: &MongoConfig, + state: &ConnectorState, existing_schemas: &HashSet, ) -> anyhow::Result> { let mut schemas = BTreeMap::new(); - let db = config.client.database(&config.database); + let db = state.database(); let mut collections_cursor = db.list_collections(None, None).await?; while let Some(collection_spec) = collections_cursor.try_next().await? { let collection_name = collection_spec.name; if !existing_schemas.contains(&collection_name) { let collection_schema = - sample_schema_from_collection(&collection_name, sample_size, config).await?; + sample_schema_from_collection(&collection_name, sample_size, state).await?; schemas.insert(collection_name, collection_schema); } } @@ -40,9 +40,9 @@ pub async fn sample_schema_from_db( async fn sample_schema_from_collection( collection_name: &str, sample_size: u32, - config: &MongoConfig, + state: &ConnectorState, ) -> anyhow::Result { - let db = config.client.database(&config.database); + let db = state.database(); let options = None; let mut cursor = db .collection::(collection_name) diff --git a/crates/cli/src/introspection/validation_schema.rs b/crates/cli/src/introspection/validation_schema.rs index f9f47724..2ff37ce8 100644 --- a/crates/cli/src/introspection/validation_schema.rs +++ b/crates/cli/src/introspection/validation_schema.rs @@ -6,19 +6,22 @@ use configuration::{ }; use futures_util::TryStreamExt; use mongodb::bson::from_bson; -use mongodb_agent_common::schema::{get_property_description, Property, ValidatorSchema}; +use mongodb_agent_common::{ + schema::{get_property_description, Property, ValidatorSchema}, + state::ConnectorState, +}; use mongodb_support::BsonScalarType; -use mongodb_agent_common::interface_types::{MongoAgentError, MongoConfig}; +use mongodb_agent_common::interface_types::MongoAgentError; type Collection = WithName; type ObjectType = WithName; type ObjectField = WithName; pub async fn get_metadata_from_validation_schema( - config: &MongoConfig, + state: &ConnectorState, ) -> Result, MongoAgentError> { - let db = config.client.database(&config.database); + let db = state.database(); let mut collections_cursor = db.list_collections(None, None).await?; let mut schemas: Vec> = vec![]; diff --git a/crates/cli/src/lib.rs b/crates/cli/src/lib.rs index b0f30cac..139db0e9 100644 --- a/crates/cli/src/lib.rs +++ b/crates/cli/src/lib.rs @@ -6,10 +6,9 @@ use std::path::PathBuf; use clap::{Parser, Subcommand}; -use mongodb_agent_common::interface_types::MongoConfig; - // Exported for use in tests pub use introspection::type_from_bson; +use mongodb_agent_common::state::ConnectorState; #[derive(Debug, Clone, Parser)] pub struct UpdateArgs { @@ -29,7 +28,7 @@ pub enum Command { pub struct Context { pub path: PathBuf, - pub mongo_config: MongoConfig, + pub connector_state: ConnectorState, } /// Run a command in a given directory. @@ -44,14 +43,14 @@ pub async fn run(command: Command, context: &Context) -> anyhow::Result<()> { async fn update(context: &Context, args: &UpdateArgs) -> anyhow::Result<()> { if !args.no_validator_schema { let schemas_from_json_validation = - introspection::get_metadata_from_validation_schema(&context.mongo_config).await?; + introspection::get_metadata_from_validation_schema(&context.connector_state).await?; configuration::write_schema_directory(&context.path, schemas_from_json_validation).await?; } let existing_schemas = configuration::list_existing_schemas(&context.path).await?; let schemas_from_sampling = introspection::sample_schema_from_db( args.sample_size, - &context.mongo_config, + &context.connector_state, &existing_schemas, ) .await?; diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index ea4de0cb..2c4b4af3 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -45,10 +45,13 @@ pub async fn main() -> anyhow::Result<()> { Some(path) => path, None => env::current_dir()?, }; - let mongo_config = try_init_state_from_uri(&args.connection_uri, &Default::default()) + let connector_state = try_init_state_from_uri(&args.connection_uri) .await .map_err(|e| anyhow!("Error initializing MongoDB state {}", e))?; - let context = Context { path, mongo_config }; + let context = Context { + path, + connector_state, + }; run(args.subcommand, &context).await?; Ok(()) } diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml index 8db65e2e..37d4af35 100644 --- a/crates/configuration/Cargo.toml +++ b/crates/configuration/Cargo.toml @@ -9,6 +9,7 @@ futures = "^0.3" itertools = "^0.12" mongodb = "2.8" mongodb-support = { path = "../mongodb-support" } +ndc-models = { workspace = true } schemars = "^0.8.12" serde = { version = "1", features = ["derive"] } serde_json = { version = "1" } diff --git a/crates/configuration/src/configuration.rs b/crates/configuration/src/configuration.rs index 1bcd622d..808eff82 100644 --- a/crates/configuration/src/configuration.rs +++ b/crates/configuration/src/configuration.rs @@ -1,52 +1,163 @@ use std::{collections::BTreeMap, path::Path}; -use anyhow::ensure; +use anyhow::{anyhow, ensure}; use itertools::Itertools; -use schemars::JsonSchema; -use serde::Deserialize; +use mongodb_support::BsonScalarType; +use ndc_models as ndc; -use crate::{native_procedure::NativeProcedure, read_directory, schema::ObjectType, Schema}; +use crate::{ + native_procedure::NativeProcedure, + native_query::{NativeQuery, NativeQueryRepresentation}, + read_directory, schema, serialized, +}; -#[derive(Clone, Debug, Default, Deserialize, JsonSchema)] -#[serde(rename_all = "camelCase")] +#[derive(Clone, Debug, Default)] pub struct Configuration { - /// Descriptions of collections and types used in the database - pub schema: Schema, + /// Tracked collections from the configured MongoDB database. This includes real collections as + /// well as virtual collections defined by native queries using + /// [NativeQueryRepresentation::Collection] representation. + pub collections: BTreeMap, + + /// Functions are based on native queries using [NativeQueryRepresentation::Function] + /// representation. + /// + /// In query requests functions and collections are treated as the same, but in schema + /// responses they are separate concepts. So we want a set of [CollectionInfo] values for + /// functions for query processing, and we want it separate from `collections` for the schema + /// response. + pub functions: BTreeMap, + + /// Procedures are based on native procedures. + pub procedures: BTreeMap, /// Native procedures allow arbitrary MongoDB commands where types of results are /// specified via user configuration. - #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] pub native_procedures: BTreeMap, + + /// Native queries allow arbitrary aggregation pipelines that can be included in a query plan. + pub native_queries: BTreeMap, + + /// Object types defined for this connector include types of documents in each collection, + /// types for objects inside collection documents, types for native query and native procedure + /// arguments and results. + /// + /// The object types here combine object type defined in files in the `schema/`, + /// `native_queries/`, and `native_procedures/` subdirectories in the connector configuration + /// directory. + pub object_types: BTreeMap, } impl Configuration { pub fn validate( - schema: Schema, - native_procedures: BTreeMap, + schema: serialized::Schema, + native_procedures: BTreeMap, + native_queries: BTreeMap, ) -> anyhow::Result { - let config = Configuration { - schema, - native_procedures, - }; - - { - let duplicate_type_names: Vec<&str> = config - .object_types() + let object_types_iter = || merge_object_types(&schema, &native_procedures, &native_queries); + let object_type_errors = { + let duplicate_type_names: Vec<&str> = object_types_iter() .map(|(name, _)| name.as_ref()) .duplicates() .collect(); - ensure!( - duplicate_type_names.is_empty(), - "configuration contains multiple definitions for these object type names: {}", - duplicate_type_names.join(", ") + if duplicate_type_names.is_empty() { + None + } else { + Some(anyhow!( + "configuration contains multiple definitions for these object type names: {}", + duplicate_type_names.join(", ") + )) + } + }; + let object_types = object_types_iter() + .map(|(name, ot)| (name.to_owned(), ot.clone())) + .collect(); + + let internal_native_queries: BTreeMap<_, _> = native_queries + .into_iter() + .map(|(name, nq)| (name, nq.into())) + .collect(); + + let internal_native_procedures: BTreeMap<_, _> = native_procedures + .into_iter() + .map(|(name, np)| (name, np.into())) + .collect(); + + let collections = { + let regular_collections = schema.collections.into_iter().map(|(name, collection)| { + ( + name.clone(), + collection_to_collection_info(&object_types, name, collection), + ) + }); + let native_query_collections = internal_native_queries.iter().filter_map( + |(name, native_query): (&String, &NativeQuery)| { + if native_query.representation == NativeQueryRepresentation::Collection { + Some(( + name.to_owned(), + native_query_to_collection_info(&object_types, name, native_query), + )) + } else { + None + } + }, ); - } + regular_collections + .chain(native_query_collections) + .collect() + }; + + let (functions, function_errors): (BTreeMap<_, _>, Vec<_>) = internal_native_queries + .iter() + .filter_map(|(name, native_query)| { + if native_query.representation == NativeQueryRepresentation::Function { + Some(( + name, + native_query_to_function_info(&object_types, name, native_query), + native_query_to_collection_info(&object_types, name, native_query), + )) + } else { + None + } + }) + .map(|(name, function_result, collection_info)| { + Ok((name.to_owned(), (function_result?, collection_info))) + as Result<_, anyhow::Error> + }) + .partition_result(); + + let procedures = internal_native_procedures + .iter() + .map(|(name, native_procedure)| { + ( + name.to_owned(), + native_procedure_to_procedure_info(name, native_procedure), + ) + }) + .collect(); - Ok(config) + let errors: Vec = object_type_errors + .into_iter() + .chain(function_errors) + .map(|e| e.to_string()) + .collect(); + ensure!( + errors.is_empty(), + "connector configuration has errrors:\n - {}", + errors.join("\n - ") + ); + + Ok(Configuration { + collections, + functions, + procedures, + native_procedures: internal_native_procedures, + native_queries: internal_native_queries, + object_types, + }) } - pub fn from_schema(schema: Schema) -> anyhow::Result { - Self::validate(schema, Default::default()) + pub fn from_schema(schema: serialized::Schema) -> anyhow::Result { + Self::validate(schema, Default::default(), Default::default()) } pub async fn parse_configuration( @@ -54,24 +165,155 @@ impl Configuration { ) -> anyhow::Result { read_directory(configuration_dir).await } +} + +fn merge_object_types<'a>( + schema: &'a serialized::Schema, + native_procedures: &'a BTreeMap, + native_queries: &'a BTreeMap, +) -> impl Iterator { + let object_types_from_schema = schema.object_types.iter(); + let object_types_from_native_procedures = native_procedures + .values() + .flat_map(|native_procedure| &native_procedure.object_types); + let object_types_from_native_queries = native_queries + .values() + .flat_map(|native_query| &native_query.object_types); + object_types_from_schema + .chain(object_types_from_native_procedures) + .chain(object_types_from_native_queries) +} + +fn collection_to_collection_info( + object_types: &BTreeMap, + name: String, + collection: schema::Collection, +) -> ndc::CollectionInfo { + let pk_constraint = + get_primary_key_uniqueness_constraint(object_types, &name, &collection.r#type); + + ndc::CollectionInfo { + name, + collection_type: collection.r#type, + description: collection.description, + arguments: Default::default(), + foreign_keys: Default::default(), + uniqueness_constraints: BTreeMap::from_iter(pk_constraint), + } +} + +fn native_query_to_collection_info( + object_types: &BTreeMap, + name: &str, + native_query: &NativeQuery, +) -> ndc::CollectionInfo { + let pk_constraint = get_primary_key_uniqueness_constraint( + object_types, + name, + &native_query.result_document_type, + ); - /// Returns object types collected from schema and native procedures - pub fn object_types(&self) -> impl Iterator { - let object_types_from_schema = self.schema.object_types.iter(); - let object_types_from_native_procedures = self - .native_procedures - .values() - .flat_map(|native_procedure| &native_procedure.object_types); - object_types_from_schema.chain(object_types_from_native_procedures) + // TODO: recursively verify that all referenced object types exist + ndc::CollectionInfo { + name: name.to_owned(), + collection_type: native_query.result_document_type.clone(), + description: native_query.description.clone(), + arguments: arguments_to_ndc_arguments(native_query.arguments.clone()), + foreign_keys: Default::default(), + uniqueness_constraints: BTreeMap::from_iter(pk_constraint), } } +fn get_primary_key_uniqueness_constraint( + object_types: &BTreeMap, + name: &str, + collection_type: &str, +) -> Option<(String, ndc::UniquenessConstraint)> { + // Check to make sure our collection's object type contains the _id objectid field + // If it doesn't (should never happen, all collections need an _id column), don't generate the constraint + let object_type = object_types.get(collection_type)?; + let id_field = object_type.fields.get("_id")?; + match &id_field.r#type { + schema::Type::Scalar(BsonScalarType::ObjectId) => Some(()), + _ => None, + }?; + let uniqueness_constraint = ndc::UniquenessConstraint { + unique_columns: vec!["_id".into()], + }; + let constraint_name = format!("{}_id", name); + Some((constraint_name, uniqueness_constraint)) +} + +fn native_query_to_function_info( + object_types: &BTreeMap, + name: &str, + native_query: &NativeQuery, +) -> anyhow::Result { + Ok(ndc::FunctionInfo { + name: name.to_owned(), + description: native_query.description.clone(), + arguments: arguments_to_ndc_arguments(native_query.arguments.clone()), + result_type: function_result_type(object_types, name, &native_query.result_document_type)?, + }) +} + +fn function_result_type( + object_types: &BTreeMap, + function_name: &str, + object_type_name: &str, +) -> anyhow::Result { + let object_type = find_object_type(object_types, object_type_name)?; + let value_field = object_type.fields.get("__value").ok_or_else(|| { + anyhow!("the type of the native query, {function_name}, is not valid: the type of a native query that is represented as a function must be an object type with a single field named \"__value\"") + + })?; + Ok(value_field.r#type.clone().into()) +} + +fn native_procedure_to_procedure_info( + procedure_name: &str, + procedure: &NativeProcedure, +) -> ndc::ProcedureInfo { + ndc::ProcedureInfo { + name: procedure_name.to_owned(), + description: procedure.description.clone(), + arguments: arguments_to_ndc_arguments(procedure.arguments.clone()), + result_type: procedure.result_type.clone().into(), + } +} + +fn arguments_to_ndc_arguments( + configured_arguments: BTreeMap, +) -> BTreeMap { + configured_arguments + .into_iter() + .map(|(name, field)| { + ( + name, + ndc::ArgumentInfo { + argument_type: field.r#type.into(), + description: field.description, + }, + ) + }) + .collect() +} + +fn find_object_type<'a>( + object_types: &'a BTreeMap, + object_type_name: &str, +) -> anyhow::Result<&'a schema::ObjectType> { + object_types + .get(object_type_name) + .ok_or_else(|| anyhow!("configuration references an object type named {object_type_name}, but it is not defined")) +} + #[cfg(test)] mod tests { use mongodb::bson::doc; use super::*; - use crate::{schema::Type, Schema}; + use crate::{schema::Type, serialized::Schema}; #[test] fn fails_with_duplicate_object_types() { @@ -79,7 +321,7 @@ mod tests { collections: Default::default(), object_types: [( "Album".to_owned(), - ObjectType { + schema::ObjectType { fields: Default::default(), description: Default::default(), }, @@ -89,10 +331,10 @@ mod tests { }; let native_procedures = [( "hello".to_owned(), - NativeProcedure { + serialized::NativeProcedure { object_types: [( "Album".to_owned(), - ObjectType { + schema::ObjectType { fields: Default::default(), description: Default::default(), }, @@ -108,7 +350,7 @@ mod tests { )] .into_iter() .collect(); - let result = Configuration::validate(schema, native_procedures); + let result = Configuration::validate(schema, native_procedures, Default::default()); let error_msg = result.unwrap_err().to_string(); assert!(error_msg.contains("multiple definitions")); assert!(error_msg.contains("Album")); diff --git a/crates/configuration/src/directory.rs b/crates/configuration/src/directory.rs index 035a5488..aa1b9871 100644 --- a/crates/configuration/src/directory.rs +++ b/crates/configuration/src/directory.rs @@ -9,10 +9,11 @@ use std::{ use tokio::fs; use tokio_stream::wrappers::ReadDirStream; -use crate::{with_name::WithName, Configuration, Schema}; +use crate::{serialized::Schema, with_name::WithName, Configuration}; pub const SCHEMA_DIRNAME: &str = "schema"; pub const NATIVE_PROCEDURES_DIRNAME: &str = "native_procedures"; +pub const NATIVE_QUERIES_DIRNAME: &str = "native_queries"; pub const CONFIGURATION_EXTENSIONS: [(&str, FileFormat); 3] = [("json", JSON), ("yaml", YAML), ("yml", YAML)]; @@ -42,7 +43,11 @@ pub async fn read_directory( .await? .unwrap_or_default(); - Configuration::validate(schema, native_procedures) + let native_queries = read_subdir_configs(&dir.join(NATIVE_QUERIES_DIRNAME)) + .await? + .unwrap_or_default(); + + Configuration::validate(schema, native_procedures, native_queries) } /// Parse all files in a directory with one of the allowed configuration extensions according to diff --git a/crates/configuration/src/lib.rs b/crates/configuration/src/lib.rs index bbd87477..c7c13e4f 100644 --- a/crates/configuration/src/lib.rs +++ b/crates/configuration/src/lib.rs @@ -1,12 +1,14 @@ mod configuration; mod directory; pub mod native_procedure; +pub mod native_query; pub mod schema; +mod serialized; mod with_name; pub use crate::configuration::Configuration; pub use crate::directory::list_existing_schemas; pub use crate::directory::read_directory; pub use crate::directory::write_schema_directory; -pub use crate::schema::Schema; +pub use crate::serialized::Schema; pub use crate::with_name::{WithName, WithNameRef}; diff --git a/crates/configuration/src/native_procedure.rs b/crates/configuration/src/native_procedure.rs index 3aff80ba..8062fb75 100644 --- a/crates/configuration/src/native_procedure.rs +++ b/crates/configuration/src/native_procedure.rs @@ -1,83 +1,35 @@ use std::collections::BTreeMap; use mongodb::{bson, options::SelectionCriteria}; -use schemars::JsonSchema; -use serde::Deserialize; -use crate::schema::{ObjectField, ObjectType, Type}; +use crate::{ + schema::{ObjectField, Type}, + serialized::{self}, +}; -/// An arbitrary database command using MongoDB's runCommand API. -/// See https://www.mongodb.com/docs/manual/reference/method/db.runCommand/ +/// Internal representation of Native Procedures. For doc comments see +/// [crate::serialized::NativeProcedure] /// -/// Native Procedures appear as "procedures" in your data graph. -#[derive(Clone, Debug, Deserialize, JsonSchema)] -#[serde(rename_all = "camelCase")] +/// Note: this type excludes `name` and `object_types` from the serialized type. Object types are +/// intended to be merged into one big map so should not be accessed through values of this type. +/// Native query values are stored in maps so names should be taken from map keys. +#[derive(Clone, Debug)] pub struct NativeProcedure { - /// You may define object types here to reference in `result_type`. Any types defined here will - /// be merged with the definitions in `schema.json`. This allows you to maintain hand-written - /// types for native procedures without having to edit a generated `schema.json` file. - #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] - pub object_types: BTreeMap, - - /// Type of data returned by the procedure. You may reference object types defined in the - /// `object_types` list in this definition, or you may reference object types from - /// `schema.json`. pub result_type: Type, - - /// Arguments to be supplied for each procedure invocation. These will be substituted into the - /// given `command`. - /// - /// Argument values are standard JSON mapped from GraphQL input types, not Extended JSON. - /// Values will be converted to BSON according to the types specified here. - #[serde(default)] pub arguments: BTreeMap, - - /// Command to run via MongoDB's `runCommand` API. For details on how to write commands see - /// https://www.mongodb.com/docs/manual/reference/method/db.runCommand/ - /// - /// The command is read as Extended JSON. It may be in canonical or relaxed format, or - /// a mixture of both. - /// See https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ - /// - /// Keys and values in the command may contain placeholders of the form `{{variableName}}` - /// which will be substituted when the native procedure is executed according to the given - /// arguments. - /// - /// Placeholders must be inside quotes so that the command can be stored in JSON format. If the - /// command includes a string whose only content is a placeholder, when the variable is - /// substituted the string will be replaced by the type of the variable. For example in this - /// command, - /// - /// ```json - /// json!({ - /// "insert": "posts", - /// "documents": "{{ documents }}" - /// }) - /// ``` - /// - /// If the type of the `documents` argument is an array then after variable substitution the - /// command will expand to: - /// - /// ```json - /// json!({ - /// "insert": "posts", - /// "documents": [/* array of documents */] - /// }) - /// ``` - /// - #[schemars(with = "Object")] pub command: bson::Document, - // TODO: test extjson deserialization - - /// Determines which servers in a cluster to read from by specifying read preference, or - /// a predicate to apply to candidate servers. - #[serde(default, skip_serializing_if = "Option::is_none")] - #[schemars(with = "OptionalObject")] pub selection_criteria: Option, - - #[serde(default, skip_serializing_if = "Option::is_none")] pub description: Option, } -type Object = serde_json::Map; -type OptionalObject = Option; +impl From for NativeProcedure { + fn from(value: serialized::NativeProcedure) -> Self { + NativeProcedure { + result_type: value.result_type, + arguments: value.arguments, + command: value.command, + selection_criteria: value.selection_criteria, + description: value.description, + } + } +} diff --git a/crates/configuration/src/native_query.rs b/crates/configuration/src/native_query.rs new file mode 100644 index 00000000..ef6291e9 --- /dev/null +++ b/crates/configuration/src/native_query.rs @@ -0,0 +1,41 @@ +use std::collections::BTreeMap; + +use mongodb::bson; +use schemars::JsonSchema; +use serde::Deserialize; + +use crate::{schema::ObjectField, serialized}; + +/// Internal representation of Native Queries. For doc comments see +/// [crate::serialized::NativeQuery] +/// +/// Note: this type excludes `name` and `object_types` from the serialized type. Object types are +/// intended to be merged into one big map so should not be accessed through values of this type. +/// Native query values are stored in maps so names should be taken from map keys. +#[derive(Clone, Debug)] +pub struct NativeQuery { + pub representation: NativeQueryRepresentation, + pub arguments: BTreeMap, + pub result_document_type: String, + pub pipeline: Vec, + pub description: Option, +} + +impl From for NativeQuery { + fn from(value: serialized::NativeQuery) -> Self { + NativeQuery { + representation: value.representation, + arguments: value.arguments, + result_document_type: value.result_document_type, + pipeline: value.pipeline, + description: value.description, + } + } +} + +#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Hash, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub enum NativeQueryRepresentation { + Collection, + Function, +} diff --git a/crates/configuration/src/schema/database.rs b/crates/configuration/src/schema/database.rs deleted file mode 100644 index 91043619..00000000 --- a/crates/configuration/src/schema/database.rs +++ /dev/null @@ -1,113 +0,0 @@ -use std::collections::BTreeMap; - -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; - -use mongodb_support::BsonScalarType; - -use crate::{WithName, WithNameRef}; - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] -#[serde(rename_all = "camelCase")] -pub struct Collection { - /// The name of a type declared in `objectTypes` that describes the fields of this collection. - /// The type name may be the same as the collection name. - pub r#type: String, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub description: Option, -} - -/// The type of values that a column, field, or argument may take. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] -#[serde(rename_all = "camelCase")] -pub enum Type { - /// Any BSON value, represented as Extended JSON. - /// To be used when we don't have any more information - /// about the types of values that a column, field or argument can take. - /// Also used when we unifying two incompatible types in schemas derived - /// from sample documents. - ExtendedJSON, - /// One of the predefined BSON scalar types - Scalar(BsonScalarType), - /// The name of an object type declared in `objectTypes` - Object(String), - ArrayOf(Box), - /// A nullable form of any of the other types - Nullable(Box), -} - -impl Type { - pub fn is_nullable(&self) -> bool { - matches!( - self, - Type::ExtendedJSON | Type::Nullable(_) | Type::Scalar(BsonScalarType::Null) - ) - } - - pub fn normalize_type(self) -> Type { - match self { - Type::ExtendedJSON => Type::ExtendedJSON, - Type::Scalar(s) => Type::Scalar(s), - Type::Object(o) => Type::Object(o), - Type::ArrayOf(a) => Type::ArrayOf(Box::new((*a).normalize_type())), - Type::Nullable(n) => match *n { - Type::ExtendedJSON => Type::ExtendedJSON, - Type::Scalar(BsonScalarType::Null) => Type::Scalar(BsonScalarType::Null), - Type::Nullable(t) => Type::Nullable(t).normalize_type(), - t => Type::Nullable(Box::new(t.normalize_type())), - }, - } - } - - pub fn make_nullable(self) -> Type { - match self { - Type::ExtendedJSON => Type::ExtendedJSON, - Type::Nullable(t) => Type::Nullable(t), - Type::Scalar(BsonScalarType::Null) => Type::Scalar(BsonScalarType::Null), - t => Type::Nullable(Box::new(t)), - } - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] -#[serde(rename_all = "camelCase")] -pub struct ObjectType { - pub fields: BTreeMap, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub description: Option, -} - -impl ObjectType { - pub fn named_fields(&self) -> impl Iterator> { - self.fields - .iter() - .map(|(name, field)| WithNameRef::named(name, field)) - } - - pub fn into_named_fields(self) -> impl Iterator> { - self.fields - .into_iter() - .map(|(name, field)| WithName::named(name, field)) - } -} - -/// Information about an object type field. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] -#[serde(rename_all = "camelCase")] -pub struct ObjectField { - pub r#type: Type, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub description: Option, -} - -impl ObjectField { - pub fn new(name: impl ToString, r#type: Type) -> (String, Self) { - ( - name.to_string(), - ObjectField { - r#type, - description: Default::default(), - }, - ) - } -} diff --git a/crates/configuration/src/schema/mod.rs b/crates/configuration/src/schema/mod.rs index 163b9945..4b7418ad 100644 --- a/crates/configuration/src/schema/mod.rs +++ b/crates/configuration/src/schema/mod.rs @@ -1,64 +1,161 @@ -mod database; - use std::collections::BTreeMap; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use mongodb_support::BsonScalarType; + use crate::{WithName, WithNameRef}; -pub use self::database::{Collection, ObjectField, ObjectType, Type}; +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct Collection { + /// The name of a type declared in `objectTypes` that describes the fields of this collection. + /// The type name may be the same as the collection name. + pub r#type: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub description: Option, +} -#[derive(Clone, Debug, Default, Serialize, Deserialize, JsonSchema)] +/// The type of values that a column, field, or argument may take. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] -pub struct Schema { - #[serde(default)] - pub collections: BTreeMap, - #[serde(default)] - pub object_types: BTreeMap, +pub enum Type { + /// Any BSON value, represented as Extended JSON. + /// To be used when we don't have any more information + /// about the types of values that a column, field or argument can take. + /// Also used when we unifying two incompatible types in schemas derived + /// from sample documents. + ExtendedJSON, + /// One of the predefined BSON scalar types + Scalar(BsonScalarType), + /// The name of an object type declared in `objectTypes` + Object(String), + ArrayOf(Box), + /// A nullable form of any of the other types + Nullable(Box), } -impl Schema { - pub fn into_named_collections(self) -> impl Iterator> { - self.collections - .into_iter() - .map(|(name, field)| WithName::named(name, field)) +impl Type { + pub fn is_nullable(&self) -> bool { + matches!( + self, + Type::ExtendedJSON | Type::Nullable(_) | Type::Scalar(BsonScalarType::Null) + ) } - pub fn into_named_object_types(self) -> impl Iterator> { - self.object_types - .into_iter() - .map(|(name, field)| WithName::named(name, field)) + pub fn normalize_type(self) -> Type { + match self { + Type::ExtendedJSON => Type::ExtendedJSON, + Type::Scalar(s) => Type::Scalar(s), + Type::Object(o) => Type::Object(o), + Type::ArrayOf(a) => Type::ArrayOf(Box::new((*a).normalize_type())), + Type::Nullable(n) => match *n { + Type::ExtendedJSON => Type::ExtendedJSON, + Type::Scalar(BsonScalarType::Null) => Type::Scalar(BsonScalarType::Null), + Type::Nullable(t) => Type::Nullable(t).normalize_type(), + t => Type::Nullable(Box::new(t.normalize_type())), + }, + } } - pub fn named_collections(&self) -> impl Iterator> { - self.collections - .iter() - .map(|(name, field)| WithNameRef::named(name, field)) + pub fn make_nullable(self) -> Type { + match self { + Type::ExtendedJSON => Type::ExtendedJSON, + Type::Nullable(t) => Type::Nullable(t), + Type::Scalar(BsonScalarType::Null) => Type::Scalar(BsonScalarType::Null), + t => Type::Nullable(Box::new(t)), + } + } +} + +impl From for ndc_models::Type { + fn from(t: Type) -> Self { + fn map_normalized_type(t: Type) -> ndc_models::Type { + match t { + // ExtendedJSON can respresent any BSON value, including null, so it is always nullable + Type::ExtendedJSON => ndc_models::Type::Nullable { + underlying_type: Box::new(ndc_models::Type::Named { + name: mongodb_support::EXTENDED_JSON_TYPE_NAME.to_owned(), + }), + }, + Type::Scalar(t) => ndc_models::Type::Named { + name: t.graphql_name(), + }, + Type::Object(t) => ndc_models::Type::Named { name: t.clone() }, + Type::ArrayOf(t) => ndc_models::Type::Array { + element_type: Box::new(map_normalized_type(*t)), + }, + Type::Nullable(t) => ndc_models::Type::Nullable { + underlying_type: Box::new(map_normalized_type(*t)), + }, + } + } + map_normalized_type(t.normalize_type()) } +} - pub fn named_object_types(&self) -> impl Iterator> { - self.object_types +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct ObjectType { + pub fields: BTreeMap, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub description: Option, +} + +impl ObjectType { + pub fn named_fields(&self) -> impl Iterator> { + self.fields .iter() .map(|(name, field)| WithNameRef::named(name, field)) } - /// Unify two schemas. Assumes that the schemas describe mutually exclusive sets of collections. - pub fn merge(schema_a: Schema, schema_b: Schema) -> Schema { - let collections = schema_a - .collections + pub fn into_named_fields(self) -> impl Iterator> { + self.fields .into_iter() - .chain(schema_b.collections) - .collect(); - let object_types = schema_a - .object_types - .into_iter() - .chain(schema_b.object_types) - .collect(); - Schema { - collections, - object_types, + .map(|(name, field)| WithName::named(name, field)) + } +} + +impl From for ndc_models::ObjectType { + fn from(object_type: ObjectType) -> Self { + ndc_models::ObjectType { + description: object_type.description, + fields: object_type + .fields + .into_iter() + .map(|(name, field)| (name, field.into())) + .collect(), } } +} +/// Information about an object type field. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct ObjectField { + pub r#type: Type, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub description: Option, +} + +impl ObjectField { + pub fn new(name: impl ToString, r#type: Type) -> (String, Self) { + ( + name.to_string(), + ObjectField { + r#type, + description: Default::default(), + }, + ) + } +} + +impl From for ndc_models::ObjectField { + fn from(field: ObjectField) -> Self { + ndc_models::ObjectField { + description: field.description, + r#type: field.r#type.into(), + } + } } diff --git a/crates/configuration/src/serialized/mod.rs b/crates/configuration/src/serialized/mod.rs new file mode 100644 index 00000000..87ade19f --- /dev/null +++ b/crates/configuration/src/serialized/mod.rs @@ -0,0 +1,5 @@ +mod native_procedure; +mod native_query; +mod schema; + +pub use self::{native_procedure::NativeProcedure, native_query::NativeQuery, schema::Schema}; diff --git a/crates/configuration/src/serialized/native_procedure.rs b/crates/configuration/src/serialized/native_procedure.rs new file mode 100644 index 00000000..74dfa9fe --- /dev/null +++ b/crates/configuration/src/serialized/native_procedure.rs @@ -0,0 +1,82 @@ +use std::collections::BTreeMap; + +use mongodb::{bson, options::SelectionCriteria}; +use schemars::JsonSchema; +use serde::Deserialize; + +use crate::schema::{ObjectField, ObjectType, Type}; + +/// An arbitrary database command using MongoDB's runCommand API. +/// See https://www.mongodb.com/docs/manual/reference/method/db.runCommand/ +/// +/// Native Procedures appear as "procedures" in your data graph. +#[derive(Clone, Debug, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct NativeProcedure { + /// You may define object types here to reference in `result_type`. Any types defined here will + /// be merged with the definitions in `schema.json`. This allows you to maintain hand-written + /// types for native procedures without having to edit a generated `schema.json` file. + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + pub object_types: BTreeMap, + + /// Type of data returned by the procedure. You may reference object types defined in the + /// `object_types` list in this definition, or you may reference object types from + /// `schema.json`. + pub result_type: Type, + + /// Arguments to be supplied for each procedure invocation. These will be substituted into the + /// given `command`. + /// + /// Argument values are standard JSON mapped from GraphQL input types, not Extended JSON. + /// Values will be converted to BSON according to the types specified here. + #[serde(default)] + pub arguments: BTreeMap, + + /// Command to run via MongoDB's `runCommand` API. For details on how to write commands see + /// https://www.mongodb.com/docs/manual/reference/method/db.runCommand/ + /// + /// The command is read as Extended JSON. It may be in canonical or relaxed format, or + /// a mixture of both. + /// See https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ + /// + /// Keys and values in the command may contain placeholders of the form `{{variableName}}` + /// which will be substituted when the native procedure is executed according to the given + /// arguments. + /// + /// Placeholders must be inside quotes so that the command can be stored in JSON format. If the + /// command includes a string whose only content is a placeholder, when the variable is + /// substituted the string will be replaced by the type of the variable. For example in this + /// command, + /// + /// ```json + /// json!({ + /// "insert": "posts", + /// "documents": "{{ documents }}" + /// }) + /// ``` + /// + /// If the type of the `documents` argument is an array then after variable substitution the + /// command will expand to: + /// + /// ```json + /// json!({ + /// "insert": "posts", + /// "documents": [/* array of documents */] + /// }) + /// ``` + /// + #[schemars(with = "Object")] + pub command: bson::Document, + // TODO: test extjson deserialization + /// Determines which servers in a cluster to read from by specifying read preference, or + /// a predicate to apply to candidate servers. + #[serde(default, skip_serializing_if = "Option::is_none")] + #[schemars(with = "OptionalObject")] + pub selection_criteria: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub description: Option, +} + +type Object = serde_json::Map; +type OptionalObject = Option; diff --git a/crates/configuration/src/serialized/native_query.rs b/crates/configuration/src/serialized/native_query.rs new file mode 100644 index 00000000..623fa4fe --- /dev/null +++ b/crates/configuration/src/serialized/native_query.rs @@ -0,0 +1,93 @@ +use std::collections::BTreeMap; + +use mongodb::bson; +use schemars::JsonSchema; +use serde::Deserialize; + +use crate::{ + native_query::NativeQueryRepresentation, + schema::{ObjectField, ObjectType}, +}; + +/// Define an arbitrary MongoDB aggregation pipeline that can be referenced in your data graph. For +/// details on aggregation pipelines see https://www.mongodb.com/docs/manual/core/aggregation-pipeline/ +#[derive(Clone, Debug, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct NativeQuery { + /// Representation may be either "collection" or "function". If you choose "collection" then + /// the native query acts as a virtual collection, or in other words a view. This implies + /// a list of documents that can be filtered and sorted using the GraphQL arguments like + /// `where` and `limit` that are available to regular collections. (These arguments are added + /// to your GraphQL API automatically - there is no need to list them in the `arguments` for + /// the native query.) + /// + /// Choose "function" if you want to produce data that is not a list of documents, or if + /// filtering and sorting are not sensible operations for this native query. A native query + /// represented as a function may return any type of data. If you choose "function" then the + /// native query pipeline *must* produce a single document with a single field named `__value`, + /// and the `resultType` for the native query *must* be an object type with a single field + /// named `__value`. In GraphQL queries the value of the `__value` field will be the value of + /// the function in GraphQL responses. + /// + /// This setting determines whether the native query appears as a "collection" or as + /// a "function" in your ddn configuration. + pub representation: NativeQueryRepresentation, + + /// Arguments to be supplied for each query invocation. These will be available to the given + /// pipeline as variables. For information about variables in MongoDB aggregation expressions + /// see https://www.mongodb.com/docs/manual/reference/aggregation-variables/ + /// + /// Argument values are standard JSON mapped from GraphQL input types, not Extended JSON. + /// Values will be converted to BSON according to the types specified here. + #[serde(default)] + pub arguments: BTreeMap, + + /// The name of an object type that describes documents produced by the given pipeline. MongoDB + /// aggregation pipelines always produce a list of documents. This type describes the type of + /// each of those individual documents. + /// + /// You may reference object types defined in the `object_types` list in this definition, or + /// you may reference object types from `schema.json`. + pub result_document_type: String, + + /// You may define object types here to reference in `result_type`. Any types defined here will + /// be merged with the definitions in `schema.json`. This allows you to maintain hand-written + /// types for native queries without having to edit a generated `schema.json` file. + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + pub object_types: BTreeMap, + + /// Pipeline to include in MongoDB queries. For details on how to write an aggregation pipeline + /// see https://www.mongodb.com/docs/manual/core/aggregation-pipeline/ + /// + /// The pipeline may include Extended JSON. + /// + /// Keys and values in the pipeline may contain placeholders of the form `{{variableName}}` + /// which will be substituted when the native procedure is executed according to the given + /// arguments. + /// + /// Placeholders must be inside quotes so that the pipeline can be stored in JSON format. If + /// the pipeline includes a string whose only content is a placeholder, when the variable is + /// substituted the string will be replaced by the type of the variable. For example in this + /// pipeline, + /// + /// ```json + /// json!([{ + /// "$documents": "{{ documents }}" + /// }]) + /// ``` + /// + /// If the type of the `documents` argument is an array then after variable substitution the + /// pipeline will expand to: + /// + /// ```json + /// json!([{ + /// "$documents": [/* array of documents */] + /// }]) + /// ``` + /// + #[schemars(with = "Vec")] + pub pipeline: Vec, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub description: Option, +} diff --git a/crates/configuration/src/serialized/schema.rs b/crates/configuration/src/serialized/schema.rs new file mode 100644 index 00000000..c3143c81 --- /dev/null +++ b/crates/configuration/src/serialized/schema.rs @@ -0,0 +1,62 @@ +use std::collections::BTreeMap; + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::{ + schema::{Collection, ObjectType}, + WithName, WithNameRef, +}; + +#[derive(Clone, Debug, Default, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct Schema { + #[serde(default)] + pub collections: BTreeMap, + #[serde(default)] + pub object_types: BTreeMap, +} + +impl Schema { + pub fn into_named_collections(self) -> impl Iterator> { + self.collections + .into_iter() + .map(|(name, field)| WithName::named(name, field)) + } + + pub fn into_named_object_types(self) -> impl Iterator> { + self.object_types + .into_iter() + .map(|(name, field)| WithName::named(name, field)) + } + + pub fn named_collections(&self) -> impl Iterator> { + self.collections + .iter() + .map(|(name, field)| WithNameRef::named(name, field)) + } + + pub fn named_object_types(&self) -> impl Iterator> { + self.object_types + .iter() + .map(|(name, field)| WithNameRef::named(name, field)) + } + + /// Unify two schemas. Assumes that the schemas describe mutually exclusive sets of collections. + pub fn merge(schema_a: Schema, schema_b: Schema) -> Schema { + let collections = schema_a + .collections + .into_iter() + .chain(schema_b.collections) + .collect(); + let object_types = schema_a + .object_types + .into_iter() + .chain(schema_b.object_types) + .collect(); + Schema { + collections, + object_types, + } + } +} diff --git a/crates/dc-api-test-helpers/src/lib.rs b/crates/dc-api-test-helpers/src/lib.rs index 75b42e84..e00cd7b6 100644 --- a/crates/dc-api-test-helpers/src/lib.rs +++ b/crates/dc-api-test-helpers/src/lib.rs @@ -76,6 +76,7 @@ pub fn source(name: &str) -> Vec { pub fn target(name: &str) -> Target { Target::TTable { name: vec![name.to_owned()], + arguments: Default::default(), } } diff --git a/crates/dc-api-test-helpers/src/query_request.rs b/crates/dc-api-test-helpers/src/query_request.rs index fe398f2a..47437e5a 100644 --- a/crates/dc-api-test-helpers/src/query_request.rs +++ b/crates/dc-api-test-helpers/src/query_request.rs @@ -1,6 +1,8 @@ use std::collections::HashMap; -use dc_api_types::{Query, QueryRequest, ScalarValue, TableRelationships, Target, VariableSet}; +use dc_api_types::{ + Argument, Query, QueryRequest, ScalarValue, TableRelationships, Target, VariableSet, +}; #[derive(Clone, Debug, Default)] pub struct QueryRequestBuilder { @@ -23,6 +25,23 @@ impl QueryRequestBuilder { { self.target = Some(Target::TTable { name: name.into_iter().map(|v| v.to_string()).collect(), + arguments: Default::default(), + }); + self + } + + pub fn target_with_arguments(mut self, name: I, arguments: Args) -> Self + where + I: IntoIterator, + S: ToString, + Args: IntoIterator, + { + self.target = Some(Target::TTable { + name: name.into_iter().map(|v| v.to_string()).collect(), + arguments: arguments + .into_iter() + .map(|(name, arg)| (name.to_string(), arg)) + .collect(), }); self } diff --git a/crates/dc-api-types/src/lib.rs b/crates/dc-api-types/src/lib.rs index ce33695b..04de9b21 100644 --- a/crates/dc-api-types/src/lib.rs +++ b/crates/dc-api-types/src/lib.rs @@ -186,7 +186,7 @@ pub use self::table_relationships::TableRelationships; pub mod table_type; pub use self::table_type::TableType; pub mod target; -pub use self::target::Target; +pub use self::target::{Argument, Target}; pub mod unary_comparison_operator; pub use self::unary_comparison_operator::UnaryComparisonOperator; pub mod unique_identifier_generation_strategy; diff --git a/crates/dc-api-types/src/target.rs b/crates/dc-api-types/src/target.rs index 4e0593dd..3888ae22 100644 --- a/crates/dc-api-types/src/target.rs +++ b/crates/dc-api-types/src/target.rs @@ -1,5 +1,6 @@ use serde::de::{self, MapAccess, Visitor}; use serde::{Deserialize, Deserializer, Serialize}; +use std::collections::HashMap; use std::fmt; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -10,13 +11,25 @@ pub enum Target { /// The fully qualified name of a table, where the last item in the array is the table name and any earlier items represent the namespacing of the table name #[serde(rename = "name")] name: Vec, + + /// This field is not part of the v2 DC Agent API - it is included to support queries + /// translated from the v3 NDC API. These arguments correspond to `arguments` fields on the + /// v3 `QueryRequest` and `Relationship` types. + #[serde(skip, default)] + arguments: HashMap, }, // TODO: variants TInterpolated and TFunction should be immplemented if/when we add support for (interpolated) native queries and functions } impl Target { pub fn name(&self) -> &Vec { match self { - Target::TTable { name } => name, + Target::TTable { name, .. } => name, + } + } + + pub fn arguments(&self) -> &HashMap { + match self { + Target::TTable { arguments, .. } => arguments, } } } @@ -41,7 +54,10 @@ where A: de::SeqAccess<'de>, { let name = Deserialize::deserialize(de::value::SeqAccessDeserializer::new(seq))?; - Ok(Target::TTable { name }) + Ok(Target::TTable { + name, + arguments: Default::default(), + }) } fn visit_map(self, map: M) -> Result @@ -54,3 +70,21 @@ where deserializer.deserialize_any(TargetOrTableName) } + +/// Optional arguments to the target of a query request or a relationship. This is a v3 feature +/// which corresponds to the `Argument` and `RelationshipArgument` ndc-client types. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum Argument { + /// The argument is provided by reference to a variable + Variable { + name: String, + }, + /// The argument is provided as a literal value + Literal { + value: serde_json::Value, + }, + // The argument is provided based on a column of the source collection + Column { + name: String, + }, +} diff --git a/crates/mongodb-agent-common/Cargo.toml b/crates/mongodb-agent-common/Cargo.toml index aaab9fcd..d61d7284 100644 --- a/crates/mongodb-agent-common/Cargo.toml +++ b/crates/mongodb-agent-common/Cargo.toml @@ -33,10 +33,11 @@ time = { version = "0.3.29", features = ["formatting", "parsing", "serde"] } tracing = "0.1" [dev-dependencies] +dc-api-test-helpers = { path = "../dc-api-test-helpers" } mongodb-cli-plugin = { path = "../cli" } test-helpers = { path = "../test-helpers" } -mockall = "0.11.4" +mockall = "^0.12.1" pretty_assertions = "1" proptest = "1" tokio = { version = "1", features = ["full"] } diff --git a/crates/mongodb-agent-common/src/explain.rs b/crates/mongodb-agent-common/src/explain.rs index c2aa3985..40d5185d 100644 --- a/crates/mongodb-agent-common/src/explain.rs +++ b/crates/mongodb-agent-common/src/explain.rs @@ -1,24 +1,34 @@ +use configuration::Configuration; use dc_api_types::{ExplainResponse, QueryRequest}; -use mongodb::bson::{doc, to_bson}; +use mongodb::bson::{doc, to_bson, Bson}; use crate::{ - interface_types::{MongoAgentError, MongoConfig}, - query::{self, collection_name}, + interface_types::MongoAgentError, + query::{self, QueryTarget}, + state::ConnectorState, }; pub async fn explain_query( - config: &MongoConfig, + config: &Configuration, + state: &ConnectorState, query_request: QueryRequest, ) -> Result { tracing::debug!(query_request = %serde_json::to_string(&query_request).unwrap()); - let db = config.client.database(&config.database); + let db = state.database(); - let (pipeline, _) = query::pipeline_for_query_request(&query_request)?; + let (pipeline, _) = query::pipeline_for_query_request(config, &query_request)?; let pipeline_bson = to_bson(&pipeline)?; + let aggregate_target = match QueryTarget::for_request(config, &query_request) { + QueryTarget::Collection(collection_name) => Bson::String(collection_name), + // 1 means aggregation without a collection target - as in `db.aggregate()` instead of + // `db..aggregate()` + QueryTarget::NativeQuery { .. } => Bson::Int32(1), + }; + let query_command = doc! { - "aggregate": collection_name(&query_request.target), + "aggregate": aggregate_target, "pipeline": pipeline_bson, "cursor": {}, }; diff --git a/crates/mongodb-agent-common/src/health.rs b/crates/mongodb-agent-common/src/health.rs index f927311b..fd1d064b 100644 --- a/crates/mongodb-agent-common/src/health.rs +++ b/crates/mongodb-agent-common/src/health.rs @@ -1,10 +1,10 @@ use http::StatusCode; use mongodb::bson::{doc, Document}; -use crate::interface_types::{MongoAgentError, MongoConfig}; +use crate::{interface_types::MongoAgentError, state::ConnectorState}; -pub async fn check_health(config: &MongoConfig) -> Result { - let db = config.client.database(&config.database); +pub async fn check_health(state: &ConnectorState) -> Result { + let db = state.database(); let status: Result = db.run_command(doc! { "ping": 1 }, None).await; diff --git a/crates/mongodb-agent-common/src/interface_types/mod.rs b/crates/mongodb-agent-common/src/interface_types/mod.rs index 35a40515..bd9e5d35 100644 --- a/crates/mongodb-agent-common/src/interface_types/mod.rs +++ b/crates/mongodb-agent-common/src/interface_types/mod.rs @@ -1,5 +1,3 @@ mod mongo_agent_error; -mod mongo_config; pub use self::mongo_agent_error::MongoAgentError; -pub use self::mongo_config::MongoConfig; diff --git a/crates/mongodb-agent-common/src/interface_types/mongo_agent_error.rs b/crates/mongodb-agent-common/src/interface_types/mongo_agent_error.rs index ad5ea4fa..d36f8f3e 100644 --- a/crates/mongodb-agent-common/src/interface_types/mongo_agent_error.rs +++ b/crates/mongodb-agent-common/src/interface_types/mongo_agent_error.rs @@ -6,6 +6,8 @@ use http::StatusCode; use mongodb::bson; use thiserror::Error; +use crate::procedure::ProcedureError; + /// A superset of the DC-API `AgentError` type. This enum adds error cases specific to the MongoDB /// agent. #[derive(Debug, Error)] @@ -18,6 +20,7 @@ pub enum MongoAgentError { MongoDBSerialization(#[from] mongodb::bson::ser::Error), MongoDBSupport(#[from] mongodb_support::error::Error), NotImplemented(&'static str), + ProcedureError(#[from] ProcedureError), Serialization(serde_json::Error), UnknownAggregationFunction(String), UnspecifiedRelation(String), @@ -68,6 +71,7 @@ impl MongoAgentError { } MongoDBSupport(err) => (StatusCode::BAD_REQUEST, ErrorResponse::new(&err)), NotImplemented(missing_feature) => (StatusCode::BAD_REQUEST, ErrorResponse::new(&format!("The MongoDB agent does not yet support {missing_feature}"))), + ProcedureError(err) => (StatusCode::BAD_REQUEST, ErrorResponse::new(err)), Serialization(err) => (StatusCode::INTERNAL_SERVER_ERROR, ErrorResponse::new(&err)), UnknownAggregationFunction(function) => ( StatusCode::BAD_REQUEST, diff --git a/crates/mongodb-agent-common/src/interface_types/mongo_config.rs b/crates/mongodb-agent-common/src/interface_types/mongo_config.rs deleted file mode 100644 index e4a43c11..00000000 --- a/crates/mongodb-agent-common/src/interface_types/mongo_config.rs +++ /dev/null @@ -1,15 +0,0 @@ -use std::collections::BTreeMap; - -use configuration::{native_procedure::NativeProcedure, schema::ObjectType}; -use mongodb::Client; - -#[derive(Clone, Debug)] -pub struct MongoConfig { - pub client: Client, - - /// Name of the database to connect to - pub database: String, - - pub native_procedures: BTreeMap, - pub object_types: BTreeMap, -} diff --git a/crates/mongodb-agent-common/src/mongodb/accumulator.rs b/crates/mongodb-agent-common/src/mongodb/accumulator.rs index 726a7969..467c3e73 100644 --- a/crates/mongodb-agent-common/src/mongodb/accumulator.rs +++ b/crates/mongodb-agent-common/src/mongodb/accumulator.rs @@ -30,6 +30,9 @@ pub enum Accumulator { #[serde(rename = "$max")] Max(bson::Bson), + #[serde(rename = "$push")] + Push(bson::Bson), + /// Returns a sum of numerical values. Ignores non-numeric values. /// /// See https://www.mongodb.com/docs/manual/reference/operator/aggregation/sum/#mongodb-group-grp.-sum diff --git a/crates/mongodb-agent-common/src/mongodb/collection.rs b/crates/mongodb-agent-common/src/mongodb/collection.rs index 0b5523ad..090dc66a 100644 --- a/crates/mongodb-agent-common/src/mongodb/collection.rs +++ b/crates/mongodb-agent-common/src/mongodb/collection.rs @@ -13,14 +13,8 @@ use mockall::automock; use super::Pipeline; -// In MockCollectionTrait the cursor types are implemented using `Iter` which is a struct that -// wraps around and iterator, and implements `Stream` (and by extension implements `TryStreamExt`). -// I didn't know how to allow any Iterator type here, so I specified the type that is produced when -// calling `into_iter` on a `Vec`. - Jesse H. -// -// To produce a mock stream use the `mock_stream` function in the `test_helpers` module. #[cfg(test)] -type MockCursor = futures::stream::Iter<> as IntoIterator>::IntoIter>; +use super::test_helpers::MockCursor; /// Abstract MongoDB collection methods. This lets us mock a database connection in tests. The /// automock attribute generates a struct called MockCollectionTrait that implements this trait. diff --git a/crates/mongodb-agent-common/src/mongodb/database.rs b/crates/mongodb-agent-common/src/mongodb/database.rs new file mode 100644 index 00000000..ce56a06f --- /dev/null +++ b/crates/mongodb-agent-common/src/mongodb/database.rs @@ -0,0 +1,63 @@ +use async_trait::async_trait; +use futures_util::Stream; +use mongodb::{bson::Document, error::Error, options::AggregateOptions, Database}; + +#[cfg(test)] +use mockall::automock; + +use super::{CollectionTrait, Pipeline}; + +#[cfg(test)] +use super::MockCollectionTrait; + +#[cfg(test)] +use super::test_helpers::MockCursor; + +/// Abstract MongoDB database methods. This lets us mock a database connection in tests. The +/// automock attribute generates a struct called MockDatabaseTrait that implements this trait. The +/// mock provides a variety of methods for mocking and spying on database behavior in tests. See +/// https://docs.rs/mockall/latest/mockall/ +/// +/// I haven't figured out how to make generic associated types work with automock, so the type +/// argument for `Collection` values produced via `DatabaseTrait::collection` is fixed to to +/// `Document`. That's the way we're using collections in this app anyway. +#[cfg_attr(test, automock( + type Collection = MockCollectionTrait; + type DocumentCursor = MockCursor; +))] +#[async_trait] +pub trait DatabaseTrait { + type Collection: CollectionTrait; + type DocumentCursor: Stream>; + + async fn aggregate( + &self, + pipeline: Pipeline, + options: Options, + ) -> Result + where + Options: Into> + Send + 'static; + + fn collection(&self, name: &str) -> Self::Collection; +} + +#[async_trait] +impl DatabaseTrait for Database { + type Collection = mongodb::Collection; + type DocumentCursor = mongodb::Cursor; + + async fn aggregate( + &self, + pipeline: Pipeline, + options: Options, + ) -> Result + where + Options: Into> + Send + 'static, + { + Database::aggregate(self, pipeline, options).await + } + + fn collection(&self, name: &str) -> Self::Collection { + Database::collection::(self, name) + } +} diff --git a/crates/mongodb-agent-common/src/mongodb/mod.rs b/crates/mongodb-agent-common/src/mongodb/mod.rs index c0261d68..2a8961cf 100644 --- a/crates/mongodb-agent-common/src/mongodb/mod.rs +++ b/crates/mongodb-agent-common/src/mongodb/mod.rs @@ -1,5 +1,6 @@ mod accumulator; mod collection; +mod database; mod pipeline; mod projection; pub mod sanitize; @@ -12,12 +13,17 @@ pub mod test_helpers; pub use self::{ accumulator::Accumulator, collection::CollectionTrait, + database::DatabaseTrait, pipeline::Pipeline, projection::{ProjectAs, Projection}, selection::Selection, stage::Stage, }; -// MockQueryableCollection is generated by automock when the test flag is active. +// MockCollectionTrait is generated by automock when the test flag is active. #[cfg(test)] pub use self::collection::MockCollectionTrait; + +// MockDatabase is generated by automock when the test flag is active. +#[cfg(test)] +pub use self::database::MockDatabaseTrait; diff --git a/crates/mongodb-agent-common/src/mongodb/pipeline.rs b/crates/mongodb-agent-common/src/mongodb/pipeline.rs index 9b684e0f..3b728477 100644 --- a/crates/mongodb-agent-common/src/mongodb/pipeline.rs +++ b/crates/mongodb-agent-common/src/mongodb/pipeline.rs @@ -19,6 +19,14 @@ impl Pipeline { self.stages.append(&mut other.stages); } + pub fn empty() -> Pipeline { + Pipeline { stages: vec![] } + } + + pub fn is_empty(&self) -> bool { + self.stages.is_empty() + } + pub fn push(&mut self, stage: Stage) { self.stages.push(stage); } diff --git a/crates/mongodb-agent-common/src/mongodb/selection.rs b/crates/mongodb-agent-common/src/mongodb/selection.rs index 966350d7..d9e5dfd3 100644 --- a/crates/mongodb-agent-common/src/mongodb/selection.rs +++ b/crates/mongodb-agent-common/src/mongodb/selection.rs @@ -256,6 +256,7 @@ mod tests { variables: None, target: Target::TTable { name: vec!["test".to_owned()], + arguments: Default::default(), }, relationships: vec![], }; diff --git a/crates/mongodb-agent-common/src/mongodb/stage.rs b/crates/mongodb-agent-common/src/mongodb/stage.rs index 8a7ff60a..7164046e 100644 --- a/crates/mongodb-agent-common/src/mongodb/stage.rs +++ b/crates/mongodb-agent-common/src/mongodb/stage.rs @@ -151,4 +151,9 @@ pub enum Stage { /// See https://www.mongodb.com/docs/manual/reference/operator/aggregation/replaceWith/#mongodb-pipeline-pipe.-replaceWith #[serde(rename = "$replaceWith")] ReplaceWith(Selection), + + /// For cases where we receive pipeline stages from an external source, such as a native query, + /// and we don't want to attempt to parse it we store the stage BSON document unaltered. + #[serde(untagged)] + Other(bson::Document), } diff --git a/crates/mongodb-agent-common/src/mongodb/test_helpers.rs b/crates/mongodb-agent-common/src/mongodb/test_helpers.rs index f58ea75e..473db605 100644 --- a/crates/mongodb-agent-common/src/mongodb/test_helpers.rs +++ b/crates/mongodb-agent-common/src/mongodb/test_helpers.rs @@ -1,5 +1,21 @@ use futures_util::stream::{iter, Iter}; -use mongodb::error::Error; +use mongodb::{ + bson::{to_bson, Bson}, + error::Error, + options::AggregateOptions, +}; +use pretty_assertions::assert_eq; + +use super::{MockCollectionTrait, MockDatabaseTrait}; + +// In MockCollectionTrait and MockDatabaseTrait the cursor types are implemented using `Iter` which +// is a struct that wraps around and iterator, and implements `Stream` (and by extension implements +// `TryStreamExt`). I didn't know how to allow any Iterator type here, so I specified the type that +// is produced when calling `into_iter` on a `Vec`. - Jesse H. +// +// To produce a mock stream use the `mock_stream` function in this module. +#[cfg(test)] +pub type MockCursor = futures::stream::Iter<> as IntoIterator>::IntoIter>; /// Create a stream that can be returned from mock implementations for /// CollectionTrait::aggregate or CollectionTrait::find. @@ -8,3 +24,127 @@ pub fn mock_stream( ) -> Iter<> as IntoIterator>::IntoIter> { iter(items) } + +/// Mocks the result of an aggregate call on a given collection. +pub fn mock_collection_aggregate_response( + collection: impl ToString, + result: Bson, +) -> MockDatabaseTrait { + let collection_name = collection.to_string(); + + let mut db = MockDatabaseTrait::new(); + db.expect_collection().returning(move |name| { + assert_eq!( + name, collection_name, + "unexpected target for mock aggregate" + ); + + // Make some clones to work around ownership issues. These closures are `FnMut`, not + // `FnOnce` so the type checker can't just move ownership into the closure. + let per_colection_result = result.clone(); + + let mut mock_collection = MockCollectionTrait::new(); + mock_collection.expect_aggregate().returning( + move |_pipeline, _: Option| { + let result_docs = { + let items = match per_colection_result.clone() { + Bson::Array(xs) => xs, + _ => panic!("mock pipeline result should be an array of documents"), + }; + items + .into_iter() + .map(|x| match x { + Bson::Document(doc) => Ok(doc), + _ => panic!("mock pipeline result should be an array of documents"), + }) + .collect() + }; + Ok(mock_stream(result_docs)) + }, + ); + mock_collection + }); + db +} + +/// Mocks the result of an aggregate call on a given collection. Asserts that the pipeline that the +/// aggregate call receives matches the given pipeline. +pub fn mock_collection_aggregate_response_for_pipeline( + collection: impl ToString, + expected_pipeline: Bson, + result: Bson, +) -> MockDatabaseTrait { + let collection_name = collection.to_string(); + + let mut db = MockDatabaseTrait::new(); + db.expect_collection().returning(move |name| { + assert_eq!( + name, collection_name, + "unexpected target for mock aggregate" + ); + + // Make some clones to work around ownership issues. These closures are `FnMut`, not + // `FnOnce` so the type checker can't just move ownership into the closure. + let per_collection_pipeline = expected_pipeline.clone(); + let per_colection_result = result.clone(); + + let mut mock_collection = MockCollectionTrait::new(); + mock_collection.expect_aggregate().returning( + move |pipeline, _: Option| { + assert_eq!( + to_bson(&pipeline).unwrap(), + per_collection_pipeline, + "actual pipeline (left) did not match expected (right)" + ); + let result_docs = { + let items = match per_colection_result.clone() { + Bson::Array(xs) => xs, + _ => panic!("mock pipeline result should be an array of documents"), + }; + items + .into_iter() + .map(|x| match x { + Bson::Document(doc) => Ok(doc), + _ => panic!("mock pipeline result should be an array of documents"), + }) + .collect() + }; + Ok(mock_stream(result_docs)) + }, + ); + mock_collection + }); + db +} + +/// Mocks the result of an aggregate call without a specified collection. Asserts that the pipeline +/// that the aggregate call receives matches the given pipeline. +pub fn mock_aggregate_response_for_pipeline( + expected_pipeline: Bson, + result: Bson, +) -> MockDatabaseTrait { + let mut db = MockDatabaseTrait::new(); + db.expect_aggregate() + .returning(move |pipeline, _: Option| { + assert_eq!( + to_bson(&pipeline).unwrap(), + expected_pipeline, + "actual pipeline (left) did not match expected (right)" + ); + let result_docs = { + let items = match result.clone() { + Bson::Array(xs) => xs, + _ => panic!("mock pipeline result should be an array of documents"), + }; + items + .into_iter() + .map(|x| match x { + Bson::Document(doc) => Ok(doc), + _ => panic!("mock pipeline result should be an array of documents"), + }) + .collect() + }; + Ok(mock_stream(result_docs)) + }); + db +} diff --git a/crates/mongodb-agent-common/src/procedure/interpolated_command.rs b/crates/mongodb-agent-common/src/procedure/interpolated_command.rs index a2a6354a..d644480d 100644 --- a/crates/mongodb-agent-common/src/procedure/interpolated_command.rs +++ b/crates/mongodb-agent-common/src/procedure/interpolated_command.rs @@ -135,7 +135,12 @@ fn parse_native_procedure(string: &str) -> Vec { #[cfg(test)] mod tests { - use configuration::native_procedure::NativeProcedure; + use configuration::{ + native_procedure::NativeProcedure, + schema::{ObjectField, ObjectType, Type}, + }; + use mongodb::bson::doc; + use mongodb_support::BsonScalarType as S; use pretty_assertions::assert_eq; use serde_json::json; @@ -148,20 +153,36 @@ mod tests { #[test] fn interpolates_non_string_type() -> anyhow::Result<()> { - let native_procedure_input = json!({ - "resultType": { "object": "InsertArtist" }, - "arguments": { - "id": { "type": { "scalar": "int" } }, - "name": { "type": { "scalar": "string" } }, - }, - "command": { + let native_procedure = NativeProcedure { + result_type: Type::Object("InsertArtist".to_owned()), + arguments: [ + ( + "id".to_owned(), + ObjectField { + r#type: Type::Scalar(S::Int), + description: Default::default(), + }, + ), + ( + "name".to_owned(), + ObjectField { + r#type: Type::Scalar(S::String), + description: Default::default(), + }, + ), + ] + .into(), + command: doc! { "insert": "Artist", "documents": [{ "ArtistId": "{{ id }}", "Name": "{{name }}", }], }, - }); + selection_criteria: Default::default(), + description: Default::default(), + }; + let input_arguments = [ ("id".to_owned(), json!(1001)), ("name".to_owned(), json!("Regina Spektor")), @@ -169,9 +190,8 @@ mod tests { .into_iter() .collect(); - let native_procedure: NativeProcedure = serde_json::from_value(native_procedure_input)?; let arguments = resolve_arguments( - &native_procedure.object_types, + &Default::default(), &native_procedure.arguments, input_arguments, )?; @@ -192,25 +212,49 @@ mod tests { #[test] fn interpolates_array_argument() -> anyhow::Result<()> { - let native_procedure_input = json!({ - "name": "insertArtist", - "resultType": { "object": "InsertArtist" }, - "objectTypes": { - "ArtistInput": { - "fields": { - "ArtistId": { "type": { "scalar": "int" } }, - "Name": { "type": { "scalar": "string" } }, - }, - } - }, - "arguments": { - "documents": { "type": { "arrayOf": { "object": "ArtistInput" } } }, - }, - "command": { + let native_procedure = NativeProcedure { + result_type: Type::Object("InsertArtist".to_owned()), + arguments: [( + "documents".to_owned(), + ObjectField { + r#type: Type::ArrayOf(Box::new(Type::Object("ArtistInput".to_owned()))), + description: Default::default(), + }, + )] + .into(), + command: doc! { "insert": "Artist", "documents": "{{ documents }}", }, - }); + selection_criteria: Default::default(), + description: Default::default(), + }; + + let object_types = [( + "ArtistInput".to_owned(), + ObjectType { + fields: [ + ( + "ArtistId".to_owned(), + ObjectField { + r#type: Type::Scalar(S::Int), + description: Default::default(), + }, + ), + ( + "Name".to_owned(), + ObjectField { + r#type: Type::Scalar(S::String), + description: Default::default(), + }, + ), + ] + .into(), + description: Default::default(), + }, + )] + .into(); + let input_arguments = [( "documents".to_owned(), json!([ @@ -221,12 +265,8 @@ mod tests { .into_iter() .collect(); - let native_procedure: NativeProcedure = serde_json::from_value(native_procedure_input)?; - let arguments = resolve_arguments( - &native_procedure.object_types, - &native_procedure.arguments, - input_arguments, - )?; + let arguments = + resolve_arguments(&object_types, &native_procedure.arguments, input_arguments)?; let command = interpolated_command(&native_procedure.command, &arguments)?; assert_eq!( @@ -250,18 +290,33 @@ mod tests { #[test] fn interpolates_arguments_within_string() -> anyhow::Result<()> { - let native_procedure_input = json!({ - "name": "insert", - "resultType": { "object": "Insert" }, - "arguments": { - "prefix": { "type": { "scalar": "string" } }, - "basename": { "type": { "scalar": "string" } }, - }, - "command": { + let native_procedure = NativeProcedure { + result_type: Type::Object("Insert".to_owned()), + arguments: [ + ( + "prefix".to_owned(), + ObjectField { + r#type: Type::Scalar(S::String), + description: Default::default(), + }, + ), + ( + "basename".to_owned(), + ObjectField { + r#type: Type::Scalar(S::String), + description: Default::default(), + }, + ), + ] + .into(), + command: doc! { "insert": "{{prefix}}-{{basename}}", "empty": "", }, - }); + selection_criteria: Default::default(), + description: Default::default(), + }; + let input_arguments = [ ("prefix".to_owned(), json!("current")), ("basename".to_owned(), json!("some-coll")), @@ -269,9 +324,8 @@ mod tests { .into_iter() .collect(); - let native_procedure: NativeProcedure = serde_json::from_value(native_procedure_input)?; let arguments = resolve_arguments( - &native_procedure.object_types, + &Default::default(), &native_procedure.arguments, input_arguments, )?; diff --git a/crates/mongodb-agent-common/src/query/execute_query_request.rs b/crates/mongodb-agent-common/src/query/execute_query_request.rs index 5d1da3c5..d56bb03c 100644 --- a/crates/mongodb-agent-common/src/query/execute_query_request.rs +++ b/crates/mongodb-agent-common/src/query/execute_query_request.rs @@ -1,32 +1,48 @@ use anyhow::anyhow; +use configuration::Configuration; use dc_api_types::{QueryRequest, QueryResponse, RowSet}; +use futures::Stream; use futures_util::TryStreamExt; use itertools::Itertools as _; use mongodb::bson::{self, Document}; use super::pipeline::{pipeline_for_query_request, ResponseShape}; use crate::{ - interface_types::MongoAgentError, mongodb::CollectionTrait, query::foreach::foreach_variants, + interface_types::MongoAgentError, + mongodb::{CollectionTrait as _, DatabaseTrait}, + query::{foreach::foreach_variants, QueryTarget}, }; +/// Execute a query request against the given collection. +/// +/// The use of `DatabaseTrait` lets us inject a mock implementation of the MongoDB driver for +/// testing. pub async fn execute_query_request( - collection: &impl CollectionTrait, + database: impl DatabaseTrait, + config: &Configuration, query_request: QueryRequest, ) -> Result { - let (pipeline, response_shape) = pipeline_for_query_request(&query_request)?; + let target = QueryTarget::for_request(config, &query_request); + let (pipeline, response_shape) = pipeline_for_query_request(config, &query_request)?; tracing::debug!( ?query_request, + ?target, pipeline = %serde_json::to_string(&pipeline).unwrap(), "executing query" ); - let document_cursor = collection.aggregate(pipeline, None).await?; - - let documents = document_cursor - .into_stream() - .map_err(MongoAgentError::MongoDB) - .try_collect::>() - .await?; + // The target of a query request might be a collection, or it might be a native query. In the + // latter case there is no collection to perform the aggregation against. So instead of sending + // the MongoDB API call `db..aggregate` we instead call `db.aggregate`. + let documents = match target { + QueryTarget::Collection(collection_name) => { + let collection = database.collection(&collection_name); + collect_from_cursor(collection.aggregate(pipeline, None).await?).await + } + QueryTarget::NativeQuery { .. } => { + collect_from_cursor(database.aggregate(pipeline, None).await?).await + } + }?; tracing::debug!(response_documents = %serde_json::to_string(&documents).unwrap(), "response from MongoDB"); @@ -47,6 +63,16 @@ pub async fn execute_query_request( Ok(response) } +async fn collect_from_cursor( + document_cursor: impl Stream>, +) -> Result, MongoAgentError> { + document_cursor + .into_stream() + .map_err(MongoAgentError::MongoDB) + .try_collect::>() + .await +} + fn parse_single_document(documents: Vec) -> Result where T: for<'de> serde::Deserialize<'de>, diff --git a/crates/mongodb-agent-common/src/query/foreach.rs b/crates/mongodb-agent-common/src/query/foreach.rs index fea797c5..d347537e 100644 --- a/crates/mongodb-agent-common/src/query/foreach.rs +++ b/crates/mongodb-agent-common/src/query/foreach.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use configuration::Configuration; use dc_api_types::comparison_column::ColumnSelector; use dc_api_types::{ BinaryComparisonOperator, ComparisonColumn, ComparisonValue, Expression, QueryRequest, @@ -54,6 +55,7 @@ pub fn foreach_variants(query_request: &QueryRequest) -> Option, + config: &Configuration, query_request: &QueryRequest, ) -> Result<(Pipeline, ResponseShape), MongoAgentError> { let pipelines_with_response_shapes: Vec<(String, (Pipeline, ResponseShape))> = foreach @@ -74,7 +76,8 @@ pub fn pipeline_for_foreach( .into(); } - let pipeline_with_response_shape = pipeline_for_non_foreach(variables.as_ref(), &q)?; + let pipeline_with_response_shape = + pipeline_for_non_foreach(config, variables.as_ref(), &q)?; Ok((facet_name(index), pipeline_with_response_shape)) }) .collect::>()?; @@ -136,15 +139,12 @@ mod tests { use dc_api_types::{ BinaryComparisonOperator, ComparisonColumn, Field, Query, QueryRequest, QueryResponse, }; - use mongodb::{ - bson::{doc, from_document}, - options::AggregateOptions, - }; + use mongodb::bson::{bson, Bson}; use pretty_assertions::assert_eq; - use serde_json::{from_value, json, to_value}; + use serde_json::{from_value, json}; use crate::{ - mongodb::{test_helpers::mock_stream, MockCollectionTrait}, + mongodb::test_helpers::mock_collection_aggregate_response_for_pipeline, query::execute_query_request::execute_query_request, }; @@ -173,7 +173,7 @@ mod tests { ] }))?; - let expected_pipeline = json!([ + let expected_pipeline = bson!([ { "$facet": { "__FACET___0": [ @@ -223,34 +223,32 @@ mod tests { ] }))?; - let mut collection = MockCollectionTrait::new(); - collection - .expect_aggregate() - .returning(move |pipeline, _: Option| { - assert_eq!(expected_pipeline, to_value(pipeline).unwrap()); - Ok(mock_stream(vec![Ok(from_document(doc! { - "rows": [ - { - "query": { - "rows": [ - { "albumId": 1, "title": "For Those About To Rock We Salute You" }, - { "albumId": 4, "title": "Let There Be Rock" } - ] - } - }, - { - "query": { - "rows": [ - { "albumId": 2, "title": "Balls to the Wall" }, - { "albumId": 3, "title": "Restless and Wild" } - ] - } + let db = mock_collection_aggregate_response_for_pipeline( + "tracks", + expected_pipeline, + bson!([{ + "rows": [ + { + "query": { + "rows": [ + { "albumId": 1, "title": "For Those About To Rock We Salute You" }, + { "albumId": 4, "title": "Let There Be Rock" } + ] } - ], - })?)])) - }); + }, + { + "query": { + "rows": [ + { "albumId": 2, "title": "Balls to the Wall" }, + { "albumId": 3, "title": "Restless and Wild" } + ] + } + } + ], + }]), + ); - let result = execute_query_request(&collection, query_request).await?; + let result = execute_query_request(db, &Default::default(), query_request).await?; assert_eq!(expected_response, result); Ok(()) @@ -284,7 +282,7 @@ mod tests { ] }))?; - let expected_pipeline = json!([ + let expected_pipeline = bson!([ { "$facet": { "__FACET___0": [ @@ -364,40 +362,38 @@ mod tests { ] }))?; - let mut collection = MockCollectionTrait::new(); - collection - .expect_aggregate() - .returning(move |pipeline, _: Option| { - assert_eq!(expected_pipeline, to_value(pipeline).unwrap()); - Ok(mock_stream(vec![Ok(from_document(doc! { - "rows": [ - { - "query": { - "aggregates": { - "count": 2, - }, - "rows": [ - { "albumId": 1, "title": "For Those About To Rock We Salute You" }, - { "albumId": 4, "title": "Let There Be Rock" } - ] - } - }, - { - "query": { - "aggregates": { - "count": 2, - }, - "rows": [ - { "albumId": 2, "title": "Balls to the Wall" }, - { "albumId": 3, "title": "Restless and Wild" } - ] - } + let db = mock_collection_aggregate_response_for_pipeline( + "tracks", + expected_pipeline, + bson!([{ + "rows": [ + { + "query": { + "aggregates": { + "count": 2, + }, + "rows": [ + { "albumId": 1, "title": "For Those About To Rock We Salute You" }, + { "albumId": 4, "title": "Let There Be Rock" } + ] } - ], - })?)])) - }); + }, + { + "query": { + "aggregates": { + "count": 2, + }, + "rows": [ + { "albumId": 2, "title": "Balls to the Wall" }, + { "albumId": 3, "title": "Restless and Wild" } + ] + } + } + ] + }]), + ); - let result = execute_query_request(&collection, query_request).await?; + let result = execute_query_request(db, &Default::default(), query_request).await?; assert_eq!(expected_response, result); Ok(()) @@ -414,6 +410,7 @@ mod tests { ), target: dc_api_types::Target::TTable { name: vec!["tracks".to_owned()], + arguments: Default::default(), }, relationships: Default::default(), query: Box::new(Query { @@ -454,8 +451,8 @@ mod tests { }), }; - fn facet(artist_id: i32) -> serde_json::Value { - json!([ + fn facet(artist_id: i32) -> Bson { + bson!([ { "$match": { "artistId": {"$eq": artist_id } } }, { "$replaceWith": { "albumId": { "$ifNull": ["$albumId", null] }, @@ -464,7 +461,7 @@ mod tests { ]) } - let expected_pipeline = json!([ + let expected_pipeline = bson!([ { "$facet": { "__FACET___0": facet(1), @@ -531,47 +528,45 @@ mod tests { ] }))?; - let mut collection = MockCollectionTrait::new(); - collection - .expect_aggregate() - .returning(move |pipeline, _: Option| { - assert_eq!(expected_pipeline, to_value(pipeline).unwrap()); - Ok(mock_stream(vec![Ok(from_document(doc! { - "rows": [ - { - "query": { - "rows": [ - { "albumId": 1, "title": "For Those About To Rock We Salute You" }, - { "albumId": 4, "title": "Let There Be Rock" } - ] - } - }, - { - "query": { - "rows": [] - } - }, - { - "query": { - "rows": [ - { "albumId": 2, "title": "Balls to the Wall" }, - { "albumId": 3, "title": "Restless and Wild" } - ] - } - }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - { "query": { "rows": [] } }, - ], - })?)])) - }); - - let result = execute_query_request(&collection, query_request).await?; + let db = mock_collection_aggregate_response_for_pipeline( + "tracks", + expected_pipeline, + bson!([{ + "rows": [ + { + "query": { + "rows": [ + { "albumId": 1, "title": "For Those About To Rock We Salute You" }, + { "albumId": 4, "title": "Let There Be Rock" } + ] + } + }, + { + "query": { + "rows": [] + } + }, + { + "query": { + "rows": [ + { "albumId": 2, "title": "Balls to the Wall" }, + { "albumId": 3, "title": "Restless and Wild" } + ] + } + }, + { "query": { "rows": [] } }, + { "query": { "rows": [] } }, + { "query": { "rows": [] } }, + { "query": { "rows": [] } }, + { "query": { "rows": [] } }, + { "query": { "rows": [] } }, + { "query": { "rows": [] } }, + { "query": { "rows": [] } }, + ], + }]), + ); + + let result = execute_query_request(db, &Default::default(), query_request).await?; assert_eq!(expected_response, result); Ok(()) diff --git a/crates/mongodb-agent-common/src/query/mod.rs b/crates/mongodb-agent-common/src/query/mod.rs index 3a3bbb7c..08498435 100644 --- a/crates/mongodb-agent-common/src/query/mod.rs +++ b/crates/mongodb-agent-common/src/query/mod.rs @@ -5,47 +5,46 @@ mod execute_query_request; mod foreach; mod make_selector; mod make_sort; +mod native_query; mod pipeline; +mod query_target; mod relations; pub mod serialization; -use dc_api_types::{QueryRequest, QueryResponse, Target}; -use mongodb::bson::Document; +use configuration::Configuration; +use dc_api_types::{QueryRequest, QueryResponse}; use self::execute_query_request::execute_query_request; pub use self::{ make_selector::make_selector, make_sort::make_sort, pipeline::{is_response_faceted, pipeline_for_non_foreach, pipeline_for_query_request}, + query_target::QueryTarget, }; -use crate::interface_types::{MongoAgentError, MongoConfig}; - -pub fn collection_name(query_request_target: &Target) -> String { - query_request_target.name().join(".") -} +use crate::{interface_types::MongoAgentError, state::ConnectorState}; pub async fn handle_query_request( - config: &MongoConfig, + config: &Configuration, + state: &ConnectorState, query_request: QueryRequest, ) -> Result { - let database = config.client.database(&config.database); - let collection = database.collection::(&collection_name(&query_request.target)); - - execute_query_request(&collection, query_request).await + let database = state.database(); + // This function delegates to another function which gives is a point to inject a mock database + // implementation for testing. + execute_query_request(database, config, query_request).await } #[cfg(test)] mod tests { use dc_api_types::{QueryRequest, QueryResponse, RowSet}; - use mongodb::{ - bson::{self, bson, doc, from_document, to_bson}, - options::AggregateOptions, - }; + use mongodb::bson::{self, bson}; use pretty_assertions::assert_eq; - use serde_json::{from_value, json, to_value}; + use serde_json::{from_value, json}; use super::execute_query_request; - use crate::mongodb::{test_helpers::mock_stream, MockCollectionTrait}; + use crate::mongodb::test_helpers::{ + mock_collection_aggregate_response, mock_collection_aggregate_response_for_pipeline, + }; #[tokio::test] async fn executes_query() -> Result<(), anyhow::Error> { @@ -72,23 +71,21 @@ mod tests { ], }))?; - let expected_pipeline = json!([ + let expected_pipeline = bson!([ { "$match": { "gpa": { "$lt": 4.0 } } }, { "$replaceWith": { "student_gpa": { "$ifNull": ["$gpa", null] } } }, ]); - let mut collection = MockCollectionTrait::new(); - collection - .expect_aggregate() - .returning(move |pipeline, _: Option| { - assert_eq!(expected_pipeline, to_value(pipeline).unwrap()); - Ok(mock_stream(vec![ - Ok(from_document(doc! { "student_gpa": 3.1, })?), - Ok(from_document(doc! { "student_gpa": 3.6, })?), - ])) - }); + let db = mock_collection_aggregate_response_for_pipeline( + "students", + expected_pipeline, + bson!([ + { "student_gpa": 3.1, }, + { "student_gpa": 3.6, }, + ]), + ); - let result = execute_query_request(&collection, query_request).await?; + let result = execute_query_request(db, &Default::default(), query_request).await?; assert_eq!(expected_response, result); Ok(()) } @@ -122,7 +119,7 @@ mod tests { } }))?; - let expected_pipeline = json!([ + let expected_pipeline = bson!([ { "$facet": { "avg": [ @@ -152,20 +149,18 @@ mod tests { }, ]); - let mut collection = MockCollectionTrait::new(); - collection - .expect_aggregate() - .returning(move |pipeline, _: Option| { - assert_eq!(expected_pipeline, to_value(pipeline).unwrap()); - Ok(mock_stream(vec![Ok(from_document(doc! { - "aggregates": { - "count": 11, - "avg": 3, - }, - })?)])) - }); + let db = mock_collection_aggregate_response_for_pipeline( + "students", + expected_pipeline, + bson!([{ + "aggregates": { + "count": 11, + "avg": 3, + }, + }]), + ); - let result = execute_query_request(&collection, query_request).await?; + let result = execute_query_request(db, &Default::default(), query_request).await?; assert_eq!(expected_response, result); Ok(()) } @@ -205,7 +200,7 @@ mod tests { }], }))?; - let expected_pipeline = json!([ + let expected_pipeline = bson!([ { "$match": { "gpa": { "$lt": 4.0 } } }, { "$facet": { @@ -233,22 +228,20 @@ mod tests { }, ]); - let mut collection = MockCollectionTrait::new(); - collection - .expect_aggregate() - .returning(move |pipeline, _: Option| { - assert_eq!(expected_pipeline, to_value(pipeline).unwrap()); - Ok(mock_stream(vec![Ok(from_document(doc! { - "aggregates": { - "avg": 3.1, - }, - "rows": [{ - "gpa": 3.1, - }], - })?)])) - }); + let db = mock_collection_aggregate_response_for_pipeline( + "students", + expected_pipeline, + bson!([{ + "aggregates": { + "avg": 3.1, + }, + "rows": [{ + "gpa": 3.1, + }], + }]), + ); - let result = execute_query_request(&collection, query_request).await?; + let result = execute_query_request(db, &Default::default(), query_request).await?; assert_eq!(expected_response, result); Ok(()) } @@ -298,17 +291,15 @@ mod tests { }, ]); - let mut collection = MockCollectionTrait::new(); - collection - .expect_aggregate() - .returning(move |pipeline, _: Option| { - assert_eq!(expected_pipeline, to_bson(&pipeline).unwrap()); - Ok(mock_stream(vec![Ok(from_document(doc! { - "date": "2018-08-14T15:05:03.142Z", - })?)])) - }); + let db = mock_collection_aggregate_response_for_pipeline( + "comments", + expected_pipeline, + bson!([{ + "date": "2018-08-14T15:05:03.142Z", + }]), + ); - let result = execute_query_request(&collection, query_request).await?; + let result = execute_query_request(db, &Default::default(), query_request).await?; assert_eq!(expected_response, result); Ok(()) } @@ -327,12 +318,9 @@ mod tests { let expected_response = QueryResponse::Single(RowSet::Rows { rows: vec![] }); - let mut collection = MockCollectionTrait::new(); - collection - .expect_aggregate() - .returning(move |_pipeline, _: Option| Ok(mock_stream(vec![]))); + let db = mock_collection_aggregate_response("comments", bson!([])); - let result = execute_query_request(&collection, query_request).await?; + let result = execute_query_request(db, &Default::default(), query_request).await?; assert_eq!(expected_response, result); Ok(()) } diff --git a/crates/mongodb-agent-common/src/query/native_query.rs b/crates/mongodb-agent-common/src/query/native_query.rs new file mode 100644 index 00000000..ca2cc84d --- /dev/null +++ b/crates/mongodb-agent-common/src/query/native_query.rs @@ -0,0 +1,293 @@ +use std::collections::HashMap; + +use configuration::{native_query::NativeQuery, Configuration}; +use dc_api_types::{Argument, QueryRequest, VariableSet}; +use itertools::Itertools as _; + +use crate::{ + interface_types::MongoAgentError, + mongodb::{Pipeline, Stage}, + procedure::{interpolated_command, ProcedureError}, +}; + +use super::{arguments::resolve_arguments, query_target::QueryTarget}; + +/// Returns either the pipeline defined by a native query with variable bindings for arguments, or +/// an empty pipeline if the query request target is not a native query +pub fn pipeline_for_native_query( + config: &Configuration, + variables: Option<&VariableSet>, + query_request: &QueryRequest, +) -> Result { + match QueryTarget::for_request(config, query_request) { + QueryTarget::Collection(_) => Ok(Pipeline::empty()), + QueryTarget::NativeQuery { + native_query, + arguments, + .. + } => make_pipeline(config, variables, native_query, arguments), + } +} + +fn make_pipeline( + config: &Configuration, + variables: Option<&VariableSet>, + native_query: &NativeQuery, + arguments: &HashMap, +) -> Result { + let expressions = arguments + .iter() + .map(|(name, argument)| { + Ok(( + name.to_owned(), + argument_to_mongodb_expression(argument, variables)?, + )) as Result<_, MongoAgentError> + }) + .try_collect()?; + + let bson_arguments = + resolve_arguments(&config.object_types, &native_query.arguments, expressions) + .map_err(ProcedureError::UnresolvableArguments)?; + + // Replace argument placeholders with resolved expressions, convert document list to + // a `Pipeline` value + let stages = native_query + .pipeline + .iter() + .map(|document| interpolated_command(document, &bson_arguments)) + .map_ok(Stage::Other) + .try_collect()?; + + Ok(Pipeline::new(stages)) +} + +fn argument_to_mongodb_expression( + argument: &Argument, + variables: Option<&VariableSet>, +) -> Result { + match argument { + Argument::Variable { name } => variables + .and_then(|vs| vs.get(name)) + .ok_or_else(|| MongoAgentError::VariableNotDefined(name.to_owned())) + .cloned(), + Argument::Literal { value } => Ok(value.clone()), + // TODO: Column references are needed for native queries that are a target of a relation. + // MDB-106 + Argument::Column { .. } => Err(MongoAgentError::NotImplemented( + "column references in native queries are not currently implemented", + )), + } +} + +#[cfg(test)] +mod tests { + use configuration::{ + native_query::{NativeQuery, NativeQueryRepresentation}, + schema::{ObjectField, ObjectType, Type}, + Configuration, + }; + use dc_api_test_helpers::{column, query, query_request}; + use dc_api_types::{Argument, QueryResponse}; + use mongodb::bson::{bson, doc}; + use mongodb_support::BsonScalarType as S; + use pretty_assertions::assert_eq; + use serde_json::{from_value, json}; + + use crate::{ + mongodb::test_helpers::mock_aggregate_response_for_pipeline, query::execute_query_request, + }; + + #[tokio::test] + async fn executes_native_query() -> Result<(), anyhow::Error> { + let native_query = NativeQuery { + representation: NativeQueryRepresentation::Collection, + arguments: [ + ( + "filter".to_string(), + ObjectField { + r#type: Type::ExtendedJSON, + description: None, + }, + ), + ( + "queryVector".to_string(), + ObjectField { + r#type: Type::ArrayOf(Box::new(Type::Scalar(S::Double))), + description: None, + }, + ), + ( + "numCandidates".to_string(), + ObjectField { + r#type: Type::Scalar(S::Int), + description: None, + }, + ), + ( + "limit".to_string(), + ObjectField { + r#type: Type::Scalar(S::Int), + description: None, + }, + ), + ] + .into(), + result_document_type: "VectorResult".to_owned(), + pipeline: vec![doc! { + "$vectorSearch": { + "index": "movie-vector-index", + "path": "plot_embedding", + "filter": "{{ filter }}", + "queryVector": "{{ queryVector }}", + "numCandidates": "{{ numCandidates }}", + "limit": "{{ limit }}" + } + }], + description: None, + }; + + let object_types = [( + "VectorResult".to_owned(), + ObjectType { + description: None, + fields: [ + ( + "_id".to_owned(), + ObjectField { + r#type: Type::Scalar(S::ObjectId), + description: None, + }, + ), + ( + "title".to_owned(), + ObjectField { + r#type: Type::Scalar(S::ObjectId), + description: None, + }, + ), + ( + "genres".to_owned(), + ObjectField { + r#type: Type::ArrayOf(Box::new(Type::Scalar(S::String))), + description: None, + }, + ), + ( + "year".to_owned(), + ObjectField { + r#type: Type::Scalar(S::Int), + description: None, + }, + ), + ] + .into(), + }, + )] + .into(); + + let config = Configuration { + native_queries: [("vectorSearch".to_owned(), native_query.clone())].into(), + object_types, + collections: Default::default(), + functions: Default::default(), + procedures: Default::default(), + native_procedures: Default::default(), + }; + + let request = query_request() + .target_with_arguments( + ["vectorSearch"], + [ + ( + "filter", + Argument::Literal { + value: json!({ + "$and": [ + { + "genres": { + "$nin": [ + "Drama", "Western", "Crime" + ], + "$in": [ + "Action", "Adventure", "Family" + ] + } + }, { + "year": { "$gte": 1960, "$lte": 2000 } + } + ] + }), + }, + ), + ( + "queryVector", + Argument::Literal { + value: json!([-0.020156775, -0.024996493, 0.010778184]), + }, + ), + ("numCandidates", Argument::Literal { value: json!(200) }), + ("limit", Argument::Literal { value: json!(10) }), + ], + ) + .query(query().fields([ + column!("title": "String"), + column!("genres": "String"), + column!("year": "String"), + ])) + .into(); + + let expected_pipeline = bson!([ + { + "$vectorSearch": { + "index": "movie-vector-index", + "path": "plot_embedding", + "filter": { + "$and": [ + { + "genres": { + "$nin": [ + "Drama", "Western", "Crime" + ], + "$in": [ + "Action", "Adventure", "Family" + ] + } + }, { + "year": { "$gte": 1960, "$lte": 2000 } + } + ] + }, + "queryVector": [-0.020156775, -0.024996493, 0.010778184], + "numCandidates": 200, + "limit": 10, + } + }, + { + "$replaceWith": { + "title": { "$ifNull": ["$title", null] }, + "year": { "$ifNull": ["$year", null] }, + "genres": { "$ifNull": ["$genres", null] }, + } + }, + ]); + + let expected_response: QueryResponse = from_value(json!({ + "rows": [ + { "title": "Beau Geste", "year": 1926, "genres": ["Action", "Adventure", "Drama"] }, + { "title": "For Heaven's Sake", "year": 1926, "genres": ["Action", "Comedy", "Romance"] }, + ], + }))?; + + let db = mock_aggregate_response_for_pipeline( + expected_pipeline, + bson!([ + { "title": "Beau Geste", "year": 1926, "genres": ["Action", "Adventure", "Drama"] }, + { "title": "For Heaven's Sake", "year": 1926, "genres": ["Action", "Comedy", "Romance"] }, + ]), + ); + + let result = execute_query_request(db, &config, request).await?; + assert_eq!(expected_response, result); + Ok(()) + } +} diff --git a/crates/mongodb-agent-common/src/query/pipeline.rs b/crates/mongodb-agent-common/src/query/pipeline.rs index 246bd554..d105b1d9 100644 --- a/crates/mongodb-agent-common/src/query/pipeline.rs +++ b/crates/mongodb-agent-common/src/query/pipeline.rs @@ -1,5 +1,6 @@ use std::collections::BTreeMap; +use configuration::Configuration; use dc_api_types::{Aggregate, Query, QueryRequest, VariableSet}; use mongodb::bson::{self, doc, Bson}; @@ -13,6 +14,7 @@ use super::{ constants::{RESULT_FIELD, ROWS_FIELD}, foreach::{foreach_variants, pipeline_for_foreach}, make_selector, make_sort, + native_query::pipeline_for_native_query, relations::pipeline_for_relations, }; @@ -46,13 +48,14 @@ pub fn is_response_faceted(query: &Query) -> bool { /// Returns a pipeline paired with a value that indicates whether the response requires /// post-processing in the agent. pub fn pipeline_for_query_request( + config: &Configuration, query_request: &QueryRequest, ) -> Result<(Pipeline, ResponseShape), MongoAgentError> { let foreach = foreach_variants(query_request); if let Some(foreach) = foreach { - pipeline_for_foreach(foreach, query_request) + pipeline_for_foreach(foreach, config, query_request) } else { - pipeline_for_non_foreach(None, query_request) + pipeline_for_non_foreach(config, None, query_request) } } @@ -62,6 +65,7 @@ pub fn pipeline_for_query_request( /// Returns a pipeline paired with a value that indicates whether the response requires /// post-processing in the agent. pub fn pipeline_for_non_foreach( + config: &Configuration, variables: Option<&VariableSet>, query_request: &QueryRequest, ) -> Result<(Pipeline, ResponseShape), MongoAgentError> { @@ -72,8 +76,13 @@ pub fn pipeline_for_non_foreach( r#where, .. } = query; + let mut pipeline = Pipeline::empty(); + + // If this is a native query then we start with the native query's pipeline + pipeline.append(pipeline_for_native_query(config, variables, query_request)?); + // Stages common to aggregate and row queries. - let mut pipeline = pipeline_for_relations(variables, query_request)?; + pipeline.append(pipeline_for_relations(config, variables, query_request)?); let match_stage = r#where .as_ref() diff --git a/crates/mongodb-agent-common/src/query/query_target.rs b/crates/mongodb-agent-common/src/query/query_target.rs new file mode 100644 index 00000000..937365ec --- /dev/null +++ b/crates/mongodb-agent-common/src/query/query_target.rs @@ -0,0 +1,41 @@ +use std::{collections::HashMap, fmt::Display}; + +use configuration::{native_query::NativeQuery, Configuration}; +use dc_api_types::{Argument, QueryRequest}; + +#[derive(Clone, Debug)] +pub enum QueryTarget<'a> { + Collection(String), + NativeQuery { + name: String, + native_query: &'a NativeQuery, + arguments: &'a HashMap, + }, +} + +impl QueryTarget<'_> { + pub fn for_request<'a>( + config: &'a Configuration, + query_request: &'a QueryRequest, + ) -> QueryTarget<'a> { + let target = &query_request.target; + let target_name = target.name().join("."); + match config.native_queries.get(&target_name) { + Some(native_query) => QueryTarget::NativeQuery { + name: target_name, + native_query, + arguments: target.arguments(), + }, + None => QueryTarget::Collection(target_name), + } + } +} + +impl Display for QueryTarget<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + QueryTarget::Collection(collection_name) => write!(f, "Collection({collection_name})"), + QueryTarget::NativeQuery { name, .. } => write!(f, "NativeQuery({name})"), + } + } +} diff --git a/crates/mongodb-agent-common/src/query/relations.rs b/crates/mongodb-agent-common/src/query/relations.rs index 9cb11481..c6bc918c 100644 --- a/crates/mongodb-agent-common/src/query/relations.rs +++ b/crates/mongodb-agent-common/src/query/relations.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use anyhow::anyhow; +use configuration::Configuration; use dc_api_types::comparison_column::ColumnSelector; use dc_api_types::relationship::ColumnMapping; use dc_api_types::{Field, QueryRequest, Relationship, VariableSet}; @@ -16,6 +17,7 @@ use crate::{ use super::pipeline::pipeline_for_non_foreach; pub fn pipeline_for_relations( + config: &Configuration, variables: Option<&VariableSet>, query_request: &QueryRequest, ) -> Result { @@ -45,12 +47,13 @@ pub fn pipeline_for_relations( }) .unwrap_or(&empty_relation_map); - let stages = lookups_for_fields(query_request, variables, relationships, &[], fields)?; + let stages = lookups_for_fields(config, query_request, variables, relationships, &[], fields)?; Ok(Pipeline::new(stages)) } /// Produces $lookup stages for any necessary joins fn lookups_for_fields( + config: &Configuration, query_request: &QueryRequest, variables: Option<&VariableSet>, relationships: &HashMap, @@ -61,6 +64,7 @@ fn lookups_for_fields( .iter() .map(|(field_name, field)| { lookups_for_field( + config, query_request, variables, relationships, @@ -78,6 +82,7 @@ fn lookups_for_fields( /// Produces $lookup stages for any necessary joins fn lookups_for_field( + config: &Configuration, query_request: &QueryRequest, variables: Option<&VariableSet>, relationships: &HashMap, @@ -91,6 +96,7 @@ fn lookups_for_field( let nested_parent_columns = append_to_path(parent_columns, column); let fields = query.fields.clone().flatten().unwrap_or_default(); lookups_for_fields( + config, query_request, variables, relationships, @@ -107,6 +113,7 @@ fn lookups_for_field( offset: _, r#where: _, } => lookups_for_field( + config, query_request, variables, relationships, @@ -132,6 +139,7 @@ fn lookups_for_field( // Recursively build pipeline according to relation query let (lookup_pipeline, _) = pipeline_for_non_foreach( + config, variables, &QueryRequest { query: query.clone(), @@ -236,12 +244,12 @@ where #[cfg(test)] mod tests { use dc_api_types::{QueryRequest, QueryResponse}; - use mongodb::{bson::doc, options::AggregateOptions}; + use mongodb::bson::{bson, Bson}; use pretty_assertions::assert_eq; - use serde_json::{from_value, json, to_value}; + use serde_json::{from_value, json}; use super::super::execute_query_request; - use crate::mongodb::{test_helpers::mock_stream, MockCollectionTrait}; + use crate::mongodb::test_helpers::mock_collection_aggregate_response_for_pipeline; #[tokio::test] async fn looks_up_an_array_relation() -> Result<(), anyhow::Error> { @@ -285,7 +293,7 @@ mod tests { ], }))?; - let expected_pipeline = json!([ + let expected_pipeline = bson!([ { "$lookup": { "from": "students", @@ -319,21 +327,19 @@ mod tests { }, ]); - let mut collection = MockCollectionTrait::new(); - collection - .expect_aggregate() - .returning(move |pipeline, _: Option| { - assert_eq!(expected_pipeline, to_value(pipeline).unwrap()); - Ok(mock_stream(vec![Ok(doc! { - "class_title": "MongoDB 101", - "students": [ - { "student_name": "Alice" }, - { "student_name": "Bob" }, - ], - })])) - }); + let db = mock_collection_aggregate_response_for_pipeline( + "classes", + expected_pipeline, + bson!([{ + "class_title": "MongoDB 101", + "students": [ + { "student_name": "Alice" }, + { "student_name": "Bob" }, + ], + }]), + ); - let result = execute_query_request(&collection, query_request).await?; + let result = execute_query_request(db, &Default::default(), query_request).await?; assert_eq!(expected_response, result); Ok(()) @@ -382,7 +388,7 @@ mod tests { ], }))?; - let expected_pipeline = json!([ + let expected_pipeline = bson!([ { "$lookup": { "from": "classes", @@ -414,24 +420,22 @@ mod tests { }, ]); - let mut collection = MockCollectionTrait::new(); - collection - .expect_aggregate() - .returning(move |pipeline, _: Option| { - assert_eq!(expected_pipeline, to_value(pipeline).unwrap()); - Ok(mock_stream(vec![ - Ok(doc! { - "student_name": "Alice", - "class": { "class_title": "MongoDB 101" }, - }), - Ok(doc! { - "student_name": "Bob", - "class": { "class_title": "MongoDB 101" }, - }), - ])) - }); - - let result = execute_query_request(&collection, query_request).await?; + let db = mock_collection_aggregate_response_for_pipeline( + "students", + expected_pipeline, + bson!([ + { + "student_name": "Alice", + "class": { "class_title": "MongoDB 101" }, + }, + { + "student_name": "Bob", + "class": { "class_title": "MongoDB 101" }, + }, + ]), + ); + + let result = execute_query_request(db, &Default::default(), query_request).await?; assert_eq!(expected_response, result); Ok(()) @@ -479,7 +483,7 @@ mod tests { ], }))?; - let expected_pipeline = json!([ + let expected_pipeline = bson!([ { "$lookup": { "from": "students", @@ -515,21 +519,19 @@ mod tests { }, ]); - let mut collection = MockCollectionTrait::new(); - collection - .expect_aggregate() - .returning(move |pipeline, _: Option| { - assert_eq!(expected_pipeline, to_value(pipeline).unwrap()); - Ok(mock_stream(vec![Ok(doc! { - "class_title": "MongoDB 101", - "students": [ - { "student_name": "Alice" }, - { "student_name": "Bob" }, - ], - })])) - }); + let db = mock_collection_aggregate_response_for_pipeline( + "classes", + expected_pipeline, + bson!([{ + "class_title": "MongoDB 101", + "students": [ + { "student_name": "Alice" }, + { "student_name": "Bob" }, + ], + }]), + ); - let result = execute_query_request(&collection, query_request).await?; + let result = execute_query_request(db, &Default::default(), query_request).await?; assert_eq!(expected_response, result); Ok(()) @@ -610,7 +612,7 @@ mod tests { ], }))?; - let expected_pipeline = json!([ + let expected_pipeline = bson!([ { "$lookup": { "from": "students", @@ -670,38 +672,36 @@ mod tests { }, ]); - let mut collection = MockCollectionTrait::new(); - collection - .expect_aggregate() - .returning(move |pipeline, _: Option| { - assert_eq!(expected_pipeline, to_value(pipeline).unwrap()); - Ok(mock_stream(vec![Ok(doc! { - "class_title": "MongoDB 101", - "students": { - "rows": [ - { - "student_name": "Alice", - "assignments": { - "rows": [ - { "assignment_title": "read chapter 2" }, - ], - } - }, - { - "student_name": "Bob", - "assignments": { - "rows": [ - { "assignment_title": "JSON Basics" }, - { "assignment_title": "read chapter 2" }, - ], - } - }, - ] - }, - })])) - }); + let db = mock_collection_aggregate_response_for_pipeline( + "classes", + expected_pipeline, + bson!([{ + "class_title": "MongoDB 101", + "students": { + "rows": [ + { + "student_name": "Alice", + "assignments": { + "rows": [ + { "assignment_title": "read chapter 2" }, + ], + } + }, + { + "student_name": "Bob", + "assignments": { + "rows": [ + { "assignment_title": "JSON Basics" }, + { "assignment_title": "read chapter 2" }, + ], + } + }, + ] + }, + }]), + ); - let result = execute_query_request(&collection, query_request).await?; + let result = execute_query_request(db, &Default::default(), query_request).await?; assert_eq!(expected_response, result); Ok(()) @@ -748,7 +748,7 @@ mod tests { ], }))?; - let expected_pipeline = json!([ + let expected_pipeline = bson!([ { "$lookup": { "from": "students", @@ -793,21 +793,19 @@ mod tests { }, ]); - let mut collection = MockCollectionTrait::new(); - collection - .expect_aggregate() - .returning(move |pipeline, _: Option| { - assert_eq!(expected_pipeline, to_value(pipeline).unwrap()); - Ok(mock_stream(vec![Ok(doc! { - "students_aggregate": { - "aggregates": { - "aggregate_count": 2, - }, + let db = mock_collection_aggregate_response_for_pipeline( + "classes", + expected_pipeline, + bson!([{ + "students_aggregate": { + "aggregates": { + "aggregate_count": 2, }, - })])) - }); + }, + }]), + ); - let result = execute_query_request(&collection, query_request).await?; + let result = execute_query_request(db, &Default::default(), query_request).await?; assert_eq!(expected_response, result); Ok(()) @@ -880,7 +878,7 @@ mod tests { }] }))?; - let expected_pipeline = json!([ + let expected_pipeline = bson!([ { "$lookup": { "from": "movies", @@ -911,7 +909,7 @@ mod tests { } }, { - "$limit": 50 + "$limit": Bson::Int64(50), }, { "$replaceWith": { @@ -927,21 +925,19 @@ mod tests { }, ]); - let mut collection = MockCollectionTrait::new(); - collection - .expect_aggregate() - .returning(move |pipeline, _: Option| { - assert_eq!(expected_pipeline, to_value(pipeline).unwrap()); - Ok(mock_stream(vec![Ok(doc! { - "name": "Mercedes Tyler", - "movie": { "rows": [{ - "title": "The Land Beyond the Sunset", - "year": 1912 - }] }, - })])) - }); - - let result = execute_query_request(&collection, query_request).await?; + let db = mock_collection_aggregate_response_for_pipeline( + "comments", + expected_pipeline, + bson!([{ + "name": "Mercedes Tyler", + "movie": { "rows": [{ + "title": "The Land Beyond the Sunset", + "year": 1912 + }] }, + }]), + ); + + let result = execute_query_request(db, &Default::default(), query_request).await?; assert_eq!(expected_response, result); Ok(()) @@ -1019,7 +1015,7 @@ mod tests { }] }))?; - let expected_pipeline = json!([ + let expected_pipeline = bson!([ { "$lookup": { "from": "movies", @@ -1055,7 +1051,7 @@ mod tests { } }, { - "$limit": 50 + "$limit": Bson::Int64(50), }, { "$replaceWith": { @@ -1071,22 +1067,20 @@ mod tests { }, ]); - let mut collection = MockCollectionTrait::new(); - collection - .expect_aggregate() - .returning(move |pipeline, _: Option| { - assert_eq!(expected_pipeline, to_value(pipeline).unwrap()); - Ok(mock_stream(vec![Ok(doc! { - "name": "Beric Dondarrion", - "movie": { "rows": [{ - "credits": { - "director": "Martin Scorsese" - } - }] }, - })])) - }); + let db = mock_collection_aggregate_response_for_pipeline( + "comments", + expected_pipeline, + bson!([{ + "name": "Beric Dondarrion", + "movie": { "rows": [{ + "credits": { + "director": "Martin Scorsese" + } + }] }, + }]), + ); - let result = execute_query_request(&collection, query_request).await?; + let result = execute_query_request(db, &Default::default(), query_request).await?; assert_eq!(expected_response, result); Ok(()) diff --git a/crates/mongodb-agent-common/src/schema.rs b/crates/mongodb-agent-common/src/schema.rs index a1acd963..26fd6845 100644 --- a/crates/mongodb-agent-common/src/schema.rs +++ b/crates/mongodb-agent-common/src/schema.rs @@ -1,205 +1,7 @@ -use dc_api_types::GqlName; -use dc_api_types::{ - ColumnInfo, ColumnType, ObjectTypeDefinition, SchemaResponse, TableInfo, TableType, -}; -use futures_util::{StreamExt, TryStreamExt}; use indexmap::IndexMap; -use mongodb::bson::from_bson; -use mongodb::results::CollectionType; use mongodb_support::{BsonScalarType, BsonType}; use serde::Deserialize; -use crate::interface_types::{MongoAgentError, MongoConfig}; - -pub async fn get_schema(config: &MongoConfig) -> Result { - tracing::debug!(?config, "get_schema"); - - let db = config.client.database(&config.database); - let collections_cursor = db.list_collections(None, None).await?; - - let (object_types, tables) = collections_cursor - .into_stream() - .map( - |collection_spec| -> Result<(Vec, TableInfo), MongoAgentError> { - let collection_spec_value = collection_spec?; - let name = &collection_spec_value.name; - let collection_type = &collection_spec_value.collection_type; - let schema_bson_option = collection_spec_value - .options - .validator - .as_ref() - .and_then(|x| x.get("$jsonSchema")); - - let table_info = match schema_bson_option { - Some(schema_bson) => { - from_bson::(schema_bson.clone()).map_err(|err| { - MongoAgentError::BadCollectionSchema( - name.to_owned(), - schema_bson.clone(), - err, - ) - }) - } - None => Ok(ValidatorSchema { - bson_type: BsonType::Object, - description: None, - required: Vec::new(), - properties: IndexMap::new(), - }), - } - .map(|validator_schema| { - make_table_info(name, collection_type, &validator_schema) - }); - tracing::debug!( - validator = %serde_json::to_string(&schema_bson_option).unwrap(), - table_info = %table_info.as_ref().map(|(_, info)| serde_json::to_string(&info).unwrap()).unwrap_or("null".to_owned()), - ); - table_info - }, - ) - .try_collect::<(Vec>, Vec)>() - .await?; - - Ok(SchemaResponse { - tables, - object_types: object_types.concat(), - }) -} - -fn make_table_info( - collection_name: &str, - collection_type: &CollectionType, - validator_schema: &ValidatorSchema, -) -> (Vec, TableInfo) { - let properties = &validator_schema.properties; - let required_labels = &validator_schema.required; - - let (object_type_defs, column_infos) = { - let type_prefix = format!("{collection_name}_"); - let id_column = ColumnInfo { - name: "_id".to_string(), - r#type: ColumnType::Scalar(BsonScalarType::ObjectId.graphql_name()), - nullable: false, - description: Some(Some("primary key _id".to_string())), - insertable: Some(false), - updatable: Some(false), - value_generated: None, - }; - let (object_type_defs, mut columns_infos): ( - Vec>, - Vec, - ) = properties - .iter() - .map(|prop| make_column_info(&type_prefix, required_labels, prop)) - .unzip(); - if !columns_infos.iter().any(|info| info.name == "_id") { - // There should always be an _id column, so add it unless it was already specified in - // the validator. - columns_infos.push(id_column); - } - (object_type_defs.concat(), columns_infos) - }; - - let table_info = TableInfo { - name: vec![collection_name.to_string()], - r#type: if collection_type == &CollectionType::View { - Some(TableType::View) - } else { - Some(TableType::Table) - }, - columns: column_infos, - primary_key: Some(vec!["_id".to_string()]), - foreign_keys: None, - description: validator_schema.description.clone().map(Some), - // Since we don't support mutations nothing is insertable, updatable, or deletable - insertable: Some(false), - updatable: Some(false), - deletable: Some(false), - }; - (object_type_defs, table_info) -} - -fn make_column_info( - type_prefix: &str, - required_labels: &[String], - (column_name, column_schema): (&String, &Property), -) -> (Vec, ColumnInfo) { - let description = get_property_description(column_schema); - - let object_type_name = format!("{type_prefix}{column_name}"); - let (collected_otds, column_type) = make_column_type(&object_type_name, column_schema); - - let column_info = ColumnInfo { - name: column_name.clone(), - r#type: column_type, - nullable: !required_labels.contains(column_name), - description: description.map(Some), - // Since we don't support mutations nothing is insertable, updatable, or deletable - insertable: Some(false), - updatable: Some(false), - value_generated: None, - }; - - (collected_otds, column_info) -} - -fn make_column_type( - object_type_name: &str, - column_schema: &Property, -) -> (Vec, ColumnType) { - let mut collected_otds: Vec = vec![]; - - match column_schema { - Property::Object { - bson_type: _, - description: _, - required, - properties, - } => { - let type_prefix = format!("{object_type_name}_"); - let (otds, otd_columns): (Vec>, Vec) = properties - .iter() - .map(|prop| make_column_info(&type_prefix, required, prop)) - .unzip(); - - let object_type_definition = ObjectTypeDefinition { - name: GqlName::from(object_type_name).into_owned(), - description: Some("generated from MongoDB validation schema".to_string()), - columns: otd_columns, - }; - - collected_otds.append(&mut otds.concat()); - collected_otds.push(object_type_definition); - - ( - collected_otds, - ColumnType::Object(GqlName::from(object_type_name).into_owned()), - ) - } - Property::Array { - bson_type: _, - description: _, - items, - } => { - let item_schemas = *items.clone(); - - let (mut otds, element_type) = make_column_type(object_type_name, &item_schemas); - let column_type = ColumnType::Array { - element_type: Box::new(element_type), - nullable: false, - }; - - collected_otds.append(&mut otds); - - (collected_otds, column_type) - } - Property::Scalar { - bson_type, - description: _, - } => (collected_otds, ColumnType::Scalar(bson_type.graphql_name())), - } -} - #[derive(Debug, Deserialize)] #[cfg_attr(test, derive(PartialEq))] pub struct ValidatorSchema { diff --git a/crates/mongodb-agent-common/src/state.rs b/crates/mongodb-agent-common/src/state.rs index 692fcbbb..7875c7ab 100644 --- a/crates/mongodb-agent-common/src/state.rs +++ b/crates/mongodb-agent-common/src/state.rs @@ -1,25 +1,36 @@ use std::{env, error::Error}; use anyhow::anyhow; -use configuration::Configuration; +use mongodb::{Client, Database}; -use crate::{interface_types::MongoConfig, mongodb_connection::get_mongodb_client}; +use crate::mongodb_connection::get_mongodb_client; pub const DATABASE_URI_ENV_VAR: &str = "MONGODB_DATABASE_URI"; +#[derive(Clone, Debug)] +pub struct ConnectorState { + client: Client, + + /// Name of the database to connect to + database: String, +} + +impl ConnectorState { + pub fn database(&self) -> Database { + self.client.database(&self.database) + } +} + /// Reads database connection URI from environment variable -pub async fn try_init_state( - configuration: &Configuration, -) -> Result> { +pub async fn try_init_state() -> Result> { // Splitting this out of the `Connector` impl makes error translation easier let database_uri = env::var(DATABASE_URI_ENV_VAR)?; - try_init_state_from_uri(&database_uri, configuration).await + try_init_state_from_uri(&database_uri).await } pub async fn try_init_state_from_uri( database_uri: &str, - configuration: &Configuration, -) -> Result> { +) -> Result> { let client = get_mongodb_client(database_uri).await?; let database_name = match client.default_database() { Some(database) => Ok(database.name().to_owned()), @@ -27,13 +38,8 @@ pub async fn try_init_state_from_uri( "${DATABASE_URI_ENV_VAR} environment variable must include a database" )), }?; - Ok(MongoConfig { + Ok(ConnectorState { client, database: database_name, - native_procedures: configuration.native_procedures.clone(), - object_types: configuration - .object_types() - .map(|(name, object_type)| (name.clone(), object_type.clone())) - .collect(), }) } diff --git a/crates/mongodb-connector/Cargo.toml b/crates/mongodb-connector/Cargo.toml index e89e8392..2ab44609 100644 --- a/crates/mongodb-connector/Cargo.toml +++ b/crates/mongodb-connector/Cargo.toml @@ -18,7 +18,7 @@ lazy_static = "^1.4.0" mongodb = "2.8" mongodb-agent-common = { path = "../mongodb-agent-common" } mongodb-support = { path = "../mongodb-support" } -ndc-sdk = { git = "https://github.com/hasura/ndc-sdk-rs.git" } +ndc-sdk = { workspace = true } prometheus = "*" # share version from ndc-sdk serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0", features = ["preserve_order"] } diff --git a/crates/mongodb-connector/src/api_type_conversions/conversion_error.rs b/crates/mongodb-connector/src/api_type_conversions/conversion_error.rs index 2ef88b9a..4f34c8ca 100644 --- a/crates/mongodb-connector/src/api_type_conversions/conversion_error.rs +++ b/crates/mongodb-connector/src/api_type_conversions/conversion_error.rs @@ -6,6 +6,9 @@ pub enum ConversionError { #[error("The connector does not yet support {0}")] NotImplemented(&'static str), + #[error("The target of the query, {0}, is a function whose result type is not an object type")] + RootTypeIsNotObject(String), + #[error("{0}")] TypeMismatch(String), diff --git a/crates/mongodb-connector/src/api_type_conversions/query_request.rs b/crates/mongodb-connector/src/api_type_conversions/query_request.rs index 7a9c4759..24e1d6ad 100644 --- a/crates/mongodb-connector/src/api_type_conversions/query_request.rs +++ b/crates/mongodb-connector/src/api_type_conversions/query_request.rs @@ -1,9 +1,12 @@ -use std::collections::{BTreeMap, HashMap}; +use std::{ + borrow::Cow, + collections::{BTreeMap, HashMap}, +}; -use configuration::{schema, Schema, WithNameRef}; +use configuration::{schema, WithNameRef}; use dc_api_types::{self as v2, ColumnSelector, Target}; use indexmap::IndexMap; -use itertools::Itertools; +use itertools::Itertools as _; use ndc_sdk::models::{self as v3}; use super::{ @@ -14,20 +17,35 @@ use super::{ #[derive(Clone, Debug)] pub struct QueryContext<'a> { - pub functions: Vec, - pub scalar_types: &'a BTreeMap, - pub schema: &'a Schema, + pub collections: Cow<'a, BTreeMap>, + pub functions: Cow<'a, BTreeMap>, + pub object_types: Cow<'a, BTreeMap>, + pub scalar_types: Cow<'a, BTreeMap>, } impl QueryContext<'_> { fn find_collection( &self, collection_name: &str, - ) -> Result<&schema::Collection, ConversionError> { - self.schema - .collections - .get(collection_name) - .ok_or_else(|| ConversionError::UnknownCollection(collection_name.to_string())) + ) -> Result<&v3::CollectionInfo, ConversionError> { + if let Some(collection) = self.collections.get(collection_name) { + return Ok(collection); + } + if let Some((_, function)) = self.functions.get(collection_name) { + return Ok(function); + } + + Err(ConversionError::UnknownCollection( + collection_name.to_string(), + )) + } + + fn find_collection_object_type( + &self, + collection_name: &str, + ) -> Result, ConversionError> { + let collection = self.find_collection(collection_name)?; + self.find_object_type(&collection.collection_type) } fn find_object_type<'a>( @@ -35,7 +53,6 @@ impl QueryContext<'_> { object_type_name: &'a str, ) -> Result, ConversionError> { let object_type = self - .schema .object_types .get(object_type_name) .ok_or_else(|| ConversionError::UnknownObjectType(object_type_name.to_string()))?; @@ -96,13 +113,13 @@ pub fn v3_to_v2_query_request( context: &QueryContext, request: v3::QueryRequest, ) -> Result { - let collection = context.find_collection(&request.collection)?; - let collection_object_type = context.find_object_type(&collection.r#type)?; + let collection_object_type = context.find_collection_object_type(&request.collection)?; Ok(v2::QueryRequest { relationships: v3_to_v2_relationships(&request)?, target: Target::TTable { name: vec![request.collection], + arguments: v3_to_v2_arguments(request.arguments.clone()), }, query: Box::new(v3_to_v2_query( context, @@ -325,8 +342,8 @@ fn v3_to_v2_field( arguments: _, } => { let v3_relationship = lookup_relationship(collection_relationships, &relationship)?; - let collection = context.find_collection(&v3_relationship.target_collection)?; - let collection_object_type = context.find_object_type(&collection.r#type)?; + let collection_object_type = + context.find_collection_object_type(&v3_relationship.target_collection)?; Ok(v2::Field::Relationship { query: Box::new(v3_to_v2_query( context, @@ -427,9 +444,7 @@ fn v3_to_v2_order_by_element( collection_relationships, &last_path_element.relationship, )?; - let target_collection = - context.find_collection(&relationship.target_collection)?; - context.find_object_type(&target_collection.r#type) + context.find_collection_object_type(&relationship.target_collection) }) .transpose()?; let target_object_type = end_of_relationship_path_object_type @@ -502,9 +517,8 @@ fn v3_to_v2_target_path_step>( .map(|expression| { let v3_relationship = lookup_relationship(collection_relationships, &path_element.relationship)?; - let target_collection = - context.find_collection(&v3_relationship.target_collection)?; - let target_object_type = context.find_object_type(&target_collection.r#type)?; + let target_object_type = + context.find_collection_object_type(&v3_relationship.target_collection)?; let v2_expression = v3_to_v2_expression( context, collection_relationships, @@ -571,16 +585,17 @@ fn v3_to_v2_relationships( }) => Some((collection, relationship, arguments)), _ => None, }) - .map_ok(|(collection_name, relationship_name, _arguments)| { + .map_ok(|(collection_name, relationship_name, arguments)| { let v3_relationship = lookup_relationship( &query_request.collection_relationships, relationship_name, )?; - // TODO: Add an `arguments` field to v2::Relationship and populate it here. (MVC-3) - // I think it's possible that the same relationship might appear multiple time with - // different arguments, so we may want to make some change to relationship names to - // avoid overwriting in such a case. -Jesse + // TODO: Functions (native queries) may be referenced multiple times in a query + // request with different arguments. To accommodate that we will need to record + // separate v2 relations for each reference with different names. In the current + // implementation one set of arguments will override arguments to all occurrences of + // a given function. MDB-106 let v2_relationship = v2::Relationship { column_mapping: v2::ColumnMapping( v3_relationship @@ -600,6 +615,7 @@ fn v3_to_v2_relationships( }, target: v2::Target::TTable { name: vec![v3_relationship.target_collection.clone()], + arguments: v3_to_v2_relationship_arguments(arguments.clone()), }, }; @@ -713,9 +729,8 @@ fn v3_to_v2_expression( } => { let v3_relationship = lookup_relationship(collection_relationships, &relationship)?; - let v3_collection = - context.find_collection(&v3_relationship.target_collection)?; - let collection_object_type = context.find_object_type(&v3_collection.r#type)?; + let collection_object_type = + context.find_collection_object_type(&v3_relationship.target_collection)?; let in_table = v2::ExistsInTable::RelatedTable { relationship }; Ok((in_table, collection_object_type)) } @@ -723,8 +738,8 @@ fn v3_to_v2_expression( collection, arguments: _, } => { - let v3_collection = context.find_collection(&collection)?; - let collection_object_type = context.find_object_type(&v3_collection.r#type)?; + let collection_object_type = + context.find_collection_object_type(&collection)?; let in_table = v2::ExistsInTable::UnrelatedTable { table: vec![collection], }; @@ -863,16 +878,48 @@ where n.map(|input| Some(input.into())) } +fn v3_to_v2_arguments(arguments: BTreeMap) -> HashMap { + arguments + .into_iter() + .map(|(argument_name, argument)| match argument { + v3::Argument::Variable { name } => (argument_name, v2::Argument::Variable { name }), + v3::Argument::Literal { value } => (argument_name, v2::Argument::Literal { value }), + }) + .collect() +} + +fn v3_to_v2_relationship_arguments( + arguments: BTreeMap, +) -> HashMap { + arguments + .into_iter() + .map(|(argument_name, argument)| match argument { + v3::RelationshipArgument::Variable { name } => { + (argument_name, v2::Argument::Variable { name }) + } + v3::RelationshipArgument::Literal { value } => { + (argument_name, v2::Argument::Literal { value }) + } + v3::RelationshipArgument::Column { name } => { + (argument_name, v2::Argument::Column { name }) + } + }) + .collect() +} + #[cfg(test)] mod tests { - use std::collections::{BTreeMap, HashMap}; + use std::{ + borrow::Cow, + collections::{BTreeMap, HashMap}, + }; - use configuration::{schema, Schema}; + use configuration::schema; use dc_api_test_helpers::{self as v2, source, table_relationships, target}; use mongodb_support::BsonScalarType; use ndc_sdk::models::{ - AggregateFunctionDefinition, ComparisonOperatorDefinition, OrderByElement, OrderByTarget, - OrderDirection, ScalarType, Type, TypeRepresentation, + self as v3, AggregateFunctionDefinition, ComparisonOperatorDefinition, OrderByElement, + OrderByTarget, OrderDirection, ScalarType, Type, TypeRepresentation, }; use ndc_test_helpers::*; use pretty_assertions::assert_eq; @@ -1022,13 +1069,7 @@ mod tests { #[test] fn translates_root_column_references() -> Result<(), anyhow::Error> { - let scalar_types = make_scalar_types(); - let schema = make_flat_schema(); - let query_context = QueryContext { - functions: vec![], - scalar_types: &scalar_types, - schema: &schema, - }; + let query_context = make_flat_schema(); let query = query_request() .collection("authors") .query(query().fields([field!("last_name")]).predicate(exists( @@ -1069,13 +1110,7 @@ mod tests { #[test] fn translates_aggregate_selections() -> Result<(), anyhow::Error> { - let scalar_types = make_scalar_types(); - let schema = make_flat_schema(); - let query_context = QueryContext { - functions: vec![], - scalar_types: &scalar_types, - schema: &schema, - }; + let query_context = make_flat_schema(); let query = query_request() .collection("authors") .query(query().aggregates([ @@ -1101,13 +1136,7 @@ mod tests { #[test] fn translates_relationships_in_fields_predicates_and_orderings() -> Result<(), anyhow::Error> { - let scalar_types = make_scalar_types(); - let schema = make_flat_schema(); - let query_context = QueryContext { - functions: vec![], - scalar_types: &scalar_types, - schema: &schema, - }; + let query_context = make_flat_schema(); let query = query_request() .collection("authors") .query( @@ -1217,13 +1246,7 @@ mod tests { #[test] fn translates_nested_fields() -> Result<(), anyhow::Error> { - let scalar_types = make_scalar_types(); - let schema = make_nested_schema(); - let query_context = QueryContext { - functions: vec![], - scalar_types: &scalar_types, - schema: &schema, - }; + let query_context = make_nested_schema(); let query_request = query_request() .collection("authors") .query(query().fields([ @@ -1288,25 +1311,34 @@ mod tests { ]) } - fn make_flat_schema() -> Schema { - Schema { - collections: BTreeMap::from([ + fn make_flat_schema() -> QueryContext<'static> { + QueryContext { + collections: Cow::Owned(BTreeMap::from([ ( "authors".into(), - schema::Collection { + v3::CollectionInfo { + name: "authors".to_owned(), description: None, - r#type: "Author".into(), + collection_type: "Author".into(), + arguments: Default::default(), + uniqueness_constraints: make_primary_key_uniqueness_constraint("authors"), + foreign_keys: Default::default(), }, ), ( "articles".into(), - schema::Collection { + v3::CollectionInfo { + name: "articles".to_owned(), description: None, - r#type: "Article".into(), + collection_type: "Article".into(), + arguments: Default::default(), + uniqueness_constraints: make_primary_key_uniqueness_constraint("articles"), + foreign_keys: Default::default(), }, ), - ]), - object_types: BTreeMap::from([ + ])), + functions: Default::default(), + object_types: Cow::Owned(BTreeMap::from([ ( "Author".into(), schema::ObjectType { @@ -1360,20 +1392,26 @@ mod tests { ]), }, ), - ]), + ])), + scalar_types: Cow::Owned(make_scalar_types()), } } - fn make_nested_schema() -> Schema { - Schema { - collections: BTreeMap::from([( + fn make_nested_schema() -> QueryContext<'static> { + QueryContext { + collections: Cow::Owned(BTreeMap::from([( "authors".into(), - schema::Collection { + v3::CollectionInfo { + name: "authors".into(), description: None, - r#type: "Author".into(), + collection_type: "Author".into(), + arguments: Default::default(), + uniqueness_constraints: make_primary_key_uniqueness_constraint("authors"), + foreign_keys: Default::default(), }, - )]), - object_types: BTreeMap::from([ + )])), + functions: Default::default(), + object_types: Cow::Owned(BTreeMap::from([ ( "Author".into(), schema::ObjectType { @@ -1433,7 +1471,20 @@ mod tests { )]), }, ), - ]), + ])), + scalar_types: Cow::Owned(make_scalar_types()), } } + + fn make_primary_key_uniqueness_constraint( + collection_name: &str, + ) -> BTreeMap { + [( + format!("{collection_name}_id"), + v3::UniquenessConstraint { + unique_columns: vec!["_id".to_owned()], + }, + )] + .into() + } } diff --git a/crates/mongodb-connector/src/main.rs b/crates/mongodb-connector/src/main.rs index aadcefad..00071bc7 100644 --- a/crates/mongodb-connector/src/main.rs +++ b/crates/mongodb-connector/src/main.rs @@ -3,6 +3,7 @@ mod capabilities; mod error_mapping; mod mongo_connector; mod mutation; +mod query_context; mod schema; use std::error::Error; diff --git a/crates/mongodb-connector/src/mongo_connector.rs b/crates/mongodb-connector/src/mongo_connector.rs index 9479f947..8705c132 100644 --- a/crates/mongodb-connector/src/mongo_connector.rs +++ b/crates/mongodb-connector/src/mongo_connector.rs @@ -4,8 +4,8 @@ use anyhow::anyhow; use async_trait::async_trait; use configuration::Configuration; use mongodb_agent_common::{ - explain::explain_query, health::check_health, interface_types::MongoConfig, - query::handle_query_request, + explain::explain_query, health::check_health, query::handle_query_request, + state::ConnectorState, }; use ndc_sdk::{ connector::{ @@ -22,10 +22,10 @@ use tracing::instrument; use crate::{ api_type_conversions::{ - v2_to_v3_explain_response, v2_to_v3_query_response, v3_to_v2_query_request, QueryContext, + v2_to_v3_explain_response, v2_to_v3_query_response, v3_to_v2_query_request, }, error_mapping::{mongo_agent_error_to_explain_error, mongo_agent_error_to_query_error}, - schema, + query_context::get_query_context, }; use crate::{capabilities::mongo_capabilities_response, mutation::handle_mutation_request}; @@ -55,10 +55,10 @@ impl ConnectorSetup for MongoConnector { // - `skip_all` omits arguments from the trace async fn try_init_state( &self, - configuration: &Configuration, + _configuration: &Configuration, _metrics: &mut prometheus::Registry, - ) -> Result { - let state = mongodb_agent_common::state::try_init_state(configuration).await?; + ) -> Result { + let state = mongodb_agent_common::state::try_init_state().await?; Ok(state) } } @@ -67,7 +67,7 @@ impl ConnectorSetup for MongoConnector { #[async_trait] impl Connector for MongoConnector { type Configuration = Configuration; - type State = MongoConfig; + type State = ConnectorState; #[instrument(err, skip_all)] fn fetch_metrics( @@ -109,15 +109,8 @@ impl Connector for MongoConnector { state: &Self::State, request: QueryRequest, ) -> Result, ExplainError> { - let v2_request = v3_to_v2_query_request( - &QueryContext { - functions: vec![], - scalar_types: &schema::SCALAR_TYPES, - schema: &configuration.schema, - }, - request, - )?; - let response = explain_query(state, v2_request) + let v2_request = v3_to_v2_query_request(&get_query_context(configuration), request)?; + let response = explain_query(configuration, state, v2_request) .await .map_err(mongo_agent_error_to_explain_error)?; Ok(v2_to_v3_explain_response(response).into()) @@ -136,11 +129,11 @@ impl Connector for MongoConnector { #[instrument(err, skip_all)] async fn mutation( - _configuration: &Self::Configuration, + configuration: &Self::Configuration, state: &Self::State, request: MutationRequest, ) -> Result, MutationError> { - handle_mutation_request(state, request).await + handle_mutation_request(configuration, state, request).await } #[instrument(err, skip_all)] @@ -150,15 +143,8 @@ impl Connector for MongoConnector { request: QueryRequest, ) -> Result, QueryError> { tracing::debug!(query_request = %serde_json::to_string(&request).unwrap(), "received query request"); - let v2_request = v3_to_v2_query_request( - &QueryContext { - functions: vec![], - scalar_types: &schema::SCALAR_TYPES, - schema: &configuration.schema, - }, - request, - )?; - let response = handle_query_request(state, v2_request) + let v2_request = v3_to_v2_query_request(&get_query_context(configuration), request)?; + let response = handle_query_request(configuration, state, v2_request) .await .map_err(mongo_agent_error_to_query_error)?; Ok(v2_to_v3_query_response(response).into()) diff --git a/crates/mongodb-connector/src/mutation.rs b/crates/mongodb-connector/src/mutation.rs index 5525dcb6..9a6ec86e 100644 --- a/crates/mongodb-connector/src/mutation.rs +++ b/crates/mongodb-connector/src/mutation.rs @@ -1,11 +1,11 @@ use std::collections::BTreeMap; -use configuration::schema::ObjectType; +use configuration::{schema::ObjectType, Configuration}; use futures::future::try_join_all; use itertools::Itertools; use mongodb::Database; use mongodb_agent_common::{ - interface_types::MongoConfig, procedure::Procedure, query::serialization::bson_to_json, + procedure::Procedure, query::serialization::bson_to_json, state::ConnectorState, }; use ndc_sdk::{ connector::MutationError, @@ -14,11 +14,12 @@ use ndc_sdk::{ }; pub async fn handle_mutation_request( - config: &MongoConfig, + config: &Configuration, + state: &ConnectorState, mutation_request: MutationRequest, ) -> Result, MutationError> { tracing::debug!(?config, mutation_request = %serde_json::to_string(&mutation_request).unwrap(), "executing mutation"); - let database = config.client.database(&config.database); + let database = state.database(); let jobs = look_up_procedures(config, mutation_request)?; let operation_results = try_join_all( jobs.into_iter() @@ -31,7 +32,7 @@ pub async fn handle_mutation_request( /// Looks up procedures according to the names given in the mutation request, and pairs them with /// arguments and requested fields. Returns an error if any procedures cannot be found. fn look_up_procedures( - config: &MongoConfig, + config: &Configuration, mutation_request: MutationRequest, ) -> Result>, MutationError> { let (procedures, not_found): (Vec, Vec) = mutation_request diff --git a/crates/mongodb-connector/src/query_context.rs b/crates/mongodb-connector/src/query_context.rs new file mode 100644 index 00000000..9ab3ac08 --- /dev/null +++ b/crates/mongodb-connector/src/query_context.rs @@ -0,0 +1,14 @@ +use std::borrow::Cow; + +use crate::{api_type_conversions::QueryContext, schema::SCALAR_TYPES}; +use configuration::Configuration; + +/// Produce a query context from the connector configuration to direct query request processing +pub fn get_query_context(configuration: &Configuration) -> QueryContext<'_> { + QueryContext { + collections: Cow::Borrowed(&configuration.collections), + functions: Cow::Borrowed(&configuration.functions), + object_types: Cow::Borrowed(&configuration.object_types), + scalar_types: Cow::Borrowed(&SCALAR_TYPES), + } +} diff --git a/crates/mongodb-connector/src/schema.rs b/crates/mongodb-connector/src/schema.rs index 368488c2..c843b352 100644 --- a/crates/mongodb-connector/src/schema.rs +++ b/crates/mongodb-connector/src/schema.rs @@ -1,145 +1,25 @@ use lazy_static::lazy_static; -use mongodb_support::BsonScalarType; use std::collections::BTreeMap; -use configuration::{native_procedure::NativeProcedure, schema, Configuration}; -use ndc_sdk::{connector, models}; +use configuration::Configuration; +use ndc_sdk::{connector::SchemaError, models as ndc}; use crate::capabilities; lazy_static! { - pub static ref SCALAR_TYPES: BTreeMap = - capabilities::scalar_types(); -} - -pub async fn get_schema( - config: &Configuration, -) -> Result { - let schema = &config.schema; - let object_types = config.object_types().map(map_object_type).collect(); - let collections = schema.collections.iter().map(|(collection_name, collection)| map_collection(&object_types, collection_name, collection)).collect(); - - let procedures = config - .native_procedures - .iter() - .map(native_procedure_to_procedure) - .collect(); - - Ok(models::SchemaResponse { - collections, - object_types, + pub static ref SCALAR_TYPES: BTreeMap = capabilities::scalar_types(); +} + +pub async fn get_schema(config: &Configuration) -> Result { + Ok(ndc::SchemaResponse { + collections: config.collections.values().cloned().collect(), + functions: config.functions.values().map(|(f, _)| f).cloned().collect(), + procedures: config.procedures.values().cloned().collect(), + object_types: config + .object_types + .iter() + .map(|(name, object_type)| (name.clone(), object_type.clone().into())) + .collect(), scalar_types: SCALAR_TYPES.clone(), - functions: Default::default(), - procedures, }) } - -fn map_object_type( - (name, object_type): (&String, &schema::ObjectType), -) -> (String, models::ObjectType) { - ( - name.clone(), - models::ObjectType { - fields: map_field_infos(&object_type.fields), - description: object_type.description.clone(), - }, - ) -} - -fn map_field_infos( - fields: &BTreeMap, -) -> BTreeMap { - fields - .iter() - .map(|(name, field)| { - ( - name.clone(), - models::ObjectField { - r#type: map_type(&field.r#type), - description: field.description.clone(), - }, - ) - }) - .collect() -} - -fn map_type(t: &schema::Type) -> models::Type { - fn map_normalized_type(t: &schema::Type) -> models::Type { - match t { - // ExtendedJSON can respresent any BSON value, including null, so it is always nullable - schema::Type::ExtendedJSON => models::Type::Nullable { - underlying_type: Box::new(models::Type::Named { - name: mongodb_support::EXTENDED_JSON_TYPE_NAME.to_owned(), - }), - }, - schema::Type::Scalar(t) => models::Type::Named { - name: t.graphql_name(), - }, - schema::Type::Object(t) => models::Type::Named { name: t.clone() }, - schema::Type::ArrayOf(t) => models::Type::Array { - element_type: Box::new(map_normalized_type(t)), - }, - schema::Type::Nullable(t) => models::Type::Nullable { - underlying_type: Box::new(map_normalized_type(t)), - }, - } - } - map_normalized_type(&t.clone().normalize_type()) -} - -fn get_primary_key_uniqueness_constraint(object_types: &BTreeMap, name: &str, collection: &schema::Collection) -> Option<(String, models::UniquenessConstraint)> { - // Check to make sure our collection's object type contains the _id objectid field - // If it doesn't (should never happen, all collections need an _id column), don't generate the constraint - let object_type = object_types.get(&collection.r#type)?; - let id_field = object_type.fields.get("_id")?; - match &id_field.r#type { - models::Type::Named { name } => { - if *name == BsonScalarType::ObjectId.graphql_name() { Some(()) } else { None } - }, - models::Type::Nullable { .. } => None, - models::Type::Array { .. } => None, - models::Type::Predicate { .. } => None, - }?; - let uniqueness_constraint = models::UniquenessConstraint { - unique_columns: vec!["_id".into()] - }; - let constraint_name = format!("{}_id", name); - Some((constraint_name, uniqueness_constraint)) -} - -fn map_collection(object_types: &BTreeMap, name: &str, collection: &schema::Collection) -> models::CollectionInfo { - let pk_constraint = get_primary_key_uniqueness_constraint(object_types, name, collection); - - models::CollectionInfo { - name: name.to_owned(), - collection_type: collection.r#type.clone(), - description: collection.description.clone(), - arguments: Default::default(), - foreign_keys: Default::default(), - uniqueness_constraints: BTreeMap::from_iter(pk_constraint), - } -} - -fn native_procedure_to_procedure( - (procedure_name, procedure): (&String, &NativeProcedure), -) -> models::ProcedureInfo { - let arguments = procedure - .arguments - .iter() - .map(|(name, field)| { - ( - name.clone(), - models::ArgumentInfo { - argument_type: map_type(&field.r#type), - description: field.description.clone(), - }, - ) - }) - .collect(); - models::ProcedureInfo { - name: procedure_name.clone(), - description: procedure.description.clone(), - arguments, - result_type: map_type(&procedure.result_type), - } -} diff --git a/crates/mongodb-support/src/bson_type.rs b/crates/mongodb-support/src/bson_type.rs index c504bc89..f92f70ef 100644 --- a/crates/mongodb-support/src/bson_type.rs +++ b/crates/mongodb-support/src/bson_type.rs @@ -142,10 +142,7 @@ impl BsonScalarType { } pub fn graphql_name(self) -> String { - match self.graphql_type() { - Some(gql_type) => gql_type.to_string(), - None => capitalize(self.bson_name()), - } + capitalize(self.bson_name()) } pub fn graphql_type(self) -> Option { diff --git a/crates/ndc-test-helpers/Cargo.toml b/crates/ndc-test-helpers/Cargo.toml index db6cce79..d5e76dd3 100644 --- a/crates/ndc-test-helpers/Cargo.toml +++ b/crates/ndc-test-helpers/Cargo.toml @@ -6,5 +6,5 @@ edition = "2021" [dependencies] indexmap = "2" itertools = "^0.10" -ndc-sdk = { git = "https://github.com/hasura/ndc-sdk-rs.git" } +ndc-sdk = { workspace = true } serde_json = "1" diff --git a/fixtures/connector/sample_mflix/native_queries/hello.json b/fixtures/connector/sample_mflix/native_queries/hello.json new file mode 100644 index 00000000..3e0a1dc1 --- /dev/null +++ b/fixtures/connector/sample_mflix/native_queries/hello.json @@ -0,0 +1,21 @@ +{ + "name": "hello", + "representation": "function", + "description": "Basic test of native queries", + "arguments": { + "name": { "type": { "scalar": "string" } } + }, + "resultDocumentType": "Hello", + "objectTypes": { + "Hello": { + "fields": { + "__value": { "type": { "scalar": "string" } } + } + } + }, + "pipeline": [{ + "$documents": [{ + "__value": "{{ name }}" + }] + }] +} diff --git a/fixtures/ddn/sample_mflix/commands/Hello.hml b/fixtures/ddn/sample_mflix/commands/Hello.hml new file mode 100644 index 00000000..9e58d38c --- /dev/null +++ b/fixtures/ddn/sample_mflix/commands/Hello.hml @@ -0,0 +1,27 @@ +kind: Command +version: v1 +definition: + name: hello + description: Basic test of native queries + outputType: String + arguments: + - name: name + type: String! + source: + dataConnectorName: sample_mflix + dataConnectorCommand: + function: hello + argumentMapping: + name: name + graphql: + rootFieldName: hello + rootFieldKind: Query + +--- +kind: CommandPermissions +version: v1 +definition: + commandName: hello + permissions: + - role: admin + allowExecution: true diff --git a/fixtures/ddn/sample_mflix/dataconnectors/sample_mflix.hml b/fixtures/ddn/sample_mflix/dataconnectors/sample_mflix.hml index 27bd7bfa..3ffc1172 100644 --- a/fixtures/ddn/sample_mflix/dataconnectors/sample_mflix.hml +++ b/fixtures/ddn/sample_mflix/dataconnectors/sample_mflix.hml @@ -891,7 +891,12 @@ definition: unique_columns: - _id foreign_keys: {} - functions: [] + functions: + - name: hello + description: Basic test of native queries + result_type: { type: named, name: String } + arguments: + name: { type: { type: named, name: String } } procedures: [] capabilities: version: 0.1.1 diff --git a/nix/mongodb-connector-workspace.nix b/nix/mongodb-connector-workspace.nix index ac155579..ddf415cc 100644 --- a/nix/mongodb-connector-workspace.nix +++ b/nix/mongodb-connector-workspace.nix @@ -51,13 +51,20 @@ let # for a `musl` target. inherit (boilerplate) craneLib; - src = - let - jsonFilter = path: _type: builtins.match ".*json" path != null; - cargoOrJson = path: type: - (jsonFilter path type) || (craneLib.filterCargoSources path type); - in - lib.cleanSourceWith { src = craneLib.path ./..; filter = cargoOrJson; }; + # Filters source directory to select only files required to build Rust crates. + # This avoids unnecessary rebuilds when other files in the repo change. + src = craneLib.cleanCargoSource (craneLib.path ./..); + + # If you need modify the filter to include some files that are being filtered + # out you can change the assignment of `src` to something like this: + # + # let src = let + # jsonFilter = path: _type: builtins.match ".*json" path != null; + # cargoOrJson = path: type: + # (jsonFilter path type) || (craneLib.filterCargoSources path type); + # in + # lib.cleanSourceWith { src = craneLib.path ./..; filter = cargoOrJson; }; + # buildArgs = recursiveMerge [ boilerplate.buildArgs