diff --git a/Cargo.lock b/Cargo.lock index 6cb0c498..875e0dda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -410,13 +410,16 @@ dependencies = [ name = "configuration" version = "0.1.0" dependencies = [ + "futures", "itertools 0.12.1", + "mongodb", "mongodb-support", "schemars", "serde", "serde_json", "serde_yaml", "tokio", + "tokio-stream", ] [[package]] diff --git a/crates/cli/src/introspection.rs b/crates/cli/src/introspection.rs index 7a8a60d4..279679fb 100644 --- a/crates/cli/src/introspection.rs +++ b/crates/cli/src/introspection.rs @@ -1,6 +1,6 @@ use configuration::{ - metadata::{Collection, ObjectField, ObjectType, Type}, - Metadata, + schema::{Collection, ObjectField, ObjectType, Type}, + Schema, }; use futures_util::{StreamExt, TryStreamExt}; use indexmap::IndexMap; @@ -12,7 +12,7 @@ use mongodb_agent_common::interface_types::{MongoAgentError, MongoConfig}; pub async fn get_metadata_from_validation_schema( config: &MongoConfig, -) -> Result { +) -> Result { let db = config.client.database(&config.database); let collections_cursor = db.list_collections(None, None).await?; @@ -51,7 +51,7 @@ pub async fn get_metadata_from_validation_schema( .try_collect::<(Vec>, Vec)>() .await?; - Ok(Metadata { + Ok(Schema { collections, object_types: object_types.concat(), }) @@ -121,11 +121,11 @@ fn make_object_field( } fn maybe_nullable( - t: configuration::metadata::Type, + t: configuration::schema::Type, is_nullable: bool, -) -> configuration::metadata::Type { +) -> configuration::schema::Type { if is_nullable { - configuration::metadata::Type::Nullable(Box::new(t)) + configuration::schema::Type::Nullable(Box::new(t)) } else { t } diff --git a/crates/cli/src/lib.rs b/crates/cli/src/lib.rs index 40cc2697..b37c4ee2 100644 --- a/crates/cli/src/lib.rs +++ b/crates/cli/src/lib.rs @@ -31,8 +31,8 @@ pub async fn run(command: Command, context: &Context) -> anyhow::Result<()> { /// Update the configuration in the current directory by introspecting the database. async fn update(context: &Context) -> anyhow::Result<()> { - let metadata = introspection::get_metadata_from_validation_schema(&context.mongo_config).await?; - let configuration = Configuration { metadata }; + let schema = introspection::get_metadata_from_validation_schema(&context.mongo_config).await?; + let configuration = Configuration::from_schema(schema); configuration::write_directory(&context.path, &configuration).await?; diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 8d3d40ba..ea4de0cb 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -45,7 +45,7 @@ pub async fn main() -> anyhow::Result<()> { Some(path) => path, None => env::current_dir()?, }; - let mongo_config = try_init_state_from_uri(&args.connection_uri) + let mongo_config = try_init_state_from_uri(&args.connection_uri, &Default::default()) .await .map_err(|e| anyhow!("Error initializing MongoDB state {}", e))?; let context = Context { path, mongo_config }; diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml index 6824690f..bea1d0e8 100644 --- a/crates/configuration/Cargo.toml +++ b/crates/configuration/Cargo.toml @@ -4,10 +4,13 @@ version = "0.1.0" edition = "2021" [dependencies] +futures = "^0.3" itertools = "^0.12" +mongodb = "2.8" mongodb-support = { path = "../mongodb-support" } schemars = "^0.8.12" serde = { version = "1", features = ["derive"] } serde_json = { version = "1" } serde_yaml = "^0.9" tokio = "1" +tokio-stream = { version = "^0.1", features = ["fs"] } diff --git a/crates/configuration/src/configuration.rs b/crates/configuration/src/configuration.rs index 625ec509..3a3549f9 100644 --- a/crates/configuration/src/configuration.rs +++ b/crates/configuration/src/configuration.rs @@ -1,17 +1,30 @@ use std::{io, path::Path}; use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; -use crate::{read_directory, Metadata}; +use crate::{native_queries::NativeQuery, read_directory, Schema}; -#[derive(Clone, Debug, Default, Serialize, Deserialize, JsonSchema)] +#[derive(Clone, Debug, Default, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct Configuration { - pub metadata: Metadata, + /// Descriptions of collections and types used in the database + pub schema: Schema, + + /// Native queries allow arbitrary MongoDB aggregation pipelines where types of results are + /// specified via user configuration. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub native_queries: Vec, } impl Configuration { + pub fn from_schema(schema: Schema) -> Self { + Self { + schema, + ..Default::default() + } + } + pub async fn parse_configuration( configuration_dir: impl AsRef + Send, ) -> io::Result { diff --git a/crates/configuration/src/directory.rs b/crates/configuration/src/directory.rs index 2a7739eb..f80d4b23 100644 --- a/crates/configuration/src/directory.rs +++ b/crates/configuration/src/directory.rs @@ -1,14 +1,18 @@ +use futures::stream::TryStreamExt as _; use itertools::Itertools as _; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::{ io, path::{Path, PathBuf}, }; use tokio::fs; +use tokio_stream::wrappers::ReadDirStream; -use crate::Configuration; +use crate::{native_queries::NativeQuery, Configuration}; + +pub const SCHEMA_FILENAME: &str = "schema"; +pub const NATIVE_QUERIES_DIRNAME: &str = "native_queries"; -pub const CONFIGURATION_FILENAME: &str = "schema"; pub const CONFIGURATION_EXTENSIONS: [(&str, FileFormat); 3] = [("json", JSON), ("yaml", YAML), ("yml", YAML)]; pub const DEFAULT_EXTENSION: &str = "json"; @@ -26,29 +30,75 @@ const YAML: FileFormat = FileFormat::Yaml; pub async fn read_directory( configuration_dir: impl AsRef + Send, ) -> io::Result { - parse_file(configuration_dir, CONFIGURATION_FILENAME).await + let dir = configuration_dir.as_ref(); + + let schema = parse_json_or_yaml(dir, SCHEMA_FILENAME).await?; + + let native_queries: Vec = read_subdir_configs(&dir.join(NATIVE_QUERIES_DIRNAME)) + .await? + .unwrap_or_default(); + + Ok(Configuration { + schema, + native_queries, + }) +} + +/// Parse all files in a directory with one of the allowed configuration extensions according to +/// the given type argument. For example if `T` is `NativeQuery` this function assumes that all +/// json and yaml files in the given directory should be parsed as native query configurations. +async fn read_subdir_configs(subdir: &Path) -> io::Result>> +where + for<'a> T: Deserialize<'a>, +{ + if !(fs::try_exists(subdir).await?) { + return Ok(None); + } + + let dir_stream = ReadDirStream::new(fs::read_dir(subdir).await?); + let configs = dir_stream + .try_filter_map(|dir_entry| async move { + // Permits regular files and symlinks, does not filter out symlinks to directories. + let is_file = !(dir_entry.file_type().await?.is_dir()); + if !is_file { + return Ok(None); + } + + let path = dir_entry.path(); + let extension = path.extension().and_then(|ext| ext.to_str()); + + let format_option = extension + .and_then(|ext| { + CONFIGURATION_EXTENSIONS + .iter() + .find(|(expected_ext, _)| ext == *expected_ext) + }) + .map(|(_, format)| *format); + + Ok(format_option.map(|format| (path, format))) + }) + .and_then(|(path, format)| async move { parse_config_file::(path, format).await }) + .try_collect::>() + .await?; + + Ok(Some(configs)) } /// Given a base name, like "connection", looks for files of the form "connection.json", /// "connection.yaml", etc; reads the file; and parses it according to its extension. -async fn parse_file(configuration_dir: impl AsRef, basename: &str) -> io::Result +async fn parse_json_or_yaml(configuration_dir: &Path, basename: &str) -> io::Result where for<'a> T: Deserialize<'a>, { let (path, format) = find_file(configuration_dir, basename).await?; - read_file(path, format).await + parse_config_file(path, format).await } /// Given a base name, like "connection", looks for files of the form "connection.json", /// "connection.yaml", etc, and returns the found path with its file format. -async fn find_file( - configuration_dir: impl AsRef, - basename: &str, -) -> io::Result<(PathBuf, FileFormat)> { - let dir = configuration_dir.as_ref(); - +async fn find_file(configuration_dir: &Path, basename: &str) -> io::Result<(PathBuf, FileFormat)> { for (extension, format) in CONFIGURATION_EXTENSIONS { - let path = dir.join(format!("{basename}.{extension}")); + let path = configuration_dir.join(format!("{basename}.{extension}")); if fs::try_exists(&path).await? { return Ok((path, format)); } @@ -58,7 +108,7 @@ async fn find_file( io::ErrorKind::NotFound, format!( "could not find file, {:?}", - dir.join(format!( + configuration_dir.join(format!( "{basename}.{{{}}}", CONFIGURATION_EXTENSIONS .into_iter() @@ -69,7 +119,7 @@ async fn find_file( )) } -async fn read_file(path: impl AsRef, format: FileFormat) -> io::Result +async fn parse_config_file(path: impl AsRef, format: FileFormat) -> io::Result where for<'a> T: Deserialize<'a>, { @@ -82,11 +132,12 @@ where Ok(value) } +/// Currently only writes `schema.json` pub async fn write_directory( configuration_dir: impl AsRef, configuration: &Configuration, ) -> io::Result<()> { - write_file(configuration_dir, CONFIGURATION_FILENAME, configuration).await + write_file(configuration_dir, SCHEMA_FILENAME, &configuration.schema).await } fn default_file_path(configuration_dir: impl AsRef, basename: &str) -> PathBuf { @@ -94,12 +145,15 @@ fn default_file_path(configuration_dir: impl AsRef, basename: &str) -> Pat dir.join(format!("{basename}.{DEFAULT_EXTENSION}")) } -async fn write_file( +async fn write_file( configuration_dir: impl AsRef, basename: &str, - configuration: &Configuration, -) -> io::Result<()> { + value: &T, +) -> io::Result<()> +where + T: Serialize, +{ let path = default_file_path(configuration_dir, basename); - let bytes = serde_json::to_vec_pretty(configuration)?; + let bytes = serde_json::to_vec_pretty(value)?; fs::write(path, bytes).await } diff --git a/crates/configuration/src/lib.rs b/crates/configuration/src/lib.rs index b4a239ce..8414acc1 100644 --- a/crates/configuration/src/lib.rs +++ b/crates/configuration/src/lib.rs @@ -1,8 +1,9 @@ mod configuration; -pub mod metadata; +pub mod schema; +pub mod native_queries; mod directory; pub use crate::configuration::Configuration; -pub use crate::metadata::Metadata; +pub use crate::schema::Schema; pub use crate::directory::read_directory; pub use crate::directory::write_directory; diff --git a/crates/configuration/src/native_queries.rs b/crates/configuration/src/native_queries.rs index 954d5abc..633c9ead 100644 --- a/crates/configuration/src/native_queries.rs +++ b/crates/configuration/src/native_queries.rs @@ -2,7 +2,7 @@ use mongodb::{bson, options::SelectionCriteria}; use schemars::JsonSchema; use serde::Deserialize; -use crate::metadata::ObjectField; +use crate::schema::{ObjectField, Type}; /// An arbitrary database command using MongoDB's runCommand API. /// See https://www.mongodb.com/docs/manual/reference/method/db.runCommand/ @@ -12,9 +12,8 @@ pub struct NativeQuery { /// Name that will be used to identify the query in your data graph pub name: String, - /// The name of an object type that specifies the type of data returned from the query. This - /// must correspond to a configuration definition in `metadata.objectTypes`. - pub result_type: String, + /// Type of data returned by the query. + pub result_type: Type, /// Arguments for per-query customization pub arguments: Vec, @@ -31,6 +30,18 @@ pub struct NativeQuery { #[serde(default, skip_serializing_if = "Option::is_none")] pub description: Option, + + /// Set to `readWrite` if this native query might modify data in the database. + #[serde(default)] + pub mode: Mode, +} + +#[derive(Clone, Copy, Default, Debug, PartialEq, Eq, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub enum Mode { + #[default] + ReadOnly, + ReadWrite, } type Object = serde_json::Map; diff --git a/crates/configuration/src/metadata/database.rs b/crates/configuration/src/schema/database.rs similarity index 100% rename from crates/configuration/src/metadata/database.rs rename to crates/configuration/src/schema/database.rs diff --git a/crates/configuration/src/metadata/mod.rs b/crates/configuration/src/schema/mod.rs similarity index 94% rename from crates/configuration/src/metadata/mod.rs rename to crates/configuration/src/schema/mod.rs index 6326d2e9..23cb3e5a 100644 --- a/crates/configuration/src/metadata/mod.rs +++ b/crates/configuration/src/schema/mod.rs @@ -7,7 +7,7 @@ pub use self::database::{Collection, ObjectField, ObjectType, Type}; #[derive(Clone, Debug, Default, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] -pub struct Metadata { +pub struct Schema { #[serde(default)] pub collections: Vec, #[serde(default)] diff --git a/crates/mongodb-agent-common/src/interface_types/mongo_config.rs b/crates/mongodb-agent-common/src/interface_types/mongo_config.rs index 2e6815ed..e5cdae13 100644 --- a/crates/mongodb-agent-common/src/interface_types/mongo_config.rs +++ b/crates/mongodb-agent-common/src/interface_types/mongo_config.rs @@ -1,3 +1,4 @@ +use configuration::native_queries::NativeQuery; use mongodb::Client; #[derive(Clone, Debug)] @@ -6,4 +7,6 @@ pub struct MongoConfig { /// Name of the database to connect to pub database: String, + + pub native_queries: Vec, } diff --git a/crates/mongodb-agent-common/src/state.rs b/crates/mongodb-agent-common/src/state.rs index 4ace391b..d51ec3c2 100644 --- a/crates/mongodb-agent-common/src/state.rs +++ b/crates/mongodb-agent-common/src/state.rs @@ -1,18 +1,25 @@ use std::{env, error::Error}; use anyhow::anyhow; +use configuration::Configuration; + use crate::{interface_types::MongoConfig, mongodb_connection::get_mongodb_client}; pub const DATABASE_URI_ENV_VAR: &str = "MONGODB_DATABASE_URI"; /// Reads database connection URI from environment variable -pub async fn try_init_state() -> Result> { +pub async fn try_init_state( + configuration: &Configuration, +) -> Result> { // Splitting this out of the `Connector` impl makes error translation easier let database_uri = env::var(DATABASE_URI_ENV_VAR)?; - try_init_state_from_uri(&database_uri).await + try_init_state_from_uri(&database_uri, configuration).await } -pub async fn try_init_state_from_uri(database_uri: &str) -> Result> { +pub async fn try_init_state_from_uri( + database_uri: &str, + configuration: &Configuration, +) -> Result> { let client = get_mongodb_client(database_uri).await?; let database_name = match client.default_database() { Some(database) => Ok(database.name().to_owned()), @@ -23,5 +30,6 @@ pub async fn try_init_state_from_uri(database_uri: &str) -> Result Result { - let state = mongodb_agent_common::state::try_init_state().await?; + let state = mongodb_agent_common::state::try_init_state(configuration).await?; Ok(state) } diff --git a/crates/mongodb-connector/src/schema.rs b/crates/mongodb-connector/src/schema.rs index 965c3f6d..5cb4a7a5 100644 --- a/crates/mongodb-connector/src/schema.rs +++ b/crates/mongodb-connector/src/schema.rs @@ -1,6 +1,9 @@ use std::collections::BTreeMap; -use configuration::{metadata, Configuration}; +use configuration::{ + native_queries::{self, NativeQuery}, + schema, Configuration, +}; use ndc_sdk::{connector, models}; use crate::capabilities; @@ -8,19 +11,34 @@ use crate::capabilities; pub async fn get_schema( config: &Configuration, ) -> Result { - let metadata = &config.metadata; - let object_types = map_object_types(&metadata.object_types); - let collections = metadata.collections.iter().map(map_collection).collect(); + let schema = &config.schema; + let object_types = map_object_types(&schema.object_types); + let collections = schema.collections.iter().map(map_collection).collect(); + + let functions = config + .native_queries + .iter() + .filter(|q| q.mode == native_queries::Mode::ReadOnly) + .map(native_query_to_function) + .collect(); + + let procedures = config + .native_queries + .iter() + .filter(|q| q.mode == native_queries::Mode::ReadWrite) + .map(native_query_to_procedure) + .collect(); + Ok(models::SchemaResponse { collections, object_types, scalar_types: capabilities::scalar_types(), - functions: Default::default(), - procedures: Default::default(), + functions, + procedures, }) } -fn map_object_types(object_types: &[metadata::ObjectType]) -> BTreeMap { +fn map_object_types(object_types: &[schema::ObjectType]) -> BTreeMap { object_types .iter() .map(|t| { @@ -35,7 +53,7 @@ fn map_object_types(object_types: &[metadata::ObjectType]) -> BTreeMap BTreeMap { +fn map_field_infos(fields: &[schema::ObjectField]) -> BTreeMap { fields .iter() .map(|f| { @@ -50,22 +68,22 @@ fn map_field_infos(fields: &[metadata::ObjectField]) -> BTreeMap models::Type { +fn map_type(t: &schema::Type) -> models::Type { match t { - metadata::Type::Scalar(t) => models::Type::Named { + schema::Type::Scalar(t) => models::Type::Named { name: t.graphql_name(), }, - metadata::Type::Object(t) => models::Type::Named { name: t.clone() }, - metadata::Type::ArrayOf(t) => models::Type::Array { + schema::Type::Object(t) => models::Type::Named { name: t.clone() }, + schema::Type::ArrayOf(t) => models::Type::Array { element_type: Box::new(map_type(t)), }, - metadata::Type::Nullable(t) => models::Type::Nullable { + schema::Type::Nullable(t) => models::Type::Nullable { underlying_type: Box::new(map_type(t)), }, } } -fn map_collection(collection: &metadata::Collection) -> models::CollectionInfo { +fn map_collection(collection: &schema::Collection) -> models::CollectionInfo { models::CollectionInfo { name: collection.name.clone(), collection_type: collection.r#type.clone(), @@ -75,3 +93,49 @@ fn map_collection(collection: &metadata::Collection) -> models::CollectionInfo { uniqueness_constraints: Default::default(), } } + +/// For read-only native queries +fn native_query_to_function(query: &NativeQuery) -> models::FunctionInfo { + let arguments = query + .arguments + .iter() + .map(|field| { + ( + field.name.clone(), + models::ArgumentInfo { + argument_type: map_type(&field.r#type), + description: field.description.clone(), + }, + ) + }) + .collect(); + models::FunctionInfo { + name: query.name.clone(), + description: query.description.clone(), + arguments, + result_type: map_type(&query.result_type), + } +} + +/// For read-write native queries +fn native_query_to_procedure(query: &NativeQuery) -> models::ProcedureInfo { + let arguments = query + .arguments + .iter() + .map(|field| { + ( + field.name.clone(), + models::ArgumentInfo { + argument_type: map_type(&field.r#type), + description: field.description.clone(), + }, + ) + }) + .collect(); + models::ProcedureInfo { + name: query.name.clone(), + description: query.description.clone(), + arguments, + result_type: map_type(&query.result_type), + } +} diff --git a/fixtures/connector/chinook/configuration.yaml b/fixtures/connector/chinook/configuration.yaml deleted file mode 100644 index e6aec4d6..00000000 --- a/fixtures/connector/chinook/configuration.yaml +++ /dev/null @@ -1,16 +0,0 @@ -metadata: - collections: - - name: Album - type: Album - - objectTypes: - - name: Album - fields: - - name: _id - type: !scalar objectId - - name: AlbumId - type: !scalar int - - name: ArtistId - type: !scalar int - - name: Title - type: !scalar string diff --git a/fixtures/connector/chinook/schema.yaml b/fixtures/connector/chinook/schema.yaml new file mode 100644 index 00000000..bbb4a52c --- /dev/null +++ b/fixtures/connector/chinook/schema.yaml @@ -0,0 +1,15 @@ +collections: + - name: Album + type: Album + +objectTypes: + - name: Album + fields: + - name: _id + type: !scalar objectId + - name: AlbumId + type: !scalar int + - name: ArtistId + type: !scalar int + - name: Title + type: !scalar string diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 5d97fd5b..2492ee58 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,4 +1,4 @@ [toolchain] channel = "1.76.0" profile = "default" # see https://rust-lang.github.io/rustup/concepts/profiles.html -components = ["rust-src"] # see https://rust-lang.github.io/rustup/concepts/components.html +components = ["rust-analyzer", "rust-src"] # see https://rust-lang.github.io/rustup/concepts/components.html