From 1ef17e1850ef9a3c2a855d3de1391a92830c6b32 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 14 Mar 2024 11:57:49 -0700 Subject: [PATCH 01/33] add configuration for native queries; report queries in schema response --- Cargo.lock | 1 + crates/configuration/Cargo.toml | 1 + crates/configuration/src/configuration.rs | 8 ++++- crates/configuration/src/lib.rs | 1 + .../src/interface_types/mongo_config.rs | 3 ++ .../mongodb-connector/src/mongo_connector.rs | 4 +-- crates/mongodb-connector/src/schema.rs | 32 +++++++++++++++++-- crates/mongodb-connector/src/state.rs | 6 +++- rust-toolchain.toml | 2 +- 9 files changed, 50 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dbce74e4..df48f696 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -411,6 +411,7 @@ name = "configuration" version = "0.1.0" dependencies = [ "itertools 0.12.1", + "mongodb", "mongodb-support", "schemars", "serde", diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml index 6824690f..ee99cfd0 100644 --- a/crates/configuration/Cargo.toml +++ b/crates/configuration/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] itertools = "^0.12" +mongodb = "2.8" mongodb-support = { path = "../mongodb-support" } schemars = "^0.8.12" serde = { version = "1", features = ["derive"] } diff --git a/crates/configuration/src/configuration.rs b/crates/configuration/src/configuration.rs index c38671b1..20c49a40 100644 --- a/crates/configuration/src/configuration.rs +++ b/crates/configuration/src/configuration.rs @@ -3,12 +3,18 @@ use std::{io, path::Path}; use schemars::JsonSchema; use serde::Deserialize; -use crate::{read_directory, Metadata}; +use crate::{read_directory, Metadata, native_queries::NativeQuery}; #[derive(Clone, Debug, Default, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct Configuration { + /// Descriptions of collections and types used in the database pub metadata: Metadata, + + /// Native queries allow arbitrary MongoDB aggregation pipelines where types of results are + /// specified via user configuration. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub native_queries: Vec, } impl Configuration { diff --git a/crates/configuration/src/lib.rs b/crates/configuration/src/lib.rs index ba88399d..88abc394 100644 --- a/crates/configuration/src/lib.rs +++ b/crates/configuration/src/lib.rs @@ -1,5 +1,6 @@ mod configuration; pub mod metadata; +pub mod native_queries; mod read_directory; pub use crate::configuration::Configuration; diff --git a/crates/mongodb-agent-common/src/interface_types/mongo_config.rs b/crates/mongodb-agent-common/src/interface_types/mongo_config.rs index 2e6815ed..e5cdae13 100644 --- a/crates/mongodb-agent-common/src/interface_types/mongo_config.rs +++ b/crates/mongodb-agent-common/src/interface_types/mongo_config.rs @@ -1,3 +1,4 @@ +use configuration::native_queries::NativeQuery; use mongodb::Client; #[derive(Clone, Debug)] @@ -6,4 +7,6 @@ pub struct MongoConfig { /// Name of the database to connect to pub database: String, + + pub native_queries: Vec, } diff --git a/crates/mongodb-connector/src/mongo_connector.rs b/crates/mongodb-connector/src/mongo_connector.rs index 6a15e319..691575cc 100644 --- a/crates/mongodb-connector/src/mongo_connector.rs +++ b/crates/mongodb-connector/src/mongo_connector.rs @@ -45,10 +45,10 @@ impl Connector for MongoConnector { /// Reads database connection URI from environment variable async fn try_init_state( - _configuration: &Self::Configuration, + configuration: &Self::Configuration, _metrics: &mut prometheus::Registry, ) -> Result { - let state = crate::state::try_init_state().await?; + let state = crate::state::try_init_state(configuration).await?; Ok(state) } diff --git a/crates/mongodb-connector/src/schema.rs b/crates/mongodb-connector/src/schema.rs index 965c3f6d..32c98e92 100644 --- a/crates/mongodb-connector/src/schema.rs +++ b/crates/mongodb-connector/src/schema.rs @@ -1,6 +1,6 @@ use std::collections::BTreeMap; -use configuration::{metadata, Configuration}; +use configuration::{metadata, native_queries::NativeQuery, Configuration}; use ndc_sdk::{connector, models}; use crate::capabilities; @@ -10,9 +10,11 @@ pub async fn get_schema( ) -> Result { let metadata = &config.metadata; let object_types = map_object_types(&metadata.object_types); - let collections = metadata.collections.iter().map(map_collection).collect(); + let configured_collections = metadata.collections.iter().map(map_collection); + let native_queries = config.native_queries.iter().map(map_native_query); + Ok(models::SchemaResponse { - collections, + collections: configured_collections.chain(native_queries).collect(), object_types, scalar_types: capabilities::scalar_types(), functions: Default::default(), @@ -75,3 +77,27 @@ fn map_collection(collection: &metadata::Collection) -> models::CollectionInfo { uniqueness_constraints: Default::default(), } } + +fn map_native_query(query: &NativeQuery) -> models::CollectionInfo { + let arguments = query + .arguments + .iter() + .map(|field| { + ( + field.name.clone(), + models::ArgumentInfo { + argument_type: map_type(&field.r#type), + description: field.description.clone(), + }, + ) + }) + .collect(); + models::CollectionInfo { + name: query.name.clone(), + collection_type: query.result_type.clone(), + uniqueness_constraints: Default::default(), + foreign_keys: Default::default(), + description: query.description.clone(), + arguments, + } +} diff --git a/crates/mongodb-connector/src/state.rs b/crates/mongodb-connector/src/state.rs index 912bcd96..c7c3938a 100644 --- a/crates/mongodb-connector/src/state.rs +++ b/crates/mongodb-connector/src/state.rs @@ -1,12 +1,15 @@ use std::{env, error::Error}; use anyhow::anyhow; +use configuration::Configuration; use mongodb_agent_common::{interface_types::MongoConfig, mongodb_connection::get_mongodb_client}; pub const DATABASE_URI_ENV_VAR: &str = "MONGODB_DATABASE_URI"; /// Reads database connection URI from environment variable -pub async fn try_init_state() -> Result> { +pub async fn try_init_state( + configuration: &Configuration, +) -> Result> { // Splitting this out of the `Connector` impl makes error translation easier let database_uri = env::var(DATABASE_URI_ENV_VAR)?; let client = get_mongodb_client(&database_uri).await?; @@ -19,5 +22,6 @@ pub async fn try_init_state() -> Result Date: Thu, 14 Mar 2024 13:38:54 -0700 Subject: [PATCH 02/33] read schema and native queries from separate files --- Cargo.lock | 2 + crates/configuration/Cargo.toml | 2 + crates/configuration/src/read_directory.rs | 71 +++++++++++++++++----- 3 files changed, 61 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index df48f696..cd14b4fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -410,6 +410,7 @@ dependencies = [ name = "configuration" version = "0.1.0" dependencies = [ + "futures", "itertools 0.12.1", "mongodb", "mongodb-support", @@ -418,6 +419,7 @@ dependencies = [ "serde_json", "serde_yaml", "tokio", + "tokio-stream", ] [[package]] diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml index ee99cfd0..bea1d0e8 100644 --- a/crates/configuration/Cargo.toml +++ b/crates/configuration/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +futures = "^0.3" itertools = "^0.12" mongodb = "2.8" mongodb-support = { path = "../mongodb-support" } @@ -12,3 +13,4 @@ serde = { version = "1", features = ["derive"] } serde_json = { version = "1" } serde_yaml = "^0.9" tokio = "1" +tokio-stream = { version = "^0.1", features = ["fs"] } diff --git a/crates/configuration/src/read_directory.rs b/crates/configuration/src/read_directory.rs index 3a72c5a6..01508bd9 100644 --- a/crates/configuration/src/read_directory.rs +++ b/crates/configuration/src/read_directory.rs @@ -1,3 +1,4 @@ +use futures::stream::TryStreamExt as _; use itertools::Itertools as _; use serde::Deserialize; use std::{ @@ -5,10 +6,13 @@ use std::{ path::{Path, PathBuf}, }; use tokio::fs; +use tokio_stream::wrappers::ReadDirStream; -use crate::Configuration; +use crate::{native_queries::NativeQuery, Configuration}; + +pub const METADATA_FILENAME: &str = "metadata"; +pub const NATIVE_QUERIES_DIRNAME: &str = "native_queries"; -pub const CONFIGURATION_FILENAME: &str = "configuration"; pub const CONFIGURATION_EXTENSIONS: [(&str, FileFormat); 3] = [("json", JSON), ("yaml", YAML), ("yml", YAML)]; @@ -25,29 +29,68 @@ const YAML: FileFormat = FileFormat::Yaml; pub async fn read_directory( configuration_dir: impl AsRef + Send, ) -> io::Result { - parse_file(configuration_dir, CONFIGURATION_FILENAME).await + let dir = configuration_dir.as_ref(); + + let metadata = parse_json_or_yaml(dir, METADATA_FILENAME).await?; + + let native_queries: Vec = + read_subdir_configs(&dir.join(NATIVE_QUERIES_DIRNAME)).await?; + + Ok(Configuration { + metadata, + native_queries, + }) +} + +/// Parse all files in a directory with one of the allowed configuration extensions according to +/// the given type argument. For example if `T` is `NativeQuery` this function assumes that all +/// json and yaml files in the given directory should be parsed as native query configurations. +async fn read_subdir_configs(subdir: &Path) -> io::Result> +where + for<'a> T: Deserialize<'a>, +{ + let dir_stream = ReadDirStream::new(fs::read_dir(subdir).await?); + dir_stream + .try_filter_map(|dir_entry| async move { + // Permits regular files and symlinks, does not filter out symlinks to directories. + let is_file = !(dir_entry.file_type().await?.is_dir()); + if !is_file { + return Ok(None); + } + + let path = dir_entry.path(); + let extension = path.extension().and_then(|ext| ext.to_str()); + + let format_option = extension + .and_then(|ext| { + CONFIGURATION_EXTENSIONS + .iter() + .find(|(expected_ext, _)| ext == *expected_ext) + }) + .map(|(_, format)| *format); + + Ok(format_option.map(|format| (path, format))) + }) + .and_then(|(path, format)| async move { parse_config_file::(path, format).await }) + .try_collect::>() + .await } /// Given a base name, like "connection", looks for files of the form "connection.json", /// "connection.yaml", etc; reads the file; and parses it according to its extension. -async fn parse_file(configuration_dir: impl AsRef, basename: &str) -> io::Result +async fn parse_json_or_yaml(configuration_dir: &Path, basename: &str) -> io::Result where for<'a> T: Deserialize<'a>, { let (path, format) = find_file(configuration_dir, basename).await?; - read_file(path, format).await + parse_config_file(path, format).await } /// Given a base name, like "connection", looks for files of the form "connection.json", /// "connection.yaml", etc, and returns the found path with its file format. -async fn find_file( - configuration_dir: impl AsRef, - basename: &str, -) -> io::Result<(PathBuf, FileFormat)> { - let dir = configuration_dir.as_ref(); - +async fn find_file(configuration_dir: &Path, basename: &str) -> io::Result<(PathBuf, FileFormat)> { for (extension, format) in CONFIGURATION_EXTENSIONS { - let path = dir.join(format!("{basename}.{extension}")); + let path = configuration_dir.join(format!("{basename}.{extension}")); if fs::try_exists(&path).await? { return Ok((path, format)); } @@ -57,7 +100,7 @@ async fn find_file( io::ErrorKind::NotFound, format!( "could not find file, {:?}", - dir.join(format!( + configuration_dir.join(format!( "{basename}.{{{}}}", CONFIGURATION_EXTENSIONS .into_iter() @@ -68,7 +111,7 @@ async fn find_file( )) } -async fn read_file(path: impl AsRef, format: FileFormat) -> io::Result +async fn parse_config_file(path: impl AsRef, format: FileFormat) -> io::Result where for<'a> T: Deserialize<'a>, { From 9ebfc75260919fcde85607b10dbd3d29b8d70f22 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 14 Mar 2024 13:45:25 -0700 Subject: [PATCH 03/33] rename metadata to schema --- crates/configuration/src/configuration.rs | 4 ++-- crates/configuration/src/lib.rs | 4 ++-- crates/configuration/src/native_queries.rs | 4 ++-- crates/configuration/src/read_directory.rs | 6 ++--- .../src/{metadata => schema}/database.rs | 0 .../src/{metadata => schema}/mod.rs | 2 +- crates/mongodb-connector/src/schema.rs | 24 +++++++++---------- 7 files changed, 22 insertions(+), 22 deletions(-) rename crates/configuration/src/{metadata => schema}/database.rs (100%) rename crates/configuration/src/{metadata => schema}/mod.rs (94%) diff --git a/crates/configuration/src/configuration.rs b/crates/configuration/src/configuration.rs index 20c49a40..a8b76d48 100644 --- a/crates/configuration/src/configuration.rs +++ b/crates/configuration/src/configuration.rs @@ -3,13 +3,13 @@ use std::{io, path::Path}; use schemars::JsonSchema; use serde::Deserialize; -use crate::{read_directory, Metadata, native_queries::NativeQuery}; +use crate::{read_directory, Schema, native_queries::NativeQuery}; #[derive(Clone, Debug, Default, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct Configuration { /// Descriptions of collections and types used in the database - pub metadata: Metadata, + pub schema: Schema, /// Native queries allow arbitrary MongoDB aggregation pipelines where types of results are /// specified via user configuration. diff --git a/crates/configuration/src/lib.rs b/crates/configuration/src/lib.rs index 88abc394..0e49b042 100644 --- a/crates/configuration/src/lib.rs +++ b/crates/configuration/src/lib.rs @@ -1,8 +1,8 @@ mod configuration; -pub mod metadata; +pub mod schema; pub mod native_queries; mod read_directory; pub use crate::configuration::Configuration; -pub use crate::metadata::Metadata; +pub use crate::schema::Schema; pub use crate::read_directory::read_directory; diff --git a/crates/configuration/src/native_queries.rs b/crates/configuration/src/native_queries.rs index 954d5abc..ad91c525 100644 --- a/crates/configuration/src/native_queries.rs +++ b/crates/configuration/src/native_queries.rs @@ -2,7 +2,7 @@ use mongodb::{bson, options::SelectionCriteria}; use schemars::JsonSchema; use serde::Deserialize; -use crate::metadata::ObjectField; +use crate::schema::ObjectField; /// An arbitrary database command using MongoDB's runCommand API. /// See https://www.mongodb.com/docs/manual/reference/method/db.runCommand/ @@ -13,7 +13,7 @@ pub struct NativeQuery { pub name: String, /// The name of an object type that specifies the type of data returned from the query. This - /// must correspond to a configuration definition in `metadata.objectTypes`. + /// must correspond to a configuration definition in `schema.objectTypes`. pub result_type: String, /// Arguments for per-query customization diff --git a/crates/configuration/src/read_directory.rs b/crates/configuration/src/read_directory.rs index 01508bd9..70eda016 100644 --- a/crates/configuration/src/read_directory.rs +++ b/crates/configuration/src/read_directory.rs @@ -10,7 +10,7 @@ use tokio_stream::wrappers::ReadDirStream; use crate::{native_queries::NativeQuery, Configuration}; -pub const METADATA_FILENAME: &str = "metadata"; +pub const SCHEMA_FILENAME: &str = "schema"; pub const NATIVE_QUERIES_DIRNAME: &str = "native_queries"; pub const CONFIGURATION_EXTENSIONS: [(&str, FileFormat); 3] = @@ -31,13 +31,13 @@ pub async fn read_directory( ) -> io::Result { let dir = configuration_dir.as_ref(); - let metadata = parse_json_or_yaml(dir, METADATA_FILENAME).await?; + let schema = parse_json_or_yaml(dir, SCHEMA_FILENAME).await?; let native_queries: Vec = read_subdir_configs(&dir.join(NATIVE_QUERIES_DIRNAME)).await?; Ok(Configuration { - metadata, + schema, native_queries, }) } diff --git a/crates/configuration/src/metadata/database.rs b/crates/configuration/src/schema/database.rs similarity index 100% rename from crates/configuration/src/metadata/database.rs rename to crates/configuration/src/schema/database.rs diff --git a/crates/configuration/src/metadata/mod.rs b/crates/configuration/src/schema/mod.rs similarity index 94% rename from crates/configuration/src/metadata/mod.rs rename to crates/configuration/src/schema/mod.rs index 28751944..8418a210 100644 --- a/crates/configuration/src/metadata/mod.rs +++ b/crates/configuration/src/schema/mod.rs @@ -7,7 +7,7 @@ pub use self::database::{Collection, ObjectField, ObjectType, Type}; #[derive(Clone, Debug, Default, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] -pub struct Metadata { +pub struct Schema { #[serde(default)] pub collections: Vec, #[serde(default)] diff --git a/crates/mongodb-connector/src/schema.rs b/crates/mongodb-connector/src/schema.rs index 32c98e92..577bc76f 100644 --- a/crates/mongodb-connector/src/schema.rs +++ b/crates/mongodb-connector/src/schema.rs @@ -1,6 +1,6 @@ use std::collections::BTreeMap; -use configuration::{metadata, native_queries::NativeQuery, Configuration}; +use configuration::{native_queries::NativeQuery, schema, Configuration}; use ndc_sdk::{connector, models}; use crate::capabilities; @@ -8,9 +8,9 @@ use crate::capabilities; pub async fn get_schema( config: &Configuration, ) -> Result { - let metadata = &config.metadata; - let object_types = map_object_types(&metadata.object_types); - let configured_collections = metadata.collections.iter().map(map_collection); + let schema = &config.schema; + let object_types = map_object_types(&schema.object_types); + let configured_collections = schema.collections.iter().map(map_collection); let native_queries = config.native_queries.iter().map(map_native_query); Ok(models::SchemaResponse { @@ -22,7 +22,7 @@ pub async fn get_schema( }) } -fn map_object_types(object_types: &[metadata::ObjectType]) -> BTreeMap { +fn map_object_types(object_types: &[schema::ObjectType]) -> BTreeMap { object_types .iter() .map(|t| { @@ -37,7 +37,7 @@ fn map_object_types(object_types: &[metadata::ObjectType]) -> BTreeMap BTreeMap { +fn map_field_infos(fields: &[schema::ObjectField]) -> BTreeMap { fields .iter() .map(|f| { @@ -52,22 +52,22 @@ fn map_field_infos(fields: &[metadata::ObjectField]) -> BTreeMap models::Type { +fn map_type(t: &schema::Type) -> models::Type { match t { - metadata::Type::Scalar(t) => models::Type::Named { + schema::Type::Scalar(t) => models::Type::Named { name: t.graphql_name(), }, - metadata::Type::Object(t) => models::Type::Named { name: t.clone() }, - metadata::Type::ArrayOf(t) => models::Type::Array { + schema::Type::Object(t) => models::Type::Named { name: t.clone() }, + schema::Type::ArrayOf(t) => models::Type::Array { element_type: Box::new(map_type(t)), }, - metadata::Type::Nullable(t) => models::Type::Nullable { + schema::Type::Nullable(t) => models::Type::Nullable { underlying_type: Box::new(map_type(t)), }, } } -fn map_collection(collection: &metadata::Collection) -> models::CollectionInfo { +fn map_collection(collection: &schema::Collection) -> models::CollectionInfo { models::CollectionInfo { name: collection.name.clone(), collection_type: collection.r#type.clone(), From b6f625849861e4980869b2411251d8e59ca66f9c Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 14 Mar 2024 13:46:42 -0700 Subject: [PATCH 04/33] update fixture configuration --- fixtures/connector/chinook/configuration.yaml | 16 ---------------- fixtures/connector/chinook/schema.yaml | 15 +++++++++++++++ 2 files changed, 15 insertions(+), 16 deletions(-) delete mode 100644 fixtures/connector/chinook/configuration.yaml create mode 100644 fixtures/connector/chinook/schema.yaml diff --git a/fixtures/connector/chinook/configuration.yaml b/fixtures/connector/chinook/configuration.yaml deleted file mode 100644 index e6aec4d6..00000000 --- a/fixtures/connector/chinook/configuration.yaml +++ /dev/null @@ -1,16 +0,0 @@ -metadata: - collections: - - name: Album - type: Album - - objectTypes: - - name: Album - fields: - - name: _id - type: !scalar objectId - - name: AlbumId - type: !scalar int - - name: ArtistId - type: !scalar int - - name: Title - type: !scalar string diff --git a/fixtures/connector/chinook/schema.yaml b/fixtures/connector/chinook/schema.yaml new file mode 100644 index 00000000..bbb4a52c --- /dev/null +++ b/fixtures/connector/chinook/schema.yaml @@ -0,0 +1,15 @@ +collections: + - name: Album + type: Album + +objectTypes: + - name: Album + fields: + - name: _id + type: !scalar objectId + - name: AlbumId + type: !scalar int + - name: ArtistId + type: !scalar int + - name: Title + type: !scalar string From 039c8408ca11d46926f82aaadeccee0dfd7050d4 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 14 Mar 2024 14:02:20 -0700 Subject: [PATCH 05/33] succeed parsing configuration if no native_queries are present --- crates/configuration/src/read_directory.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/crates/configuration/src/read_directory.rs b/crates/configuration/src/read_directory.rs index 70eda016..4f214ad3 100644 --- a/crates/configuration/src/read_directory.rs +++ b/crates/configuration/src/read_directory.rs @@ -33,8 +33,9 @@ pub async fn read_directory( let schema = parse_json_or_yaml(dir, SCHEMA_FILENAME).await?; - let native_queries: Vec = - read_subdir_configs(&dir.join(NATIVE_QUERIES_DIRNAME)).await?; + let native_queries: Vec = read_subdir_configs(&dir.join(NATIVE_QUERIES_DIRNAME)) + .await? + .unwrap_or_default(); Ok(Configuration { schema, @@ -45,12 +46,16 @@ pub async fn read_directory( /// Parse all files in a directory with one of the allowed configuration extensions according to /// the given type argument. For example if `T` is `NativeQuery` this function assumes that all /// json and yaml files in the given directory should be parsed as native query configurations. -async fn read_subdir_configs(subdir: &Path) -> io::Result> +async fn read_subdir_configs(subdir: &Path) -> io::Result>> where for<'a> T: Deserialize<'a>, { + if !(fs::try_exists(subdir).await?) { + return Ok(None); + } + let dir_stream = ReadDirStream::new(fs::read_dir(subdir).await?); - dir_stream + let configs = dir_stream .try_filter_map(|dir_entry| async move { // Permits regular files and symlinks, does not filter out symlinks to directories. let is_file = !(dir_entry.file_type().await?.is_dir()); @@ -73,7 +78,9 @@ where }) .and_then(|(path, format)| async move { parse_config_file::(path, format).await }) .try_collect::>() - .await + .await?; + + Ok(Some(configs)) } /// Given a base name, like "connection", looks for files of the form "connection.json", From 12e7b331cbf864556117314e704aa22a2516c4a9 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 14 Mar 2024 16:49:00 -0700 Subject: [PATCH 06/33] add mode property to NativeQuery --- crates/configuration/src/native_queries.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/crates/configuration/src/native_queries.rs b/crates/configuration/src/native_queries.rs index ad91c525..93a84ccf 100644 --- a/crates/configuration/src/native_queries.rs +++ b/crates/configuration/src/native_queries.rs @@ -31,6 +31,18 @@ pub struct NativeQuery { #[serde(default, skip_serializing_if = "Option::is_none")] pub description: Option, + + /// Set to `readWrite` if this native query might modify data in the database. + #[serde(default)] + pub mode: Mode, +} + +#[derive(Clone, Default, Debug, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub enum Mode { + #[default] + ReadOnly, + ReadWrite, } type Object = serde_json::Map; From 5962ae5c9886a2d2a0f86180cd17c2e23de2af13 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 14 Mar 2024 17:29:01 -0700 Subject: [PATCH 07/33] represent native queries as functions and procedures in schema --- crates/configuration/src/native_queries.rs | 9 ++-- crates/mongodb-connector/src/schema.rs | 60 ++++++++++++++++++---- 2 files changed, 53 insertions(+), 16 deletions(-) diff --git a/crates/configuration/src/native_queries.rs b/crates/configuration/src/native_queries.rs index 93a84ccf..633c9ead 100644 --- a/crates/configuration/src/native_queries.rs +++ b/crates/configuration/src/native_queries.rs @@ -2,7 +2,7 @@ use mongodb::{bson, options::SelectionCriteria}; use schemars::JsonSchema; use serde::Deserialize; -use crate::schema::ObjectField; +use crate::schema::{ObjectField, Type}; /// An arbitrary database command using MongoDB's runCommand API. /// See https://www.mongodb.com/docs/manual/reference/method/db.runCommand/ @@ -12,9 +12,8 @@ pub struct NativeQuery { /// Name that will be used to identify the query in your data graph pub name: String, - /// The name of an object type that specifies the type of data returned from the query. This - /// must correspond to a configuration definition in `schema.objectTypes`. - pub result_type: String, + /// Type of data returned by the query. + pub result_type: Type, /// Arguments for per-query customization pub arguments: Vec, @@ -37,7 +36,7 @@ pub struct NativeQuery { pub mode: Mode, } -#[derive(Clone, Default, Debug, Deserialize, JsonSchema)] +#[derive(Clone, Copy, Default, Debug, PartialEq, Eq, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub enum Mode { #[default] diff --git a/crates/mongodb-connector/src/schema.rs b/crates/mongodb-connector/src/schema.rs index 577bc76f..5cb4a7a5 100644 --- a/crates/mongodb-connector/src/schema.rs +++ b/crates/mongodb-connector/src/schema.rs @@ -1,6 +1,9 @@ use std::collections::BTreeMap; -use configuration::{native_queries::NativeQuery, schema, Configuration}; +use configuration::{ + native_queries::{self, NativeQuery}, + schema, Configuration, +}; use ndc_sdk::{connector, models}; use crate::capabilities; @@ -10,15 +13,28 @@ pub async fn get_schema( ) -> Result { let schema = &config.schema; let object_types = map_object_types(&schema.object_types); - let configured_collections = schema.collections.iter().map(map_collection); - let native_queries = config.native_queries.iter().map(map_native_query); + let collections = schema.collections.iter().map(map_collection).collect(); + + let functions = config + .native_queries + .iter() + .filter(|q| q.mode == native_queries::Mode::ReadOnly) + .map(native_query_to_function) + .collect(); + + let procedures = config + .native_queries + .iter() + .filter(|q| q.mode == native_queries::Mode::ReadWrite) + .map(native_query_to_procedure) + .collect(); Ok(models::SchemaResponse { - collections: configured_collections.chain(native_queries).collect(), + collections, object_types, scalar_types: capabilities::scalar_types(), - functions: Default::default(), - procedures: Default::default(), + functions, + procedures, }) } @@ -78,7 +94,8 @@ fn map_collection(collection: &schema::Collection) -> models::CollectionInfo { } } -fn map_native_query(query: &NativeQuery) -> models::CollectionInfo { +/// For read-only native queries +fn native_query_to_function(query: &NativeQuery) -> models::FunctionInfo { let arguments = query .arguments .iter() @@ -92,12 +109,33 @@ fn map_native_query(query: &NativeQuery) -> models::CollectionInfo { ) }) .collect(); - models::CollectionInfo { + models::FunctionInfo { + name: query.name.clone(), + description: query.description.clone(), + arguments, + result_type: map_type(&query.result_type), + } +} + +/// For read-write native queries +fn native_query_to_procedure(query: &NativeQuery) -> models::ProcedureInfo { + let arguments = query + .arguments + .iter() + .map(|field| { + ( + field.name.clone(), + models::ArgumentInfo { + argument_type: map_type(&field.r#type), + description: field.description.clone(), + }, + ) + }) + .collect(); + models::ProcedureInfo { name: query.name.clone(), - collection_type: query.result_type.clone(), - uniqueness_constraints: Default::default(), - foreign_keys: Default::default(), description: query.description.clone(), arguments, + result_type: map_type(&query.result_type), } } From 178695ad78e606d5f4cfef112ec0b33d6174d357 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Sun, 17 Mar 2024 17:40:09 -0700 Subject: [PATCH 08/33] execute native query --- .../src/query/execute_native_query_request.rs | 31 +++++++++++++++++++ crates/mongodb-agent-common/src/query/mod.rs | 21 ++++++++++--- 2 files changed, 47 insertions(+), 5 deletions(-) create mode 100644 crates/mongodb-agent-common/src/query/execute_native_query_request.rs diff --git a/crates/mongodb-agent-common/src/query/execute_native_query_request.rs b/crates/mongodb-agent-common/src/query/execute_native_query_request.rs new file mode 100644 index 00000000..ff603dd0 --- /dev/null +++ b/crates/mongodb-agent-common/src/query/execute_native_query_request.rs @@ -0,0 +1,31 @@ +use configuration::native_queries::NativeQuery; +use dc_api::JsonResponse; +use dc_api_types::{QueryResponse, ResponseFieldValue, RowSet}; +use mongodb::Database; + +use crate::interface_types::MongoAgentError; + +pub async fn handle_native_query_request( + native_query: NativeQuery, + database: Database, +) -> Result, MongoAgentError> { + let result = database + .run_command(native_query.command, native_query.selection_criteria) + .await?; + let result_json = + serde_json::to_value(result).map_err(|err| MongoAgentError::AdHoc(err.into()))?; + + // A function returs a single row with a single column called `__value` + // https://hasura.github.io/ndc-spec/specification/queries/functions.html + let response_row = [( + "__value".to_owned(), + ResponseFieldValue::Column(result_json), + )] + .into_iter() + .collect(); + + Ok(JsonResponse::Value(QueryResponse::Single(RowSet { + aggregates: None, + rows: Some(vec![response_row]), + }))) +} diff --git a/crates/mongodb-agent-common/src/query/mod.rs b/crates/mongodb-agent-common/src/query/mod.rs index ed0abc68..5f32bee9 100644 --- a/crates/mongodb-agent-common/src/query/mod.rs +++ b/crates/mongodb-agent-common/src/query/mod.rs @@ -1,5 +1,6 @@ mod column_ref; mod constants; +mod execute_native_query_request; mod execute_query_request; mod foreach; mod make_selector; @@ -17,7 +18,10 @@ pub use self::{ make_sort::make_sort, pipeline::{is_response_faceted, pipeline_for_non_foreach, pipeline_for_query_request}, }; -use crate::interface_types::{MongoAgentError, MongoConfig}; +use crate::{ + interface_types::{MongoAgentError, MongoConfig}, + query::execute_native_query_request::handle_native_query_request, +}; pub fn collection_name(query_request_target: &Target) -> String { query_request_target.name().join(".") @@ -29,10 +33,17 @@ pub async fn handle_query_request( ) -> Result, MongoAgentError> { tracing::debug!(?config, query_request = %serde_json::to_string(&query_request).unwrap(), "executing query"); - let collection = config - .client - .database(&config.database) - .collection::(&collection_name(&query_request.target)); + let database = config.client.database(&config.database); + + let target = &query_request.target; + if let Some(native_query) = config.native_queries.iter().find(|query| { + let target_name = target.name(); + target_name.len() == 1 && target_name[0] == query.name + }) { + return handle_native_query_request(native_query.clone(), database).await; + } + + let collection = database.collection::(&collection_name(&query_request.target)); execute_query_request(&collection, query_request).await } From c608d1795056dd96af97a05356c7a3080945a234 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Sun, 17 Mar 2024 20:26:39 -0700 Subject: [PATCH 09/33] updated fixtures with a very basic native query --- crates/configuration/src/native_queries.rs | 7 +- .../chinook/native_queries/hello.yaml | 4 + fixtures/connector/chinook/schema.json | 742 ++++++++++++++++++ fixtures/connector/chinook/schema.yaml | 15 - .../ddn/subgraphs/chinook/commands/Hello.hml | 23 + .../chinook/dataconnectors/mongodb.hml | 6 +- 6 files changed, 780 insertions(+), 17 deletions(-) create mode 100644 fixtures/connector/chinook/native_queries/hello.yaml create mode 100644 fixtures/connector/chinook/schema.json delete mode 100644 fixtures/connector/chinook/schema.yaml create mode 100644 fixtures/ddn/subgraphs/chinook/commands/Hello.hml diff --git a/crates/configuration/src/native_queries.rs b/crates/configuration/src/native_queries.rs index 633c9ead..3a92d94a 100644 --- a/crates/configuration/src/native_queries.rs +++ b/crates/configuration/src/native_queries.rs @@ -16,6 +16,7 @@ pub struct NativeQuery { pub result_type: Type, /// Arguments for per-query customization + #[serde(default)] pub arguments: Vec, /// Command to run expressed as a BSON document @@ -31,7 +32,11 @@ pub struct NativeQuery { #[serde(default, skip_serializing_if = "Option::is_none")] pub description: Option, - /// Set to `readWrite` if this native query might modify data in the database. + /// Set to `readWrite` if this native query might modify data in the database. When refreshing + /// a dataconnector native queries will appear in the corresponding `DataConnectorLink` + /// definition as `functions` if they are read-only, or as `procedures` if they are read-rite. + /// Functions are intended to map to GraphQL Query fields, while procedures map to Mutation + /// fields. #[serde(default)] pub mode: Mode, } diff --git a/fixtures/connector/chinook/native_queries/hello.yaml b/fixtures/connector/chinook/native_queries/hello.yaml new file mode 100644 index 00000000..2f440194 --- /dev/null +++ b/fixtures/connector/chinook/native_queries/hello.yaml @@ -0,0 +1,4 @@ +name: hello +result_type: !scalar string +command: + hello: 1 diff --git a/fixtures/connector/chinook/schema.json b/fixtures/connector/chinook/schema.json new file mode 100644 index 00000000..4c7ee983 --- /dev/null +++ b/fixtures/connector/chinook/schema.json @@ -0,0 +1,742 @@ +{ + "collections": [ + { + "name": "Invoice", + "type": "Invoice", + "description": null + }, + { + "name": "Track", + "type": "Track", + "description": null + }, + { + "name": "MediaType", + "type": "MediaType", + "description": null + }, + { + "name": "InvoiceLine", + "type": "InvoiceLine", + "description": null + }, + { + "name": "Employee", + "type": "Employee", + "description": null + }, + { + "name": "PlaylistTrack", + "type": "PlaylistTrack", + "description": null + }, + { + "name": "Album", + "type": "Album", + "description": null + }, + { + "name": "Genre", + "type": "Genre", + "description": null + }, + { + "name": "Artist", + "type": "Artist", + "description": null + }, + { + "name": "Playlist", + "type": "Playlist", + "description": null + }, + { + "name": "Customer", + "type": "Customer", + "description": null + } + ], + "objectTypes": [ + { + "name": "Invoice", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "BillingAddress", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "BillingCity", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "BillingCountry", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "BillingPostalCode", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "BillingState", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "CustomerId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "InvoiceDate", + "type": { + "scalar": "string" + }, + "description": null + }, + { + "name": "InvoiceId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "Total", + "type": { + "scalar": "double" + }, + "description": null + } + ], + "description": "Object type for collection Invoice" + }, + { + "name": "Track", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "AlbumId", + "type": { + "nullable": { + "scalar": "int" + } + }, + "description": null + }, + { + "name": "Bytes", + "type": { + "nullable": { + "scalar": "int" + } + }, + "description": null + }, + { + "name": "Composer", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "GenreId", + "type": { + "nullable": { + "scalar": "int" + } + }, + "description": null + }, + { + "name": "MediaTypeId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "Milliseconds", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "Name", + "type": { + "scalar": "string" + }, + "description": null + }, + { + "name": "TrackId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "UnitPrice", + "type": { + "scalar": "double" + }, + "description": null + } + ], + "description": "Object type for collection Track" + }, + { + "name": "MediaType", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "MediaTypeId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "Name", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + } + ], + "description": "Object type for collection MediaType" + }, + { + "name": "InvoiceLine", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "InvoiceId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "InvoiceLineId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "Quantity", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "TrackId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "UnitPrice", + "type": { + "scalar": "double" + }, + "description": null + } + ], + "description": "Object type for collection InvoiceLine" + }, + { + "name": "Employee", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "Address", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "BirthDate", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "City", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "Country", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "Email", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "EmployeeId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "Fax", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "FirstName", + "type": { + "scalar": "string" + }, + "description": null + }, + { + "name": "HireDate", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "LastName", + "type": { + "scalar": "string" + }, + "description": null + }, + { + "name": "Phone", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "PostalCode", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "ReportsTo", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "State", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "Title", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + } + ], + "description": "Object type for collection Employee" + }, + { + "name": "PlaylistTrack", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "PlaylistId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "TrackId", + "type": { + "scalar": "int" + }, + "description": null + } + ], + "description": "Object type for collection PlaylistTrack" + }, + { + "name": "Album", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "AlbumId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "ArtistId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "Title", + "type": { + "scalar": "string" + }, + "description": null + } + ], + "description": "Object type for collection Album" + }, + { + "name": "Genre", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "GenreId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "Name", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + } + ], + "description": "Object type for collection Genre" + }, + { + "name": "Artist", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "ArtistId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "Name", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + } + ], + "description": "Object type for collection Artist" + }, + { + "name": "Playlist", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "Name", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "PlaylistId", + "type": { + "scalar": "int" + }, + "description": null + } + ], + "description": "Object type for collection Playlist" + }, + { + "name": "Customer", + "fields": [ + { + "name": "_id", + "type": { + "nullable": { + "scalar": "objectId" + } + }, + "description": null + }, + { + "name": "Address", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "City", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "Company", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "Country", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "CustomerId", + "type": { + "scalar": "int" + }, + "description": null + }, + { + "name": "Email", + "type": { + "scalar": "string" + }, + "description": null + }, + { + "name": "Fax", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "FirstName", + "type": { + "scalar": "string" + }, + "description": null + }, + { + "name": "LastName", + "type": { + "scalar": "string" + }, + "description": null + }, + { + "name": "Phone", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "PostalCode", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "State", + "type": { + "nullable": { + "scalar": "string" + } + }, + "description": null + }, + { + "name": "SupportRepId", + "type": { + "nullable": { + "scalar": "int" + } + }, + "description": null + } + ], + "description": "Object type for collection Customer" + } + ] +} \ No newline at end of file diff --git a/fixtures/connector/chinook/schema.yaml b/fixtures/connector/chinook/schema.yaml deleted file mode 100644 index bbb4a52c..00000000 --- a/fixtures/connector/chinook/schema.yaml +++ /dev/null @@ -1,15 +0,0 @@ -collections: - - name: Album - type: Album - -objectTypes: - - name: Album - fields: - - name: _id - type: !scalar objectId - - name: AlbumId - type: !scalar int - - name: ArtistId - type: !scalar int - - name: Title - type: !scalar string diff --git a/fixtures/ddn/subgraphs/chinook/commands/Hello.hml b/fixtures/ddn/subgraphs/chinook/commands/Hello.hml new file mode 100644 index 00000000..c194a5ba --- /dev/null +++ b/fixtures/ddn/subgraphs/chinook/commands/Hello.hml @@ -0,0 +1,23 @@ +kind: Command +version: v1 +definition: + name: hello + description: Example of a read-only native query + outputType: String + arguments: [] + source: + dataConnectorName: mongodb + dataConnectorCommand: + function: hello + graphql: + rootFieldName: hello + rootFieldKind: Query + +--- +kind: CommandPermissions +version: v1 +definition: + commandName: hello + permissions: + - role: admin + allowExecution: true diff --git a/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml b/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml index 4167b898..7127da8b 100644 --- a/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml +++ b/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml @@ -994,7 +994,11 @@ definition: unique_columns: - _id foreign_keys: {} - functions: [] + functions: + - name: hello + result_type: { type: named, name: String } + arguments: {} + command: { hello: 1 } procedures: [] capabilities: version: ^0.1.0 From e9f042f95c2f437dabe15dbcecc94209723dc526 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Sun, 17 Mar 2024 21:35:25 -0700 Subject: [PATCH 10/33] add more context to configuration read errors --- Cargo.lock | 1 + crates/configuration/Cargo.toml | 1 + crates/configuration/src/configuration.rs | 4 +- crates/configuration/src/directory.rs | 54 ++++++++++--------- .../mongodb-connector/src/mongo_connector.rs | 4 +- 5 files changed, 35 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 875e0dda..6401d568 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -410,6 +410,7 @@ dependencies = [ name = "configuration" version = "0.1.0" dependencies = [ + "anyhow", "futures", "itertools 0.12.1", "mongodb", diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml index bea1d0e8..8db65e2e 100644 --- a/crates/configuration/Cargo.toml +++ b/crates/configuration/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +anyhow = "1" futures = "^0.3" itertools = "^0.12" mongodb = "2.8" diff --git a/crates/configuration/src/configuration.rs b/crates/configuration/src/configuration.rs index 3a3549f9..1081ef26 100644 --- a/crates/configuration/src/configuration.rs +++ b/crates/configuration/src/configuration.rs @@ -1,4 +1,4 @@ -use std::{io, path::Path}; +use std::path::Path; use schemars::JsonSchema; use serde::Deserialize; @@ -27,7 +27,7 @@ impl Configuration { pub async fn parse_configuration( configuration_dir: impl AsRef + Send, - ) -> io::Result { + ) -> anyhow::Result { read_directory(configuration_dir).await } } diff --git a/crates/configuration/src/directory.rs b/crates/configuration/src/directory.rs index f80d4b23..2ec5618e 100644 --- a/crates/configuration/src/directory.rs +++ b/crates/configuration/src/directory.rs @@ -1,10 +1,8 @@ +use anyhow::{anyhow, Context as _}; use futures::stream::TryStreamExt as _; use itertools::Itertools as _; use serde::{Deserialize, Serialize}; -use std::{ - io, - path::{Path, PathBuf}, -}; +use std::path::{Path, PathBuf}; use tokio::fs; use tokio_stream::wrappers::ReadDirStream; @@ -29,7 +27,7 @@ const YAML: FileFormat = FileFormat::Yaml; /// Read configuration from a directory pub async fn read_directory( configuration_dir: impl AsRef + Send, -) -> io::Result { +) -> anyhow::Result { let dir = configuration_dir.as_ref(); let schema = parse_json_or_yaml(dir, SCHEMA_FILENAME).await?; @@ -47,7 +45,7 @@ pub async fn read_directory( /// Parse all files in a directory with one of the allowed configuration extensions according to /// the given type argument. For example if `T` is `NativeQuery` this function assumes that all /// json and yaml files in the given directory should be parsed as native query configurations. -async fn read_subdir_configs(subdir: &Path) -> io::Result>> +async fn read_subdir_configs(subdir: &Path) -> anyhow::Result>> where for<'a> T: Deserialize<'a>, { @@ -57,6 +55,7 @@ where let dir_stream = ReadDirStream::new(fs::read_dir(subdir).await?); let configs = dir_stream + .map_err(|err| err.into()) .try_filter_map(|dir_entry| async move { // Permits regular files and symlinks, does not filter out symlinks to directories. let is_file = !(dir_entry.file_type().await?.is_dir()); @@ -86,7 +85,7 @@ where /// Given a base name, like "connection", looks for files of the form "connection.json", /// "connection.yaml", etc; reads the file; and parses it according to its extension. -async fn parse_json_or_yaml(configuration_dir: &Path, basename: &str) -> io::Result +async fn parse_json_or_yaml(configuration_dir: &Path, basename: &str) -> anyhow::Result where for<'a> T: Deserialize<'a>, { @@ -96,7 +95,10 @@ where /// Given a base name, like "connection", looks for files of the form "connection.json", /// "connection.yaml", etc, and returns the found path with its file format. -async fn find_file(configuration_dir: &Path, basename: &str) -> io::Result<(PathBuf, FileFormat)> { +async fn find_file( + configuration_dir: &Path, + basename: &str, +) -> anyhow::Result<(PathBuf, FileFormat)> { for (extension, format) in CONFIGURATION_EXTENSIONS { let path = configuration_dir.join(format!("{basename}.{extension}")); if fs::try_exists(&path).await? { @@ -104,30 +106,28 @@ async fn find_file(configuration_dir: &Path, basename: &str) -> io::Result<(Path } } - Err(io::Error::new( - io::ErrorKind::NotFound, - format!( - "could not find file, {:?}", - configuration_dir.join(format!( - "{basename}.{{{}}}", - CONFIGURATION_EXTENSIONS - .into_iter() - .map(|(ext, _)| ext) - .join(",") - )) - ), + Err(anyhow!( + "could not find file, {:?}", + configuration_dir.join(format!( + "{basename}.{{{}}}", + CONFIGURATION_EXTENSIONS + .into_iter() + .map(|(ext, _)| ext) + .join(",") + )) )) } -async fn parse_config_file(path: impl AsRef, format: FileFormat) -> io::Result +async fn parse_config_file(path: impl AsRef, format: FileFormat) -> anyhow::Result where for<'a> T: Deserialize<'a>, { let bytes = fs::read(path.as_ref()).await?; let value = match format { - FileFormat::Json => serde_json::from_slice(&bytes)?, + FileFormat::Json => serde_json::from_slice(&bytes) + .with_context(|| format!("error parsing {:?}", path.as_ref()))?, FileFormat::Yaml => serde_yaml::from_slice(&bytes) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?, + .with_context(|| format!("error parsing {:?}", path.as_ref()))?, }; Ok(value) } @@ -136,7 +136,7 @@ where pub async fn write_directory( configuration_dir: impl AsRef, configuration: &Configuration, -) -> io::Result<()> { +) -> anyhow::Result<()> { write_file(configuration_dir, SCHEMA_FILENAME, &configuration.schema).await } @@ -149,11 +149,13 @@ async fn write_file( configuration_dir: impl AsRef, basename: &str, value: &T, -) -> io::Result<()> +) -> anyhow::Result<()> where T: Serialize, { let path = default_file_path(configuration_dir, basename); let bytes = serde_json::to_vec_pretty(value)?; - fs::write(path, bytes).await + fs::write(path.clone(), bytes) + .await + .with_context(|| format!("error writing {:?}", path)) } diff --git a/crates/mongodb-connector/src/mongo_connector.rs b/crates/mongodb-connector/src/mongo_connector.rs index e7e18b2d..f23c4338 100644 --- a/crates/mongodb-connector/src/mongo_connector.rs +++ b/crates/mongodb-connector/src/mongo_connector.rs @@ -39,7 +39,9 @@ impl Connector for MongoConnector { async fn parse_configuration( configuration_dir: impl AsRef + Send, ) -> Result { - let configuration = Configuration::parse_configuration(configuration_dir).await?; + let configuration = Configuration::parse_configuration(configuration_dir) + .await + .map_err(|err| ParseError::Other(err.into()))?; Ok(configuration) } From f5d4f623e79eddecc9ad3d10f94f3e24ad254e8f Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Sun, 17 Mar 2024 21:39:58 -0700 Subject: [PATCH 11/33] add objectTypes field to native queries --- crates/configuration/src/native_queries.rs | 12 +++++- crates/mongodb-connector/src/schema.rs | 37 ++++++++++++------- .../chinook/native_queries/hello.yaml | 10 ++++- 3 files changed, 42 insertions(+), 17 deletions(-) diff --git a/crates/configuration/src/native_queries.rs b/crates/configuration/src/native_queries.rs index 3a92d94a..19f54f69 100644 --- a/crates/configuration/src/native_queries.rs +++ b/crates/configuration/src/native_queries.rs @@ -2,7 +2,7 @@ use mongodb::{bson, options::SelectionCriteria}; use schemars::JsonSchema; use serde::Deserialize; -use crate::schema::{ObjectField, Type}; +use crate::schema::{ObjectField, Type, ObjectType}; /// An arbitrary database command using MongoDB's runCommand API. /// See https://www.mongodb.com/docs/manual/reference/method/db.runCommand/ @@ -12,7 +12,15 @@ pub struct NativeQuery { /// Name that will be used to identify the query in your data graph pub name: String, - /// Type of data returned by the query. + /// You may define object types here to reference in `result_type`. Any types defined here will + /// be merged with the definitions in `schema.json`. This allows you to maintain hand-written + /// types for native queries without having to edit a generated `schema.json` file. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub object_types: Vec, + + /// Type of data returned by the query. You may reference object types defined in the + /// `object_types` list in this definition, or you may reference object types from + /// `schema.json`. pub result_type: Type, /// Arguments for per-query customization diff --git a/crates/mongodb-connector/src/schema.rs b/crates/mongodb-connector/src/schema.rs index 5cb4a7a5..c06131a2 100644 --- a/crates/mongodb-connector/src/schema.rs +++ b/crates/mongodb-connector/src/schema.rs @@ -12,9 +12,19 @@ pub async fn get_schema( config: &Configuration, ) -> Result { let schema = &config.schema; - let object_types = map_object_types(&schema.object_types); let collections = schema.collections.iter().map(map_collection).collect(); + let object_types_from_schema = map_object_types(&schema.object_types); + let object_types_from_native_queries = map_object_types( + config + .native_queries + .iter() + .flat_map(|native_query| &native_query.object_types), + ); + let object_types = object_types_from_schema + .chain(object_types_from_native_queries) + .collect(); + let functions = config .native_queries .iter() @@ -38,19 +48,18 @@ pub async fn get_schema( }) } -fn map_object_types(object_types: &[schema::ObjectType]) -> BTreeMap { - object_types - .iter() - .map(|t| { - ( - t.name.clone(), - models::ObjectType { - fields: map_field_infos(&t.fields), - description: t.description.clone(), - }, - ) - }) - .collect() +fn map_object_types<'a>( + object_types: impl IntoIterator + 'a, +) -> impl Iterator + 'a { + object_types.into_iter().map(|t| { + ( + t.name.clone(), + models::ObjectType { + fields: map_field_infos(&t.fields), + description: t.description.clone(), + }, + ) + }) } fn map_field_infos(fields: &[schema::ObjectField]) -> BTreeMap { diff --git a/fixtures/connector/chinook/native_queries/hello.yaml b/fixtures/connector/chinook/native_queries/hello.yaml index 2f440194..c3523342 100644 --- a/fixtures/connector/chinook/native_queries/hello.yaml +++ b/fixtures/connector/chinook/native_queries/hello.yaml @@ -1,4 +1,12 @@ name: hello -result_type: !scalar string +objectTypes: + - name: helloResult + fields: + - name: ok + type: !scalar int + - name: readOnly + type: !scalar bool + # There are more fields but you get the idea +resultType: !object helloResult command: hello: 1 From 5385363b4424e90e17e28f924167e621ae02f144 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Sun, 17 Mar 2024 21:53:39 -0700 Subject: [PATCH 12/33] update fixtures with object types --- .../chinook/native_queries/hello.yaml | 4 +-- .../ddn/subgraphs/chinook/commands/Hello.hml | 34 +++++++++++++++++-- .../chinook/dataconnectors/mongodb.hml | 8 ++++- 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/fixtures/connector/chinook/native_queries/hello.yaml b/fixtures/connector/chinook/native_queries/hello.yaml index c3523342..36b14855 100644 --- a/fixtures/connector/chinook/native_queries/hello.yaml +++ b/fixtures/connector/chinook/native_queries/hello.yaml @@ -1,12 +1,12 @@ name: hello objectTypes: - - name: helloResult + - name: HelloResult fields: - name: ok type: !scalar int - name: readOnly type: !scalar bool # There are more fields but you get the idea -resultType: !object helloResult +resultType: !object HelloResult command: hello: 1 diff --git a/fixtures/ddn/subgraphs/chinook/commands/Hello.hml b/fixtures/ddn/subgraphs/chinook/commands/Hello.hml index c194a5ba..cfdebd65 100644 --- a/fixtures/ddn/subgraphs/chinook/commands/Hello.hml +++ b/fixtures/ddn/subgraphs/chinook/commands/Hello.hml @@ -3,16 +3,21 @@ version: v1 definition: name: hello description: Example of a read-only native query - outputType: String + outputType: HelloResult arguments: [] source: dataConnectorName: mongodb dataConnectorCommand: function: hello + typeMapping: + HelloResult: + fieldMapping: + ok: { column: ok } + readOnly: { column: readOnly } graphql: rootFieldName: hello rootFieldKind: Query - + --- kind: CommandPermissions version: v1 @@ -21,3 +26,28 @@ definition: permissions: - role: admin allowExecution: true + +--- +kind: ObjectType +version: v1 +definition: + name: HelloResult + graphql: + typeName: HelloResult + fields: + - name: ok + type: Int! + - name: readOnly + type: Boolean! + +--- +kind: TypePermissions +version: v1 +definition: + typeName: HelloResult + permissions: + - role: admin + output: + allowedFields: + - ok + - readOnly diff --git a/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml b/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml index 7127da8b..4eb5585b 100644 --- a/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml +++ b/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml @@ -905,6 +905,12 @@ definition: underlying_type: type: named name: ObjectId + HelloResult: + fields: + ok: + type: { type: named, name: Int } + readOnly: + type: { type: named, name: Boolean } collections: - name: Album arguments: {} @@ -996,7 +1002,7 @@ definition: foreign_keys: {} functions: - name: hello - result_type: { type: named, name: String } + result_type: { type: named, name: HelloResult } arguments: {} command: { hello: 1 } procedures: [] From e6b32b12024665318c136eac5c3733ccb3fac83a Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Mon, 18 Mar 2024 11:43:07 -0700 Subject: [PATCH 13/33] fix a typo --- crates/configuration/src/native_queries.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/configuration/src/native_queries.rs b/crates/configuration/src/native_queries.rs index 94688cb4..6153a4bf 100644 --- a/crates/configuration/src/native_queries.rs +++ b/crates/configuration/src/native_queries.rs @@ -42,7 +42,7 @@ pub struct NativeQuery { /// Set to `readWrite` if this native query might modify data in the database. When refreshing /// a dataconnector native queries will appear in the corresponding `DataConnectorLink` - /// definition as `functions` if they are read-only, or as `procedures` if they are read-rite. + /// definition as `functions` if they are read-only, or as `procedures` if they are read-write. /// Functions are intended to map to GraphQL Query fields, while procedures map to Mutation /// fields. #[serde(default)] From 0d098d3392129256f42d760f4a5df312fb7c75e8 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Mon, 18 Mar 2024 12:49:33 -0700 Subject: [PATCH 14/33] check for duplicate names when parsing configuration --- crates/cli/src/lib.rs | 2 +- crates/configuration/src/configuration.rs | 92 ++++++++++++++++++++++- crates/configuration/src/directory.rs | 5 +- crates/mongodb-connector/src/schema.rs | 32 +++----- 4 files changed, 99 insertions(+), 32 deletions(-) diff --git a/crates/cli/src/lib.rs b/crates/cli/src/lib.rs index b37c4ee2..88eedde5 100644 --- a/crates/cli/src/lib.rs +++ b/crates/cli/src/lib.rs @@ -32,7 +32,7 @@ pub async fn run(command: Command, context: &Context) -> anyhow::Result<()> { /// Update the configuration in the current directory by introspecting the database. async fn update(context: &Context) -> anyhow::Result<()> { let schema = introspection::get_metadata_from_validation_schema(&context.mongo_config).await?; - let configuration = Configuration::from_schema(schema); + let configuration = Configuration::from_schema(schema)?; configuration::write_directory(&context.path, &configuration).await?; diff --git a/crates/configuration/src/configuration.rs b/crates/configuration/src/configuration.rs index 1081ef26..8a5bd1a6 100644 --- a/crates/configuration/src/configuration.rs +++ b/crates/configuration/src/configuration.rs @@ -1,9 +1,11 @@ use std::path::Path; +use anyhow::ensure; +use itertools::Itertools; use schemars::JsonSchema; use serde::Deserialize; -use crate::{native_queries::NativeQuery, read_directory, Schema}; +use crate::{native_queries::NativeQuery, read_directory, schema::ObjectType, Schema}; #[derive(Clone, Debug, Default, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] @@ -18,11 +20,45 @@ pub struct Configuration { } impl Configuration { - pub fn from_schema(schema: Schema) -> Self { - Self { + pub fn validate(schema: Schema, native_queries: Vec) -> anyhow::Result { + let config = Configuration { schema, - ..Default::default() + native_queries, + }; + + { + let duplicate_type_names: Vec<&str> = config + .object_types() + .map(|t| t.name.as_ref()) + .duplicates() + .collect(); + ensure!( + duplicate_type_names.is_empty(), + "configuration contains multiple definitions for these object type names: {}", + duplicate_type_names.join(", ") + ); + } + + { + let duplicate_collection_names: Vec<&str> = config + .schema + .collections + .iter() + .map(|c| c.name.as_ref()) + .duplicates() + .collect(); + ensure!( + duplicate_collection_names.is_empty(), + "configuration contains multiple definitions for these collection names: {}", + duplicate_collection_names.join(", ") + ); } + + Ok(config) + } + + pub fn from_schema(schema: Schema) -> anyhow::Result { + Self::validate(schema, Default::default()) } pub async fn parse_configuration( @@ -30,4 +66,52 @@ impl Configuration { ) -> anyhow::Result { read_directory(configuration_dir).await } + + /// Returns object types collected from schema and native queries + pub fn object_types(&self) -> impl Iterator { + let object_types_from_schema = self.schema.object_types.iter(); + let object_types_from_native_queries = self + .native_queries + .iter() + .flat_map(|native_query| &native_query.object_types); + object_types_from_schema.chain(object_types_from_native_queries) + } +} + +#[cfg(test)] +mod tests { + use mongodb::bson::doc; + + use super::*; + use crate::{schema::Type, Schema}; + + #[test] + fn fails_with_duplicate_object_types() { + let schema = Schema { + collections: Default::default(), + object_types: vec![ObjectType { + name: "Album".to_owned(), + fields: Default::default(), + description: Default::default(), + }], + }; + let native_queries = vec![NativeQuery { + name: "hello".to_owned(), + object_types: vec![ObjectType { + name: "Album".to_owned(), + fields: Default::default(), + description: Default::default(), + }], + result_type: Type::Object("Album".to_owned()), + command: doc! { "command": 1 }, + arguments: Default::default(), + selection_criteria: Default::default(), + description: Default::default(), + mode: Default::default(), + }]; + let result = Configuration::validate(schema, native_queries); + let error_msg = result.unwrap_err().to_string(); + assert!(error_msg.contains("multiple definitions")); + assert!(error_msg.contains("Album")); + } } diff --git a/crates/configuration/src/directory.rs b/crates/configuration/src/directory.rs index 2ec5618e..fd616e71 100644 --- a/crates/configuration/src/directory.rs +++ b/crates/configuration/src/directory.rs @@ -36,10 +36,7 @@ pub async fn read_directory( .await? .unwrap_or_default(); - Ok(Configuration { - schema, - native_queries, - }) + Configuration::validate(schema, native_queries) } /// Parse all files in a directory with one of the allowed configuration extensions according to diff --git a/crates/mongodb-connector/src/schema.rs b/crates/mongodb-connector/src/schema.rs index c06131a2..d5f265e8 100644 --- a/crates/mongodb-connector/src/schema.rs +++ b/crates/mongodb-connector/src/schema.rs @@ -13,17 +13,7 @@ pub async fn get_schema( ) -> Result { let schema = &config.schema; let collections = schema.collections.iter().map(map_collection).collect(); - - let object_types_from_schema = map_object_types(&schema.object_types); - let object_types_from_native_queries = map_object_types( - config - .native_queries - .iter() - .flat_map(|native_query| &native_query.object_types), - ); - let object_types = object_types_from_schema - .chain(object_types_from_native_queries) - .collect(); + let object_types = config.object_types().map(map_object_type).collect(); let functions = config .native_queries @@ -48,18 +38,14 @@ pub async fn get_schema( }) } -fn map_object_types<'a>( - object_types: impl IntoIterator + 'a, -) -> impl Iterator + 'a { - object_types.into_iter().map(|t| { - ( - t.name.clone(), - models::ObjectType { - fields: map_field_infos(&t.fields), - description: t.description.clone(), - }, - ) - }) +fn map_object_type(object_type: &schema::ObjectType) -> (String, models::ObjectType) { + ( + object_type.name.clone(), + models::ObjectType { + fields: map_field_infos(&object_type.fields), + description: object_type.description.clone(), + }, + ) } fn map_field_infos(fields: &[schema::ObjectField]) -> BTreeMap { From 4100fd77cad7b01b66542366310db34fe8ff5bc9 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Mon, 18 Mar 2024 17:49:16 -0700 Subject: [PATCH 15/33] implementation for read-write native queries --- Cargo.lock | 1 + crates/mongodb-connector/Cargo.toml | 1 + crates/mongodb-connector/src/main.rs | 1 + .../mongodb-connector/src/mongo_connector.rs | 10 +- crates/mongodb-connector/src/mutation.rs | 110 ++++++++++++++++++ 5 files changed, 117 insertions(+), 6 deletions(-) create mode 100644 crates/mongodb-connector/src/mutation.rs diff --git a/Cargo.lock b/Cargo.lock index 6401d568..6b300ed8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1557,6 +1557,7 @@ dependencies = [ "dc-api-test-helpers", "dc-api-types", "enum-iterator", + "futures", "http", "indexmap 2.2.5", "itertools 0.10.5", diff --git a/crates/mongodb-connector/Cargo.toml b/crates/mongodb-connector/Cargo.toml index 0632b67c..36a21468 100644 --- a/crates/mongodb-connector/Cargo.toml +++ b/crates/mongodb-connector/Cargo.toml @@ -10,6 +10,7 @@ configuration = { path = "../configuration" } dc-api = { path = "../dc-api" } dc-api-types = { path = "../dc-api-types" } enum-iterator = "1.4.1" +futures = "^0.3" http = "^0.2" indexmap = { version = "2.1.0", features = ["serde"] } itertools = "^0.10" diff --git a/crates/mongodb-connector/src/main.rs b/crates/mongodb-connector/src/main.rs index 26c46d0b..aadcefad 100644 --- a/crates/mongodb-connector/src/main.rs +++ b/crates/mongodb-connector/src/main.rs @@ -2,6 +2,7 @@ mod api_type_conversions; mod capabilities; mod error_mapping; mod mongo_connector; +mod mutation; mod schema; use std::error::Error; diff --git a/crates/mongodb-connector/src/mongo_connector.rs b/crates/mongodb-connector/src/mongo_connector.rs index f23c4338..77f16cc5 100644 --- a/crates/mongodb-connector/src/mongo_connector.rs +++ b/crates/mongodb-connector/src/mongo_connector.rs @@ -19,7 +19,6 @@ use ndc_sdk::{ }, }; -use crate::capabilities::mongo_capabilities_response; use crate::{ api_type_conversions::{ v2_to_v3_explain_response, v2_to_v3_query_response, v3_to_v2_query_request, QueryContext, @@ -27,6 +26,7 @@ use crate::{ capabilities::scalar_types, error_mapping::{mongo_agent_error_to_explain_error, mongo_agent_error_to_query_error}, }; +use crate::{capabilities::mongo_capabilities_response, mutation::handle_mutation_request}; #[derive(Clone, Default)] pub struct MongoConnector; @@ -115,12 +115,10 @@ impl Connector for MongoConnector { async fn mutation( _configuration: &Self::Configuration, - _state: &Self::State, - _request: MutationRequest, + state: &Self::State, + request: MutationRequest, ) -> Result, MutationError> { - Err(MutationError::UnsupportedOperation( - "The MongoDB agent does not yet support mutations".to_owned(), - )) + handle_mutation_request(state, request).await } async fn query( diff --git a/crates/mongodb-connector/src/mutation.rs b/crates/mongodb-connector/src/mutation.rs new file mode 100644 index 00000000..737842fd --- /dev/null +++ b/crates/mongodb-connector/src/mutation.rs @@ -0,0 +1,110 @@ +use std::collections::BTreeMap; + +use configuration::native_queries::NativeQuery; +use futures::future::try_join_all; +use itertools::Itertools; +use mongodb::Database; +use mongodb_agent_common::interface_types::MongoConfig; +use ndc_sdk::{ + connector::MutationError, + json_response::JsonResponse, + models::{ + MutationOperation, MutationOperationResults, MutationRequest, MutationResponse, NestedField, + }, +}; +use serde_json::Value; + +/// A procedure combined with inputs +#[derive(Clone, Debug)] +#[allow(dead_code)] +struct Job<'a> { + // For the time being all procedures are native queries. + native_query: &'a NativeQuery, + arguments: BTreeMap, + fields: Option, +} + +impl<'a> Job<'a> { + pub fn new( + native_query: &'a NativeQuery, + arguments: BTreeMap, + fields: Option, + ) -> Self { + Job { + native_query, + arguments, + fields, + } + } +} + +pub async fn handle_mutation_request( + config: &MongoConfig, + mutation_request: MutationRequest, +) -> Result, MutationError> { + tracing::debug!(?config, mutation_request = %serde_json::to_string(&mutation_request).unwrap(), "executing mutation"); + let database = config.client.database(&config.database); + let jobs = look_up_procedures(config, mutation_request)?; + let operation_results = try_join_all( + jobs.into_iter() + .map(|job| execute_job(database.clone(), job)), + ) + .await?; + Ok(JsonResponse::Value(MutationResponse { operation_results })) +} + +/// Looks up procedures according to the names given in the mutation request, and pairs them with +/// arguments and requested fields. Returns an error if any procedures cannot be found. +fn look_up_procedures( + config: &MongoConfig, + mutation_request: MutationRequest, +) -> Result>, MutationError> { + let (jobs, not_found): (Vec, Vec) = mutation_request + .operations + .into_iter() + .map(|operation| match operation { + MutationOperation::Procedure { + name, + arguments, + fields, + } => { + let native_query = config + .native_queries + .iter() + .find(|native_query| native_query.name == name); + native_query + .ok_or(name) + .map(|nq| Job::new(nq, arguments, fields)) + } + }) + .partition_result(); + + if !not_found.is_empty() { + return Err(MutationError::UnprocessableContent(format!( + "request includes unknown procedures: {}", + not_found.join(", ") + ))); + } + + Ok(jobs) +} + +async fn execute_job( + database: Database, + job: Job<'_>, +) -> Result { + let result = database + .run_command(job.native_query.command.clone(), None) + .await + .map_err(|err| match *err.kind { + mongodb::error::ErrorKind::InvalidArgument { message, .. } => { + MutationError::UnprocessableContent(message) + } + err => MutationError::Other(Box::new(err)), + })?; + let json_result = + serde_json::to_value(result).map_err(|err| MutationError::Other(Box::new(err)))?; + Ok(MutationOperationResults::Procedure { + result: json_result, + }) +} From 6af42b60d6a60ca1e29387e73be6cd7afc350764 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Tue, 19 Mar 2024 11:59:21 -0700 Subject: [PATCH 16/33] remove fields from Job if we're not going to use it --- crates/mongodb-connector/src/mutation.rs | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/crates/mongodb-connector/src/mutation.rs b/crates/mongodb-connector/src/mutation.rs index 737842fd..2388d952 100644 --- a/crates/mongodb-connector/src/mutation.rs +++ b/crates/mongodb-connector/src/mutation.rs @@ -8,9 +8,7 @@ use mongodb_agent_common::interface_types::MongoConfig; use ndc_sdk::{ connector::MutationError, json_response::JsonResponse, - models::{ - MutationOperation, MutationOperationResults, MutationRequest, MutationResponse, NestedField, - }, + models::{MutationOperation, MutationOperationResults, MutationRequest, MutationResponse}, }; use serde_json::Value; @@ -21,19 +19,13 @@ struct Job<'a> { // For the time being all procedures are native queries. native_query: &'a NativeQuery, arguments: BTreeMap, - fields: Option, } impl<'a> Job<'a> { - pub fn new( - native_query: &'a NativeQuery, - arguments: BTreeMap, - fields: Option, - ) -> Self { + pub fn new(native_query: &'a NativeQuery, arguments: BTreeMap) -> Self { Job { native_query, arguments, - fields, } } } @@ -64,17 +56,13 @@ fn look_up_procedures( .into_iter() .map(|operation| match operation { MutationOperation::Procedure { - name, - arguments, - fields, + name, arguments, .. } => { let native_query = config .native_queries .iter() .find(|native_query| native_query.name == name); - native_query - .ok_or(name) - .map(|nq| Job::new(nq, arguments, fields)) + native_query.ok_or(name).map(|nq| Job::new(nq, arguments)) } }) .partition_result(); From a092d72233f67dea1f883064034dd10a6b6f7932 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Tue, 19 Mar 2024 12:41:53 -0700 Subject: [PATCH 17/33] add example mutation, insertArtist, in fixtures --- .../chinook/native_queries/hello.yaml | 1 + .../chinook/native_queries/insert_artist.yaml | 16 ++++++ .../chinook/commands/InsertArtist.hml | 54 +++++++++++++++++++ .../chinook/dataconnectors/mongodb.hml | 14 ++++- 4 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 fixtures/connector/chinook/native_queries/insert_artist.yaml create mode 100644 fixtures/ddn/subgraphs/chinook/commands/InsertArtist.hml diff --git a/fixtures/connector/chinook/native_queries/hello.yaml b/fixtures/connector/chinook/native_queries/hello.yaml index 36b14855..e7b7a575 100644 --- a/fixtures/connector/chinook/native_queries/hello.yaml +++ b/fixtures/connector/chinook/native_queries/hello.yaml @@ -1,4 +1,5 @@ name: hello +description: Example of a read-only native query objectTypes: - name: HelloResult fields: diff --git a/fixtures/connector/chinook/native_queries/insert_artist.yaml b/fixtures/connector/chinook/native_queries/insert_artist.yaml new file mode 100644 index 00000000..3dd16c70 --- /dev/null +++ b/fixtures/connector/chinook/native_queries/insert_artist.yaml @@ -0,0 +1,16 @@ +name: insertArtist +description: Example of a database update using a native query +objectTypes: + - name: InsertArtist + fields: + - name: ok + type: !scalar int + - name: n + type: !scalar int +resultType: !object InsertArtist +# TODO: implement arguments instead of hard-coding inputs +command: + insert: "Artist" + documents: + - ArtistId: 1001 + - Name: Regina Spektor diff --git a/fixtures/ddn/subgraphs/chinook/commands/InsertArtist.hml b/fixtures/ddn/subgraphs/chinook/commands/InsertArtist.hml new file mode 100644 index 00000000..7b1d3fff --- /dev/null +++ b/fixtures/ddn/subgraphs/chinook/commands/InsertArtist.hml @@ -0,0 +1,54 @@ +kind: Command +version: v1 +definition: + name: insertArtist + description: Example of a database update using a native query + outputType: InsertArtist + arguments: [] + source: + dataConnectorName: mongodb + dataConnectorCommand: + procedure: insertArtist + typeMapping: + InsertArtist: + fieldMapping: + ok: { column: ok } + n: { column: n } + graphql: + rootFieldName: insertArtist + rootFieldKind: Mutation + +--- +kind: CommandPermissions +version: v1 +definition: + commandName: insertArtist + permissions: + - role: admin + allowExecution: true + +--- +kind: ObjectType +version: v1 +definition: + name: InsertArtist + graphql: + typeName: InsertArtist + fields: + - name: ok + type: Int! + - name: n + type: Int! + +--- +kind: TypePermissions +version: v1 +definition: + typeName: InsertArtist + permissions: + - role: admin + output: + allowedFields: + - ok + - n + diff --git a/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml b/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml index 4eb5585b..d94ec308 100644 --- a/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml +++ b/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml @@ -911,6 +911,12 @@ definition: type: { type: named, name: Int } readOnly: type: { type: named, name: Boolean } + InsertArtist: + fields: + ok: + type: { type: named, name: Int } + n: + type: { type: named, name: Int } collections: - name: Album arguments: {} @@ -1002,10 +1008,16 @@ definition: foreign_keys: {} functions: - name: hello + description: Example of a read-only native query result_type: { type: named, name: HelloResult } arguments: {} command: { hello: 1 } - procedures: [] + procedures: + - name: insertArtist + description: Example of a database update using a native query + result_type: { type: named, name: InsertArtist } + arguments: {} + command: { insert: Artist, documents: [{ ArtistId: 1001, Name: Regina Spektor }] } capabilities: version: ^0.1.0 capabilities: From 3c7e9dfbc356cefb04a4e7d39701872283caf368 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Wed, 20 Mar 2024 09:56:32 -0700 Subject: [PATCH 18/33] fixture was inserting two documents --- fixtures/connector/chinook/native_queries/insert_artist.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fixtures/connector/chinook/native_queries/insert_artist.yaml b/fixtures/connector/chinook/native_queries/insert_artist.yaml index 3dd16c70..d6803340 100644 --- a/fixtures/connector/chinook/native_queries/insert_artist.yaml +++ b/fixtures/connector/chinook/native_queries/insert_artist.yaml @@ -13,4 +13,4 @@ command: insert: "Artist" documents: - ArtistId: 1001 - - Name: Regina Spektor + Name: Regina Spektor From fae1840a2b85872385c67de5e9f867b0c788f203 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 21 Mar 2024 12:39:43 -0700 Subject: [PATCH 19/33] wip: --- .../src/command/command.rs | 282 ++++++++++++++++++ .../mongodb-agent-common/src/command/error.rs | 24 ++ .../mongodb-agent-common/src/command/mod.rs | 5 + crates/mongodb-support/src/bson_type.rs | 36 +++ 4 files changed, 347 insertions(+) create mode 100644 crates/mongodb-agent-common/src/command/command.rs create mode 100644 crates/mongodb-agent-common/src/command/error.rs create mode 100644 crates/mongodb-agent-common/src/command/mod.rs diff --git a/crates/mongodb-agent-common/src/command/command.rs b/crates/mongodb-agent-common/src/command/command.rs new file mode 100644 index 00000000..1bfe6eb0 --- /dev/null +++ b/crates/mongodb-agent-common/src/command/command.rs @@ -0,0 +1,282 @@ +use std::collections::BTreeMap; + +use configuration::schema::{ObjectField, Type}; +use itertools::Itertools; +use mongodb::bson::{self, Bson}; +use mongodb_support::BsonScalarType; +use serde_json::Value; + +use super::CommandError; + +// type JsonObject = serde_json::Map; + +/// Parse native query commands, and interpolate variables. Input is serde_json::Value because our +/// configuration format is JSON. Output is BSON because that is the format that MongoDB commands +/// use. +pub fn interpolate( + command: &bson::Document, + parameters: &[ObjectField], + arguments: &BTreeMap, +) -> Result { + // let arguments_bson: BTreeMap = arguments + // .iter() + // .map(|(key, value)| -> Result<(String, Bson), CommandError> { + // Ok((key.to_owned(), value.clone().try_into()?)) + // }) + // .try_collect()?; + interpolate_helper(&command.into(), parameters, arguments) +} + +fn interpolate_helper( + command: &Bson, + parameters: &[ObjectField], + arguments: &BTreeMap, +) -> Result { + // let result = match command { + // exp @ Value::Null => exp.clone(), + // exp @ Value::Bool(_) => exp.clone(), + // exp @ Value::Number(_) => exp.clone(), + // Value::String(string) => interpolate_string(string, parameters, arguments)?, + // Value::Array(_) => todo!(), + // Value::Object(_) => todo!(), + // }; + + let result = match command { + Bson::Array(values) => values + .iter() + .map(|value| interpolate_helper(value, parameters, arguments)) + .try_collect()?, + Bson::Document(doc) => interpolate_document(doc.clone(), parameters, arguments)?.into(), + Bson::String(string) => interpolate_string(string, parameters, arguments)?, + Bson::RegularExpression(_) => todo!(), + Bson::JavaScriptCode(_) => todo!(), + Bson::JavaScriptCodeWithScope(_) => todo!(), + value => value.clone(), + }; + + Ok(result) +} + +fn interpolate_document( + document: bson::Document, + parameters: &[ObjectField], + arguments: &BTreeMap, +) -> Result { + document + .into_iter() + .map(|(key, value)| { + let interpolated_value = interpolate_helper(&value, parameters, arguments)?; + let interpolated_key = interpolate_string(&key, parameters, arguments)?; + match interpolated_key { + Bson::String(string_key) => Ok((string_key, interpolated_value)), + _ => Err(CommandError::NonStringKey(interpolated_key)), + } + }) + .try_collect() +} + +/// Substitute placeholders within a string in the input template. This may produce an output that +/// is not a string if the entire content of the string is a placeholder. For example, +/// +/// { "key": "{{recordId}}" } +/// +/// might expand to, +/// +/// { "key": 42 } +/// +/// if the type of the variable `recordId` is `int`. +fn interpolate_string( + string: &str, + parameters: &[ObjectField], + arguments: &BTreeMap, +) -> Result { + let parts = parse_native_query(string); + if parts.len() == 1 { + let mut parts = parts; + match parts.remove(0) { + NativeQueryPart::Text(string) => Ok(Bson::String(string)), + NativeQueryPart::Parameter(param) => resolve_argument(¶m, parameters, arguments), + } + } else { + todo!() + } +} + +/// Looks up an argument value for a given parameter, and produces a BSON value that matches the +/// declared parameter type. +fn resolve_argument( + param_name: &str, + parameters: &[ObjectField], + arguments: &BTreeMap, +) -> Result { + let parameter = parameters + .iter() + .find(|arg| arg.name == param_name) + .ok_or_else(|| CommandError::UnknownParameter(param_name.to_owned()))?; + let argument_json = arguments + .get(param_name) + .ok_or_else(|| CommandError::MissingArgument(param_name.to_owned()))?; + let argument: Bson = argument_json.clone().try_into()?; + match parameter.r#type { + Type::Scalar(t) => resolve_scalar_argument(t, argument), + Type::Object(_) => todo!(), + Type::ArrayOf(_) => todo!(), + Type::Nullable(_) => todo!(), + } +} + +fn resolve_scalar_argument( + parameter_type: BsonScalarType, + argument: Bson, +) -> Result { + let argument_type: BsonScalarType = (&argument).try_into()?; + if argument_type == parameter_type { + Ok(argument) + } else { + Err(CommandError::TypeMismatch(argument_type, parameter_type)) + } +} + +/// A part of a Native Query text, either raw text or a parameter. +#[derive(Debug, Clone, PartialEq, Eq)] +enum NativeQueryPart { + /// A raw text part + Text(String), + /// A parameter + Parameter(String), +} + +/// Parse a string or key in a native query into parts where variables have the syntax +/// `{{}}`. +fn parse_native_query(string: &str) -> Vec { + let vec: Vec> = string + .split("{{") + .filter(|part| !part.is_empty()) + .map(|part| match part.split_once("}}") { + None => vec![NativeQueryPart::Text(part.to_string())], + Some((var, text)) => { + if text.is_empty() { + vec![NativeQueryPart::Parameter(var.trim().to_owned())] + } else { + vec![ + NativeQueryPart::Parameter(var.trim().to_owned()), + NativeQueryPart::Text(text.to_string()), + ] + } + } + }) + .collect(); + vec.concat() +} + +#[cfg(test)] +mod tests { + use configuration::native_queries::NativeQuery; + use pretty_assertions::assert_eq; + use serde_json::json; + + use super::*; + + // TODO: extjson + // TODO: nullable + // TODO: optional + + #[test] + fn interpolates_non_string_type() -> anyhow::Result<()> { + let native_query_input = json!({ + "name": "insertArtist", + "resultType": { "object": "InsertArtist" }, + "arguments": [ + { "name": "id", "type": { "scalar": "int" } }, + { "name": "name", "type": { "scalar": "string" } }, + ], + "command": { + "insert": "Artist", + "documents": [{ + "ArtistId": "{{ id }}", + "Name": "{{name }}", + }], + }, + }); + let arguments = [ + ("id".to_owned(), json!(1001)), + ("name".to_owned(), json!("Regina Spektor")), + ] + .into_iter() + .collect(); + + let native_query: NativeQuery = serde_json::from_value(native_query_input)?; + let interpolated_command = interpolate( + &native_query.command.into(), + &native_query.arguments, + &arguments, + )?; + + assert_eq!( + interpolated_command, + bson::doc! { + "insert": "Artist", + "documents": [{ + "ArtistId": 1001, + "Name": "Regina Spektor", + }], + } + .into() + ); + Ok(()) + } + + #[test] + fn interpolates_array_argument() -> anyhow::Result<()> { + let native_query_input = json!({ + "name": "insertArtist", + "resultType": { "object": "InsertArtist" }, + "objectTypes": [{ + "name": "ArtistInput", + "fields": [ + { "name": "ArtistId", "type": { "scalar": "int" } }, + { "name": "Name", "type": { "scalar": "string" } }, + ], + }], + "arguments": [ + { "name": "documents", "type": { "arrayOf": { "object": "ArtistInput" } } }, + ], + "command": { + "insert": "Artist", + "documents": "{{ documents }}", + }, + }); + let arguments = [ + ("id".to_owned(), json!(1001)), + ("name".to_owned(), json!("Regina Spektor")), + ] + .into_iter() + .collect(); + + let native_query: NativeQuery = serde_json::from_value(native_query_input)?; + let interpolated_command = interpolate( + &native_query.command.into(), + &native_query.arguments, + &arguments, + )?; + + assert_eq!( + interpolated_command, + bson::doc! { + "insert": "Artist", + "documents": [ + { + "ArtistId": "{{ id }}", + "Name": "{{name }}", + }, + { + "ArtistId": "{{ id }}", + "Name": "{{name }}", + } + ], + } + .into() + ); + Ok(()) + } +} diff --git a/crates/mongodb-agent-common/src/command/error.rs b/crates/mongodb-agent-common/src/command/error.rs new file mode 100644 index 00000000..b66c033d --- /dev/null +++ b/crates/mongodb-agent-common/src/command/error.rs @@ -0,0 +1,24 @@ +use mongodb::bson::{self, Bson}; +use mongodb_support::BsonScalarType; +use thiserror::Error; + +#[derive(Clone, Debug, Error)] +pub enum CommandError { + #[error("error converting parsing argument as extjson: {0}")] + ExtJsonConversionError(#[from] bson::extjson::de::Error), + + #[error("invalid argument type: {0}")] + InvalidArgumentType(#[from] mongodb_support::error::Error), + + #[error("a required argument was not provided, \"{0}\"")] + MissingArgument(String), + + #[error("object keys must be strings, but got: \"{0}\"")] + NonStringKey(Bson), + + #[error("argument type, \"{0}\", does not match parameter type, \"{1}\"")] + TypeMismatch(BsonScalarType, BsonScalarType), + + #[error("an argument was provided for an undefined paremeter, \"{0}\"")] + UnknownParameter(String), +} diff --git a/crates/mongodb-agent-common/src/command/mod.rs b/crates/mongodb-agent-common/src/command/mod.rs new file mode 100644 index 00000000..96022527 --- /dev/null +++ b/crates/mongodb-agent-common/src/command/mod.rs @@ -0,0 +1,5 @@ +mod command; +mod error; + +pub use self::command::Command; +pub use self::error::CommandError; diff --git a/crates/mongodb-support/src/bson_type.rs b/crates/mongodb-support/src/bson_type.rs index 5f948553..d63061f3 100644 --- a/crates/mongodb-support/src/bson_type.rs +++ b/crates/mongodb-support/src/bson_type.rs @@ -168,6 +168,42 @@ impl BsonScalarType { } } +impl std::fmt::Display for BsonScalarType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.bson_name()) + } +} + +impl TryFrom<&Bson> for BsonScalarType { + type Error = Error; + + fn try_from(value: &Bson) -> Result { + match value { + Bson::Double(_) => Ok(S::Double), + Bson::String(_) => Ok(S::String), + Bson::Array(_) => Err(Error::ExpectedScalarType(BsonType::Array)), + Bson::Document(_) => Err(Error::ExpectedScalarType(BsonType::Object)), + Bson::Boolean(_) => Ok(S::Bool), + Bson::Null => Ok(S::Null), + Bson::RegularExpression(_) => Ok(S::Regex), + Bson::JavaScriptCode(_) => Ok(S::Javascript), + Bson::JavaScriptCodeWithScope(_) => Ok(S::JavascriptWithScope), + Bson::Int32(_) => Ok(S::Int), + Bson::Int64(_) => Ok(S::Long), + Bson::Timestamp(_) => Ok(S::Timestamp), + Bson::Binary(_) => Ok(S::BinData), + Bson::ObjectId(_) => Ok(S::ObjectId), + Bson::DateTime(_) => Ok(S::Date), + Bson::Symbol(_) => Ok(S::Symbol), + Bson::Decimal128(_) => Ok(S::Decimal), + Bson::Undefined => Ok(S::Undefined), + Bson::MaxKey => Ok(S::MaxKey), + Bson::MinKey => Ok(S::MinKey), + Bson::DbPointer(_) => Ok(S::DbPointer), + } + } +} + impl TryFrom for BsonScalarType { type Error = Error; From 795c59d58b2ac20df1838b99d91b041d71c4cf23 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 21 Mar 2024 22:23:08 -0700 Subject: [PATCH 20/33] add json_to_bson with implementations for scalar and object types --- Cargo.lock | 17 +- crates/configuration/src/schema/database.rs | 10 + crates/mongodb-agent-common/Cargo.toml | 1 + .../src/mongodb/json_to_bson.rs | 403 ++++++++++++++++++ .../mongodb-agent-common/src/mongodb/mod.rs | 1 + .../src/query/make_selector.rs | 26 +- crates/mongodb-support/Cargo.toml | 1 + crates/mongodb-support/src/bson_type.rs | 6 +- 8 files changed, 440 insertions(+), 25 deletions(-) create mode 100644 crates/mongodb-agent-common/src/mongodb/json_to_bson.rs diff --git a/Cargo.lock b/Cargo.lock index 3c784f24..065fd07e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -632,7 +632,7 @@ dependencies = [ "regex", "serde", "serde_json", - "serde_with 3.4.0", + "serde_with 3.7.0", ] [[package]] @@ -929,7 +929,7 @@ dependencies = [ "serde", "serde-enum-str", "serde_json", - "serde_with 3.4.0", + "serde_with 3.7.0", ] [[package]] @@ -1522,6 +1522,7 @@ dependencies = [ "schemars", "serde", "serde_json", + "serde_with 3.7.0", "thiserror", "time", "tokio", @@ -1584,6 +1585,7 @@ dependencies = [ "dc-api-types", "enum-iterator", "indexmap 1.9.3", + "mongodb", "schemars", "serde", "serde_json", @@ -2677,9 +2679,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.4.0" +version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64cd236ccc1b7a29e7e2739f27c0b2dd199804abc4290e32f59f3b68d6405c23" +checksum = "ee80b0e361bbf88fd2f6e242ccd19cfda072cb0faa6ae694ecee08199938569a" dependencies = [ "base64 0.21.5", "chrono", @@ -2687,8 +2689,9 @@ dependencies = [ "indexmap 1.9.3", "indexmap 2.2.5", "serde", + "serde_derive", "serde_json", - "serde_with_macros 3.4.0", + "serde_with_macros 3.7.0", "time", ] @@ -2718,9 +2721,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.4.0" +version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93634eb5f75a2323b16de4748022ac4297f9e76b6dced2be287a099f41b5e788" +checksum = "6561dc161a9224638a31d876ccdfefbc1df91d3f3a8342eddb35f055d48c7655" dependencies = [ "darling 0.20.3", "proc-macro2", diff --git a/crates/configuration/src/schema/database.rs b/crates/configuration/src/schema/database.rs index c82942e5..a9be8d0c 100644 --- a/crates/configuration/src/schema/database.rs +++ b/crates/configuration/src/schema/database.rs @@ -45,3 +45,13 @@ pub struct ObjectField { #[serde(default)] pub description: Option, } + +impl ObjectField { + pub fn new(name: &str, r#type: Type) -> Self { + ObjectField { + name: name.to_owned(), + r#type, + description: Default::default(), + } + } +} diff --git a/crates/mongodb-agent-common/Cargo.toml b/crates/mongodb-agent-common/Cargo.toml index a5e42698..69b98ba8 100644 --- a/crates/mongodb-agent-common/Cargo.toml +++ b/crates/mongodb-agent-common/Cargo.toml @@ -25,6 +25,7 @@ regex = "1" schemars = { version = "^0.8.12", features = ["smol_str"] } serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0", features = ["preserve_order"] } +serde_with = { version = "^3.7", features = ["base64", "hex"] } thiserror = "1" time = { version = "0.3.29", features = ["formatting", "parsing", "serde"] } tracing = "0.1" diff --git a/crates/mongodb-agent-common/src/mongodb/json_to_bson.rs b/crates/mongodb-agent-common/src/mongodb/json_to_bson.rs new file mode 100644 index 00000000..7c3c36b1 --- /dev/null +++ b/crates/mongodb-agent-common/src/mongodb/json_to_bson.rs @@ -0,0 +1,403 @@ +use std::{collections::BTreeMap, str::FromStr}; + +use configuration::schema::{ObjectType, Type}; +use itertools::Itertools as _; +use mongodb::bson::{self, Bson, Decimal128}; +use mongodb_support::BsonScalarType; +use serde::de::DeserializeOwned; +use serde_json::Value; +use thiserror::Error; +use time::{format_description::well_known::Iso8601, OffsetDateTime}; + +#[derive(Debug, Error)] +pub enum JsonToBsonError { + #[error("error converting \"{1}\" to type, \"{0:?}\"")] + ConversionError(Type, Value), + + #[error("error converting \"{1}\" to type, \"{0:?}\": {2}")] + ConversionErrorWithContext(Type, Value, #[source] anyhow::Error), + + #[error("cannot use value, \"{0:?}\", in position of type, \"{1:?}\"")] + IncompatibleType(Type, Value), + + #[error("input with BSON type {expected_type:?} should be encoded in GraphQL as {expected_backing_type}, but got: {value}")] + IncompatibleBackingType { + expected_type: Type, + expected_backing_type: &'static str, + value: Value, + }, + + #[error("input object of type \"{0:?}\" is missing a field, \"{1}\"")] + MissingObjectField(Type, String), + + #[error("inputs of type {0} are not implemented")] + NotImplemented(BsonScalarType), + + #[error("error deserializing input: {0}")] + SerdeError(#[from] serde_json::Error), + + #[error("unknown object type, \"{0}\"")] + UnknownObjectType(String), +} + +type Result = std::result::Result; + +/// Converts JSON input to BSON according to an expected BSON type. +/// +/// The BSON library already has a `Deserialize` impl that can convert from JSON. But that +/// implementation cannot take advantage of the type information that we have available. Instead it +/// uses Extended JSON which uses tags in JSON data to distinguish BSON types. +pub fn json_to_bson( + expected_type: Type, + object_types: &BTreeMap, + value: Value, +) -> Result { + match expected_type { + Type::Scalar(t) => json_to_bson_scalar(t, value), + Type::Object(object_type_name) => { + let object_type = object_types + .get(&object_type_name) + .ok_or_else(|| JsonToBsonError::UnknownObjectType(object_type_name))?; + convert_object(object_type, object_types, value) + } + Type::ArrayOf(_) => todo!(), + Type::Nullable(_) => todo!(), + } +} + +/// Works like json_to_bson, but only converts BSON scalar types. +pub fn json_to_bson_scalar(expected_type: BsonScalarType, value: Value) -> Result { + let result = match expected_type { + // BsonScalarType::Double => Bson::Double(from_number(expected_type, value)?), + BsonScalarType::Double => Bson::Double(deserialize(expected_type, value)?), + BsonScalarType::Int => Bson::Int32(deserialize(expected_type, value)?), + BsonScalarType::Long => Bson::Int64(deserialize(expected_type, value)?), + BsonScalarType::Decimal => Bson::Decimal128( + Decimal128::from_str(&from_string(expected_type, value.clone())?).map_err(|err| { + JsonToBsonError::ConversionErrorWithContext( + Type::Scalar(expected_type), + value, + err.into(), + ) + })?, + ), + BsonScalarType::String => Bson::String(from_string(expected_type, value)?), + BsonScalarType::Date => convert_date(&from_string(expected_type, value)?)?, + BsonScalarType::Timestamp => deserialize::(expected_type, value)?.into(), + BsonScalarType::BinData => deserialize::(expected_type, value)?.into(), + BsonScalarType::ObjectId => Bson::ObjectId(deserialize(expected_type, value)?), + // BsonScalarType::ObjectId => Bson::ObjectId( + // ObjectId::from_str(&from_string(expected_type, value)?).map_err(|err| { + // JsonToBsonError::ConversionErrorWithContext( + // Type::Scalar(expected_type), + // value, + // err.into(), + // ) + // })?, + // ), + BsonScalarType::Bool => match value { + Value::Bool(b) => Bson::Boolean(b), + _ => incompatible_scalar_type(BsonScalarType::Bool, value)?, + }, + BsonScalarType::Null => match value { + Value::Null => Bson::Null, + _ => incompatible_scalar_type(BsonScalarType::Null, value)?, + }, + BsonScalarType::Undefined => match value { + Value::Null => Bson::Undefined, + _ => incompatible_scalar_type(BsonScalarType::Undefined, value)?, + }, + BsonScalarType::Regex => deserialize::(expected_type, value)?.into(), + BsonScalarType::Javascript => Bson::JavaScriptCode(deserialize(expected_type, value)?), + BsonScalarType::JavascriptWithScope => { + deserialize::(expected_type, value)?.into() + } + BsonScalarType::MinKey => Bson::MinKey, + BsonScalarType::MaxKey => Bson::MaxKey, + BsonScalarType::Symbol => Bson::Symbol(deserialize(expected_type, value)?), + // dbPointer is deprecated + BsonScalarType::DbPointer => Err(JsonToBsonError::NotImplemented(expected_type))?, + }; + Ok(result) +} + +/// Types defined just to get deserialization logic for BSON "scalar" types that are represented in +/// JSON as composite structures. The types here are designed to match the representations of BSON +/// types in extjson. +mod de { + use mongodb::bson::{self, Bson}; + use serde::Deserialize; + use serde_with::{base64::Base64, hex::Hex, serde_as}; + + #[serde_as] + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + pub struct BinData { + #[serde_as(as = "Base64")] + base64: Vec, + #[serde_as(as = "Hex")] + sub_type: [u8; 1], + } + + impl From for Bson { + fn from(value: BinData) -> Self { + Bson::Binary(bson::Binary { + bytes: value.base64, + subtype: value.sub_type[0].into(), + }) + } + } + + #[derive(Deserialize)] + pub struct JavaScripCodetWithScope { + #[serde(rename = "$code")] + code: String, + #[serde(rename = "$scope")] + scope: bson::Document, + } + + impl From for Bson { + fn from(value: JavaScripCodetWithScope) -> Self { + Bson::JavaScriptCodeWithScope(bson::JavaScriptCodeWithScope { + code: value.code, + scope: value.scope, + }) + } + } + + #[derive(Deserialize)] + pub struct Regex { + pattern: String, + options: String, + } + + impl From for Bson { + fn from(value: Regex) -> Self { + Bson::RegularExpression(bson::Regex { + pattern: value.pattern, + options: value.options, + }) + } + } + + #[derive(Deserialize)] + pub struct Timestamp { + t: u32, + i: u32, + } + + impl From for Bson { + fn from(value: Timestamp) -> Self { + Bson::Timestamp(bson::Timestamp { + time: value.t, + increment: value.i, + }) + } + } +} + +fn convert_object( + object_type: &ObjectType, + object_types: &BTreeMap, + value: Value, +) -> Result { + let input_fields: BTreeMap = serde_json::from_value(value)?; + let bson_doc: bson::Document = object_type + .fields + .iter() + .map(|field| { + let input_field_value = input_fields.get(&field.name).ok_or_else(|| { + JsonToBsonError::MissingObjectField( + Type::Object(object_type.name.clone()), + field.name.clone(), + ) + })?; + Ok(( + field.name.clone(), + json_to_bson( + field.r#type.clone(), + object_types, + input_field_value.clone(), + )?, + )) + }) + .try_collect::<_, _, JsonToBsonError>()?; + Ok(bson_doc.into()) +} + +fn convert_date(value: &str) -> Result { + let date = OffsetDateTime::parse(value, &Iso8601::DEFAULT).map_err(|err| { + JsonToBsonError::ConversionErrorWithContext( + Type::Scalar(BsonScalarType::Date), + Value::String(value.to_owned()), + err.into(), + ) + })?; + Ok(Bson::DateTime(bson::DateTime::from_system_time( + date.into(), + ))) +} + +// fn convert_timestamp(value: Value) -> Result { +// match value +// } + +fn deserialize(expected_type: BsonScalarType, value: Value) -> Result +where + T: DeserializeOwned, +{ + serde_json::from_value::(value.clone()).map_err(|err| { + JsonToBsonError::ConversionErrorWithContext(Type::Scalar(expected_type), value, err.into()) + }) +} + +// fn from_number(expected_type: BsonScalarType, value: Value) -> Result +// where +// T: NumCast, +// { +// let mk_err = || JsonToBsonError::ConversionError(Type::Scalar(expected_type), value); +// match value { +// Value::Number(n) => { +// if let Some(n) = n.as_u64() { +// ::from(n).ok_or_else(mk_err) +// } else if let Some(n) = n.as_i64() { +// ::from(n).ok_or_else(mk_err) +// } else if let Some(n) = n.as_f64() { +// ::from(n).ok_or_else(mk_err) +// } else { +// Err(mk_err()) +// } +// } +// _ => Err(JsonToBsonError::IncompatibleBackingType { +// expected_type: Type::Scalar(expected_type), +// expected_backing_type: "Int or Float", +// value, +// }), +// } +// } + +fn from_string(expected_type: BsonScalarType, value: Value) -> Result { + match value { + Value::String(s) => Ok(s), + _ => Err(JsonToBsonError::IncompatibleBackingType { + expected_type: Type::Scalar(expected_type), + expected_backing_type: "String", + value, + }), + } +} + +fn incompatible_scalar_type(expected_type: BsonScalarType, value: Value) -> Result { + Err(JsonToBsonError::IncompatibleType( + Type::Scalar(expected_type), + value, + )) +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use configuration::schema::{ObjectField, ObjectType, Type}; + use mongodb::bson::{self, datetime::DateTimeBuilder, Bson}; + use mongodb_support::BsonScalarType; + use pretty_assertions::assert_eq; + use serde_json::json; + + use super::json_to_bson; + + #[test] + fn deserializes_specialized_scalar_types() -> anyhow::Result<()> { + let object_type = ObjectType { + name: "scalar_test".to_owned(), + fields: vec![ + ObjectField::new("double", Type::Scalar(BsonScalarType::Double)), + ObjectField::new("int", Type::Scalar(BsonScalarType::Int)), + ObjectField::new("long", Type::Scalar(BsonScalarType::Long)), + ObjectField::new("decimal", Type::Scalar(BsonScalarType::Decimal)), + ObjectField::new("string", Type::Scalar(BsonScalarType::String)), + ObjectField::new("date", Type::Scalar(BsonScalarType::Date)), + ObjectField::new("timestamp", Type::Scalar(BsonScalarType::Timestamp)), + ObjectField::new("binData", Type::Scalar(BsonScalarType::BinData)), + ObjectField::new("objectId", Type::Scalar(BsonScalarType::ObjectId)), + ObjectField::new("bool", Type::Scalar(BsonScalarType::Bool)), + ObjectField::new("null", Type::Scalar(BsonScalarType::Null)), + ObjectField::new("undefined", Type::Scalar(BsonScalarType::Undefined)), + ObjectField::new("regex", Type::Scalar(BsonScalarType::Regex)), + ObjectField::new("javascript", Type::Scalar(BsonScalarType::Javascript)), + ObjectField::new( + "javascriptWithScope", + Type::Scalar(BsonScalarType::JavascriptWithScope), + ), + ObjectField::new("minKey", Type::Scalar(BsonScalarType::MinKey)), + ObjectField::new("maxKey", Type::Scalar(BsonScalarType::MaxKey)), + ObjectField::new("symbol", Type::Scalar(BsonScalarType::Symbol)), + ], + description: Default::default(), + }; + + let input = json!({ + "double": 3.14159, + "int": 3, + "long": 3, + "decimal": "3.14159", + "string": "hello", + "date": "2024-03-22T00:59:01Z", + "timestamp": { "t": 1565545664, "i": 1 }, + "binData": { + "base64": "EEEBEIEIERA=", + "subType": "00" + }, + "objectId": "e7c8f79873814cbae1f8d84c", + "bool": true, + "null": null, + "undefined": null, + "regex": { "pattern": "^fo+$", "options": "i" }, + "javascript": "console.log('hello, world!')", + "javascriptWithScope": { + "$code": "console.log('hello, ', name)", + "$scope": { "name": "you!" }, + }, + "minKey": {}, + "maxKey": {}, + "symbol": "a_symbol", + }); + + let expected = bson::doc! { + "double": Bson::Double(3.14159), + "int": Bson::Int32(3), + "long": Bson::Int64(3), + "decimal": Bson::Decimal128(bson::Decimal128::from_str("3.14159")?), + "string": Bson::String("hello".to_owned()), + "date": Bson::DateTime(DateTimeBuilder::default().year(2024).month(3).day(22).hour(0).minute(59).second(1).build()?), + "timestamp": Bson::Timestamp(bson::Timestamp { time: 1565545664, increment: 1 }), + "binData": Bson::Binary(bson::Binary { + bytes: vec![0x10, 0x41, 0x01, 0x10, 0x81, 0x08, 0x11, 0x10], + subtype: bson::spec::BinarySubtype::Generic, + }), + "objectId": Bson::ObjectId(FromStr::from_str("e7c8f79873814cbae1f8d84c")?), + "bool": Bson::Boolean(true), + "null": Bson::Null, + "undefined": Bson::Undefined, + "regex": Bson::RegularExpression(bson::Regex { pattern: "^fo+$".to_owned(), options: "i".to_owned() }), + "javascript": Bson::JavaScriptCode("console.log('hello, world!')".to_owned()), + "javascriptWithScope": Bson::JavaScriptCodeWithScope(bson::JavaScriptCodeWithScope { + code: "console.log('hello, ', name)".to_owned(), + scope: bson::doc! { "name": "you!" }, + }), + "minKey": Bson::MinKey, + "maxKey": Bson::MaxKey, + "symbol": Bson::Symbol("a_symbol".to_owned()), + }; + + let actual = json_to_bson( + Type::Object(object_type.name.clone()), + &[(object_type.name.clone(), object_type)] + .into_iter() + .collect(), + input, + )?; + assert_eq!(actual, expected.into()); + Ok(()) + } +} diff --git a/crates/mongodb-agent-common/src/mongodb/mod.rs b/crates/mongodb-agent-common/src/mongodb/mod.rs index c0261d68..37e892a6 100644 --- a/crates/mongodb-agent-common/src/mongodb/mod.rs +++ b/crates/mongodb-agent-common/src/mongodb/mod.rs @@ -1,5 +1,6 @@ mod accumulator; mod collection; +pub mod json_to_bson; mod pipeline; mod projection; pub mod sanitize; diff --git a/crates/mongodb-agent-common/src/query/make_selector.rs b/crates/mongodb-agent-common/src/query/make_selector.rs index 8c60abb8..80c28e0d 100644 --- a/crates/mongodb-agent-common/src/query/make_selector.rs +++ b/crates/mongodb-agent-common/src/query/make_selector.rs @@ -5,12 +5,12 @@ use dc_api_types::{ ArrayComparisonValue, BinaryArrayComparisonOperator, ComparisonValue, ExistsInTable, Expression, UnaryComparisonOperator, }; -use mongodb::bson::{self, doc, Bson, Document}; -use time::{format_description::well_known::Iso8601, OffsetDateTime}; +use mongodb::bson::{self, doc, Document}; +use mongodb_support::BsonScalarType; use crate::{ comparison_function::ComparisonFunction, interface_types::MongoAgentError, - query::column_ref::column_ref, + mongodb::json_to_bson::json_to_bson_scalar, query::column_ref::column_ref, }; use BinaryArrayComparisonOperator as ArrOp; @@ -21,21 +21,13 @@ fn bson_from_scalar_value( value: &serde_json::Value, value_type: &str, ) -> Result { - match value_type { - "date" | "Date" => { - let parsed_date = value - .as_str() - .and_then(|s| OffsetDateTime::parse(s, &Iso8601::DEFAULT).ok()) - .ok_or_else(|| { - MongoAgentError::BadQuery(anyhow!( - "unrecognized date value: {value} - date values should be strings in ISO 8601 format including a time and a time zone specifier" - )) - })?; - Ok(Bson::DateTime(bson::DateTime::from_system_time( - parsed_date.into(), - ))) + // TODO: fail on unrecognized types + let bson_type = BsonScalarType::from_bson_name(value_type).ok(); + match bson_type { + Some(t) => { + json_to_bson_scalar(t, value.clone()).map_err(|e| MongoAgentError::BadQuery(anyhow!(e))) } - _ => bson::to_bson(value).map_err(|e| MongoAgentError::BadQuery(anyhow!(e))), + None => bson::to_bson(value).map_err(|e| MongoAgentError::BadQuery(anyhow!(e))), } } diff --git a/crates/mongodb-support/Cargo.toml b/crates/mongodb-support/Cargo.toml index dbd6cb2e..1a1d43a6 100644 --- a/crates/mongodb-support/Cargo.toml +++ b/crates/mongodb-support/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" dc-api-types = { path = "../dc-api-types" } enum-iterator = "1.4.1" indexmap = { version = "1", features = ["serde"] } # must match the version that ndc-client uses +mongodb = "2.8" schemars = "^0.8.12" serde = { version = "1", features = ["derive"] } serde_json = "1" diff --git a/crates/mongodb-support/src/bson_type.rs b/crates/mongodb-support/src/bson_type.rs index d63061f3..b7fb52ac 100644 --- a/crates/mongodb-support/src/bson_type.rs +++ b/crates/mongodb-support/src/bson_type.rs @@ -1,5 +1,6 @@ use dc_api_types::GraphQlType; use enum_iterator::{all, Sequence}; +use mongodb::bson::Bson; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -163,7 +164,10 @@ impl BsonScalarType { if name == "number" { return Ok(S::Double); } - let scalar_type = all::().find(|s| s.bson_name() == name); + // case-insensitive comparison because we are inconsistent about initial-letter + // capitalization between v2 and v3 + let scalar_type = + all::().find(|s| s.bson_name().eq_ignore_ascii_case(name)); scalar_type.ok_or_else(|| Error::UnknownScalarType(name.to_owned())) } } From 2c556af8a5b22a4087c1499414c71d8a36d62759 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 21 Mar 2024 22:36:21 -0700 Subject: [PATCH 21/33] implement array conversion in json_to_bson --- .../src/mongodb/json_to_bson.rs | 52 +++++++++++++++---- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/crates/mongodb-agent-common/src/mongodb/json_to_bson.rs b/crates/mongodb-agent-common/src/mongodb/json_to_bson.rs index 7c3c36b1..5013d8ee 100644 --- a/crates/mongodb-agent-common/src/mongodb/json_to_bson.rs +++ b/crates/mongodb-agent-common/src/mongodb/json_to_bson.rs @@ -48,19 +48,19 @@ type Result = std::result::Result; /// implementation cannot take advantage of the type information that we have available. Instead it /// uses Extended JSON which uses tags in JSON data to distinguish BSON types. pub fn json_to_bson( - expected_type: Type, + expected_type: &Type, object_types: &BTreeMap, value: Value, ) -> Result { match expected_type { - Type::Scalar(t) => json_to_bson_scalar(t, value), + Type::Scalar(t) => json_to_bson_scalar(*t, value), Type::Object(object_type_name) => { let object_type = object_types - .get(&object_type_name) - .ok_or_else(|| JsonToBsonError::UnknownObjectType(object_type_name))?; + .get(object_type_name) + .ok_or_else(|| JsonToBsonError::UnknownObjectType(object_type_name.to_owned()))?; convert_object(object_type, object_types, value) } - Type::ArrayOf(_) => todo!(), + Type::ArrayOf(element_type) => convert_array(&*element_type, object_types, value), Type::Nullable(_) => todo!(), } } @@ -196,6 +196,19 @@ mod de { } } +fn convert_array( + element_type: &Type, + object_types: &BTreeMap, + value: Value, +) -> Result { + let input_elements: Vec = serde_json::from_value(value)?; + let bson_array = input_elements + .into_iter() + .map(|v| json_to_bson(element_type, object_types, v)) + .try_collect()?; + Ok(Bson::Array(bson_array)) +} + fn convert_object( object_type: &ObjectType, object_types: &BTreeMap, @@ -214,11 +227,7 @@ fn convert_object( })?; Ok(( field.name.clone(), - json_to_bson( - field.r#type.clone(), - object_types, - input_field_value.clone(), - )?, + json_to_bson(&field.r#type, object_types, input_field_value.clone())?, )) }) .try_collect::<_, _, JsonToBsonError>()?; @@ -391,7 +400,7 @@ mod tests { }; let actual = json_to_bson( - Type::Object(object_type.name.clone()), + &Type::Object(object_type.name.clone()), &[(object_type.name.clone(), object_type)] .into_iter() .collect(), @@ -400,4 +409,25 @@ mod tests { assert_eq!(actual, expected.into()); Ok(()) } + + #[test] + fn deserializes_arrays() -> anyhow::Result<()> { + let input = json!([ + "e7c8f79873814cbae1f8d84c", + "76a3317b46f1eea7fae4f643", + "fae1840a2b85872385c67de5", + ]); + let expected = Bson::Array(vec![ + Bson::ObjectId(FromStr::from_str("e7c8f79873814cbae1f8d84c")?), + Bson::ObjectId(FromStr::from_str("76a3317b46f1eea7fae4f643")?), + Bson::ObjectId(FromStr::from_str("fae1840a2b85872385c67de5")?), + ]); + let actual = json_to_bson( + &Type::ArrayOf(Box::new(Type::Scalar(BsonScalarType::ObjectId))), + &Default::default(), + input, + )?; + assert_eq!(actual, expected); + Ok(()) + } } From e3fcc1bbc5686121687746adaa97b3dee934d998 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 21 Mar 2024 22:42:28 -0700 Subject: [PATCH 22/33] convert nullable types --- .../src/mongodb/json_to_bson.rs | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/crates/mongodb-agent-common/src/mongodb/json_to_bson.rs b/crates/mongodb-agent-common/src/mongodb/json_to_bson.rs index 5013d8ee..329ca9e6 100644 --- a/crates/mongodb-agent-common/src/mongodb/json_to_bson.rs +++ b/crates/mongodb-agent-common/src/mongodb/json_to_bson.rs @@ -61,7 +61,7 @@ pub fn json_to_bson( convert_object(object_type, object_types, value) } Type::ArrayOf(element_type) => convert_array(&*element_type, object_types, value), - Type::Nullable(_) => todo!(), + Type::Nullable(t) => convert_nullable(&*t, object_types, value), } } @@ -234,6 +234,17 @@ fn convert_object( Ok(bson_doc.into()) } +fn convert_nullable( + underlying_type: &Type, + object_types: &BTreeMap, + value: Value, +) -> Result { + match value { + Value::Null => Ok(Bson::Null), + non_null_value => json_to_bson(underlying_type, object_types, non_null_value), + } +} + fn convert_date(value: &str) -> Result { let date = OffsetDateTime::parse(value, &Iso8601::DEFAULT).map_err(|err| { JsonToBsonError::ConversionErrorWithContext( @@ -430,4 +441,23 @@ mod tests { assert_eq!(actual, expected); Ok(()) } + + #[test] + fn deserializes_nullable_values() -> anyhow::Result<()> { + let input = json!(["e7c8f79873814cbae1f8d84c", null, "fae1840a2b85872385c67de5",]); + let expected = Bson::Array(vec![ + Bson::ObjectId(FromStr::from_str("e7c8f79873814cbae1f8d84c")?), + Bson::Null, + Bson::ObjectId(FromStr::from_str("fae1840a2b85872385c67de5")?), + ]); + let actual = json_to_bson( + &Type::ArrayOf(Box::new(Type::Nullable(Box::new(Type::Scalar( + BsonScalarType::ObjectId, + ))))), + &Default::default(), + input, + )?; + assert_eq!(actual, expected); + Ok(()) + } } From a452ca40349e8fd488aa520640e521fe03a2175f Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Thu, 21 Mar 2024 22:46:05 -0700 Subject: [PATCH 23/33] cleanup --- .../src/mongodb/json_to_bson.rs | 41 +------------------ 1 file changed, 1 insertion(+), 40 deletions(-) diff --git a/crates/mongodb-agent-common/src/mongodb/json_to_bson.rs b/crates/mongodb-agent-common/src/mongodb/json_to_bson.rs index 329ca9e6..70612208 100644 --- a/crates/mongodb-agent-common/src/mongodb/json_to_bson.rs +++ b/crates/mongodb-agent-common/src/mongodb/json_to_bson.rs @@ -68,7 +68,6 @@ pub fn json_to_bson( /// Works like json_to_bson, but only converts BSON scalar types. pub fn json_to_bson_scalar(expected_type: BsonScalarType, value: Value) -> Result { let result = match expected_type { - // BsonScalarType::Double => Bson::Double(from_number(expected_type, value)?), BsonScalarType::Double => Bson::Double(deserialize(expected_type, value)?), BsonScalarType::Int => Bson::Int32(deserialize(expected_type, value)?), BsonScalarType::Long => Bson::Int64(deserialize(expected_type, value)?), @@ -81,20 +80,11 @@ pub fn json_to_bson_scalar(expected_type: BsonScalarType, value: Value) -> Resul ) })?, ), - BsonScalarType::String => Bson::String(from_string(expected_type, value)?), + BsonScalarType::String => Bson::String(deserialize(expected_type, value)?), BsonScalarType::Date => convert_date(&from_string(expected_type, value)?)?, BsonScalarType::Timestamp => deserialize::(expected_type, value)?.into(), BsonScalarType::BinData => deserialize::(expected_type, value)?.into(), BsonScalarType::ObjectId => Bson::ObjectId(deserialize(expected_type, value)?), - // BsonScalarType::ObjectId => Bson::ObjectId( - // ObjectId::from_str(&from_string(expected_type, value)?).map_err(|err| { - // JsonToBsonError::ConversionErrorWithContext( - // Type::Scalar(expected_type), - // value, - // err.into(), - // ) - // })?, - // ), BsonScalarType::Bool => match value { Value::Bool(b) => Bson::Boolean(b), _ => incompatible_scalar_type(BsonScalarType::Bool, value)?, @@ -258,10 +248,6 @@ fn convert_date(value: &str) -> Result { ))) } -// fn convert_timestamp(value: Value) -> Result { -// match value -// } - fn deserialize(expected_type: BsonScalarType, value: Value) -> Result where T: DeserializeOwned, @@ -271,31 +257,6 @@ where }) } -// fn from_number(expected_type: BsonScalarType, value: Value) -> Result -// where -// T: NumCast, -// { -// let mk_err = || JsonToBsonError::ConversionError(Type::Scalar(expected_type), value); -// match value { -// Value::Number(n) => { -// if let Some(n) = n.as_u64() { -// ::from(n).ok_or_else(mk_err) -// } else if let Some(n) = n.as_i64() { -// ::from(n).ok_or_else(mk_err) -// } else if let Some(n) = n.as_f64() { -// ::from(n).ok_or_else(mk_err) -// } else { -// Err(mk_err()) -// } -// } -// _ => Err(JsonToBsonError::IncompatibleBackingType { -// expected_type: Type::Scalar(expected_type), -// expected_backing_type: "Int or Float", -// value, -// }), -// } -// } - fn from_string(expected_type: BsonScalarType, value: Value) -> Result { match value { Value::String(s) => Ok(s), From 6c75265ab937645c536bcaac9477702fb8d6dbb3 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Mon, 25 Mar 2024 15:41:24 -0700 Subject: [PATCH 24/33] wip --- crates/configuration/src/native_queries.rs | 37 ++- .../src/command/command.rs | 233 ++++++++++-------- .../mongodb-agent-common/src/command/error.rs | 9 +- .../mongodb-agent-common/src/command/mod.rs | 2 +- .../src/interface_types/mongo_agent_error.rs | 11 + crates/mongodb-agent-common/src/lib.rs | 1 + .../mongodb-agent-common/src/mongodb/mod.rs | 1 - .../arguments}/json_to_bson.rs | 4 +- .../src/query/arguments/mod.rs | 59 +++++ .../src/query/make_selector.rs | 2 +- crates/mongodb-agent-common/src/query/mod.rs | 1 + 11 files changed, 241 insertions(+), 119 deletions(-) rename crates/mongodb-agent-common/src/{mongodb => query/arguments}/json_to_bson.rs (98%) create mode 100644 crates/mongodb-agent-common/src/query/arguments/mod.rs diff --git a/crates/configuration/src/native_queries.rs b/crates/configuration/src/native_queries.rs index d7946a3f..f7c56b5c 100644 --- a/crates/configuration/src/native_queries.rs +++ b/crates/configuration/src/native_queries.rs @@ -22,13 +22,46 @@ pub struct NativeQuery { /// `schema.json`. pub result_type: Type, - /// Arguments for per-query customization + /// Arguments to be supplied for each query invocation. These will be substituted into the + /// given `command`. + /// + /// Argument values are standard JSON mapped from GraphQL input types, not Extended JSON. + /// Values will be converted to BSON according to the types specified here. #[serde(default)] pub arguments: BTreeMap, - /// Command to run expressed as a BSON document + /// Command to run via MongoDB's `runCommand` API. For details on how to write commands see + /// https://www.mongodb.com/docs/manual/reference/method/db.runCommand/ + /// + /// The command is read as Extended JSON. It may be in canonical or relaxed format, or + /// a mixture of both. + /// See https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ + /// + /// Keys and values in the command may contain placeholders of the form `{{variableName}}` + /// which will be substituted when the native query is executed according to the given + /// arguments. + /// + /// Placeholders must be inside quotes so that the command can be stored in JSON format. If the + /// command includes a string whose only content is a placeholder, when the variable is + /// substituted the string will be replaced by the type of the variable. For example in this + /// command, + /// + /// { + /// "insert": "posts", + /// "documents": "{{ documents }}" + /// } + /// + /// If the type of the `documents` argument is an array then after variable substitution the + /// command will expand to: + /// + /// { + /// "insert": "posts", + /// "documents": [/* array of documents */] + /// } + /// #[schemars(with = "Object")] pub command: bson::Document, + // TODO: test extjson deserialization /// Determines which servers in a cluster to read from by specifying read preference, or /// a predicate to apply to candidate servers. diff --git a/crates/mongodb-agent-common/src/command/command.rs b/crates/mongodb-agent-common/src/command/command.rs index 1bfe6eb0..79650fbd 100644 --- a/crates/mongodb-agent-common/src/command/command.rs +++ b/crates/mongodb-agent-common/src/command/command.rs @@ -1,72 +1,47 @@ use std::collections::BTreeMap; -use configuration::schema::{ObjectField, Type}; -use itertools::Itertools; +use itertools::Itertools as _; use mongodb::bson::{self, Bson}; -use mongodb_support::BsonScalarType; -use serde_json::Value; use super::CommandError; -// type JsonObject = serde_json::Map; +type Result = std::result::Result; -/// Parse native query commands, and interpolate variables. Input is serde_json::Value because our -/// configuration format is JSON. Output is BSON because that is the format that MongoDB commands -/// use. -pub fn interpolate( +/// Parse native query commands, and interpolate arguments. +pub fn interpolated_command( command: &bson::Document, - parameters: &[ObjectField], - arguments: &BTreeMap, -) -> Result { - // let arguments_bson: BTreeMap = arguments - // .iter() - // .map(|(key, value)| -> Result<(String, Bson), CommandError> { - // Ok((key.to_owned(), value.clone().try_into()?)) - // }) - // .try_collect()?; - interpolate_helper(&command.into(), parameters, arguments) + arguments: &BTreeMap, +) -> Result { + interpolate_helper(&command.into(), arguments) } -fn interpolate_helper( - command: &Bson, - parameters: &[ObjectField], - arguments: &BTreeMap, -) -> Result { - // let result = match command { - // exp @ Value::Null => exp.clone(), - // exp @ Value::Bool(_) => exp.clone(), - // exp @ Value::Number(_) => exp.clone(), - // Value::String(string) => interpolate_string(string, parameters, arguments)?, - // Value::Array(_) => todo!(), - // Value::Object(_) => todo!(), - // }; - - let result = match command { - Bson::Array(values) => values - .iter() - .map(|value| interpolate_helper(value, parameters, arguments)) - .try_collect()?, - Bson::Document(doc) => interpolate_document(doc.clone(), parameters, arguments)?.into(), - Bson::String(string) => interpolate_string(string, parameters, arguments)?, - Bson::RegularExpression(_) => todo!(), - Bson::JavaScriptCode(_) => todo!(), - Bson::JavaScriptCodeWithScope(_) => todo!(), +fn interpolate_helper(command_node: &Bson, arguments: &BTreeMap) -> Result { + let result = match command_node { + Bson::Array(values) => interpolate_array(values.to_vec(), arguments)?.into(), + Bson::Document(doc) => interpolate_document(doc.clone(), arguments)?.into(), + Bson::String(string) => interpolate_string(string, arguments)?, + // TODO: Support interpolation within other scalar types value => value.clone(), }; - Ok(result) } +fn interpolate_array(values: Vec, arguments: &BTreeMap) -> Result> { + values + .iter() + .map(|value| interpolate_helper(value, arguments)) + .try_collect() +} + fn interpolate_document( document: bson::Document, - parameters: &[ObjectField], - arguments: &BTreeMap, -) -> Result { + arguments: &BTreeMap, +) -> Result { document .into_iter() .map(|(key, value)| { - let interpolated_value = interpolate_helper(&value, parameters, arguments)?; - let interpolated_key = interpolate_string(&key, parameters, arguments)?; + let interpolated_value = interpolate_helper(&value, arguments)?; + let interpolated_key = interpolate_string(&key, arguments)?; match interpolated_key { Bson::String(string_key) => Ok((string_key, interpolated_value)), _ => Err(CommandError::NonStringKey(interpolated_key)), @@ -85,56 +60,37 @@ fn interpolate_document( /// { "key": 42 } /// /// if the type of the variable `recordId` is `int`. -fn interpolate_string( - string: &str, - parameters: &[ObjectField], - arguments: &BTreeMap, -) -> Result { +fn interpolate_string(string: &str, arguments: &BTreeMap) -> Result { let parts = parse_native_query(string); if parts.len() == 1 { let mut parts = parts; match parts.remove(0) { NativeQueryPart::Text(string) => Ok(Bson::String(string)), - NativeQueryPart::Parameter(param) => resolve_argument(¶m, parameters, arguments), + NativeQueryPart::Parameter(param) => resolve_argument(¶m, arguments), } } else { - todo!() - } -} - -/// Looks up an argument value for a given parameter, and produces a BSON value that matches the -/// declared parameter type. -fn resolve_argument( - param_name: &str, - parameters: &[ObjectField], - arguments: &BTreeMap, -) -> Result { - let parameter = parameters - .iter() - .find(|arg| arg.name == param_name) - .ok_or_else(|| CommandError::UnknownParameter(param_name.to_owned()))?; - let argument_json = arguments - .get(param_name) - .ok_or_else(|| CommandError::MissingArgument(param_name.to_owned()))?; - let argument: Bson = argument_json.clone().try_into()?; - match parameter.r#type { - Type::Scalar(t) => resolve_scalar_argument(t, argument), - Type::Object(_) => todo!(), - Type::ArrayOf(_) => todo!(), - Type::Nullable(_) => todo!(), + let interpolated_parts: Vec = parts + .into_iter() + .map(|part| match part { + NativeQueryPart::Text(string) => Ok(string), + NativeQueryPart::Parameter(param) => { + let argument_value = resolve_argument(¶m, arguments)?; + match argument_value { + Bson::String(string) => Ok(string), + _ => Err(CommandError::NonStringInStringContext(param)), + } + } + }) + .try_collect()?; + Ok(Bson::String(interpolated_parts.join(""))) } } -fn resolve_scalar_argument( - parameter_type: BsonScalarType, - argument: Bson, -) -> Result { - let argument_type: BsonScalarType = (&argument).try_into()?; - if argument_type == parameter_type { - Ok(argument) - } else { - Err(CommandError::TypeMismatch(argument_type, parameter_type)) - } +fn resolve_argument(argument_name: &str, arguments: &BTreeMap) -> Result { + let argument = arguments + .get(argument_name) + .ok_or_else(|| CommandError::MissingArgument(argument_name.to_owned()))?; + Ok(argument.clone()) } /// A part of a Native Query text, either raw text or a parameter. @@ -175,11 +131,13 @@ mod tests { use pretty_assertions::assert_eq; use serde_json::json; + use crate::query::arguments::resolve_arguments; + use super::*; - // TODO: extjson - // TODO: nullable - // TODO: optional + // TODO: key + // TODO: value with multiple placeholders + // TODO: key with multiple placeholders #[test] fn interpolates_non_string_type() -> anyhow::Result<()> { @@ -198,7 +156,7 @@ mod tests { }], }, }); - let arguments = [ + let input_arguments = [ ("id".to_owned(), json!(1001)), ("name".to_owned(), json!("Regina Spektor")), ] @@ -206,14 +164,15 @@ mod tests { .collect(); let native_query: NativeQuery = serde_json::from_value(native_query_input)?; - let interpolated_command = interpolate( - &native_query.command.into(), + let arguments = resolve_arguments( + &native_query.object_types, &native_query.arguments, - &arguments, + input_arguments, )?; + let command = interpolated_command(&native_query.command, &arguments)?; assert_eq!( - interpolated_command, + command, bson::doc! { "insert": "Artist", "documents": [{ @@ -246,32 +205,88 @@ mod tests { "documents": "{{ documents }}", }, }); - let arguments = [ - ("id".to_owned(), json!(1001)), - ("name".to_owned(), json!("Regina Spektor")), - ] + let input_arguments = [( + "documents".to_owned(), + json!([ + { "ArtistId": 1001, "Name": "Regina Spektor" } , + { "ArtistId": 1002, "Name": "Ok Go" } , + ]), + )] + .into_iter() + .collect(); + + let native_query: NativeQuery = serde_json::from_value(native_query_input)?; + let arguments = resolve_arguments( + &native_query.object_types, + &native_query.arguments, + input_arguments, + )?; + let command = interpolated_command(&native_query.command, &arguments)?; + + assert_eq!( + command, + bson::doc! { + "insert": "Artist", + "documents": [ + { + "ArtistId": 1001, + "Name": "Regina Spektor", + }, + { + "ArtistId": 1002, + "Name": "Ok Go", + } + ], + } + .into() + ); + Ok(()) + } + + #[test] + fn interpolates_arguments_within_string() -> anyhow::Result<()> { + let native_query_input = json!({ + "name": "insert", + "resultType": { "object": "Insert" }, + "arguments": [ + { "name": "prefix", "type": { "scalar": "string" } }, + { "name": "basename", "type": { "scalar": "string" } }, + ], + "command": { + "insert": "{{prefix}}-{{basename}}", + "empty": "", + }, + }); + let input_arguments = [( + "documents".to_owned(), + json!([ + { "prefix": "current" } , + { "basename": "some-coll" } , + ]), + )] .into_iter() .collect(); let native_query: NativeQuery = serde_json::from_value(native_query_input)?; - let interpolated_command = interpolate( - &native_query.command.into(), + let arguments = resolve_arguments( + &native_query.object_types, &native_query.arguments, - &arguments, + input_arguments, )?; + let command = interpolated_command(&native_query.command, &arguments)?; assert_eq!( - interpolated_command, + command, bson::doc! { "insert": "Artist", "documents": [ { - "ArtistId": "{{ id }}", - "Name": "{{name }}", + "ArtistId": 1001, + "Name": "Regina Spektor", }, { - "ArtistId": "{{ id }}", - "Name": "{{name }}", + "ArtistId": 1002, + "Name": "Ok Go", } ], } diff --git a/crates/mongodb-agent-common/src/command/error.rs b/crates/mongodb-agent-common/src/command/error.rs index b66c033d..b21712f5 100644 --- a/crates/mongodb-agent-common/src/command/error.rs +++ b/crates/mongodb-agent-common/src/command/error.rs @@ -1,5 +1,5 @@ +use configuration::schema::Type; use mongodb::bson::{self, Bson}; -use mongodb_support::BsonScalarType; use thiserror::Error; #[derive(Clone, Debug, Error)] @@ -13,11 +13,14 @@ pub enum CommandError { #[error("a required argument was not provided, \"{0}\"")] MissingArgument(String), + #[error("found a non-string argument, {0}, in a string context - if you want to use a non-string argument it must be the only thing in the string with no white space around the curly braces")] + NonStringInStringContext(String), + #[error("object keys must be strings, but got: \"{0}\"")] NonStringKey(Bson), - #[error("argument type, \"{0}\", does not match parameter type, \"{1}\"")] - TypeMismatch(BsonScalarType, BsonScalarType), + #[error("argument type, \"{0:?}\", does not match parameter type, \"{1:?}\"")] + TypeMismatch(Type, Type), #[error("an argument was provided for an undefined paremeter, \"{0}\"")] UnknownParameter(String), diff --git a/crates/mongodb-agent-common/src/command/mod.rs b/crates/mongodb-agent-common/src/command/mod.rs index 96022527..9fae7217 100644 --- a/crates/mongodb-agent-common/src/command/mod.rs +++ b/crates/mongodb-agent-common/src/command/mod.rs @@ -1,5 +1,5 @@ mod command; mod error; -pub use self::command::Command; +pub use self::command::interpolated_command; pub use self::error::CommandError; diff --git a/crates/mongodb-agent-common/src/interface_types/mongo_agent_error.rs b/crates/mongodb-agent-common/src/interface_types/mongo_agent_error.rs index ad5ea4fa..0e27716c 100644 --- a/crates/mongodb-agent-common/src/interface_types/mongo_agent_error.rs +++ b/crates/mongodb-agent-common/src/interface_types/mongo_agent_error.rs @@ -12,7 +12,9 @@ use thiserror::Error; pub enum MongoAgentError { BadCollectionSchema(String, bson::Bson, bson::de::Error), BadQuery(anyhow::Error), + CommandError(#[from] crate::command::CommandError), InvalidVariableName(String), + JsonToBson(#[from] JsonToBsonError), MongoDB(#[from] mongodb::error::Error), MongoDBDeserialization(#[from] mongodb::bson::de::Error), MongoDBSerialization(#[from] mongodb::bson::ser::Error), @@ -20,6 +22,7 @@ pub enum MongoAgentError { NotImplemented(&'static str), Serialization(serde_json::Error), UnknownAggregationFunction(String), + UnknownVariables(Vec), UnspecifiedRelation(String), VariableNotDefined(String), AdHoc(#[from] anyhow::Error), @@ -28,6 +31,8 @@ pub enum MongoAgentError { use MongoAgentError::*; +use crate::query::arguments::JsonToBsonError; + impl MongoAgentError { pub fn status_and_error_response(&self) -> (StatusCode, ErrorResponse) { match self { @@ -57,10 +62,12 @@ impl MongoAgentError { }, ), BadQuery(err) => (StatusCode::BAD_REQUEST, ErrorResponse::new(&err)), + CommandError(err) => (StatusCode::BAD_REQUEST, ErrorResponse::new(&err)), InvalidVariableName(name) => ( StatusCode::BAD_REQUEST, ErrorResponse::new(&format!("Column identifier includes characters that are not permitted in a MongoDB variable name: {name}")) ), + JsonToBson(err) => (StatusCode::BAD_REQUEST, ErrorResponse::new(&err)), MongoDB(err) => (StatusCode::BAD_REQUEST, ErrorResponse::new(&err)), MongoDBDeserialization(err) => (StatusCode::BAD_REQUEST, ErrorResponse::new(&err)), MongoDBSerialization(err) => { @@ -73,6 +80,10 @@ impl MongoAgentError { StatusCode::BAD_REQUEST, ErrorResponse::new(&format!("Unknown aggregation function, {function}")), ), + UnknownVariables(variable_names) => ( + StatusCode::BAD_REQUEST, + ErrorResponse::new(&format!("Query included unrecognized variables: {}", variable_names.join(", "))) + ), UnspecifiedRelation(relation) => ( StatusCode::BAD_REQUEST, ErrorResponse::new(&format!("Query referenced a relationship, \"{relation}\", but did not include relation metadata in `table_relationships`")) diff --git a/crates/mongodb-agent-common/src/lib.rs b/crates/mongodb-agent-common/src/lib.rs index ab1585eb..5471a71c 100644 --- a/crates/mongodb-agent-common/src/lib.rs +++ b/crates/mongodb-agent-common/src/lib.rs @@ -1,4 +1,5 @@ pub mod aggregation_function; +pub mod command; pub mod comparison_function; pub mod explain; pub mod health; diff --git a/crates/mongodb-agent-common/src/mongodb/mod.rs b/crates/mongodb-agent-common/src/mongodb/mod.rs index 37e892a6..c0261d68 100644 --- a/crates/mongodb-agent-common/src/mongodb/mod.rs +++ b/crates/mongodb-agent-common/src/mongodb/mod.rs @@ -1,6 +1,5 @@ mod accumulator; mod collection; -pub mod json_to_bson; mod pipeline; mod projection; pub mod sanitize; diff --git a/crates/mongodb-agent-common/src/mongodb/json_to_bson.rs b/crates/mongodb-agent-common/src/query/arguments/json_to_bson.rs similarity index 98% rename from crates/mongodb-agent-common/src/mongodb/json_to_bson.rs rename to crates/mongodb-agent-common/src/query/arguments/json_to_bson.rs index 70612208..755c336d 100644 --- a/crates/mongodb-agent-common/src/mongodb/json_to_bson.rs +++ b/crates/mongodb-agent-common/src/query/arguments/json_to_bson.rs @@ -60,8 +60,8 @@ pub fn json_to_bson( .ok_or_else(|| JsonToBsonError::UnknownObjectType(object_type_name.to_owned()))?; convert_object(object_type, object_types, value) } - Type::ArrayOf(element_type) => convert_array(&*element_type, object_types, value), - Type::Nullable(t) => convert_nullable(&*t, object_types, value), + Type::ArrayOf(element_type) => convert_array(element_type, object_types, value), + Type::Nullable(t) => convert_nullable(t, object_types, value), } } diff --git a/crates/mongodb-agent-common/src/query/arguments/mod.rs b/crates/mongodb-agent-common/src/query/arguments/mod.rs new file mode 100644 index 00000000..16674719 --- /dev/null +++ b/crates/mongodb-agent-common/src/query/arguments/mod.rs @@ -0,0 +1,59 @@ +mod json_to_bson; + +use std::collections::BTreeMap; + +use configuration::schema::{ObjectField, ObjectType}; +use itertools::Itertools as _; +use mongodb::bson::Bson; +use serde_json::Value; + +use crate::interface_types::MongoAgentError; + +use self::json_to_bson::json_to_bson; + +pub use self::json_to_bson::{JsonToBsonError, json_to_bson_scalar}; + +/// Translate arguments to queries or native queries to BSON according to declared parameter types. +/// +/// Checks that all arguments have been provided, and that no arguments have been given that do not +/// map to declared paremeters (no excess arguments). +pub fn resolve_arguments( + object_types: &BTreeMap, + parameters: &BTreeMap, + arguments: BTreeMap, +) -> Result, MongoAgentError> { + validate_no_excess_arguments(parameters, &arguments)?; + parameters + .iter() + .map(|(key, parameter)| { + let argument = arguments + .get(key) + .ok_or_else(|| MongoAgentError::VariableNotDefined(key.to_owned()))?; + Ok(( + key.clone(), + json_to_bson(¶meter.r#type, object_types, argument.clone())?, + )) + }) + .try_collect() +} + +pub fn validate_no_excess_arguments( + parameters: &BTreeMap, + arguments: &BTreeMap, +) -> Result<(), MongoAgentError> { + let excess: Vec = arguments + .iter() + .filter_map(|(name, _)| { + let parameter = parameters.get(name); + match parameter { + Some(_) => None, + None => Some(name.clone()), + } + }) + .collect(); + if !excess.is_empty() { + Err(MongoAgentError::UnknownVariables(excess)) + } else { + Ok(()) + } +} diff --git a/crates/mongodb-agent-common/src/query/make_selector.rs b/crates/mongodb-agent-common/src/query/make_selector.rs index 80c28e0d..170eba54 100644 --- a/crates/mongodb-agent-common/src/query/make_selector.rs +++ b/crates/mongodb-agent-common/src/query/make_selector.rs @@ -10,7 +10,7 @@ use mongodb_support::BsonScalarType; use crate::{ comparison_function::ComparisonFunction, interface_types::MongoAgentError, - mongodb::json_to_bson::json_to_bson_scalar, query::column_ref::column_ref, + query::arguments::json_to_bson_scalar, query::column_ref::column_ref, }; use BinaryArrayComparisonOperator as ArrOp; diff --git a/crates/mongodb-agent-common/src/query/mod.rs b/crates/mongodb-agent-common/src/query/mod.rs index 53a4bc92..b079d9d8 100644 --- a/crates/mongodb-agent-common/src/query/mod.rs +++ b/crates/mongodb-agent-common/src/query/mod.rs @@ -1,3 +1,4 @@ +pub mod arguments; mod column_ref; mod constants; mod execute_native_query_request; From 3dbb723bf04dfa0658745992b48e14c29bb7a763 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Mon, 25 Mar 2024 15:56:57 -0700 Subject: [PATCH 25/33] updates for switch to map types --- crates/configuration/src/schema/database.rs | 15 ++++++++----- .../src/query/arguments/json_to_bson.rs | 22 +++++++++---------- crates/mongodb-connector/src/mutation.rs | 2 +- 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/crates/configuration/src/schema/database.rs b/crates/configuration/src/schema/database.rs index 9c92a885..25ac5a94 100644 --- a/crates/configuration/src/schema/database.rs +++ b/crates/configuration/src/schema/database.rs @@ -62,11 +62,14 @@ pub struct ObjectField { } impl ObjectField { - pub fn new(name: &str, r#type: Type) -> Self { - ObjectField { - name: name.to_owned(), - r#type, - description: Default::default(), - } + #[cfg(test)] + pub fn new(name: impl ToOwned, r#type: Type) -> (String, Self) { + ( + name.to_owned(), + ObjectField { + r#type, + description: Default::default(), + }, + ) } } diff --git a/crates/mongodb-agent-common/src/query/arguments/json_to_bson.rs b/crates/mongodb-agent-common/src/query/arguments/json_to_bson.rs index 755c336d..627fb09b 100644 --- a/crates/mongodb-agent-common/src/query/arguments/json_to_bson.rs +++ b/crates/mongodb-agent-common/src/query/arguments/json_to_bson.rs @@ -58,7 +58,7 @@ pub fn json_to_bson( let object_type = object_types .get(object_type_name) .ok_or_else(|| JsonToBsonError::UnknownObjectType(object_type_name.to_owned()))?; - convert_object(object_type, object_types, value) + convert_object(object_type_name, object_type, object_types, value) } Type::ArrayOf(element_type) => convert_array(element_type, object_types, value), Type::Nullable(t) => convert_nullable(t, object_types, value), @@ -200,24 +200,24 @@ fn convert_array( } fn convert_object( + object_type_name: &str, object_type: &ObjectType, object_types: &BTreeMap, value: Value, ) -> Result { let input_fields: BTreeMap = serde_json::from_value(value)?; let bson_doc: bson::Document = object_type - .fields - .iter() + .named_fields() .map(|field| { - let input_field_value = input_fields.get(&field.name).ok_or_else(|| { + let input_field_value = input_fields.get(field.name).ok_or_else(|| { JsonToBsonError::MissingObjectField( - Type::Object(object_type.name.clone()), - field.name.clone(), + Type::Object(object_type_name.to_owned()), + field.name.to_owned(), ) })?; Ok(( - field.name.clone(), - json_to_bson(&field.r#type, object_types, input_field_value.clone())?, + field.name.to_owned(), + json_to_bson(&field.value.r#type, object_types, input_field_value.clone())?, )) }) .try_collect::<_, _, JsonToBsonError>()?; @@ -277,7 +277,7 @@ fn incompatible_scalar_type(expected_type: BsonScalarType, value: Value) -> R #[cfg(test)] mod tests { - use std::str::FromStr; + use std::{collections::BTreeMap, str::FromStr}; use configuration::schema::{ObjectField, ObjectType, Type}; use mongodb::bson::{self, datetime::DateTimeBuilder, Bson}; @@ -291,7 +291,7 @@ mod tests { fn deserializes_specialized_scalar_types() -> anyhow::Result<()> { let object_type = ObjectType { name: "scalar_test".to_owned(), - fields: vec![ + fields: BTreeMap::from([ ObjectField::new("double", Type::Scalar(BsonScalarType::Double)), ObjectField::new("int", Type::Scalar(BsonScalarType::Int)), ObjectField::new("long", Type::Scalar(BsonScalarType::Long)), @@ -313,7 +313,7 @@ mod tests { ObjectField::new("minKey", Type::Scalar(BsonScalarType::MinKey)), ObjectField::new("maxKey", Type::Scalar(BsonScalarType::MaxKey)), ObjectField::new("symbol", Type::Scalar(BsonScalarType::Symbol)), - ], + ]), description: Default::default(), }; diff --git a/crates/mongodb-connector/src/mutation.rs b/crates/mongodb-connector/src/mutation.rs index 64ad0579..8c803c26 100644 --- a/crates/mongodb-connector/src/mutation.rs +++ b/crates/mongodb-connector/src/mutation.rs @@ -65,7 +65,7 @@ fn look_up_procedures( .get(&name); native_query .ok_or(name) - .map(|(_, nq)| Job::new(nq, arguments)) + .map(|native_query| Job::new(native_query, arguments)) } }) .partition_result(); From f21e75d2d583a220c024c72f214a89e64f5b99d0 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Mon, 25 Mar 2024 16:30:32 -0700 Subject: [PATCH 26/33] rename --- crates/mongodb-agent-common/src/command/mod.rs | 5 ----- .../src/interface_types/mongo_agent_error.rs | 2 +- crates/mongodb-agent-common/src/lib.rs | 2 +- .../src/{command => procedure}/error.rs | 2 +- .../command.rs => procedure/interpolated_command.rs} | 10 +++++----- crates/mongodb-agent-common/src/procedure/mod.rs | 5 +++++ 6 files changed, 13 insertions(+), 13 deletions(-) delete mode 100644 crates/mongodb-agent-common/src/command/mod.rs rename crates/mongodb-agent-common/src/{command => procedure}/error.rs (97%) rename crates/mongodb-agent-common/src/{command/command.rs => procedure/interpolated_command.rs} (96%) create mode 100644 crates/mongodb-agent-common/src/procedure/mod.rs diff --git a/crates/mongodb-agent-common/src/command/mod.rs b/crates/mongodb-agent-common/src/command/mod.rs deleted file mode 100644 index 9fae7217..00000000 --- a/crates/mongodb-agent-common/src/command/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod command; -mod error; - -pub use self::command::interpolated_command; -pub use self::error::CommandError; diff --git a/crates/mongodb-agent-common/src/interface_types/mongo_agent_error.rs b/crates/mongodb-agent-common/src/interface_types/mongo_agent_error.rs index 0e27716c..e5e1ead4 100644 --- a/crates/mongodb-agent-common/src/interface_types/mongo_agent_error.rs +++ b/crates/mongodb-agent-common/src/interface_types/mongo_agent_error.rs @@ -12,7 +12,7 @@ use thiserror::Error; pub enum MongoAgentError { BadCollectionSchema(String, bson::Bson, bson::de::Error), BadQuery(anyhow::Error), - CommandError(#[from] crate::command::CommandError), + CommandError(#[from] crate::procedure::ProcedureError), InvalidVariableName(String), JsonToBson(#[from] JsonToBsonError), MongoDB(#[from] mongodb::error::Error), diff --git a/crates/mongodb-agent-common/src/lib.rs b/crates/mongodb-agent-common/src/lib.rs index 5471a71c..664c2795 100644 --- a/crates/mongodb-agent-common/src/lib.rs +++ b/crates/mongodb-agent-common/src/lib.rs @@ -1,11 +1,11 @@ pub mod aggregation_function; -pub mod command; pub mod comparison_function; pub mod explain; pub mod health; pub mod interface_types; pub mod mongodb; pub mod mongodb_connection; +pub mod procedure; pub mod query; pub mod scalar_types_capabilities; pub mod schema; diff --git a/crates/mongodb-agent-common/src/command/error.rs b/crates/mongodb-agent-common/src/procedure/error.rs similarity index 97% rename from crates/mongodb-agent-common/src/command/error.rs rename to crates/mongodb-agent-common/src/procedure/error.rs index b21712f5..879fe6bd 100644 --- a/crates/mongodb-agent-common/src/command/error.rs +++ b/crates/mongodb-agent-common/src/procedure/error.rs @@ -3,7 +3,7 @@ use mongodb::bson::{self, Bson}; use thiserror::Error; #[derive(Clone, Debug, Error)] -pub enum CommandError { +pub enum ProcedureError { #[error("error converting parsing argument as extjson: {0}")] ExtJsonConversionError(#[from] bson::extjson::de::Error), diff --git a/crates/mongodb-agent-common/src/command/command.rs b/crates/mongodb-agent-common/src/procedure/interpolated_command.rs similarity index 96% rename from crates/mongodb-agent-common/src/command/command.rs rename to crates/mongodb-agent-common/src/procedure/interpolated_command.rs index 79650fbd..fd2d9736 100644 --- a/crates/mongodb-agent-common/src/command/command.rs +++ b/crates/mongodb-agent-common/src/procedure/interpolated_command.rs @@ -3,9 +3,9 @@ use std::collections::BTreeMap; use itertools::Itertools as _; use mongodb::bson::{self, Bson}; -use super::CommandError; +use super::ProcedureError; -type Result = std::result::Result; +type Result = std::result::Result; /// Parse native query commands, and interpolate arguments. pub fn interpolated_command( @@ -44,7 +44,7 @@ fn interpolate_document( let interpolated_key = interpolate_string(&key, arguments)?; match interpolated_key { Bson::String(string_key) => Ok((string_key, interpolated_value)), - _ => Err(CommandError::NonStringKey(interpolated_key)), + _ => Err(ProcedureError::NonStringKey(interpolated_key)), } }) .try_collect() @@ -77,7 +77,7 @@ fn interpolate_string(string: &str, arguments: &BTreeMap) -> Resul let argument_value = resolve_argument(¶m, arguments)?; match argument_value { Bson::String(string) => Ok(string), - _ => Err(CommandError::NonStringInStringContext(param)), + _ => Err(ProcedureError::NonStringInStringContext(param)), } } }) @@ -89,7 +89,7 @@ fn interpolate_string(string: &str, arguments: &BTreeMap) -> Resul fn resolve_argument(argument_name: &str, arguments: &BTreeMap) -> Result { let argument = arguments .get(argument_name) - .ok_or_else(|| CommandError::MissingArgument(argument_name.to_owned()))?; + .ok_or_else(|| ProcedureError::MissingArgument(argument_name.to_owned()))?; Ok(argument.clone()) } diff --git a/crates/mongodb-agent-common/src/procedure/mod.rs b/crates/mongodb-agent-common/src/procedure/mod.rs new file mode 100644 index 00000000..0d242583 --- /dev/null +++ b/crates/mongodb-agent-common/src/procedure/mod.rs @@ -0,0 +1,5 @@ +mod interpolated_command; +mod error; + +pub use self::interpolated_command::interpolated_command; +pub use self::error::ProcedureError; From 5a382bd80d1f83f7a686b8481c07d2efeb523c21 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Tue, 26 Mar 2024 13:16:20 -0700 Subject: [PATCH 27/33] add Procedure type that resolves arguments, and executes command --- Cargo.lock | 7 ++ crates/mongodb-agent-common/Cargo.toml | 1 + .../src/procedure/error.rs | 21 ++---- .../src/procedure/interpolated_command.rs | 9 ++- .../mongodb-agent-common/src/procedure/mod.rs | 57 ++++++++++++++- .../src/query/arguments/mod.rs | 73 ++++++++++++++----- .../chinook/native_queries/insert_artist.yaml | 3 +- 7 files changed, 134 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index de05b76b..3c9d0c58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1191,6 +1191,12 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "indent" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9f1a0777d972970f204fdf8ef319f1f4f8459131636d7e3c96c5d59570d0fa6" + [[package]] name = "indexmap" version = "1.9.3" @@ -1511,6 +1517,7 @@ dependencies = [ "futures", "futures-util", "http", + "indent", "indexmap 1.9.3", "itertools 0.10.5", "mockall", diff --git a/crates/mongodb-agent-common/Cargo.toml b/crates/mongodb-agent-common/Cargo.toml index 69b98ba8..0fd37bcf 100644 --- a/crates/mongodb-agent-common/Cargo.toml +++ b/crates/mongodb-agent-common/Cargo.toml @@ -17,6 +17,7 @@ futures = "0.3.28" futures-util = "0.3.28" http = "^0.2" indexmap = { version = "1", features = ["serde"] } # must match the version that ndc-client uses +indent = "^0.1" itertools = "^0.10" mongodb = "2.8" mongodb-support = { path = "../mongodb-support" } diff --git a/crates/mongodb-agent-common/src/procedure/error.rs b/crates/mongodb-agent-common/src/procedure/error.rs index 879fe6bd..45a5ba56 100644 --- a/crates/mongodb-agent-common/src/procedure/error.rs +++ b/crates/mongodb-agent-common/src/procedure/error.rs @@ -1,14 +1,12 @@ -use configuration::schema::Type; -use mongodb::bson::{self, Bson}; +use mongodb::bson::Bson; use thiserror::Error; -#[derive(Clone, Debug, Error)] -pub enum ProcedureError { - #[error("error converting parsing argument as extjson: {0}")] - ExtJsonConversionError(#[from] bson::extjson::de::Error), +use crate::query::arguments::ArgumentError; - #[error("invalid argument type: {0}")] - InvalidArgumentType(#[from] mongodb_support::error::Error), +#[derive(Debug, Error)] +pub enum ProcedureError { + #[error("error executing mongodb command: {0}")] + ExecutionError(#[from] mongodb::error::Error), #[error("a required argument was not provided, \"{0}\"")] MissingArgument(String), @@ -19,9 +17,6 @@ pub enum ProcedureError { #[error("object keys must be strings, but got: \"{0}\"")] NonStringKey(Bson), - #[error("argument type, \"{0:?}\", does not match parameter type, \"{1:?}\"")] - TypeMismatch(Type, Type), - - #[error("an argument was provided for an undefined paremeter, \"{0}\"")] - UnknownParameter(String), + #[error("could not resolve arguments: {0}")] + UnresolvableArguments(#[from] ArgumentError), } diff --git a/crates/mongodb-agent-common/src/procedure/interpolated_command.rs b/crates/mongodb-agent-common/src/procedure/interpolated_command.rs index fd2d9736..86acb3cf 100644 --- a/crates/mongodb-agent-common/src/procedure/interpolated_command.rs +++ b/crates/mongodb-agent-common/src/procedure/interpolated_command.rs @@ -11,8 +11,12 @@ type Result = std::result::Result; pub fn interpolated_command( command: &bson::Document, arguments: &BTreeMap, -) -> Result { - interpolate_helper(&command.into(), arguments) +) -> Result { + let bson = interpolate_helper(&command.into(), arguments)?; + match bson { + Bson::Document(doc) => Ok(doc), + _ => unreachable!("interpolated_command is guaranteed to produce a document"), + } } fn interpolate_helper(command_node: &Bson, arguments: &BTreeMap) -> Result { @@ -136,7 +140,6 @@ mod tests { use super::*; // TODO: key - // TODO: value with multiple placeholders // TODO: key with multiple placeholders #[test] diff --git a/crates/mongodb-agent-common/src/procedure/mod.rs b/crates/mongodb-agent-common/src/procedure/mod.rs index 0d242583..ee124f41 100644 --- a/crates/mongodb-agent-common/src/procedure/mod.rs +++ b/crates/mongodb-agent-common/src/procedure/mod.rs @@ -1,5 +1,58 @@ -mod interpolated_command; mod error; +mod interpolated_command; + +use std::borrow::Cow; +use std::collections::BTreeMap; + +use configuration::native_queries::NativeQuery; +use configuration::schema::{ObjectField, ObjectType}; +use mongodb::options::SelectionCriteria; +use mongodb::{bson, Database}; + +use crate::query::arguments::resolve_arguments; -pub use self::interpolated_command::interpolated_command; pub use self::error::ProcedureError; +pub use self::interpolated_command::interpolated_command; + +/// Encapsulates running arbitrary mongodb commands with interpolated arguments +pub struct Procedure<'a> { + command: Cow<'a, bson::Document>, + object_types: Cow<'a, BTreeMap>, + parameters: Cow<'a, BTreeMap>, + selection_criteria: Option>, +} + +impl<'a> Procedure<'a> { + /// Note: the `object_types` argument here is not the object types from the native query - it + /// should be the set of *all* object types collected from schema and native query definitions. + pub fn from_native_query( + native_query: &'a NativeQuery, + object_types: &'a BTreeMap, + ) -> Self { + Procedure { + command: Cow::Borrowed(&native_query.command), + object_types: Cow::Borrowed(object_types), + parameters: Cow::Borrowed(&native_query.arguments), + selection_criteria: native_query.selection_criteria.as_ref().map(Cow::Borrowed), + } + } + + pub async fn execute( + self, + arguments: BTreeMap, + database: Database, + ) -> Result { + let command = self.interpolated_command(arguments)?; + let selection_criteria = self.selection_criteria.map(Cow::into_owned); + let result = database.run_command(command, selection_criteria).await?; + Ok(result) + } + + pub fn interpolated_command( + &self, + arguments: BTreeMap, + ) -> Result { + let bson_arguments = resolve_arguments(&self.object_types, &self.parameters, arguments)?; + interpolated_command(&self.command, &bson_arguments) + } +} diff --git a/crates/mongodb-agent-common/src/query/arguments/mod.rs b/crates/mongodb-agent-common/src/query/arguments/mod.rs index 16674719..ab84a740 100644 --- a/crates/mongodb-agent-common/src/query/arguments/mod.rs +++ b/crates/mongodb-agent-common/src/query/arguments/mod.rs @@ -2,16 +2,28 @@ mod json_to_bson; use std::collections::BTreeMap; -use configuration::schema::{ObjectField, ObjectType}; +use configuration::schema::{ObjectField, ObjectType, Type}; +use indent::indent_all_by; use itertools::Itertools as _; use mongodb::bson::Bson; use serde_json::Value; - -use crate::interface_types::MongoAgentError; +use thiserror::Error; use self::json_to_bson::json_to_bson; -pub use self::json_to_bson::{JsonToBsonError, json_to_bson_scalar}; +pub use self::json_to_bson::{json_to_bson_scalar, JsonToBsonError}; + +#[derive(Debug, Error)] +pub enum ArgumentError { + #[error("unknown variables or arguments: {}", .0.join(", "))] + Excess(Vec), + + #[error("some variables or arguments are invalid:\n{}", format_errors(.0))] + Invalid(BTreeMap), + + #[error("missing variables or arguments: {}", .0.join(", "))] + Missing(Vec), +} /// Translate arguments to queries or native queries to BSON according to declared parameter types. /// @@ -20,27 +32,44 @@ pub use self::json_to_bson::{JsonToBsonError, json_to_bson_scalar}; pub fn resolve_arguments( object_types: &BTreeMap, parameters: &BTreeMap, - arguments: BTreeMap, -) -> Result, MongoAgentError> { + mut arguments: BTreeMap, +) -> Result, ArgumentError> { validate_no_excess_arguments(parameters, &arguments)?; - parameters + + let (arguments, missing): (Vec<(String, Value, &Type)>, Vec) = parameters .iter() - .map(|(key, parameter)| { - let argument = arguments - .get(key) - .ok_or_else(|| MongoAgentError::VariableNotDefined(key.to_owned()))?; - Ok(( - key.clone(), - json_to_bson(¶meter.r#type, object_types, argument.clone())?, - )) + .map(|(name, parameter)| { + if let Some((name, argument)) = arguments.remove_entry(name) { + Ok((name, argument, ¶meter.r#type)) + } else { + Err(name.clone()) + } + }) + .partition_result(); + if !missing.is_empty() { + return Err(ArgumentError::Missing(missing)); + } + + let (resolved, errors): (BTreeMap, BTreeMap) = arguments + .into_iter() + .map(|(name, argument, parameter_type)| { + match json_to_bson(parameter_type, object_types, argument) { + Ok(bson) => Ok((name, bson)), + Err(err) => Err((name, err)), + } }) - .try_collect() + .partition_result(); + if !errors.is_empty() { + return Err(ArgumentError::Invalid(errors)); + } + + Ok(resolved) } pub fn validate_no_excess_arguments( parameters: &BTreeMap, arguments: &BTreeMap, -) -> Result<(), MongoAgentError> { +) -> Result<(), ArgumentError> { let excess: Vec = arguments .iter() .filter_map(|(name, _)| { @@ -52,8 +81,16 @@ pub fn validate_no_excess_arguments( }) .collect(); if !excess.is_empty() { - Err(MongoAgentError::UnknownVariables(excess)) + Err(ArgumentError::Excess(excess)) } else { Ok(()) } } + +fn format_errors(errors: &BTreeMap) -> String { + errors + .iter() + .map(|(name, error)| format!(" {name}:\n{}", indent_all_by(4, error.to_string()))) + .collect::>() + .join("\n") +} diff --git a/fixtures/connector/chinook/native_queries/insert_artist.yaml b/fixtures/connector/chinook/native_queries/insert_artist.yaml index cb04258b..46194eaf 100644 --- a/fixtures/connector/chinook/native_queries/insert_artist.yaml +++ b/fixtures/connector/chinook/native_queries/insert_artist.yaml @@ -1,5 +1,7 @@ name: insertArtist description: Example of a database update using a native query +mode: readWrite +resultType: !object InsertArtist objectTypes: InsertArtist: fields: @@ -7,7 +9,6 @@ objectTypes: type: !scalar int n: type: !scalar int -resultType: !object InsertArtist # TODO: implement arguments instead of hard-coding inputs command: insert: "Artist" From d6bf2ddfc7e041ab51abaaa49d8cdf656565d2db Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Tue, 26 Mar 2024 13:40:57 -0700 Subject: [PATCH 28/33] connect arguments to procedure in mutation handler --- .../src/interface_types/mongo_config.rs | 3 +- .../mongodb-agent-common/src/procedure/mod.rs | 40 ++++++++---- crates/mongodb-agent-common/src/state.rs | 4 ++ crates/mongodb-connector/src/mutation.rs | 64 ++++++------------- 4 files changed, 52 insertions(+), 59 deletions(-) diff --git a/crates/mongodb-agent-common/src/interface_types/mongo_config.rs b/crates/mongodb-agent-common/src/interface_types/mongo_config.rs index b7323285..0801dd0c 100644 --- a/crates/mongodb-agent-common/src/interface_types/mongo_config.rs +++ b/crates/mongodb-agent-common/src/interface_types/mongo_config.rs @@ -1,6 +1,6 @@ use std::collections::BTreeMap; -use configuration::native_queries::NativeQuery; +use configuration::{native_queries::NativeQuery, schema::ObjectType}; use mongodb::Client; #[derive(Clone, Debug)] @@ -11,4 +11,5 @@ pub struct MongoConfig { pub database: String, pub native_queries: BTreeMap, + pub object_types: BTreeMap, } diff --git a/crates/mongodb-agent-common/src/procedure/mod.rs b/crates/mongodb-agent-common/src/procedure/mod.rs index ee124f41..cf193236 100644 --- a/crates/mongodb-agent-common/src/procedure/mod.rs +++ b/crates/mongodb-agent-common/src/procedure/mod.rs @@ -15,7 +15,9 @@ pub use self::error::ProcedureError; pub use self::interpolated_command::interpolated_command; /// Encapsulates running arbitrary mongodb commands with interpolated arguments +#[derive(Clone, Debug)] pub struct Procedure<'a> { + arguments: BTreeMap, command: Cow<'a, bson::Document>, object_types: Cow<'a, BTreeMap>, parameters: Cow<'a, BTreeMap>, @@ -28,8 +30,10 @@ impl<'a> Procedure<'a> { pub fn from_native_query( native_query: &'a NativeQuery, object_types: &'a BTreeMap, + arguments: BTreeMap, ) -> Self { Procedure { + arguments, command: Cow::Borrowed(&native_query.command), object_types: Cow::Borrowed(object_types), parameters: Cow::Borrowed(&native_query.arguments), @@ -37,22 +41,34 @@ impl<'a> Procedure<'a> { } } - pub async fn execute( - self, - arguments: BTreeMap, - database: Database, - ) -> Result { - let command = self.interpolated_command(arguments)?; + pub async fn execute(self, database: Database) -> Result { let selection_criteria = self.selection_criteria.map(Cow::into_owned); + let command = interpolate( + &self.object_types, + &self.parameters, + self.arguments, + &self.command, + )?; let result = database.run_command(command, selection_criteria).await?; Ok(result) } - pub fn interpolated_command( - &self, - arguments: BTreeMap, - ) -> Result { - let bson_arguments = resolve_arguments(&self.object_types, &self.parameters, arguments)?; - interpolated_command(&self.command, &bson_arguments) + pub fn interpolated_command(self) -> Result { + interpolate( + &self.object_types, + &self.parameters, + self.arguments, + &self.command, + ) } } + +fn interpolate( + object_types: &BTreeMap, + parameters: &BTreeMap, + arguments: BTreeMap, + command: &bson::Document, +) -> Result { + let bson_arguments = resolve_arguments(object_types, parameters, arguments)?; + interpolated_command(command, &bson_arguments) +} diff --git a/crates/mongodb-agent-common/src/state.rs b/crates/mongodb-agent-common/src/state.rs index d51ec3c2..7bc2df3a 100644 --- a/crates/mongodb-agent-common/src/state.rs +++ b/crates/mongodb-agent-common/src/state.rs @@ -31,5 +31,9 @@ pub async fn try_init_state_from_uri( client, database: database_name, native_queries: configuration.native_queries.clone(), + object_types: configuration + .object_types() + .map(|(name, object_type)| (name.clone(), object_type.clone())) + .collect(), }) } diff --git a/crates/mongodb-connector/src/mutation.rs b/crates/mongodb-connector/src/mutation.rs index 8c803c26..c751073c 100644 --- a/crates/mongodb-connector/src/mutation.rs +++ b/crates/mongodb-connector/src/mutation.rs @@ -1,34 +1,12 @@ -use std::collections::BTreeMap; - -use configuration::native_queries::NativeQuery; use futures::future::try_join_all; use itertools::Itertools; use mongodb::Database; -use mongodb_agent_common::interface_types::MongoConfig; +use mongodb_agent_common::{interface_types::MongoConfig, procedure::Procedure}; use ndc_sdk::{ connector::MutationError, json_response::JsonResponse, models::{MutationOperation, MutationOperationResults, MutationRequest, MutationResponse}, }; -use serde_json::Value; - -/// A procedure combined with inputs -#[derive(Clone, Debug)] -#[allow(dead_code)] -struct Job<'a> { - // For the time being all procedures are native queries. - native_query: &'a NativeQuery, - arguments: BTreeMap, -} - -impl<'a> Job<'a> { - pub fn new(native_query: &'a NativeQuery, arguments: BTreeMap) -> Self { - Job { - native_query, - arguments, - } - } -} pub async fn handle_mutation_request( config: &MongoConfig, @@ -39,7 +17,7 @@ pub async fn handle_mutation_request( let jobs = look_up_procedures(config, mutation_request)?; let operation_results = try_join_all( jobs.into_iter() - .map(|job| execute_job(database.clone(), job)), + .map(|procedure| execute_procedure(database.clone(), procedure)), ) .await?; Ok(JsonResponse::Value(MutationResponse { operation_results })) @@ -50,22 +28,18 @@ pub async fn handle_mutation_request( fn look_up_procedures( config: &MongoConfig, mutation_request: MutationRequest, -) -> Result>, MutationError> { - let (jobs, not_found): (Vec, Vec) = mutation_request +) -> Result>, MutationError> { + let (procedures, not_found): (Vec, Vec) = mutation_request .operations .into_iter() .map(|operation| match operation { MutationOperation::Procedure { - name, - arguments, - .. + name, arguments, .. } => { - let native_query = config - .native_queries - .get(&name); - native_query - .ok_or(name) - .map(|native_query| Job::new(native_query, arguments)) + let native_query = config.native_queries.get(&name); + native_query.ok_or(name).map(|native_query| { + Procedure::from_native_query(native_query, &config.object_types, arguments) + }) } }) .partition_result(); @@ -77,22 +51,20 @@ fn look_up_procedures( ))); } - Ok(jobs) + Ok(procedures) } -async fn execute_job( +async fn execute_procedure( database: Database, - job: Job<'_>, + procedure: Procedure<'_>, ) -> Result { - let result = database - .run_command(job.native_query.command.clone(), None) + let result = procedure + .execute(database.clone()) .await - .map_err(|err| match *err.kind { - mongodb::error::ErrorKind::InvalidArgument { message, .. } => { - MutationError::UnprocessableContent(message) - } - err => MutationError::Other(Box::new(err)), - })?; + .map_err(|err| MutationError::InvalidRequest(err.to_string()))?; + + // TODO: instead of outputting extended JSON, map to JSON using a reverse of `json_to_bson` + // according to the native query result type let json_result = serde_json::to_value(result).map_err(|err| MutationError::Other(Box::new(err)))?; Ok(MutationOperationResults::Procedure { From 104fbeb909e2212f3dd99e913edbb0730edbb502 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Tue, 26 Mar 2024 15:48:34 -0700 Subject: [PATCH 29/33] update native query fixture to use arguments --- .../chinook/native_queries/insert_artist.yaml | 10 +++++++--- .../ddn/subgraphs/chinook/commands/InsertArtist.hml | 6 +++++- .../ddn/subgraphs/chinook/dataconnectors/mongodb.hml | 6 ++++-- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/fixtures/connector/chinook/native_queries/insert_artist.yaml b/fixtures/connector/chinook/native_queries/insert_artist.yaml index 46194eaf..db40b45e 100644 --- a/fixtures/connector/chinook/native_queries/insert_artist.yaml +++ b/fixtures/connector/chinook/native_queries/insert_artist.yaml @@ -2,6 +2,11 @@ name: insertArtist description: Example of a database update using a native query mode: readWrite resultType: !object InsertArtist +arguments: + id: + type: !scalar int + name: + type: !scalar string objectTypes: InsertArtist: fields: @@ -9,9 +14,8 @@ objectTypes: type: !scalar int n: type: !scalar int -# TODO: implement arguments instead of hard-coding inputs command: insert: "Artist" documents: - - ArtistId: 1001 - Name: Regina Spektor + - ArtistId: "{{ id }}" + Name: "{{ name }}" diff --git a/fixtures/ddn/subgraphs/chinook/commands/InsertArtist.hml b/fixtures/ddn/subgraphs/chinook/commands/InsertArtist.hml index 7b1d3fff..663e9199 100644 --- a/fixtures/ddn/subgraphs/chinook/commands/InsertArtist.hml +++ b/fixtures/ddn/subgraphs/chinook/commands/InsertArtist.hml @@ -4,7 +4,11 @@ definition: name: insertArtist description: Example of a database update using a native query outputType: InsertArtist - arguments: [] + arguments: + - name: id + type: Int! + - name: name + type: String! source: dataConnectorName: mongodb dataConnectorCommand: diff --git a/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml b/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml index d94ec308..37db817e 100644 --- a/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml +++ b/fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml @@ -1016,8 +1016,10 @@ definition: - name: insertArtist description: Example of a database update using a native query result_type: { type: named, name: InsertArtist } - arguments: {} - command: { insert: Artist, documents: [{ ArtistId: 1001, Name: Regina Spektor }] } + arguments: + id: { type: { type: named, name: Int } } + name: { type: { type: named, name: String } } + command: { insert: Artist, documents: [{ ArtistId: "{{ id }}", Name: "{{ name }}" }] } capabilities: version: ^0.1.0 capabilities: From fa6bedde01739c451a9c9de958686873f3a49249 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Tue, 26 Mar 2024 16:28:41 -0700 Subject: [PATCH 30/33] convert native query fixtures to json --- .../chinook/native_queries/hello.json | 26 +++++++++++ .../chinook/native_queries/hello.yaml | 13 ------ .../chinook/native_queries/insert_artist.json | 45 +++++++++++++++++++ .../chinook/native_queries/insert_artist.yaml | 21 --------- 4 files changed, 71 insertions(+), 34 deletions(-) create mode 100644 fixtures/connector/chinook/native_queries/hello.json delete mode 100644 fixtures/connector/chinook/native_queries/hello.yaml create mode 100644 fixtures/connector/chinook/native_queries/insert_artist.json delete mode 100644 fixtures/connector/chinook/native_queries/insert_artist.yaml diff --git a/fixtures/connector/chinook/native_queries/hello.json b/fixtures/connector/chinook/native_queries/hello.json new file mode 100644 index 00000000..c88d253f --- /dev/null +++ b/fixtures/connector/chinook/native_queries/hello.json @@ -0,0 +1,26 @@ +{ + "name": "hello", + "description": "Example of a read-only native query", + "objectTypes": { + "HelloResult": { + "fields": { + "ok": { + "type": { + "scalar": "int" + } + }, + "readOnly": { + "type": { + "scalar": "bool" + } + } + } + } + }, + "resultType": { + "object": "HelloResult" + }, + "command": { + "hello": 1 + } +} diff --git a/fixtures/connector/chinook/native_queries/hello.yaml b/fixtures/connector/chinook/native_queries/hello.yaml deleted file mode 100644 index 4a027c8f..00000000 --- a/fixtures/connector/chinook/native_queries/hello.yaml +++ /dev/null @@ -1,13 +0,0 @@ -name: hello -description: Example of a read-only native query -objectTypes: - HelloResult: - fields: - ok: - type: !scalar int - readOnly: - type: !scalar bool - # There are more fields but you get the idea -resultType: !object HelloResult -command: - hello: 1 diff --git a/fixtures/connector/chinook/native_queries/insert_artist.json b/fixtures/connector/chinook/native_queries/insert_artist.json new file mode 100644 index 00000000..7ff29310 --- /dev/null +++ b/fixtures/connector/chinook/native_queries/insert_artist.json @@ -0,0 +1,45 @@ +{ + "name": "insertArtist", + "description": "Example of a database update using a native query", + "mode": "readWrite", + "resultType": { + "object": "InsertArtist" + }, + "arguments": { + "id": { + "type": { + "scalar": "int" + } + }, + "name": { + "type": { + "scalar": "string" + } + } + }, + "objectTypes": { + "InsertArtist": { + "fields": { + "ok": { + "type": { + "scalar": "int" + } + }, + "n": { + "type": { + "scalar": "int" + } + } + } + } + }, + "command": { + "insert": "Artist", + "documents": [ + { + "ArtistId": "{{ id }}", + "Name": "{{ name }}" + } + ] + } +} diff --git a/fixtures/connector/chinook/native_queries/insert_artist.yaml b/fixtures/connector/chinook/native_queries/insert_artist.yaml deleted file mode 100644 index db40b45e..00000000 --- a/fixtures/connector/chinook/native_queries/insert_artist.yaml +++ /dev/null @@ -1,21 +0,0 @@ -name: insertArtist -description: Example of a database update using a native query -mode: readWrite -resultType: !object InsertArtist -arguments: - id: - type: !scalar int - name: - type: !scalar string -objectTypes: - InsertArtist: - fields: - ok: - type: !scalar int - n: - type: !scalar int -command: - insert: "Artist" - documents: - - ArtistId: "{{ id }}" - Name: "{{ name }}" From e6d12b0b6d96afd6227c589dbeb41bf55ecaef8e Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Tue, 26 Mar 2024 16:38:45 -0700 Subject: [PATCH 31/33] remove error variants that I didn't end up using --- .../src/interface_types/mongo_agent_error.rs | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/crates/mongodb-agent-common/src/interface_types/mongo_agent_error.rs b/crates/mongodb-agent-common/src/interface_types/mongo_agent_error.rs index e5e1ead4..ad5ea4fa 100644 --- a/crates/mongodb-agent-common/src/interface_types/mongo_agent_error.rs +++ b/crates/mongodb-agent-common/src/interface_types/mongo_agent_error.rs @@ -12,9 +12,7 @@ use thiserror::Error; pub enum MongoAgentError { BadCollectionSchema(String, bson::Bson, bson::de::Error), BadQuery(anyhow::Error), - CommandError(#[from] crate::procedure::ProcedureError), InvalidVariableName(String), - JsonToBson(#[from] JsonToBsonError), MongoDB(#[from] mongodb::error::Error), MongoDBDeserialization(#[from] mongodb::bson::de::Error), MongoDBSerialization(#[from] mongodb::bson::ser::Error), @@ -22,7 +20,6 @@ pub enum MongoAgentError { NotImplemented(&'static str), Serialization(serde_json::Error), UnknownAggregationFunction(String), - UnknownVariables(Vec), UnspecifiedRelation(String), VariableNotDefined(String), AdHoc(#[from] anyhow::Error), @@ -31,8 +28,6 @@ pub enum MongoAgentError { use MongoAgentError::*; -use crate::query::arguments::JsonToBsonError; - impl MongoAgentError { pub fn status_and_error_response(&self) -> (StatusCode, ErrorResponse) { match self { @@ -62,12 +57,10 @@ impl MongoAgentError { }, ), BadQuery(err) => (StatusCode::BAD_REQUEST, ErrorResponse::new(&err)), - CommandError(err) => (StatusCode::BAD_REQUEST, ErrorResponse::new(&err)), InvalidVariableName(name) => ( StatusCode::BAD_REQUEST, ErrorResponse::new(&format!("Column identifier includes characters that are not permitted in a MongoDB variable name: {name}")) ), - JsonToBson(err) => (StatusCode::BAD_REQUEST, ErrorResponse::new(&err)), MongoDB(err) => (StatusCode::BAD_REQUEST, ErrorResponse::new(&err)), MongoDBDeserialization(err) => (StatusCode::BAD_REQUEST, ErrorResponse::new(&err)), MongoDBSerialization(err) => { @@ -80,10 +73,6 @@ impl MongoAgentError { StatusCode::BAD_REQUEST, ErrorResponse::new(&format!("Unknown aggregation function, {function}")), ), - UnknownVariables(variable_names) => ( - StatusCode::BAD_REQUEST, - ErrorResponse::new(&format!("Query included unrecognized variables: {}", variable_names.join(", "))) - ), UnspecifiedRelation(relation) => ( StatusCode::BAD_REQUEST, ErrorResponse::new(&format!("Query referenced a relationship, \"{relation}\", but did not include relation metadata in `table_relationships`")) From 33fa05000ba18d29d5cebba15867bd3187df6979 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Tue, 26 Mar 2024 21:13:55 -0700 Subject: [PATCH 32/33] finish the incomplete tests --- crates/configuration/src/native_queries.rs | 20 +++--- crates/configuration/src/schema/database.rs | 5 +- .../src/procedure/interpolated_command.rs | 70 ++++++++----------- .../src/query/arguments/json_to_bson.rs | 6 +- .../mongodb-agent-common/src/query/foreach.rs | 8 +-- 5 files changed, 52 insertions(+), 57 deletions(-) diff --git a/crates/configuration/src/native_queries.rs b/crates/configuration/src/native_queries.rs index f7c56b5c..705a3c86 100644 --- a/crates/configuration/src/native_queries.rs +++ b/crates/configuration/src/native_queries.rs @@ -46,18 +46,22 @@ pub struct NativeQuery { /// substituted the string will be replaced by the type of the variable. For example in this /// command, /// - /// { - /// "insert": "posts", - /// "documents": "{{ documents }}" - /// } + /// ```json + /// json!({ + /// "insert": "posts", + /// "documents": "{{ documents }}" + /// }) + /// ``` /// /// If the type of the `documents` argument is an array then after variable substitution the /// command will expand to: /// - /// { - /// "insert": "posts", - /// "documents": [/* array of documents */] - /// } + /// ```json + /// json!({ + /// "insert": "posts", + /// "documents": [/* array of documents */] + /// }) + /// ``` /// #[schemars(with = "Object")] pub command: bson::Document, diff --git a/crates/configuration/src/schema/database.rs b/crates/configuration/src/schema/database.rs index 25ac5a94..61a9d901 100644 --- a/crates/configuration/src/schema/database.rs +++ b/crates/configuration/src/schema/database.rs @@ -62,10 +62,9 @@ pub struct ObjectField { } impl ObjectField { - #[cfg(test)] - pub fn new(name: impl ToOwned, r#type: Type) -> (String, Self) { + pub fn new(name: impl ToString, r#type: Type) -> (String, Self) { ( - name.to_owned(), + name.to_string(), ObjectField { r#type, description: Default::default(), diff --git a/crates/mongodb-agent-common/src/procedure/interpolated_command.rs b/crates/mongodb-agent-common/src/procedure/interpolated_command.rs index 86acb3cf..cadf3eae 100644 --- a/crates/mongodb-agent-common/src/procedure/interpolated_command.rs +++ b/crates/mongodb-agent-common/src/procedure/interpolated_command.rs @@ -57,11 +57,15 @@ fn interpolate_document( /// Substitute placeholders within a string in the input template. This may produce an output that /// is not a string if the entire content of the string is a placeholder. For example, /// -/// { "key": "{{recordId}}" } +/// ```json +/// { "key": "{{recordId}}" } +/// ``` /// /// might expand to, /// -/// { "key": 42 } +/// ```json +/// { "key": 42 } +/// ``` /// /// if the type of the variable `recordId` is `int`. fn interpolate_string(string: &str, arguments: &BTreeMap) -> Result { @@ -145,12 +149,11 @@ mod tests { #[test] fn interpolates_non_string_type() -> anyhow::Result<()> { let native_query_input = json!({ - "name": "insertArtist", "resultType": { "object": "InsertArtist" }, - "arguments": [ - { "name": "id", "type": { "scalar": "int" } }, - { "name": "name", "type": { "scalar": "string" } }, - ], + "arguments": { + "id": { "type": { "scalar": "int" } }, + "name": { "type": { "scalar": "string" } }, + }, "command": { "insert": "Artist", "documents": [{ @@ -193,16 +196,17 @@ mod tests { let native_query_input = json!({ "name": "insertArtist", "resultType": { "object": "InsertArtist" }, - "objectTypes": [{ - "name": "ArtistInput", - "fields": [ - { "name": "ArtistId", "type": { "scalar": "int" } }, - { "name": "Name", "type": { "scalar": "string" } }, - ], - }], - "arguments": [ - { "name": "documents", "type": { "arrayOf": { "object": "ArtistInput" } } }, - ], + "objectTypes": { + "ArtistInput": { + "fields": { + "ArtistId": { "type": { "scalar": "int" } }, + "Name": { "type": { "scalar": "string" } }, + }, + } + }, + "arguments": { + "documents": { "type": { "arrayOf": { "object": "ArtistInput" } } }, + }, "command": { "insert": "Artist", "documents": "{{ documents }}", @@ -251,22 +255,19 @@ mod tests { let native_query_input = json!({ "name": "insert", "resultType": { "object": "Insert" }, - "arguments": [ - { "name": "prefix", "type": { "scalar": "string" } }, - { "name": "basename", "type": { "scalar": "string" } }, - ], + "arguments": { + "prefix": { "type": { "scalar": "string" } }, + "basename": { "type": { "scalar": "string" } }, + }, "command": { "insert": "{{prefix}}-{{basename}}", "empty": "", }, }); - let input_arguments = [( - "documents".to_owned(), - json!([ - { "prefix": "current" } , - { "basename": "some-coll" } , - ]), - )] + let input_arguments = [ + ("prefix".to_owned(), json!("current")), + ("basename".to_owned(), json!("some-coll")), + ] .into_iter() .collect(); @@ -281,17 +282,8 @@ mod tests { assert_eq!( command, bson::doc! { - "insert": "Artist", - "documents": [ - { - "ArtistId": 1001, - "Name": "Regina Spektor", - }, - { - "ArtistId": 1002, - "Name": "Ok Go", - } - ], + "insert": "current-some-coll", + "empty": "", } .into() ); diff --git a/crates/mongodb-agent-common/src/query/arguments/json_to_bson.rs b/crates/mongodb-agent-common/src/query/arguments/json_to_bson.rs index 627fb09b..2ce5ce31 100644 --- a/crates/mongodb-agent-common/src/query/arguments/json_to_bson.rs +++ b/crates/mongodb-agent-common/src/query/arguments/json_to_bson.rs @@ -289,8 +289,8 @@ mod tests { #[test] fn deserializes_specialized_scalar_types() -> anyhow::Result<()> { + let object_type_name = "scalar_test".to_owned(); let object_type = ObjectType { - name: "scalar_test".to_owned(), fields: BTreeMap::from([ ObjectField::new("double", Type::Scalar(BsonScalarType::Double)), ObjectField::new("int", Type::Scalar(BsonScalarType::Int)), @@ -372,8 +372,8 @@ mod tests { }; let actual = json_to_bson( - &Type::Object(object_type.name.clone()), - &[(object_type.name.clone(), object_type)] + &Type::Object(object_type_name.clone()), + &[(object_type_name.clone(), object_type)] .into_iter() .collect(), input, diff --git a/crates/mongodb-agent-common/src/query/foreach.rs b/crates/mongodb-agent-common/src/query/foreach.rs index a70e0ffc..7febe1c0 100644 --- a/crates/mongodb-agent-common/src/query/foreach.rs +++ b/crates/mongodb-agent-common/src/query/foreach.rs @@ -166,8 +166,8 @@ mod tests { "target": {"name": ["tracks"], "type": "table"}, "relationships": [], "foreach": [ - { "artistId": {"value": 1, "value_type": "number"} }, - { "artistId": {"value": 2, "value_type": "number"} } + { "artistId": {"value": 1, "value_type": "int"} }, + { "artistId": {"value": 2, "value_type": "int"} } ] }))?; @@ -279,8 +279,8 @@ mod tests { "target": {"name": ["tracks"], "type": "table"}, "relationships": [], "foreach": [ - { "artistId": {"value": 1, "value_type": "number"} }, - { "artistId": {"value": 2, "value_type": "number"} } + { "artistId": {"value": 1, "value_type": "int"} }, + { "artistId": {"value": 2, "value_type": "int"} } ] }))?; From 01b5cee175652be76ca2b0dd464e51ec53da7247 Mon Sep 17 00:00:00 2001 From: Jesse Hallett Date: Wed, 27 Mar 2024 11:07:22 -0700 Subject: [PATCH 33/33] lint fixes for test code --- .../mongodb-agent-common/src/procedure/interpolated_command.rs | 3 --- .../mongodb-agent-common/src/query/arguments/json_to_bson.rs | 1 + 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/mongodb-agent-common/src/procedure/interpolated_command.rs b/crates/mongodb-agent-common/src/procedure/interpolated_command.rs index cadf3eae..76ff4304 100644 --- a/crates/mongodb-agent-common/src/procedure/interpolated_command.rs +++ b/crates/mongodb-agent-common/src/procedure/interpolated_command.rs @@ -186,7 +186,6 @@ mod tests { "Name": "Regina Spektor", }], } - .into() ); Ok(()) } @@ -245,7 +244,6 @@ mod tests { } ], } - .into() ); Ok(()) } @@ -285,7 +283,6 @@ mod tests { "insert": "current-some-coll", "empty": "", } - .into() ); Ok(()) } diff --git a/crates/mongodb-agent-common/src/query/arguments/json_to_bson.rs b/crates/mongodb-agent-common/src/query/arguments/json_to_bson.rs index 2ce5ce31..6ffa3bf8 100644 --- a/crates/mongodb-agent-common/src/query/arguments/json_to_bson.rs +++ b/crates/mongodb-agent-common/src/query/arguments/json_to_bson.rs @@ -288,6 +288,7 @@ mod tests { use super::json_to_bson; #[test] + #[allow(clippy::approx_constant)] fn deserializes_specialized_scalar_types() -> anyhow::Result<()> { let object_type_name = "scalar_test".to_owned(); let object_type = ObjectType {