From f2cba4a728c0ba7b92b77d82b10acc2121652ec6 Mon Sep 17 00:00:00 2001 From: Vamshi Surabhi Date: Thu, 25 Apr 2019 17:27:44 +0530 Subject: [PATCH 1/3] split stm transactions when snapshotting to make it faster --- .../GraphQL/Execute/LiveQuery/Multiplexed.hs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Multiplexed.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Multiplexed.hs index 47d56320f158e..8c74daccc1b3b 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Multiplexed.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Multiplexed.hs @@ -194,8 +194,8 @@ dumpLiveQueryMap lqMap = , "min" J..= Metrics.min stats , "max" J..= Metrics.max stats ] - dumpCandidates candidateMap = STM.atomically $ do - candidates <- toListTMap candidateMap + dumpCandidates candidateMap = do + candidates <- STM.atomically $ toListTMap candidateMap forM candidates $ \((usrVars, varVals), candidate) -> do candidateJ <- dumpCandidate candidate return $ J.object @@ -203,7 +203,8 @@ dumpLiveQueryMap lqMap = , "variable_values" J..= varVals , "candidate" J..= candidateJ ] - dumpCandidate (CandidateState respId _ respTV curOps newOps) = do + dumpCandidate (CandidateState respId _ respTV curOps newOps) = + STM.atomically $ do prevResHash <- STM.readTVar respTV curOpIds <- toListTMap curOps newOpIds <- toListTMap newOps @@ -488,11 +489,14 @@ pollQuery pollQuery metrics batchSize pgExecCtx handler = do procInit <- Clock.getCurrentTime + -- get a snapshot of all the candidates - candidateSnapshotMap <- STM.atomically $ do - candidates <- toListTMap candidateMap - candidateSnapshots <- mapM getCandidateSnapshot candidates - return $ Map.fromList candidateSnapshots + -- this need not be done in a transaction + candidates <- STM.atomically $ toListTMap candidateMap + candidateSnapshotMap <- + fmap Map.fromList $ + mapM (STM.atomically . getCandidateSnapshot) candidates + let queryVarsBatches = chunks (unBatchSize batchSize) $ getQueryVars candidateSnapshotMap From 812e2fbca562eb395862bc5125c1a60d041e8c52 Mon Sep 17 00:00:00 2001 From: Vamshi Surabhi Date: Thu, 25 Apr 2019 17:40:35 +0530 Subject: [PATCH 2/3] mx subs: push to both old and new sinks at the same time --- .../GraphQL/Execute/LiveQuery/Multiplexed.hs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Multiplexed.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Multiplexed.hs index 8c74daccc1b3b..e10448260cbf7 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Multiplexed.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Multiplexed.hs @@ -435,12 +435,16 @@ data CandidateSnapshot pushCandidateResult :: GQResp -> Maybe RespHash -> CandidateSnapshot -> IO () pushCandidateResult resp respHashM candidateSnapshot = do - pushResultToSinks newSinks - -- write to the current websockets if needed prevRespHashM <- STM.readTVarIO respRef - when (isExecError resp || respHashM /= prevRespHashM) $ do - pushResultToSinks curSinks - STM.atomically $ STM.writeTVar respRef respHashM + -- write to the current websockets if needed + sinks <- + if (isExecError resp || respHashM /= prevRespHashM) + then do + STM.atomically $ STM.writeTVar respRef respHashM + return (newSinks <> curSinks) + else + return newSinks + pushResultToSinks sinks where CandidateSnapshot _ respRef curSinks newSinks = candidateSnapshot pushResultToSinks = From 9d2077e1d53161816af5273f412c3cef326faf99 Mon Sep 17 00:00:00 2001 From: Vamshi Surabhi Date: Fri, 26 Apr 2019 17:13:06 +0530 Subject: [PATCH 3/3] expose dev APIs through allowed APIs flag --- server/graphql-engine.cabal | 2 +- .../Hasura/GraphQL/Execute/LiveQuery.hs | 6 ++--- .../GraphQL/Execute/LiveQuery/Multiplexed.hs | 15 +++++++----- server/src-lib/Hasura/Server/App.hs | 24 ++++++++++++------- server/src-lib/Hasura/Server/Init.hs | 16 +++++++++---- 5 files changed, 41 insertions(+), 22 deletions(-) diff --git a/server/graphql-engine.cabal b/server/graphql-engine.cabal index 0ac31c68f9823..4e36f912c704c 100644 --- a/server/graphql-engine.cabal +++ b/server/graphql-engine.cabal @@ -290,7 +290,7 @@ library if flag(profile) ghc-prof-options: -rtsopts -fprof-auto -fno-prof-count-entries if flag(developer) - cpp-options: -DInternalAPIs -DLocalConsole + cpp-options: -DDeveloperAPIs -DLocalConsole if flag(local-console) cpp-options: -DLocalConsole diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery.hs index ceea89b884667..83be0a11aac3a 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery.hs @@ -69,9 +69,9 @@ data LiveQueriesState } dumpLiveQueriesState - :: LiveQueriesState -> IO J.Value -dumpLiveQueriesState (LiveQueriesState mx fallback _) = do - mxJ <- LQM.dumpLiveQueriesState mx + :: Bool -> LiveQueriesState -> IO J.Value +dumpLiveQueriesState extended (LiveQueriesState mx fallback _) = do + mxJ <- LQM.dumpLiveQueriesState extended mx fallbackJ <- LQF.dumpLiveQueriesState fallback return $ J.object [ "fallback" J..= fallbackJ diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Multiplexed.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Multiplexed.hs index e10448260cbf7..fcc0c821acdce 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Multiplexed.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Multiplexed.hs @@ -148,23 +148,26 @@ initLiveQueriesState lqOptions = lqOptions <$> STMMap.new -dumpLiveQueriesState :: LiveQueriesState -> IO J.Value -dumpLiveQueriesState (LiveQueriesState opts lqMap) = do - lqMapJ <- dumpLiveQueryMap lqMap +dumpLiveQueriesState :: Bool -> LiveQueriesState -> IO J.Value +dumpLiveQueriesState extended (LiveQueriesState opts lqMap) = do + lqMapJ <- dumpLiveQueryMap extended lqMap return $ J.object [ "options" J..= opts , "live_queries_map" J..= lqMapJ ] -dumpLiveQueryMap :: LiveQueryMap -> IO J.Value -dumpLiveQueryMap lqMap = +dumpLiveQueryMap :: Bool -> LiveQueryMap -> IO J.Value +dumpLiveQueryMap extended lqMap = fmap J.toJSON $ do entries <- STM.atomically $ ListT.toList $ STMMap.listT lqMap forM entries $ \(lq, (lqHandler, threadRef)) -> do ThreadState threadId metrics <- STM.atomically $ STM.readTMVar threadRef metricsJ <- dumpReftechMetrics metrics - candidatesJ <- dumpCandidates $ _mhCandidates lqHandler + candidatesJ <- + if extended + then fmap Just $ dumpCandidates $ _mhCandidates lqHandler + else return Nothing return $ J.object [ "key" J..= lq , "thread_id" J..= show (A.asyncThreadId threadId) diff --git a/server/src-lib/Hasura/Server/App.hs b/server/src-lib/Hasura/Server/App.hs index ac7ec40cbb8a3..374f9f363632b 100644 --- a/server/src-lib/Hasura/Server/App.hs +++ b/server/src-lib/Hasura/Server/App.hs @@ -151,6 +151,9 @@ isMetadataEnabled sc = S.member METADATA $ scEnabledAPIs sc isGraphQLEnabled :: ServerCtx -> Bool isGraphQLEnabled sc = S.member GRAPHQL $ scEnabledAPIs sc +isDeveloperAPIEnabled :: ServerCtx -> Bool +isDeveloperAPIEnabled sc = S.member DEVELOPER $ scEnabledAPIs sc + -- {-# SCC parseBody #-} parseBody :: (FromJSON a) => Handler a parseBody = do @@ -410,14 +413,19 @@ httpApp corsCfg serverCtx enableConsole enableTelemetry = do query <- parseBody v1Alpha1GQHandler query -#ifdef InternalAPIs - get "internal/plan_cache" $ do - respJ <- liftIO $ E.dumpPlanCache $ scPlanCache serverCtx - json respJ - get "internal/subscriptions" $ do - respJ <- liftIO $ EL.dumpLiveQueriesState $ scLQState serverCtx - json respJ -#endif + when (isDeveloperAPIEnabled serverCtx) $ do + get "dev/plan_cache" $ mkSpockAction encodeQErr serverCtx $ do + onlyAdmin + respJ <- liftIO $ E.dumpPlanCache $ scPlanCache serverCtx + return $ encJFromJValue respJ + get "dev/subscriptions" $ mkSpockAction encodeQErr serverCtx $ do + onlyAdmin + respJ <- liftIO $ EL.dumpLiveQueriesState False $ scLQState serverCtx + return $ encJFromJValue respJ + get "dev/subscriptions/extended" $ mkSpockAction encodeQErr serverCtx $ do + onlyAdmin + respJ <- liftIO $ EL.dumpLiveQueriesState True $ scLQState serverCtx + return $ encJFromJValue respJ forM_ [GET,POST] $ \m -> hookAny m $ \_ -> do let qErr = err404 NotFound "resource does not exist" diff --git a/server/src-lib/Hasura/Server/Init.hs b/server/src-lib/Hasura/Server/Init.hs index 5c3e3f8920a6b..2d38188a8bce6 100644 --- a/server/src-lib/Hasura/Server/Init.hs +++ b/server/src-lib/Hasura/Server/Init.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE CPP #-} module Hasura.Server.Init where import qualified Database.PG.Query as Q @@ -102,6 +103,7 @@ data HGECommandG a data API = METADATA | GRAPHQL + | DEVELOPER deriving (Show, Eq, Read, Generic) instance Hashable API @@ -263,13 +265,18 @@ mkServeOptions rso = do enableTelemetry <- fromMaybe True <$> withEnv (rsoEnableTelemetry rso) (fst enableTelemetryEnv) strfyNum <- withEnvBool (rsoStringifyNum rso) $ fst stringifyNumEnv - enabledAPIs <- Set.fromList . fromMaybe [METADATA,GRAPHQL] <$> + enabledAPIs <- Set.fromList . fromMaybe defaultAPIs <$> withEnv (rsoEnabledAPIs rso) (fst enabledAPIsEnv) lqOpts <- mkLQOpts return $ ServeOptions port host connParams txIso adminScrt authHook jwtSecret unAuthRole corsCfg enableConsole enableTelemetry strfyNum enabledAPIs lqOpts where +#ifdef DeveloperAPIs + defaultAPIs = [METADATA,GRAPHQL,DEVELOPER] +#else + defaultAPIs = [METADATA,GRAPHQL] +#endif mkConnParams (RawConnParams s c i p) = do stripes <- fromMaybe 1 <$> withEnv s (fst pgStripesEnv) conns <- fromMaybe 50 <$> withEnv c (fst pgConnsEnv) @@ -686,6 +693,7 @@ readAPIs = mapM readAPI . T.splitOn "," . T.pack where readAPI si = case T.toUpper $ T.strip si of "METADATA" -> Right METADATA "GRAPHQL" -> Right GRAPHQL + "DEVELOPER" -> Right DEVELOPER _ -> Left "Only expecting list of comma separated API types metadata / graphql" parseWebHook :: Parser RawAuthHook @@ -796,14 +804,14 @@ parseMxBatchSize = mxRefetchDelayEnv :: (String, String) mxRefetchDelayEnv = ( "HASURA_GRAPHQL_LIVE_QUERIES_MULTIPLEXED_REFETCH_INTERVAL" - , "results will only be sent once in this interval (in milliseconds) for \ + , "results will only be sent once in this interval (in milliseconds) for \\ \live queries which can be multiplexed. Default: 1000 (1sec)" ) mxBatchSizeEnv :: (String, String) mxBatchSizeEnv = ( "HASURA_GRAPHQL_LIVE_QUERIES_MULTIPLEXED_BATCH_SIZE" - , "multiplexed live queries are split into batches of the specified \ + , "multiplexed live queries are split into batches of the specified \\ \size. Default 100. " ) @@ -819,7 +827,7 @@ parseFallbackRefetchInt = fallbackRefetchDelayEnv :: (String, String) fallbackRefetchDelayEnv = ( "HASURA_GRAPHQL_LIVE_QUERIES_FALLBACK_REFETCH_INTERVAL" - , "results will only be sent once in this interval (in milliseconds) for \ + , "results will only be sent once in this interval (in milliseconds) for \\ \live queries which cannot be multiplexed. Default: 1000 (1sec)" )