这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/graphql-engine.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ library
if flag(profile)
ghc-prof-options: -rtsopts -fprof-auto -fno-prof-count-entries
if flag(developer)
cpp-options: -DInternalAPIs -DLocalConsole
cpp-options: -DDeveloperAPIs -DLocalConsole
if flag(local-console)
cpp-options: -DLocalConsole

Expand Down
6 changes: 3 additions & 3 deletions server/src-lib/Hasura/GraphQL/Execute/LiveQuery.hs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ data LiveQueriesState
}

dumpLiveQueriesState
:: LiveQueriesState -> IO J.Value
dumpLiveQueriesState (LiveQueriesState mx fallback _) = do
mxJ <- LQM.dumpLiveQueriesState mx
:: Bool -> LiveQueriesState -> IO J.Value
dumpLiveQueriesState extended (LiveQueriesState mx fallback _) = do
mxJ <- LQM.dumpLiveQueriesState extended mx
fallbackJ <- LQF.dumpLiveQueriesState fallback
return $ J.object
[ "fallback" J..= fallbackJ
Expand Down
47 changes: 29 additions & 18 deletions server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Multiplexed.hs
Original file line number Diff line number Diff line change
Expand Up @@ -148,23 +148,26 @@ initLiveQueriesState lqOptions =
lqOptions
<$> STMMap.new

dumpLiveQueriesState :: LiveQueriesState -> IO J.Value
dumpLiveQueriesState (LiveQueriesState opts lqMap) = do
lqMapJ <- dumpLiveQueryMap lqMap
dumpLiveQueriesState :: Bool -> LiveQueriesState -> IO J.Value
dumpLiveQueriesState extended (LiveQueriesState opts lqMap) = do
lqMapJ <- dumpLiveQueryMap extended lqMap
return $ J.object
[ "options" J..= opts
, "live_queries_map" J..= lqMapJ
]

dumpLiveQueryMap :: LiveQueryMap -> IO J.Value
dumpLiveQueryMap lqMap =
dumpLiveQueryMap :: Bool -> LiveQueryMap -> IO J.Value
dumpLiveQueryMap extended lqMap =
fmap J.toJSON $ do
entries <- STM.atomically $ ListT.toList $ STMMap.listT lqMap
forM entries $ \(lq, (lqHandler, threadRef)) -> do
ThreadState threadId metrics <-
STM.atomically $ STM.readTMVar threadRef
metricsJ <- dumpReftechMetrics metrics
candidatesJ <- dumpCandidates $ _mhCandidates lqHandler
candidatesJ <-
if extended
then fmap Just $ dumpCandidates $ _mhCandidates lqHandler
else return Nothing
return $ J.object
[ "key" J..= lq
, "thread_id" J..= show (A.asyncThreadId threadId)
Expand Down Expand Up @@ -194,16 +197,17 @@ dumpLiveQueryMap lqMap =
, "min" J..= Metrics.min stats
, "max" J..= Metrics.max stats
]
dumpCandidates candidateMap = STM.atomically $ do
candidates <- toListTMap candidateMap
dumpCandidates candidateMap = do
candidates <- STM.atomically $ toListTMap candidateMap
forM candidates $ \((usrVars, varVals), candidate) -> do
candidateJ <- dumpCandidate candidate
return $ J.object
[ "session_vars" J..= usrVars
, "variable_values" J..= varVals
, "candidate" J..= candidateJ
]
dumpCandidate (CandidateState respId _ respTV curOps newOps) = do
dumpCandidate (CandidateState respId _ respTV curOps newOps) =
STM.atomically $ do
prevResHash <- STM.readTVar respTV
curOpIds <- toListTMap curOps
newOpIds <- toListTMap newOps
Expand Down Expand Up @@ -434,12 +438,16 @@ data CandidateSnapshot

pushCandidateResult :: GQResp -> Maybe RespHash -> CandidateSnapshot -> IO ()
pushCandidateResult resp respHashM candidateSnapshot = do
pushResultToSinks newSinks
-- write to the current websockets if needed
prevRespHashM <- STM.readTVarIO respRef
when (isExecError resp || respHashM /= prevRespHashM) $ do
pushResultToSinks curSinks
STM.atomically $ STM.writeTVar respRef respHashM
-- write to the current websockets if needed
sinks <-
if (isExecError resp || respHashM /= prevRespHashM)
then do
STM.atomically $ STM.writeTVar respRef respHashM
return (newSinks <> curSinks)
else
return newSinks
pushResultToSinks sinks
where
CandidateSnapshot _ respRef curSinks newSinks = candidateSnapshot
pushResultToSinks =
Expand Down Expand Up @@ -488,11 +496,14 @@ pollQuery
pollQuery metrics batchSize pgExecCtx handler = do

procInit <- Clock.getCurrentTime

-- get a snapshot of all the candidates
candidateSnapshotMap <- STM.atomically $ do
candidates <- toListTMap candidateMap
candidateSnapshots <- mapM getCandidateSnapshot candidates
return $ Map.fromList candidateSnapshots
-- this need not be done in a transaction
candidates <- STM.atomically $ toListTMap candidateMap
candidateSnapshotMap <-
fmap Map.fromList $
mapM (STM.atomically . getCandidateSnapshot) candidates

let queryVarsBatches = chunks (unBatchSize batchSize) $
getQueryVars candidateSnapshotMap

Expand Down
24 changes: 16 additions & 8 deletions server/src-lib/Hasura/Server/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ isMetadataEnabled sc = S.member METADATA $ scEnabledAPIs sc
isGraphQLEnabled :: ServerCtx -> Bool
isGraphQLEnabled sc = S.member GRAPHQL $ scEnabledAPIs sc

isDeveloperAPIEnabled :: ServerCtx -> Bool
isDeveloperAPIEnabled sc = S.member DEVELOPER $ scEnabledAPIs sc

-- {-# SCC parseBody #-}
parseBody :: (FromJSON a) => Handler a
parseBody = do
Expand Down Expand Up @@ -418,14 +421,19 @@ httpApp corsCfg serverCtx enableConsole enableTelemetry = do
query <- parseBody
v1Alpha1GQHandler query

#ifdef InternalAPIs
get "internal/plan_cache" $ do
respJ <- liftIO $ E.dumpPlanCache $ scPlanCache serverCtx
json respJ
get "internal/subscriptions" $ do
respJ <- liftIO $ EL.dumpLiveQueriesState $ scLQState serverCtx
json respJ
#endif
when (isDeveloperAPIEnabled serverCtx) $ do
get "dev/plan_cache" $ mkSpockAction encodeQErr serverCtx $ do
onlyAdmin
respJ <- liftIO $ E.dumpPlanCache $ scPlanCache serverCtx
return $ encJFromJValue respJ
get "dev/subscriptions" $ mkSpockAction encodeQErr serverCtx $ do
onlyAdmin
respJ <- liftIO $ EL.dumpLiveQueriesState False $ scLQState serverCtx
return $ encJFromJValue respJ
get "dev/subscriptions/extended" $ mkSpockAction encodeQErr serverCtx $ do
onlyAdmin
respJ <- liftIO $ EL.dumpLiveQueriesState True $ scLQState serverCtx
return $ encJFromJValue respJ

forM_ [GET,POST] $ \m -> hookAny m $ \_ -> do
let qErr = err404 NotFound "resource does not exist"
Expand Down
16 changes: 12 additions & 4 deletions server/src-lib/Hasura/Server/Init.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{-# LANGUAGE CPP #-}
module Hasura.Server.Init where

import qualified Database.PG.Query as Q
Expand Down Expand Up @@ -102,6 +103,7 @@ data HGECommandG a
data API
= METADATA
| GRAPHQL
| DEVELOPER
deriving (Show, Eq, Read, Generic)

instance Hashable API
Expand Down Expand Up @@ -263,13 +265,18 @@ mkServeOptions rso = do
enableTelemetry <- fromMaybe True <$>
withEnv (rsoEnableTelemetry rso) (fst enableTelemetryEnv)
strfyNum <- withEnvBool (rsoStringifyNum rso) $ fst stringifyNumEnv
enabledAPIs <- Set.fromList . fromMaybe [METADATA,GRAPHQL] <$>
enabledAPIs <- Set.fromList . fromMaybe defaultAPIs <$>
withEnv (rsoEnabledAPIs rso) (fst enabledAPIsEnv)
lqOpts <- mkLQOpts
return $ ServeOptions port host connParams txIso adminScrt authHook jwtSecret
unAuthRole corsCfg enableConsole
enableTelemetry strfyNum enabledAPIs lqOpts
where
#ifdef DeveloperAPIs
defaultAPIs = [METADATA,GRAPHQL,DEVELOPER]
#else
defaultAPIs = [METADATA,GRAPHQL]
#endif
mkConnParams (RawConnParams s c i p) = do
stripes <- fromMaybe 1 <$> withEnv s (fst pgStripesEnv)
conns <- fromMaybe 50 <$> withEnv c (fst pgConnsEnv)
Expand Down Expand Up @@ -686,6 +693,7 @@ readAPIs = mapM readAPI . T.splitOn "," . T.pack
where readAPI si = case T.toUpper $ T.strip si of
"METADATA" -> Right METADATA
"GRAPHQL" -> Right GRAPHQL
"DEVELOPER" -> Right DEVELOPER
_ -> Left "Only expecting list of comma separated API types metadata / graphql"

parseWebHook :: Parser RawAuthHook
Expand Down Expand Up @@ -796,14 +804,14 @@ parseMxBatchSize =
mxRefetchDelayEnv :: (String, String)
mxRefetchDelayEnv =
( "HASURA_GRAPHQL_LIVE_QUERIES_MULTIPLEXED_REFETCH_INTERVAL"
, "results will only be sent once in this interval (in milliseconds) for \
, "results will only be sent once in this interval (in milliseconds) for \\
\live queries which can be multiplexed. Default: 1000 (1sec)"
)

mxBatchSizeEnv :: (String, String)
mxBatchSizeEnv =
( "HASURA_GRAPHQL_LIVE_QUERIES_MULTIPLEXED_BATCH_SIZE"
, "multiplexed live queries are split into batches of the specified \
, "multiplexed live queries are split into batches of the specified \\
\size. Default 100. "
)

Expand All @@ -819,7 +827,7 @@ parseFallbackRefetchInt =
fallbackRefetchDelayEnv :: (String, String)
fallbackRefetchDelayEnv =
( "HASURA_GRAPHQL_LIVE_QUERIES_FALLBACK_REFETCH_INTERVAL"
, "results will only be sent once in this interval (in milliseconds) for \
, "results will only be sent once in this interval (in milliseconds) for \\
\live queries which cannot be multiplexed. Default: 1000 (1sec)"
)

Expand Down