这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/src-exec/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ main = do
void $ C.forkIO $ checkForUpdates loggerCtx httpManager

maxEvThrds <- getFromEnv defaultMaxEventThreads "HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE"
evPollSec <- getFromEnv defaultPollingIntervalSec "HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL"
evFetchMilliSec <- getFromEnv defaultFetchIntervalMilliSec "HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL"
logEnvHeaders <- getFromEnv False "LOG_HEADERS_FROM_ENV"

eventEngineCtx <- atomically $ initEventEngineCtx maxEvThrds evPollSec
eventEngineCtx <- atomically $ initEventEngineCtx maxEvThrds evFetchMilliSec
httpSession <- WrqS.newSessionControl Nothing TLS.tlsManagerSettings

void $ C.forkIO $ processEventQueue hloggerCtx logEnvHeaders httpSession pool cacheRef eventEngineCtx
Expand Down
30 changes: 15 additions & 15 deletions server/src-lib/Hasura/Events/Lib.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Hasura.Events.Lib
, processEventQueue
, unlockAllEvents
, defaultMaxEventThreads
, defaultPollingIntervalSec
, defaultFetchIntervalMilliSec
, Event(..)
) where

Expand Down Expand Up @@ -129,45 +129,45 @@ data Invocation

data EventEngineCtx
= EventEngineCtx
{ _eeCtxEventQueue :: TQ.TQueue Event
, _eeCtxEventThreads :: TVar Int
, _eeCtxMaxEventThreads :: Int
, _eeCtxPollingIntervalSec :: Int
{ _eeCtxEventQueue :: TQ.TQueue Event
, _eeCtxEventThreads :: TVar Int
, _eeCtxMaxEventThreads :: Int
, _eeCtxFetchIntervalMilliSec :: Int
}

defaultMaxEventThreads :: Int
defaultMaxEventThreads = 100

defaultPollingIntervalSec :: Int
defaultPollingIntervalSec = 1
defaultFetchIntervalMilliSec :: Int
defaultFetchIntervalMilliSec = 1000

retryAfterHeader :: CI.CI T.Text
retryAfterHeader = "Retry-After"

initEventEngineCtx :: Int -> Int -> STM EventEngineCtx
initEventEngineCtx maxT pollI = do
initEventEngineCtx maxT fetchI = do
q <- TQ.newTQueue
c <- newTVar 0
return $ EventEngineCtx q c maxT pollI
return $ EventEngineCtx q c maxT fetchI

processEventQueue :: L.LoggerCtx -> LogEnvHeaders -> WS.Session -> Q.PGPool -> CacheRef -> EventEngineCtx -> IO ()
processEventQueue logctx logenv httpSess pool cacheRef eectx = do
putStrLn "event_trigger: starting workers"
threads <- mapM async [pollThread , consumeThread]
threads <- mapM async [fetchThread , consumeThread]
void $ waitAny threads
where
pollThread = pollEvents (mkHLogger logctx) pool eectx
fetchThread = pushEvents (mkHLogger logctx) pool eectx
consumeThread = consumeEvents (mkHLogger logctx) logenv httpSess pool cacheRef eectx

pollEvents
pushEvents
:: HLogger -> Q.PGPool -> EventEngineCtx -> IO ()
pollEvents logger pool eectx = forever $ do
let EventEngineCtx q _ _ pollI = eectx
pushEvents logger pool eectx = forever $ do
let EventEngineCtx q _ _ fetchI = eectx
eventsOrError <- runExceptT $ Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) fetchEvents
case eventsOrError of
Left err -> logger $ L.toEngineLog $ EventInternalErr err
Right events -> atomically $ mapM_ (TQ.writeTQueue q) events
threadDelay (pollI * 1000 * 1000)
threadDelay (fetchI * 1000)

consumeEvents
:: HLogger -> LogEnvHeaders -> WS.Session -> Q.PGPool -> CacheRef -> EventEngineCtx -> IO ()
Expand Down