这是indexloc提供的服务,不要输入任何密码
Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 68 additions & 53 deletions server/src-lib/Hasura/GraphQL/Execute/LiveQuery.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@ module Hasura.GraphQL.Execute.LiveQuery
, initLiveQueriesState
, dumpLiveQueriesState

, LiveQueryOpG(..)

, LiveQueryOp
, LiveQueryOpPartial
, getLiveQueryOpPartial
, LiveQueryId
, addLiveQuery
, removeLiveQuery

, SubsPlan
, subsOpFromPlan
, subsOpFromPGAST

) where

import Data.Has
Expand All @@ -30,7 +35,6 @@ import qualified Control.Concurrent.STM as STM
import qualified Data.Aeson as J
import qualified Data.HashMap.Strict as Map
import qualified Data.HashSet as Set
import qualified Data.Text as T
import qualified Database.PG.Query as Q
import qualified Database.PG.Query.Connection as Q
import qualified Language.GraphQL.Draft.Syntax as G
Expand All @@ -48,7 +52,6 @@ import Hasura.GraphQL.Execute.LiveQuery.Types
import Hasura.Prelude
import Hasura.RQL.DML.Select (asSingleRowJsonResp)
import Hasura.RQL.Types

import Hasura.SQL.Types
import Hasura.SQL.Value

Expand Down Expand Up @@ -95,9 +98,12 @@ initLiveQueriesState (LQOpts mxOpts fallbackOpts) pgExecCtx = do
<*> LQF.initLiveQueriesState fallbackOpts
return $ LiveQueriesState mxMap fallbackMap pgExecCtx

data LiveQueryOp
= LQMultiplexed !LQM.MxOp
| LQFallback !LQF.FallbackOp
data LiveQueryOpG f m
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you tell me what the G stands for in these types? “Grammar”? “General”?

= LQFallback !f
| LQMultiplexed !m
deriving (Show, Eq)

type LiveQueryOp = LiveQueryOpG LQF.FallbackOp LQM.MxOp

