diff --git a/server/src-exec/Main.hs b/server/src-exec/Main.hs index 3ba8054e7ea3e..d0018e4d0efba 100644 --- a/server/src-exec/Main.hs +++ b/server/src-exec/Main.hs @@ -169,10 +169,10 @@ main = do void $ C.forkIO $ checkForUpdates loggerCtx httpManager maxEvThrds <- getFromEnv defaultMaxEventThreads "HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE" - evPollSec <- getFromEnv defaultPollingIntervalSec "HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL" + evFetchMilliSec <- getFromEnv defaultFetchIntervalMilliSec "HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL" logEnvHeaders <- getFromEnv False "LOG_HEADERS_FROM_ENV" - eventEngineCtx <- atomically $ initEventEngineCtx maxEvThrds evPollSec + eventEngineCtx <- atomically $ initEventEngineCtx maxEvThrds evFetchMilliSec httpSession <- WrqS.newSessionControl Nothing TLS.tlsManagerSettings void $ C.forkIO $ processEventQueue hloggerCtx logEnvHeaders httpSession pool cacheRef eventEngineCtx diff --git a/server/src-lib/Hasura/Events/Lib.hs b/server/src-lib/Hasura/Events/Lib.hs index 47cf19b6b915c..6d9d55e479076 100644 --- a/server/src-lib/Hasura/Events/Lib.hs +++ b/server/src-lib/Hasura/Events/Lib.hs @@ -9,7 +9,7 @@ module Hasura.Events.Lib , processEventQueue , unlockAllEvents , defaultMaxEventThreads - , defaultPollingIntervalSec + , defaultFetchIntervalMilliSec , Event(..) ) where @@ -129,45 +129,45 @@ data Invocation data EventEngineCtx = EventEngineCtx - { _eeCtxEventQueue :: TQ.TQueue Event - , _eeCtxEventThreads :: TVar Int - , _eeCtxMaxEventThreads :: Int - , _eeCtxPollingIntervalSec :: Int + { _eeCtxEventQueue :: TQ.TQueue Event + , _eeCtxEventThreads :: TVar Int + , _eeCtxMaxEventThreads :: Int + , _eeCtxFetchIntervalMilliSec :: Int } defaultMaxEventThreads :: Int defaultMaxEventThreads = 100 -defaultPollingIntervalSec :: Int -defaultPollingIntervalSec = 1 +defaultFetchIntervalMilliSec :: Int +defaultFetchIntervalMilliSec = 1000 retryAfterHeader :: CI.CI T.Text retryAfterHeader = "Retry-After" initEventEngineCtx :: Int -> Int -> STM EventEngineCtx -initEventEngineCtx maxT pollI = do +initEventEngineCtx maxT fetchI = do q <- TQ.newTQueue c <- newTVar 0 - return $ EventEngineCtx q c maxT pollI + return $ EventEngineCtx q c maxT fetchI processEventQueue :: L.LoggerCtx -> LogEnvHeaders -> WS.Session -> Q.PGPool -> CacheRef -> EventEngineCtx -> IO () processEventQueue logctx logenv httpSess pool cacheRef eectx = do putStrLn "event_trigger: starting workers" - threads <- mapM async [pollThread , consumeThread] + threads <- mapM async [fetchThread , consumeThread] void $ waitAny threads where - pollThread = pollEvents (mkHLogger logctx) pool eectx + fetchThread = pushEvents (mkHLogger logctx) pool eectx consumeThread = consumeEvents (mkHLogger logctx) logenv httpSess pool cacheRef eectx -pollEvents +pushEvents :: HLogger -> Q.PGPool -> EventEngineCtx -> IO () -pollEvents logger pool eectx = forever $ do - let EventEngineCtx q _ _ pollI = eectx +pushEvents logger pool eectx = forever $ do + let EventEngineCtx q _ _ fetchI = eectx eventsOrError <- runExceptT $ Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) fetchEvents case eventsOrError of Left err -> logger $ L.toEngineLog $ EventInternalErr err Right events -> atomically $ mapM_ (TQ.writeTQueue q) events - threadDelay (pollI * 1000 * 1000) + threadDelay (fetchI * 1000) consumeEvents :: HLogger -> LogEnvHeaders -> WS.Session -> Q.PGPool -> CacheRef -> EventEngineCtx -> IO ()