这是indexloc提供的服务,不要输入任何密码
Skip to content

translate mutation response according to requested fields #59

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This changelog documents the changes between release versions.
- In the CLI update command, if the database URI is not provided the error message now mentions the correct environment variable to use (`MONGODB_DATABASE_URI`) ([#50](https://github.com/hasura/ndc-mongodb/pull/50))
- Update to latest NDC SDK ([#51](https://github.com/hasura/ndc-mongodb/pull/51))
- Update `rustls` dependency to fix https://github.com/hasura/ndc-mongodb/security/dependabot/1 ([#51](https://github.com/hasura/ndc-mongodb/pull/51))
- Serialize query and mutation response fields with known types using simple JSON instead of Extended JSON (#53) (#59)

## [0.0.4] - 2024-04-12
- Queries that attempt to compare a column to a column in the query root table, or a related table, will now fail instead of giving the incorrect result ([#22](https://github.com/hasura/ndc-mongodb/pull/22))
Expand Down
2 changes: 1 addition & 1 deletion crates/integration-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl From<&str> for GraphQLRequest {
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
pub struct GraphQLResponse {
data: Value,
errors: Option<Vec<Value>>,
Expand Down
19 changes: 16 additions & 3 deletions crates/integration-tests/src/tests/native_procedure.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::query;
use crate::{query, GraphQLResponse};
use insta::assert_yaml_snapshot;
use serde_json::json;

Expand All @@ -9,13 +9,13 @@ async fn updates_with_native_procedure() -> anyhow::Result<()> {
let mutation = r#"
mutation InsertArtist($id: Int!, $name: String!) {
insertArtist(id: $id, name: $name) {
n
number_of_docs_inserted: n
ok
}
}
"#;

query(mutation)
let res1 = query(mutation)
.variables(json!({ "id": id_1, "name": "Regina Spektor" }))
.run()
.await?;
Expand All @@ -24,6 +24,19 @@ async fn updates_with_native_procedure() -> anyhow::Result<()> {
.run()
.await?;

assert_eq!(
res1,
GraphQLResponse {
data: json!({
"insertArtist": {
"number_of_docs_inserted": 1,
"ok": 1.0,
}
}),
errors: None,
}
);

assert_yaml_snapshot!(
query(
r#"
Expand Down
5 changes: 3 additions & 2 deletions crates/mongodb-connector/src/mongo_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl Connector for MongoConnector {
_request: MutationRequest,
) -> Result<JsonResponse<ExplainResponse>, ExplainError> {
Err(ExplainError::UnsupportedOperation(
"The MongoDB agent does not yet support mutations".to_owned(),
"Explain for mutations is not implemented yet".to_owned(),
))
}

Expand All @@ -132,7 +132,8 @@ impl Connector for MongoConnector {
state: &Self::State,
request: MutationRequest,
) -> Result<JsonResponse<MutationResponse>, MutationError> {
handle_mutation_request(configuration, state, request).await
let query_context = get_query_context(configuration);
handle_mutation_request(configuration, query_context, state, request).await
}

#[instrument(err, skip_all)]
Expand Down
145 changes: 121 additions & 24 deletions crates/mongodb-connector/src/mutation.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,71 @@
use std::collections::BTreeMap;

use configuration::{schema::ObjectType, Configuration};
use configuration::Configuration;
use futures::future::try_join_all;
use itertools::Itertools;
use mongodb::Database;
use mongodb::{
bson::{self, Bson},
Database,
};
use mongodb_agent_common::{
procedure::Procedure, query::serialization::bson_to_json, state::ConnectorState,
};
use ndc_sdk::{
connector::MutationError,
json_response::JsonResponse,
models::{MutationOperation, MutationOperationResults, MutationRequest, MutationResponse},
models::{
Field, MutationOperation, MutationOperationResults, MutationRequest, MutationResponse,
NestedArray, NestedField, NestedObject, Relationship,
},
};

use crate::{
api_type_conversions::QueryContext,
query_response::{extend_configured_object_types, prune_type_to_field_selection},
};

pub async fn handle_mutation_request(
config: &Configuration,
query_context: QueryContext<'_>,
state: &ConnectorState,
mutation_request: MutationRequest,
) -> Result<JsonResponse<MutationResponse>, MutationError> {
tracing::debug!(?config, mutation_request = %serde_json::to_string(&mutation_request).unwrap(), "executing mutation");
let database = state.database();
let jobs = look_up_procedures(config, mutation_request)?;
let operation_results = try_join_all(
jobs.into_iter()
.map(|procedure| execute_procedure(&config.object_types, database.clone(), procedure)),
)
let jobs = look_up_procedures(config, &mutation_request)?;
let operation_results = try_join_all(jobs.into_iter().map(|(procedure, requested_fields)| {
execute_procedure(
&query_context,
database.clone(),
&mutation_request.collection_relationships,
procedure,
requested_fields,
)
}))
.await?;
Ok(JsonResponse::Value(MutationResponse { operation_results }))
}

/// Looks up procedures according to the names given in the mutation request, and pairs them with
/// arguments and requested fields. Returns an error if any procedures cannot be found.
fn look_up_procedures(
config: &Configuration,
mutation_request: MutationRequest,
) -> Result<Vec<Procedure<'_>>, MutationError> {
let (procedures, not_found): (Vec<Procedure>, Vec<String>) = mutation_request
fn look_up_procedures<'a, 'b>(
config: &'a Configuration,
mutation_request: &'b MutationRequest,
) -> Result<Vec<(Procedure<'a>, Option<&'b NestedField>)>, MutationError> {
let (procedures, not_found): (Vec<_>, Vec<String>) = mutation_request
.operations
.into_iter()
.iter()
.map(|operation| match operation {
MutationOperation::Procedure {
name, arguments, ..
name,
arguments,
fields,
} => {
let native_procedure = config.native_procedures.get(&name);
native_procedure.ok_or(name).map(|native_procedure| {
Procedure::from_native_procedure(native_procedure, arguments)
})
let native_procedure = config.native_procedures.get(name);
let procedure = native_procedure.ok_or(name).map(|native_procedure| {
Procedure::from_native_procedure(native_procedure, arguments.clone())
})?;
Ok((procedure, fields.as_ref()))
}
})
.partition_result();
Expand All @@ -61,17 +81,94 @@ fn look_up_procedures(
}

async fn execute_procedure(
object_types: &BTreeMap<String, ObjectType>,
query_context: &QueryContext<'_>,
database: Database,
relationships: &BTreeMap<String, Relationship>,
procedure: Procedure<'_>,
requested_fields: Option<&NestedField>,
) -> Result<MutationOperationResults, MutationError> {
let (result, result_type) = procedure
.execute(object_types, database.clone())
.execute(&query_context.object_types, database.clone())
.await
.map_err(|err| MutationError::InvalidRequest(err.to_string()))?;
let json_result = bson_to_json(&result_type, object_types, result.into())
.map_err(|err| MutationError::Other(Box::new(err)))?;
.map_err(|err| MutationError::UnprocessableContent(err.to_string()))?;

let rewritten_result = rewrite_response(requested_fields, result.into())?;

let (requested_result_type, temp_object_types) = prune_type_to_field_selection(
query_context,
relationships,
&[],
&result_type,
requested_fields,
)
.map_err(|err| MutationError::Other(Box::new(err)))?;
let object_types = extend_configured_object_types(query_context, temp_object_types);

let json_result = bson_to_json(&requested_result_type, &object_types, rewritten_result)
.map_err(|err| MutationError::UnprocessableContent(err.to_string()))?;

Ok(MutationOperationResults::Procedure {
result: json_result,
})
}

/// We need to traverse requested fields to rename any fields that are aliased in the GraphQL
/// request
fn rewrite_response(
requested_fields: Option<&NestedField>,
value: Bson,
) -> Result<Bson, MutationError> {
match (requested_fields, value) {
(None, value) => Ok(value),

(Some(NestedField::Object(fields)), Bson::Document(doc)) => {
Ok(rewrite_doc(fields, doc)?.into())
}
(Some(NestedField::Array(fields)), Bson::Array(values)) => {
Ok(rewrite_array(fields, values)?.into())
}

(Some(NestedField::Object(_)), _) => Err(MutationError::UnprocessableContent(
"expected an object".to_owned(),
)),
(Some(NestedField::Array(_)), _) => Err(MutationError::UnprocessableContent(
"expected an array".to_owned(),
)),
}
}

fn rewrite_doc(
fields: &NestedObject,
mut doc: bson::Document,
) -> Result<bson::Document, MutationError> {
fields
.fields
.iter()
.map(|(name, field)| {
let field_value = match field {
Field::Column { column, fields } => {
let orig_value = doc.remove(column).ok_or_else(|| {
MutationError::UnprocessableContent(format!(
"missing expected field from response: {name}"
))
})?;
rewrite_response(fields.as_ref(), orig_value)
}
Field::Relationship { .. } => Err(MutationError::UnsupportedOperation(
"The MongoDB connector does not support relationship references in mutations"
.to_owned(),
)),
}?;

Ok((name.clone(), field_value))
})
.try_collect()
}

fn rewrite_array(fields: &NestedArray, values: Vec<Bson>) -> Result<Vec<Bson>, MutationError> {
let nested = &fields.fields;
values
.into_iter()
.map(|value| rewrite_response(Some(nested), value))
.try_collect()
}
Loading