data LiveQueryId
= LQIMultiplexed !LQM.LiveQueryId
Expand Down Expand Up @@ -161,33 +167,56 @@ type TextEncodedVariables
-- referring correctly to the values from '_subs' temporary table
-- The variables are at _subs.result_vars.variables and
-- session variables at _subs.result_vars.user
toMultiplexedQueryVar
:: (MonadState GV.AnnPGVarVals m)
=> GR.UnresolvedVal -> m S.SQLExp
toMultiplexedQueryVar = \case
GR.UVPG annPGVal ->
let GR.AnnPGVal varM isNullable colTy colVal = annPGVal
in case (varM, isNullable) of
-- we don't check for nullability as
-- this is only used for reusable plans
-- the check has to be made before this
(Just var, _) -> do
modify $ Map.insert var (colTy, colVal)
return $ fromResVars (PgTypeSimple colTy)
[ "variables"
, G.unName $ G.unVariable var
]
_ -> return $ toTxtValue colTy colVal
GR.UVSessVar ty sessVar ->
return $ fromResVars ty [ "user", T.toLower sessVar]
GR.UVSQL sqlExp -> return sqlExp
type FallbackOpPartial = (GR.QueryRootFldUnresolved, Set.HashSet G.Variable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, mostly unimportant comment, but: I wonder if there’s a good way to encode into the type system that this HashSet must be non-empty.

type MultiplexedOpPartial = (GV.VarPGTypes, Q.Query, TextEncodedVariables)

type LiveQueryOpPartial = LiveQueryOpG FallbackOpPartial MultiplexedOpPartial

-- | Creates a partial live query operation, used in both
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly is a “partial” live query operation—which parts of a “full” live query does it include and which parts does it lack?

-- analyze and execution of a live query
getLiveQueryOpPartial
:: ( MonadError QErr m
, MonadIO m
)

-- | to validate arguments
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the syntax Haddock expects for documenting arguments is the -- ^ some description form (which goes under the type rather than above), and in fact I believe doing it this way actually causes Haddock to barf. I personally think it’s a bit silly, but oh well.

=> PGExecCtx

-- | variable definitions as seen in the subscription, needed in
-- checking whether the subscription can be multiplexed or not
-> [G.VariableDefinition]

-- | The partially processed live query field
-> GR.QueryRootFldUnresolved

-> m LiveQueryOpPartial
getLiveQueryOpPartial pgExecCtx varDefs astUnresolved = do
-- collect the variables (with their types) used inside the subscription
(_, varTypes) <- flip runStateT mempty $ GR.traverseQueryRootFldAST
collectNonNullableVars astUnresolved

let nonConfirmingVariables = getNonConfirmingVariables varTypes

-- Can the subscription be multiplexed?
-- Only if all variables are non null and can be prepared
if null nonConfirmingVariables
then do
let (mxQuery, annVarVals) = LQM.resolveToMxQuery astUnresolved
-- We need to ensure that the values provided for variables
-- are correct according to Postgres. Without this check
-- an invalid value for a variable for one instance of the
-- subscription will take down the entire multiplexed query
txtEncodedVars <- validateAnnVarValsOnPg pgExecCtx annVarVals
return $ LQMultiplexed (varTypes, mxQuery, txtEncodedVars)
else
return $ LQFallback (astUnresolved, nonConfirmingVariables)
where
fromResVars ty jPath =
flip S.SETyAnn (S.mkTypeAnn ty) $ S.SEOpApp (S.SQLOp "#>>")
[ S.SEQIden $ S.QIden (S.QualIden $ Iden "_subs")
(Iden "result_vars")
, S.SEArray $ map S.SELit jPath
]
-- get the variables which don't conifrm to the
-- 'non-null scalar' rule
getNonConfirmingVariables usedVariables =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be “conforming” rather than “confirming”?

let queryVariables = Set.fromList $ map G._vdVariable varDefs
confirmingVariables = Map.keysSet usedVariables
in queryVariables `Set.difference` confirmingVariables

-- | Creates a live query operation and if possible, a reusable plan
--
Expand Down Expand Up @@ -216,32 +245,18 @@ subsOpFromPGAST
subsOpFromPGAST pgExecCtx reqUnparsed varDefs (fldAls, astUnresolved) = do
userInfo <- asks getter

-- collect the variables (with their types) used inside the subscription
(_, varTypes) <- flip runStateT mempty $ GR.traverseQueryRootFldAST
collectNonNullableVars astUnresolved
liveQueryOpPartial <- getLiveQueryOpPartial pgExecCtx varDefs astUnresolved

-- Can the subscription be multiplexed?
-- Only if all variables are non null and can be prepared
if Set.fromList (Map.keys varTypes) == allVars
then mkMultiplexedOp userInfo varTypes
else mkFallbackOp userInfo
where
allVars = Set.fromList $ map G._vdVariable varDefs
case liveQueryOpPartial of
LQFallback _ -> mkFallbackOp userInfo
LQMultiplexed (varTypes, mxQuery, txtEncodedVars) ->
mkMultiplexedOp userInfo varTypes mxQuery txtEncodedVars

where
-- multiplexed subscription
mkMultiplexedOp userInfo varTypes = do
(astResolved, annVarVals) <-
flip runStateT mempty $ GR.traverseQueryRootFldAST
toMultiplexedQueryVar astUnresolved
mkMultiplexedOp userInfo varTypes mxQuery txtEncodedVars = do
let mxOpCtx = LQM.mkMxOpCtx (userRole userInfo)
(GH._grQuery reqUnparsed) fldAls $
GR.toPGQuery astResolved

-- We need to ensure that the values provided for variables
-- are correct according to Postgres. Without this check
-- an invalid value for a variable for one instance of the
-- subscription will take down the entire multiplexed query
txtEncodedVars <- validateAnnVarValsOnPg pgExecCtx annVarVals
(GH._grQuery reqUnparsed) fldAls mxQuery
let mxOp = (mxOpCtx, userVars userInfo, txtEncodedVars)
return (LQMultiplexed mxOp, Just $ SubsPlan mxOpCtx varTypes)

Expand Down
84 changes: 71 additions & 13 deletions server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Multiplexed.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,20 @@ module Hasura.GraphQL.Execute.LiveQuery.Multiplexed
, initLiveQueriesState
, dumpLiveQueriesState

, resolveToMxQuery
, MxOpCtx
, mkMxOpCtx
, MxOp

, LiveQueryId
, addLiveQuery
, removeLiveQuery

, RespId
, newRespId
, RespVars
, getRespVars
, mkMxQueryArgs
) where

import Data.List (unfoldr)
Expand All @@ -27,6 +34,7 @@ import qualified Control.Concurrent.Async as A
import qualified Control.Concurrent.STM as STM
import qualified Data.Aeson.Extended as J
import qualified Data.HashMap.Strict as Map
import qualified Data.Text as T
import qualified Data.Time.Clock as Clock
import qualified Data.UUID as UUID
import qualified Data.UUID.V4 as UUID
Expand All @@ -37,11 +45,16 @@ import qualified System.Metrics.Distribution as Metrics

import Control.Concurrent (threadDelay)

import qualified Hasura.GraphQL.Resolve as GR
import qualified Hasura.GraphQL.Validate as GV
import qualified Hasura.SQL.DML as S

import Hasura.EncJSON
import Hasura.GraphQL.Execute.LiveQuery.Types
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.Prelude
import Hasura.RQL.Types
import Hasura.SQL.Types
import Hasura.SQL.Value

