From b65cab812d31d3c27a5da786b0fb63ecad83dbf6 Mon Sep 17 00:00:00 2001 From: Alexis King Date: Fri, 20 Sep 2019 02:09:29 -0500 Subject: [PATCH 1/3] Support explaining subscriptions --- server/src-lib/Hasura/GraphQL/Execute.hs | 1 + .../Hasura/GraphQL/Execute/LiveQuery.hs | 3 + .../Hasura/GraphQL/Execute/LiveQuery/Plan.hs | 82 +++++++++++++++++-- .../Hasura/GraphQL/Execute/LiveQuery/Poll.hs | 48 +---------- .../Hasura/GraphQL/Execute/LiveQuery/State.hs | 9 +- server/src-lib/Hasura/GraphQL/Explain.hs | 16 ++-- 6 files changed, 95 insertions(+), 64 deletions(-) diff --git a/server/src-lib/Hasura/GraphQL/Execute.hs b/server/src-lib/Hasura/GraphQL/Execute.hs index 09a1a2d699e2f..6c3106010186a 100644 --- a/server/src-lib/Hasura/GraphQL/Execute.hs +++ b/server/src-lib/Hasura/GraphQL/Execute.hs @@ -8,6 +8,7 @@ module Hasura.GraphQL.Execute , ExecPlanResolved , getResolvedExecPlan , execRemoteGQ + , getSubsOp , EP.PlanCache , EP.initPlanCache diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery.hs index 438fb8ef44481..5146df1b25a9d 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery.hs @@ -93,6 +93,9 @@ module Hasura.GraphQL.Execute.LiveQuery , reuseLiveQueryPlan , buildLiveQueryPlan + , LiveQueryPlanExplanation + , explainLiveQueryPlan + , LiveQueriesState , initLiveQueriesState , dumpLiveQueriesState diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Plan.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Plan.hs index 2db3af30a83bb..af733141ab5ed 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Plan.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Plan.hs @@ -3,15 +3,22 @@ module Hasura.GraphQL.Execute.LiveQuery.Plan ( MultiplexedQuery , mkMultiplexedQuery - , unMultiplexedQuery , toMultiplexedQueryVar + , CohortId + , newCohortId + , CohortVariables(..) + , executeMultiplexedQuery + , LiveQueryPlan(..) , ParameterizedLiveQueryPlan(..) , ReusableLiveQueryPlan , ValidatedQueryVariables , buildLiveQueryPlan , reuseLiveQueryPlan + + , LiveQueryPlanExplanation + , explainLiveQueryPlan ) where import Hasura.Prelude @@ -21,11 +28,17 @@ import qualified Data.Aeson.Extended as J import qualified Data.Aeson.TH as J import qualified Data.HashMap.Strict as Map import qualified Data.Text as T +import qualified Data.UUID.V4 as UUID import qualified Database.PG.Query as Q import qualified Language.GraphQL.Draft.Syntax as G +-- remove these when array encoding is merged +import qualified Database.PG.Query.PTI as PTI +import qualified PostgreSQL.Binary.Encoding as PE + import Control.Lens import Data.Has +import Data.UUID (UUID) import qualified Hasura.GraphQL.Resolve as GR import qualified Hasura.GraphQL.Transport.HTTP.Protocol as GH @@ -33,6 +46,7 @@ import qualified Hasura.GraphQL.Validate as GV import qualified Hasura.SQL.DML as S import Hasura.Db +import Hasura.EncJSON import Hasura.RQL.Types import Hasura.SQL.Error import Hasura.SQL.Types @@ -89,6 +103,49 @@ toMultiplexedQueryVar = \case , S.SEArray $ map S.SELit jPath ] +newtype CohortId = CohortId { unCohortId :: UUID } + deriving (Show, Eq, Hashable, J.ToJSON, Q.FromCol) + +newCohortId :: (MonadIO m) => m CohortId +newCohortId = CohortId <$> liftIO UUID.nextRandom + +data CohortVariables + = CohortVariables + { _cvSessionVariables :: !UserVars + , _cvQueryVariables :: !ValidatedQueryVariables + } deriving (Show, Eq, Generic) +instance Hashable CohortVariables + +instance J.ToJSON CohortVariables where + toJSON (CohortVariables sessionVars queryVars) = + J.object ["user" J..= sessionVars, "variables" J..= queryVars] + +-- These types exist only to use the Postgres array encoding. +newtype CohortIdArray = CohortIdArray { unCohortIdArray :: [CohortId] } + deriving (Show, Eq) +instance Q.ToPrepArg CohortIdArray where + toPrepVal (CohortIdArray l) = Q.toPrepValHelper PTI.unknown encoder $ map unCohortId l + where + encoder = PE.array 2950 . PE.dimensionArray foldl' (PE.encodingArray . PE.uuid) +newtype CohortVariablesArray = CohortVariablesArray { unCohortVariablesArray :: [CohortVariables] } + deriving (Show, Eq) +instance Q.ToPrepArg CohortVariablesArray where + toPrepVal (CohortVariablesArray l) = + Q.toPrepValHelper PTI.unknown encoder (map J.toJSON l) + where + encoder = PE.array 114 . PE.dimensionArray foldl' (PE.encodingArray . PE.json_ast) + +executeMultiplexedQuery + :: (MonadTx m) => MultiplexedQuery -> [(CohortId, CohortVariables)] -> m [(CohortId, EncJSON)] +executeMultiplexedQuery (MultiplexedQuery query) = executeQuery query + +-- | Internal; used by both 'executeMultiplexedQuery' and 'explainLiveQueryPlan'. +executeQuery :: (MonadTx m, Q.FromRow a) => Q.Query -> [(CohortId, CohortVariables)] -> m [a] +executeQuery query cohorts = + let (cohortIds, cohortVars) = unzip cohorts + preparedArgs = (CohortIdArray cohortIds, CohortVariablesArray cohortVars) + in liftTx $ Q.listQE defaultTxErrorHandler query preparedArgs True + -- ------------------------------------------------------------------------------------------------- -- Variable validation @@ -135,8 +192,7 @@ validateQueryVariables pgExecCtx annVarVals = do data LiveQueryPlan = LiveQueryPlan { _lqpParameterizedPlan :: !ParameterizedLiveQueryPlan - , _lqpSessionVariables :: !UserVars - , _lqpQueryVariables :: !ValidatedQueryVariables + , _lqpVariables :: !CohortVariables } data ParameterizedLiveQueryPlan @@ -180,7 +236,7 @@ buildLiveQueryPlan pgExecCtx fieldAlias astUnresolved varTypes = do -- an invalid value for a variable for one instance of the -- subscription will take down the entire multiplexed query validatedVars <- validateQueryVariables pgExecCtx annVarVals - let plan = LiveQueryPlan parameterizedPlan (userVars userInfo) validatedVars + let plan = LiveQueryPlan parameterizedPlan (CohortVariables (userVars userInfo) validatedVars) reusablePlan = ReusableLiveQueryPlan parameterizedPlan <$> varTypes pure (plan, reusablePlan) @@ -195,4 +251,20 @@ reuseLiveQueryPlan pgExecCtx sessionVars queryVars reusablePlan = do let ReusableLiveQueryPlan parameterizedPlan varTypes = reusablePlan annVarVals <- GV.validateVariablesForReuse varTypes queryVars validatedVars <- validateQueryVariables pgExecCtx annVarVals - pure $ LiveQueryPlan parameterizedPlan sessionVars validatedVars + pure $ LiveQueryPlan parameterizedPlan (CohortVariables sessionVars validatedVars) + +data LiveQueryPlanExplanation + = LiveQueryPlanExplanation + { _lqpeSql :: !Text + , _lqpePlan :: ![Text] + } deriving (Show) +$(J.deriveToJSON (J.aesonDrop 5 J.snakeCase) ''LiveQueryPlanExplanation) + +explainLiveQueryPlan :: (MonadTx m, MonadIO m) => LiveQueryPlan -> m LiveQueryPlanExplanation +explainLiveQueryPlan plan = do + let parameterizedPlan = _lqpParameterizedPlan plan + queryText = Q.getQueryText . unMultiplexedQuery $ _plqpQuery parameterizedPlan + explainQuery = Q.fromText $ "EXPLAIN (FORMAT TEXT) " <> queryText + cohortId <- newCohortId + explanationLines <- map runIdentity <$> executeQuery explainQuery [(cohortId, _lqpVariables plan)] + pure $ LiveQueryPlanExplanation queryText explanationLines diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs index c5fb65ab7628a..84d41286b98c6 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs @@ -39,16 +39,11 @@ import qualified Data.HashMap.Strict as Map import qualified Data.Time.Clock as Clock import qualified Data.UUID as UUID import qualified Data.UUID.V4 as UUID -import qualified Database.PG.Query as Q import qualified Language.GraphQL.Draft.Syntax as G import qualified ListT import qualified StmContainers.Map as STMMap import qualified System.Metrics.Distribution as Metrics --- remove these when array encoding is merged -import qualified Database.PG.Query.PTI as PTI -import qualified PostgreSQL.Binary.Encoding as PE - import Data.List.Split (chunksOf) import qualified Hasura.GraphQL.Execute.LiveQuery.TMap as TMap @@ -103,23 +98,6 @@ data Cohort -- result changed, then merge them in the map of existing subscribers } -newtype CohortId = CohortId { unCohortId :: UUID.UUID } - deriving (Show, Eq, Hashable, Q.FromCol) - -newCohortId :: IO CohortId -newCohortId = CohortId <$> UUID.nextRandom - -data CohortVariables - = CohortVariables - { _cvSessionVariables :: !UserVars - , _cvQueryVariables :: !ValidatedQueryVariables - } deriving (Show, Eq, Generic) -instance Hashable CohortVariables - -instance J.ToJSON CohortVariables where - toJSON (CohortVariables sessionVars queryVars) = - J.object ["user" J..= sessionVars, "variables" J..= queryVars] - -- | A hash used to determine if the result changed without having to keep the entire result in -- memory. Using a cryptographic hash ensures that a hash collision is almost impossible: with 256 -- bits, even if a subscription changes once per second for an entire year, the probability of a @@ -157,7 +135,7 @@ dumpCohortMap cohortMap = do curOpIds <- TMap.toList curOps newOpIds <- TMap.toList newOps return $ J.object - [ "resp_id" J..= unCohortId respId + [ "resp_id" J..= respId , "current_ops" J..= map fst curOpIds , "new_ops" J..= map fst newOpIds , "previous_result_hash" J..= prevResHash @@ -292,23 +270,6 @@ dumpPollerMap extended lqMap = , "max" J..= Metrics.max stats ] -newtype CohortIdArray = CohortIdArray { unCohortIdArray :: [CohortId] } - deriving (Show, Eq) - -instance Q.ToPrepArg CohortIdArray where - toPrepVal (CohortIdArray l) = Q.toPrepValHelper PTI.unknown encoder $ map unCohortId l - where - encoder = PE.array 2950 . PE.dimensionArray foldl' (PE.encodingArray . PE.uuid) - -newtype CohortVariablesArray = CohortVariablesArray { unCohortVariablesArray :: [CohortVariables] } - deriving (Show, Eq) - -instance Q.ToPrepArg CohortVariablesArray where - toPrepVal (CohortVariablesArray l) = - Q.toPrepValHelper PTI.unknown encoder (map J.toJSON l) - where - encoder = PE.array 114 . PE.dimensionArray foldl' (PE.encodingArray . PE.json_ast) - -- | Where the magic happens: the top-level action run periodically by each active 'Poller'. pollQuery :: RefetchMetrics @@ -332,8 +293,7 @@ pollQuery metrics batchSize pgExecCtx pgQuery handler = do realToFrac $ Clock.diffUTCTime snapshotFinish procInit flip A.mapConcurrently_ queryVarsBatches $ \queryVars -> do queryInit <- Clock.getCurrentTime - mxRes <- runExceptT . runLazyTx' pgExecCtx . liftTx $ Q.listQE defaultTxErrorHandler - (unMultiplexedQuery pgQuery) (mkMxQueryPrepArgs queryVars) True + mxRes <- runExceptT . runLazyTx' pgExecCtx $ executeMultiplexedQuery pgQuery queryVars queryFinish <- Clock.getCurrentTime Metrics.add (_rmQuery metrics) $ realToFrac $ Clock.diffUTCTime queryFinish queryInit @@ -365,10 +325,6 @@ pollQuery metrics batchSize pgExecCtx pgQuery handler = do getQueryVars cohortSnapshotMap = Map.toList $ fmap _csVariables cohortSnapshotMap - mkMxQueryPrepArgs l = - let (respIdL, respVarL) = unzip l - in (CohortIdArray respIdL, CohortVariablesArray respVarL) - getCohortOperations cohortSnapshotMap = \case Left e -> -- TODO: this is internal error diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs index 383a28e8c36f4..6e37c13ae3d09 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs @@ -68,7 +68,7 @@ addLiveQuery lqState plan onResultAction = do handlerM <- STMMap.lookup handlerId lqMap case handlerM of Just handler -> do - cohortM <- TMap.lookup cohortId $ _pCohorts handler + cohortM <- TMap.lookup cohortKey $ _pCohorts handler case cohortM of Just cohort -> addToCohort sinkId cohort Nothing -> addToPoller sinkId responseId handler @@ -88,14 +88,13 @@ addLiveQuery lqState plan onResultAction = do threadDelay $ unRefetchInterval refetchInterval STM.atomically $ STM.putTMVar (_pIOState handler) (PollerIOState threadRef metrics) - pure $ LiveQueryId handlerId cohortId sinkId + pure $ LiveQueryId handlerId cohortKey sinkId where LiveQueriesState lqOpts pgExecCtx lqMap = lqState LiveQueriesOptions batchSize refetchInterval = lqOpts - LiveQueryPlan (ParameterizedLiveQueryPlan role alias query) sessionVars queryVars = plan + LiveQueryPlan (ParameterizedLiveQueryPlan role alias query) cohortKey = plan handlerId = PollerKey role query - cohortId = CohortVariables sessionVars queryVars addToCohort sinkId handlerC = TMap.insert (Subscriber alias onResultAction) sinkId $ _cNewSubscribers handlerC @@ -103,7 +102,7 @@ addLiveQuery lqState plan onResultAction = do addToPoller sinkId responseId handler = do newCohort <- Cohort responseId <$> STM.newTVar Nothing <*> TMap.new <*> TMap.new addToCohort sinkId newCohort - TMap.insert newCohort cohortId $ _pCohorts handler + TMap.insert newCohort cohortKey $ _pCohorts handler newPoller = Poller <$> TMap.new <*> STM.newEmptyTMVar diff --git a/server/src-lib/Hasura/GraphQL/Explain.hs b/server/src-lib/Hasura/GraphQL/Explain.hs index 19423d3a6a554..4bc0382e7ef59 100644 --- a/server/src-lib/Hasura/GraphQL/Explain.hs +++ b/server/src-lib/Hasura/GraphQL/Explain.hs @@ -20,6 +20,7 @@ import Hasura.SQL.Types import Hasura.SQL.Value import qualified Hasura.GraphQL.Execute as E +import qualified Hasura.GraphQL.Execute.LiveQuery as E import qualified Hasura.GraphQL.Resolve as RS import qualified Hasura.GraphQL.Transport.HTTP.Protocol as GH import qualified Hasura.GraphQL.Validate as GV @@ -120,15 +121,14 @@ explainGQLQuery pgExecCtx sc sqlGenCtx enableAL (GQLExplain query userVarsRaw) = E.GExPRemote _ _ -> throw400 InvalidParams "only hasura queries can be explained" case rootSelSet of - GV.RQuery selSet -> do - let tx = mapM (explainField userInfo gCtx sqlGenCtx) (toList selSet) - plans <- liftIO (runExceptT $ runLazyTx pgExecCtx tx) >>= liftEither - return $ encJFromJValue plans + GV.RQuery selSet -> + runInTx $ encJFromJValue <$> traverse (explainField userInfo gCtx sqlGenCtx) (toList selSet) GV.RMutation _ -> throw400 InvalidParams "only queries can be explained" - GV.RSubscription _ -> - throw400 InvalidParams "only queries can be explained" - + GV.RSubscription rootField -> do + (plan, _) <- E.getSubsOp pgExecCtx gCtx sqlGenCtx userInfo rootField + runInTx $ encJFromJValue <$> E.explainLiveQueryPlan plan where - usrVars = mkUserVars $ maybe [] Map.toList userVarsRaw + usrVars = mkUserVars $ maybe [] Map.toList userVarsRaw userInfo = mkUserInfo (fromMaybe adminRole $ roleFromVars usrVars) usrVars + runInTx = liftEither <=< liftIO . runExceptT . runLazyTx pgExecCtx From 7351783650412c3d17626cf1b5cfade4d7dd2261 Mon Sep 17 00:00:00 2001 From: Alexis King Date: Fri, 20 Sep 2019 03:25:14 -0500 Subject: [PATCH 2/3] wip: add failing test for subscription parameterization --- .../subscriptions/multiplexing/query.yaml | 14 ++++ .../subscriptions/multiplexing/setup.yaml | 8 +++ .../subscriptions/multiplexing/teardown.yaml | 5 ++ server/tests-py/test_subscriptions.py | 71 ++++++++++++------- 4 files changed, 73 insertions(+), 25 deletions(-) create mode 100644 server/tests-py/queries/subscriptions/multiplexing/query.yaml create mode 100644 server/tests-py/queries/subscriptions/multiplexing/setup.yaml create mode 100644 server/tests-py/queries/subscriptions/multiplexing/teardown.yaml diff --git a/server/tests-py/queries/subscriptions/multiplexing/query.yaml b/server/tests-py/queries/subscriptions/multiplexing/query.yaml new file mode 100644 index 0000000000000..1356b1b861d21 --- /dev/null +++ b/server/tests-py/queries/subscriptions/multiplexing/query.yaml @@ -0,0 +1,14 @@ +query: | + subscription ($condition: test_bool_exp) { + test(where: $condition) { + id + } + } +variables_representative: + condition: {id: {_eq: 1}} +variables_same: +- condition: {id: {_eq: 1}} +- condition: {id: {_eq: 2}} +variables_different: +- condition: {id: {_gt: 1}} +- condition: null diff --git a/server/tests-py/queries/subscriptions/multiplexing/setup.yaml b/server/tests-py/queries/subscriptions/multiplexing/setup.yaml new file mode 100644 index 0000000000000..f667d71b62b84 --- /dev/null +++ b/server/tests-py/queries/subscriptions/multiplexing/setup.yaml @@ -0,0 +1,8 @@ +type: bulk +args: +- type: run_sql + args: + sql: CREATE TABLE test(id serial PRIMARY KEY); +- type: track_table + args: + name: test diff --git a/server/tests-py/queries/subscriptions/multiplexing/teardown.yaml b/server/tests-py/queries/subscriptions/multiplexing/teardown.yaml new file mode 100644 index 0000000000000..4492d0f42dd88 --- /dev/null +++ b/server/tests-py/queries/subscriptions/multiplexing/teardown.yaml @@ -0,0 +1,5 @@ +type: bulk +args: +- type: run_sql + args: + sql: DROP TABLE test; diff --git a/server/tests-py/test_subscriptions.py b/server/tests-py/test_subscriptions.py index 186beaad6d03c..46a7c4787c0ab 100644 --- a/server/tests-py/test_subscriptions.py +++ b/server/tests-py/test_subscriptions.py @@ -5,6 +5,8 @@ import queue import yaml +from super_classes import GraphQLEngineTest + ''' Refer: https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md#gql_connection_init ''' @@ -27,6 +29,11 @@ def init_ws_conn(hge_ctx, ws_client, payload = None): ev = ws_client.get_ws_event(3) assert ev['type'] == 'connection_ack', ev +class DefaultTestSubscriptions(GraphQLEngineTest): + @pytest.fixture(scope='class', autouse=True) + def ws_conn_init(self, transact, hge_ctx, ws_client): + init_ws_conn(hge_ctx, ws_client) + class TestSubscriptionCtrl(object): def test_init_without_payload(self, hge_ctx, ws_client): @@ -59,20 +66,10 @@ def test_connection_terminate(self, hge_ctx, ws_client): with pytest.raises(queue.Empty): ev = ws_client.get_ws_event(3) -class TestSubscriptionBasic(object): - - @pytest.fixture(scope='class') - def transact(self, request, hge_ctx): - self.dir = 'queries/subscriptions/basic' - st_code, resp = hge_ctx.v1q_f(self.dir + '/setup.yaml') - assert st_code == 200, resp - yield - st_code, resp = hge_ctx.v1q_f(self.dir + '/teardown.yaml') - assert st_code == 200, resp - - @pytest.fixture(autouse=True) - def ws_conn_init(self, transact, hge_ctx, ws_client): - init_ws_conn(hge_ctx, ws_client) +class TestSubscriptionBasic(DefaultTestSubscriptions): + @classmethod + def dir(cls): + return 'queries/subscriptions/basic' ''' Refer: https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md#gql_connection_error @@ -171,16 +168,10 @@ def test_complete(self, hge_ctx, ws_client): assert ev['type'] == 'complete' and ev['id'] == '2', ev -class TestSubscriptionLiveQueries(object): - - @pytest.fixture(scope='class', autouse=True) - def transact(self, request, hge_ctx, ws_client): - st_code, resp = hge_ctx.v1q_f(self.dir() + '/setup.yaml') - assert st_code == 200, resp - init_ws_conn(hge_ctx, ws_client) - yield - st_code, resp = hge_ctx.v1q_f(self.dir() + '/teardown.yaml') - assert st_code == 200, resp +class TestSubscriptionLiveQueries(DefaultTestSubscriptions): + @classmethod + def dir(cls): + return 'queries/subscriptions/live_queries' def test_live_queries(self, hge_ctx, ws_client): ''' @@ -257,6 +248,36 @@ def test_live_queries(self, hge_ctx, ws_client): with pytest.raises(queue.Empty): ev = ws_client.get_ws_event(3) +class TestSubscriptionMultiplexing(GraphQLEngineTest): @classmethod def dir(cls): - return 'queries/subscriptions/live_queries' + return 'queries/subscriptions/multiplexing' + + def test_query_parameterization(self, hge_ctx): + with open(self.dir() + '/query.yaml') as c: + config = yaml.safe_load(c) + + query = config['query'] + representative_sql = self.get_parameterized_sql(hge_ctx, query, config['variables_representative']) + + for vars in config['variables_same']: + same_sql = self.get_parameterized_sql(hge_ctx, query, vars) + assert same_sql == representative_sql, (representative_sql, same_sql) + + for vars in config['variables_different']: + different_sql = self.get_parameterized_sql(hge_ctx, query, vars) + assert different_sql != representative_sql, (representative_sql, different_sql) + + def get_parameterized_sql(self, hge_ctx, query, variables): + admin_secret = hge_ctx.hge_key + headers = {} + if admin_secret is not None: + headers['X-Hasura-Admin-Secret'] = admin_secret + + request = { 'query': { 'query': query, 'variables': variables }, 'user': {} } + status_code, response = hge_ctx.anyq('/v1/graphql/explain', request, headers) + assert status_code == 200, (request, status_code, response) + + sql = response['sql'] + assert isinstance(sql, str), response + return sql From 889bc69b90c8c370178042259b2c6db0cafa2b03 Mon Sep 17 00:00:00 2001 From: Alexis King Date: Wed, 25 Sep 2019 23:06:03 -0500 Subject: [PATCH 3/3] Parameterize over all SQL literals when multiplexing subscriptions --- server/graphql-engine.cabal | 2 + .../Hasura/GraphQL/Execute/LiveQuery/Plan.hs | 113 +++++++++++------- .../Hasura/GraphQL/Execute/LiveQuery/Poll.hs | 7 +- server/src-lib/Hasura/Prelude.hs | 1 + 4 files changed, 79 insertions(+), 44 deletions(-) diff --git a/server/graphql-engine.cabal b/server/graphql-engine.cabal index cca9401b1e152..f4cdd5c8a957a 100644 --- a/server/graphql-engine.cabal +++ b/server/graphql-engine.cabal @@ -319,6 +319,7 @@ library QuasiQuotes RankNTypes ScopedTypeVariables + StandaloneDeriving TemplateHaskell TupleSections TypeApplications @@ -363,6 +364,7 @@ executable graphql-engine QuasiQuotes RankNTypes ScopedTypeVariables + StandaloneDeriving TemplateHaskell TupleSections TypeApplications diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Plan.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Plan.hs index af733141ab5ed..b3e9a264b4f80 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Plan.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Plan.hs @@ -1,13 +1,15 @@ +{-# LANGUAGE UndecidableInstances #-} + -- | Construction of multiplexed live query plans; see "Hasura.GraphQL.Execute.LiveQuery" for -- details. module Hasura.GraphQL.Execute.LiveQuery.Plan ( MultiplexedQuery , mkMultiplexedQuery - , toMultiplexedQueryVar + , resolveMultiplexedValue , CohortId , newCohortId - , CohortVariables(..) + , CohortVariables , executeMultiplexedQuery , LiveQueryPlan(..) @@ -79,21 +81,23 @@ mkMultiplexedQuery baseQuery = ) _fld_resp ON ('true') |] --- | converts the partial unresolved value containing --- variables, session variables to an SQL expression --- referring correctly to the values from '_subs' temporary table --- The variables are at _subs.result_vars.variables and --- session variables at _subs.result_vars.user -toMultiplexedQueryVar :: (MonadState GV.ReusableVariableValues m) => GR.UnresolvedVal -> m S.SQLExp -toMultiplexedQueryVar = \case - GR.UVPG annPGVal -> +-- | Resolves an 'GR.UnresolvedVal' by converting 'GR.UVPG' values to SQL expressions that refer to +-- the @result_vars@ input object, collecting variable values along the way. +resolveMultiplexedValue + :: (MonadState (GV.ReusableVariableValues, Seq (WithScalarType PGScalarValue)) m) + => GR.UnresolvedVal -> m S.SQLExp +resolveMultiplexedValue = \case + GR.UVPG annPGVal -> do let GR.AnnPGVal varM _ colVal = annPGVal - in case varM of - Just var -> do - modify $ Map.insert var colVal - pure $ fromResVars (PGTypeScalar $ pstType colVal) - ["variables", G.unName $ G.unVariable var] - Nothing -> return $ toTxtValue colVal + varJsonPath <- case varM of + Just varName -> do + modifying _1 $ Map.insert varName colVal + pure ["variables", G.unName $ G.unVariable varName] + Nothing -> do + syntheticVarIndex <- gets (length . snd) + modifying _2 (|> colVal) + pure ["synthetic", T.pack $ show syntheticVarIndex] + pure $ fromResVars (PGTypeScalar $ pstType colVal) varJsonPath GR.UVSessVar ty sessVar -> pure $ fromResVars ty ["user", T.toLower sessVar] GR.UVSQL sqlExp -> pure sqlExp where @@ -111,14 +115,34 @@ newCohortId = CohortId <$> liftIO UUID.nextRandom data CohortVariables = CohortVariables - { _cvSessionVariables :: !UserVars - , _cvQueryVariables :: !ValidatedQueryVariables + { _cvSessionVariables :: !UserVars + , _cvQueryVariables :: !ValidatedQueryVariables + , _cvSyntheticVariables :: !ValidatedSyntheticVariables + -- ^ To allow more queries to be multiplexed together, we introduce “synthetic” variables for + -- /all/ SQL literals in a query, even if they don’t correspond to any GraphQL variable. For + -- example, the query + -- + -- > subscription latest_tracks($condition: tracks_bool_exp!) { + -- > tracks(where: $tracks_bool_exp) { + -- > id + -- > title + -- > } + -- > } + -- + -- might be executed with similar values for @$condition@, such as @{"album_id": {"_eq": "1"}}@ + -- and @{"album_id": {"_eq": "2"}}@. + -- + -- Normally, we wouldn’t bother parameterizing over the @1@ and @2@ literals in the resulting + -- query because we can’t cache that query plan (since different @$condition@ values could lead to + -- different SQL). However, for live queries, we can still take advantage of the similarity + -- between the two queries by multiplexing them together, so we replace them with references to + -- synthetic variables. } deriving (Show, Eq, Generic) instance Hashable CohortVariables instance J.ToJSON CohortVariables where - toJSON (CohortVariables sessionVars queryVars) = - J.object ["user" J..= sessionVars, "variables" J..= queryVars] + toJSON (CohortVariables sessionVars queryVars syntheticVars) = + J.object ["session" J..= sessionVars, "query" J..= queryVars, "synthetic" J..= syntheticVars] -- These types exist only to use the Postgres array encoding. newtype CohortIdArray = CohortIdArray { unCohortIdArray :: [CohortId] } @@ -157,21 +181,27 @@ executeQuery query cohorts = -- > SELECT 'v1'::t1, 'v2'::t2, ..., 'vn'::tn -- -- so if any variable values are invalid, the error will be caught early. -newtype ValidatedQueryVariables = ValidatedQueryVariables (Map.HashMap G.Variable TxtEncodedPGVal) - deriving (Show, Eq, Hashable, J.ToJSON) +newtype ValidatedVariables f = ValidatedVariables (f TxtEncodedPGVal) +deriving instance (Show (f TxtEncodedPGVal)) => Show (ValidatedVariables f) +deriving instance (Eq (f TxtEncodedPGVal)) => Eq (ValidatedVariables f) +deriving instance (Hashable (f TxtEncodedPGVal)) => Hashable (ValidatedVariables f) +deriving instance (J.ToJSON (f TxtEncodedPGVal)) => J.ToJSON (ValidatedVariables f) + +type ValidatedQueryVariables = ValidatedVariables (Map.HashMap G.Variable) +type ValidatedSyntheticVariables = ValidatedVariables [] -- | Checks if the provided arguments are valid values for their corresponding types. -- Generates SQL of the format "select 'v1'::t1, 'v2'::t2 ..." -validateQueryVariables - :: (MonadError QErr m, MonadIO m) +validateVariables + :: (Traversable f, MonadError QErr m, MonadIO m) => PGExecCtx - -> GV.ReusableVariableValues - -> m ValidatedQueryVariables -validateQueryVariables pgExecCtx annVarVals = do - let valSel = mkValidationSel $ Map.elems annVarVals + -> f (WithScalarType PGScalarValue) + -> m (ValidatedVariables f) +validateVariables pgExecCtx variableValues = do + let valSel = mkValidationSel $ toList variableValues Q.Discard () <- runTx' $ liftTx $ Q.rawQE dataExnErrHandler (Q.fromBuilder $ toSQL valSel) [] False - pure . ValidatedQueryVariables $ fmap (txtEncodedPGVal . pstValue) annVarVals + pure . ValidatedVariables $ fmap (txtEncodedPGVal . pstValue) variableValues where mkExtrs = map (flip S.Extractor Nothing . toTxtValue) mkValidationSel vars = @@ -205,8 +235,9 @@ $(J.deriveToJSON (J.aesonDrop 4 J.snakeCase) ''ParameterizedLiveQueryPlan) data ReusableLiveQueryPlan = ReusableLiveQueryPlan - { _rlqpParameterizedPlan :: !ParameterizedLiveQueryPlan - , _rlqpQueryVariableTypes :: !GV.ReusableVariableTypes + { _rlqpParameterizedPlan :: !ParameterizedLiveQueryPlan + , _rlqpSyntheticVariableValues :: !ValidatedSyntheticVariables + , _rlqpQueryVariableTypes :: !GV.ReusableVariableTypes } deriving (Show) $(J.deriveToJSON (J.aesonDrop 4 J.snakeCase) ''ReusableLiveQueryPlan) @@ -226,8 +257,8 @@ buildLiveQueryPlan buildLiveQueryPlan pgExecCtx fieldAlias astUnresolved varTypes = do userInfo <- asks getter - (astResolved, annVarVals) <- flip runStateT mempty $ - GR.traverseQueryRootFldAST toMultiplexedQueryVar astUnresolved + (astResolved, (queryVariableValues, syntheticVariableValues)) <- flip runStateT mempty $ + GR.traverseQueryRootFldAST resolveMultiplexedValue astUnresolved let pgQuery = mkMultiplexedQuery $ GR.toPGQuery astResolved parameterizedPlan = ParameterizedLiveQueryPlan (userRole userInfo) fieldAlias pgQuery @@ -235,9 +266,11 @@ buildLiveQueryPlan pgExecCtx fieldAlias astUnresolved varTypes = do -- are correct according to Postgres. Without this check -- an invalid value for a variable for one instance of the -- subscription will take down the entire multiplexed query - validatedVars <- validateQueryVariables pgExecCtx annVarVals - let plan = LiveQueryPlan parameterizedPlan (CohortVariables (userVars userInfo) validatedVars) - reusablePlan = ReusableLiveQueryPlan parameterizedPlan <$> varTypes + validatedQueryVars <- validateVariables pgExecCtx queryVariableValues + validatedSyntheticVars <- validateVariables pgExecCtx (toList syntheticVariableValues) + let cohortVariables = CohortVariables (userVars userInfo) validatedQueryVars validatedSyntheticVars + plan = LiveQueryPlan parameterizedPlan cohortVariables + reusablePlan = ReusableLiveQueryPlan parameterizedPlan validatedSyntheticVars <$> varTypes pure (plan, reusablePlan) reuseLiveQueryPlan @@ -248,10 +281,10 @@ reuseLiveQueryPlan -> ReusableLiveQueryPlan -> m LiveQueryPlan reuseLiveQueryPlan pgExecCtx sessionVars queryVars reusablePlan = do - let ReusableLiveQueryPlan parameterizedPlan varTypes = reusablePlan - annVarVals <- GV.validateVariablesForReuse varTypes queryVars - validatedVars <- validateQueryVariables pgExecCtx annVarVals - pure $ LiveQueryPlan parameterizedPlan (CohortVariables sessionVars validatedVars) + let ReusableLiveQueryPlan parameterizedPlan syntheticVars queryVarTypes = reusablePlan + annVarVals <- GV.validateVariablesForReuse queryVarTypes queryVars + validatedVars <- validateVariables pgExecCtx annVarVals + pure $ LiveQueryPlan parameterizedPlan (CohortVariables sessionVars validatedVars syntheticVars) data LiveQueryPlanExplanation = LiveQueryPlanExplanation diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs index 84d41286b98c6..e80cfcf57fb78 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs @@ -16,7 +16,7 @@ module Hasura.GraphQL.Execute.LiveQuery.Poll ( , Cohort(..) , CohortId , newCohortId - , CohortVariables(..) + , CohortVariables , CohortKey , CohortMap @@ -121,11 +121,10 @@ type CohortMap = TMap.TMap CohortKey Cohort dumpCohortMap :: CohortMap -> IO J.Value dumpCohortMap cohortMap = do cohorts <- STM.atomically $ TMap.toList cohortMap - fmap J.toJSON . forM cohorts $ \(CohortVariables usrVars varVals, cohort) -> do + fmap J.toJSON . forM cohorts $ \(variableValues, cohort) -> do cohortJ <- dumpCohort cohort return $ J.object - [ "session_vars" J..= usrVars - , "variable_values" J..= varVals + [ "variables" J..= variableValues , "cohort" J..= cohortJ ] where diff --git a/server/src-lib/Hasura/Prelude.hs b/server/src-lib/Hasura/Prelude.hs index b1314148d4633..1223f4500f7fd 100644 --- a/server/src-lib/Hasura/Prelude.hs +++ b/server/src-lib/Hasura/Prelude.hs @@ -36,6 +36,7 @@ import Data.Maybe as M (catMaybes, fromMaybe, isJust, mapMaybe, maybeToList) import Data.Ord as M (comparing) import Data.Semigroup as M (Semigroup (..)) +import Data.Sequence as M (Seq) import Data.String as M (IsString) import Data.Text as M (Text) import Data.Traversable as M (for)