这是indexloc提供的服务,不要输入任何密码
Skip to content

fix order of results for query requests with more than 10 variable sets #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
This changelog documents the changes between release versions.

## [Unreleased]
- Fix incorrect order of results for query requests with more than 10 variable sets (#37)

## [0.0.4] - 2024-04-12
- Queries that attempt to compare a column to a column in the query root table, or a related table, will now fail instead of giving the incorrect result ([#22](https://github.com/hasura/ndc-mongodb/pull/22))
Expand Down
37 changes: 30 additions & 7 deletions arion-compose/project-connector.nix
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# Run v3 MongoDB connector and engine with supporting databases. To start this
# project run:
# Run 2 MongoDB connectors and engine with supporting database. Running two
# connectors is useful for testing remote joins.
#
# To start this # project run:
#
# arion -f arion-compose/project-connector.nix up -d
#

{ pkgs, ... }:
let
connector = "7130";
connector-port = "7130";
connector-chinook-port = "7131";
engine-port = "7100";
mongodb-port = "27017";
in
Expand All @@ -18,8 +21,21 @@ in
inherit pkgs;
configuration-dir = ../fixtures/connector/sample_mflix;
database-uri = "mongodb://mongodb/sample_mflix";
port = connector;
hostPort = connector;
port = connector-port;
hostPort = connector-port;
otlp-endpoint = "http://jaeger:4317";
service.depends_on = {
jaeger.condition = "service_healthy";
mongodb.condition = "service_healthy";
};
};

connector-chinook = import ./service-connector.nix {
inherit pkgs;
configuration-dir = ../fixtures/connector/chinook;
database-uri = "mongodb://mongodb/chinook";
port = connector-chinook-port;
hostPort = connector-chinook-port;
otlp-endpoint = "http://jaeger:4317";
service.depends_on = {
jaeger.condition = "service_healthy";
Expand All @@ -41,8 +57,15 @@ in
inherit pkgs;
port = engine-port;
hostPort = engine-port;
connectors = [
{ name = "sample_mflix"; url = "http://connector:${connector}"; subgraph = ../fixtures/ddn/subgraphs/sample_mflix; }
auth-webhook = { url = "http://auth-hook:3050/validate-request"; };
connectors = {
chinook = "http://connector-chinook:${connector-chinook-port}";
sample_mflix = "http://connector:${connector-port}";
};
ddn-dirs = [
../fixtures/ddn/chinook
../fixtures/ddn/sample_mflix
../fixtures/ddn/remote-relationships_chinook-sample_mflix
];
otlp-endpoint = "http://jaeger:4317";
service.depends_on = {
Expand Down
3 changes: 2 additions & 1 deletion arion-compose/project-e2e-testing.nix
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ in
engine = import ./service-engine.nix {
inherit pkgs;
port = engine-port;
connectors = [{ name = "chinook"; url = "http://connector:${connector-port}"; subgraph = ../fixtures/ddn/subgraphs/chinook; }];
connectors.chinook = "http://connector:${connector-port}";
ddn-dirs = [ ../fixtures/ddn/chinook ];
service.depends_on = {
auth-hook.condition = "service_started";
};
Expand Down
39 changes: 28 additions & 11 deletions arion-compose/service-engine.nix
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
{ pkgs
, port ? "7100"
, hostPort ? null
, connectors ? [{ name = "sample_mflix"; url = "http://connector:7130"; subgraph = ../fixtures/ddn/subgraphs/sample_mflix; }]

# Each key in the `connectors` map should match
# a `DataConnectorLink.definition.name` value in one of the given `ddn-dirs`
# to correctly match up configuration to connector instances.
, connectors ? { sample_mflix = "http://connector:7130"; }
, ddn-dirs ? [ ../fixtures/ddn/subgraphs/sample_mflix ]
, auth-webhook ? { url = "http://auth-hook:3050/validate-request"; }
, otlp-endpoint ? "http://jaeger:4317"
, service ? { } # additional options to customize this service configuration
}:

let
# Compile JSON metadata from HML fixture
metadata = pkgs.stdenv.mkDerivation {
name = "hasura-metadata.json";
src = (builtins.head connectors).subgraph;
# Compile JSON metadata from HML fixtures
#
# Converts yaml documents from each ddn-dir into json objects, and combines
# objects into one big array. Produces a file in the Nix store of the form
# /nix/store/<hash>-hasura-metadata.json
metadata = pkgs.runCommand "hasura-metadata.json" { } ''
${pkgs.jq}/bin/jq -s 'flatten(1)' \
${builtins.concatStringsSep " " (builtins.map compile-ddn ddn-dirs)} \
> $out
'';

# Translate each yaml document from hml files into a json object, and combine
# all objects into an array
compile-ddn = ddn-dir: pkgs.stdenv.mkDerivation {
name = "ddn-${builtins.baseNameOf ddn-dir}.json";
src = ddn-dir;
nativeBuildInputs = with pkgs; [ findutils jq yq-go ];

# The yq command converts the input sequence of yaml docs to a sequence of
Expand All @@ -20,7 +37,7 @@ let
# The jq command combines those json docs into an array (due to the -s
# switch), and modifies the json to update the data connector url.
buildPhase = ''
combined=$(mktemp -t subgraph-XXXXXX.hml)
combined=$(mktemp -t ddn-${builtins.baseNameOf ddn-dir}-XXXXXX.hml)
for obj in $(find . -name '*hml'); do
echo "---" >> "$combined"
cat "$obj" >> "$combined"
Expand All @@ -29,21 +46,21 @@ let
| yq -o=json \
${connector-url-substituters} \
| jq -s 'map(select(type != "null"))' \
> metadata.json
> ddn.json
'';

installPhase = ''
cp metadata.json "$out"
cp ddn.json "$out"
'';
};

# Pipe commands to replace data connector urls in fixture configuration with
# urls of dockerized connector instances
connector-url-substituters = builtins.toString (builtins.map
({ name, url, ... }:
connector-url-substituters = builtins.toString (builtins.attrValues (builtins.mapAttrs
(name: url:
'' | jq 'if .kind == "DataConnectorLink" and .definition.name == "${name}" then .definition.url = { singleUrl: { value: "${url}" } } else . end' ''
)
connectors);
connectors));

auth-config = pkgs.writeText "auth_config.json" (builtins.toJSON {
version = "v1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ pub async fn execute_query_request(
query_request: QueryRequest,
) -> Result<QueryResponse, MongoAgentError> {
let (pipeline, response_shape) = pipeline_for_query_request(&query_request)?;
tracing::debug!(pipeline = %serde_json::to_string(&pipeline).unwrap(), "aggregate pipeline");
tracing::debug!(
?query_request,
pipeline = %serde_json::to_string(&pipeline).unwrap(),
"executing query"
);

let document_cursor = collection.aggregate(pipeline, None).await?;

Expand Down
182 changes: 179 additions & 3 deletions crates/mongodb-agent-common/src/query/foreach.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;

use dc_api_types::comparison_column::ColumnSelector;
use dc_api_types::{
Expand Down Expand Up @@ -56,7 +56,7 @@ pub fn pipeline_for_foreach(
foreach: Vec<ForeachVariant>,
query_request: &QueryRequest,
) -> Result<(Pipeline, ResponseShape), MongoAgentError> {
let pipelines_with_response_shapes: BTreeMap<String, (Pipeline, ResponseShape)> = foreach
let pipelines_with_response_shapes: Vec<(String, (Pipeline, ResponseShape))> = foreach
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR contains a lot of fixture and test changes. This line is the only change required for the actual fix.

.into_iter()
.enumerate()
.map(|(index, foreach_variant)| {
Expand Down Expand Up @@ -133,7 +133,9 @@ fn facet_name(index: usize) -> String {

#[cfg(test)]
mod tests {
use dc_api_types::{QueryRequest, QueryResponse};
use dc_api_types::{
BinaryComparisonOperator, ComparisonColumn, Field, Query, QueryRequest, QueryResponse,
};
use mongodb::{
bson::{doc, from_document},
options::AggregateOptions,
Expand Down Expand Up @@ -400,4 +402,178 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn executes_foreach_with_variables() -> Result<(), anyhow::Error> {
let query_request = QueryRequest {
foreach: None,
variables: Some(
(1..=12)
.map(|artist_id| [("artistId".to_owned(), json!(artist_id))].into())
.collect(),
),
target: dc_api_types::Target::TTable {
name: vec!["tracks".to_owned()],
},
relationships: Default::default(),
query: Box::new(Query {
r#where: Some(dc_api_types::Expression::ApplyBinaryComparison {
column: ComparisonColumn::new(
"int".to_owned(),
dc_api_types::ColumnSelector::Column("artistId".to_owned()),
),
operator: BinaryComparisonOperator::Equal,
value: dc_api_types::ComparisonValue::Variable {
name: "artistId".to_owned(),
},
}),
fields: Some(Some(
[
(
"albumId".to_owned(),
Field::Column {
column: "albumId".to_owned(),
column_type: "int".to_owned(),
},
),
(
"title".to_owned(),
Field::Column {
column: "title".to_owned(),
column_type: "string".to_owned(),
},
),
]
.into(),
)),
aggregates: None,
aggregates_limit: None,
limit: None,
offset: None,
order_by: None,
}),
};

fn facet(artist_id: i32) -> serde_json::Value {
json!([
{ "$match": { "artistId": {"$eq": artist_id } } },
{ "$replaceWith": {
"albumId": { "$ifNull": ["$albumId", null] },
"title": { "$ifNull": ["$title", null] }
} },
])
}

let expected_pipeline = json!([
{
"$facet": {
"__FACET___0": facet(1),
"__FACET___1": facet(2),
"__FACET___2": facet(3),
"__FACET___3": facet(4),
"__FACET___4": facet(5),
"__FACET___5": facet(6),
"__FACET___6": facet(7),
"__FACET___7": facet(8),
"__FACET___8": facet(9),
"__FACET___9": facet(10),
"__FACET___10": facet(11),
"__FACET___11": facet(12),
},
},
{
"$replaceWith": {
"rows": [
{ "query": { "rows": "$__FACET___0" } },
{ "query": { "rows": "$__FACET___1" } },
{ "query": { "rows": "$__FACET___2" } },
{ "query": { "rows": "$__FACET___3" } },
{ "query": { "rows": "$__FACET___4" } },
{ "query": { "rows": "$__FACET___5" } },
{ "query": { "rows": "$__FACET___6" } },
{ "query": { "rows": "$__FACET___7" } },
{ "query": { "rows": "$__FACET___8" } },
{ "query": { "rows": "$__FACET___9" } },
{ "query": { "rows": "$__FACET___10" } },
{ "query": { "rows": "$__FACET___11" } },
]
},
}
]);

let expected_response: QueryResponse = from_value(json! ({
"rows": [
{
"query": {
"rows": [
{ "albumId": 1, "title": "For Those About To Rock We Salute You" },
{ "albumId": 4, "title": "Let There Be Rock" }
]
}
},
{ "query": { "rows": [] } },
{
"query": {
"rows": [
{ "albumId": 2, "title": "Balls to the Wall" },
{ "albumId": 3, "title": "Restless and Wild" }
]
}
},
{ "query": { "rows": [] } },
{ "query": { "rows": [] } },
{ "query": { "rows": [] } },
{ "query": { "rows": [] } },
{ "query": { "rows": [] } },
{ "query": { "rows": [] } },
{ "query": { "rows": [] } },
{ "query": { "rows": [] } },
]
}))?;

let mut collection = MockCollectionTrait::new();
collection
.expect_aggregate()
.returning(move |pipeline, _: Option<AggregateOptions>| {
assert_eq!(expected_pipeline, to_value(pipeline).unwrap());
Ok(mock_stream(vec![Ok(from_document(doc! {
"rows": [
{
"query": {
"rows": [
{ "albumId": 1, "title": "For Those About To Rock We Salute You" },
{ "albumId": 4, "title": "Let There Be Rock" }
]
}
},
{
"query": {
"rows": []
}
},
{
"query": {
"rows": [
{ "albumId": 2, "title": "Balls to the Wall" },
{ "albumId": 3, "title": "Restless and Wild" }
]
}
},
{ "query": { "rows": [] } },
{ "query": { "rows": [] } },
{ "query": { "rows": [] } },
{ "query": { "rows": [] } },
{ "query": { "rows": [] } },
{ "query": { "rows": [] } },
{ "query": { "rows": [] } },
{ "query": { "rows": [] } },
],
})?)]))
});

let result = execute_query_request(&collection, query_request).await?;
assert_eq!(expected_response, result);

Ok(())
}
}
2 changes: 0 additions & 2 deletions crates/mongodb-agent-common/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ pub async fn handle_query_request(
config: &MongoConfig,
query_request: QueryRequest,
) -> Result<QueryResponse, MongoAgentError> {
tracing::debug!(?config, query_request = %serde_json::to_string(&query_request).unwrap(), "executing query");

let database = config.client.database(&config.database);
let collection = database.collection::<Document>(&collection_name(&query_request.target));

Expand Down
5 changes: 2 additions & 3 deletions crates/mongodb-connector/src/mongo_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ impl Connector for MongoConnector {
state: &Self::State,
request: QueryRequest,
) -> Result<JsonResponse<QueryResponse>, QueryError> {
tracing::debug!(query_request = %serde_json::to_string(&request).unwrap(), "received query request");
let v2_request = v3_to_v2_query_request(
&QueryContext {
functions: vec![],
Expand All @@ -160,8 +161,6 @@ impl Connector for MongoConnector {
let response = handle_query_request(state, v2_request)
.await
.map_err(mongo_agent_error_to_query_error)?;
let r = v2_to_v3_query_response(response);
tracing::warn!(v3_response = %serde_json::to_string(&r).unwrap());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes some debugging output I accidentally committed in a previous PR. I'd like to set up the sdk to making the logging level for this crate configurable so that I don't have to change debug!s to warn!s to see these traces.

Ok(r.into())
Ok(v2_to_v3_query_response(response).into())
}
}
Loading