-- remove these when array encoding is merged
Expand Down Expand Up @@ -255,9 +268,9 @@ data CandidateState
-- and the validated, text encoded query variables
data MxOpCtx
= MxOpCtx
{ _mocGroup :: !LQGroup
, _mocAlias :: !G.Alias
, _mocQuery :: !Q.Query
{ _mocGroup :: !LQGroup
, _mocAlias :: !G.Alias
, _mocQuery :: !Q.Query
}

instance J.ToJSON MxOpCtx where
Expand All @@ -274,14 +287,14 @@ mkMxOpCtx
-> G.Alias -> Q.Query
-> MxOpCtx
mkMxOpCtx role queryTxt als query =
MxOpCtx lqGroup als $ mkMxQuery query
MxOpCtx lqGroup als query
where
lqGroup = LQGroup role queryTxt

mkMxQuery :: Q.Query -> Q.Query
mkMxQuery baseQuery =
mkMxQuery :: GR.QueryRootFldResolved -> Q.Query
mkMxQuery astResolved =
Q.fromText $ mconcat $ map Q.getQueryText $
[mxQueryPfx, baseQuery, mxQuerySfx]
[mxQueryPfx, GR.toPGQuery astResolved, mxQuerySfx]
where
mxQueryPfx :: Q.Query
mxQueryPfx =
Expand Down Expand Up @@ -487,6 +500,55 @@ chunks :: Word32 -> [a] -> [[a]]
chunks n =
takeWhile (not.null) . unfoldr (Just . splitAt (fromIntegral n))

newtype MxQueryArgs
= MxQueryArgs { _unMxQueryArgs :: (RespIdList, RespVarsList)}
deriving (Q.ToPrepArgs)

mkMxQueryArgs :: [(RespId, RespVars)] -> MxQueryArgs
mkMxQueryArgs args =
MxQueryArgs (RespIdList respIdL, RespVarsList respVarsL)
where
(respIdL, respVarsL) = unzip args

toMultiplexedQueryVar
:: (MonadState GV.AnnPGVarVals m)
=> GR.UnresolvedVal -> m S.SQLExp
toMultiplexedQueryVar = \case
GR.UVPG annPGVal ->
let GR.AnnPGVal varM isNullable colTy colVal = annPGVal
in case (varM, isNullable) of
-- we don't check for nullability as
-- this is only used for reusable plans
-- the check has to be made before this
(Just var, _) -> do
modify $ Map.insert var (colTy, colVal)
return $ fromResVars (PgTypeSimple colTy)
[ "variables"
, G.unName $ G.unVariable var
]
_ -> return $ toTxtValue colTy colVal
GR.UVSessVar ty sessVar ->
return $ fromResVars ty [ "user", T.toLower sessVar]
GR.UVSQL sqlExp -> return sqlExp
where
fromResVars ty jPath =
flip S.SETyAnn (S.mkTypeAnn ty) $ S.SEOpApp (S.SQLOp "#>>")
[ S.SEQIden $ S.QIden (S.QualIden $ Iden "_subs")
(Iden "result_vars")
, S.SEArray $ map S.SELit jPath
]

resolveToMxQuery
:: GR.QueryRootFldUnresolved
-> (Q.Query, GV.AnnPGVarVals)
resolveToMxQuery astUnresolved =
(mxQuery, annVarVals)
where
mxQuery = mkMxQuery astResolved
(astResolved, annVarVals) =
flip runState mempty $ GR.traverseQueryRootFldAST
toMultiplexedQueryVar astUnresolved

pollQuery
:: RefetchMetrics
-> BatchSize
Expand All @@ -505,7 +567,7 @@ pollQuery metrics batchSize pgExecCtx handler = do
mapM (STM.atomically . getCandidateSnapshot) candidates

let queryVarsBatches = chunks (unBatchSize batchSize) $
getQueryVars candidateSnapshotMap
getQueryVars candidateSnapshotMap

snapshotFinish <- Clock.getCurrentTime
Metrics.add (_rmSnapshot metrics) $
Expand All @@ -514,7 +576,7 @@ pollQuery metrics batchSize pgExecCtx handler = do
queryInit <- Clock.getCurrentTime
mxRes <- runExceptT $ runLazyTx' pgExecCtx $
liftTx $ Q.listQE defaultTxErrorHandler
pgQuery (mkMxQueryPrepArgs queryVars) True
pgQuery (mkMxQueryArgs queryVars) True
queryFinish <- Clock.getCurrentTime
Metrics.add (_rmQuery metrics) $
realToFrac $ Clock.diffUTCTime queryFinish queryInit
Expand Down Expand Up @@ -547,10 +609,6 @@ pollQuery metrics batchSize pgExecCtx handler = do
getQueryVars candidateSnapshotMap =
Map.toList $ fmap _csRespVars candidateSnapshotMap

mkMxQueryPrepArgs l =
let (respIdL, respVarL) = unzip l
in (RespIdList respIdL, RespVarsList respVarL)

getCandidateOperations candidateSnapshotMap = \case
Left e ->
-- TODO: this is internal error
Expand Down
Loading