From 81022ada7bd45b60dc3061f60b948ab685c93a1a Mon Sep 17 00:00:00 2001 From: Karthikeyan Chinnakonda Date: Wed, 1 Apr 2020 19:21:12 +0530 Subject: [PATCH 1/4] unlock the locked-events during graceful shutdown * Some events can still be delivered multiple times due to ungraceful shutdown --- server/src-lib/Hasura/App.hs | 66 +++++++++++++++++++------- server/src-lib/Hasura/Events/Lib.hs | 73 ++++++++++++++++++++++++----- 2 files changed, 109 insertions(+), 30 deletions(-) diff --git a/server/src-lib/Hasura/App.hs b/server/src-lib/Hasura/App.hs index 2b863047f6323..f23ee0fb0abb8 100644 --- a/server/src-lib/Hasura/App.hs +++ b/server/src-lib/Hasura/App.hs @@ -6,6 +6,7 @@ module Hasura.App where import Control.Monad.Base import Control.Monad.Stateless import Control.Monad.STM (atomically) +import Control.Concurrent.STM.TVar (readTVarIO) import Control.Monad.Trans.Control (MonadBaseControl (..)) import Data.Aeson ((.=)) import Data.Time.Clock (UTCTime, getCurrentTime) @@ -28,6 +29,7 @@ import qualified Network.HTTP.Client.TLS as HTTP import qualified Network.Wai.Handler.Warp as Warp import qualified System.Posix.Signals as Signals import qualified Text.Mustache.Compile as M +import qualified Data.Set as Set import Hasura.Db import Hasura.EncJSON @@ -204,7 +206,7 @@ runHGEServer -- ^ start time -> m () runHGEServer ServeOptions{..} InitCtx{..} initTime = do - -- Comment this to enable expensive assertions from "GHC.AssertNF". These will log lines to + -- Comment this to enable expensive assertions from "GHC.AssertNF". These will log lines to -- STDOUT containing "not in normal form". In the future we could try to integrate this into -- our tests. For now this is a development tool. -- @@ -241,16 +243,10 @@ runHGEServer ServeOptions{..} InitCtx{..} initTime = do liftIO $ logInconsObjs logger inconsObjs -- start background threads for schema sync - (_schemaSyncListenerThread, _schemaSyncProcessorThread) <- + (_schemaSyncListenerThread, _schemaSyncProcessorThread) <- startSchemaSyncThreads sqlGenCtx _icPgPool logger _icHttpManager cacheRef _icInstanceId cacheInitTime - let warpSettings = Warp.setPort soPort - . Warp.setHost soHost - . Warp.setGracefulShutdownTimeout (Just 30) -- 30s graceful shutdown - . Warp.setInstallShutdownHandler (shutdownHandler logger shutdownApp) - $ Warp.defaultSettings - maxEvThrds <- liftIO $ getFromEnv defaultMaxEventThreads "HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE" fetchI <- fmap milliseconds $ liftIO $ getFromEnv defaultFetchIntervalMilliSec "HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL" @@ -269,27 +265,60 @@ runHGEServer ServeOptions{..} InitCtx{..} initTime = do asyncActionsProcessor (_scrCache cacheRef) _icPgPool _icHttpManager -- start a background thread to check for updates - _updateThread <- C.forkImmortal "checkForUpdates" logger $ liftIO $ + _updateThread <- C.forkImmortal "checkForUpdates" logger $ liftIO $ checkForUpdates loggerCtx _icHttpManager -- start a background thread for telemetry when soEnableTelemetry $ do unLogger logger $ mkGenericStrLog LevelInfo "telemetry" telemetryNotice - void $ C.forkImmortal "runTelemetry" logger $ liftIO $ + void $ C.forkImmortal "runTelemetry" logger $ liftIO $ runTelemetry logger _icHttpManager (getSCFromRef cacheRef) _icDbUid _icInstanceId finishTime <- liftIO Clock.getCurrentTime let apiInitTime = realToFrac $ Clock.diffUTCTime finishTime initTime unLogger logger $ mkGenericLog LevelInfo "server" $ StartupTimeInfo "starting API server" apiInitTime + let warpSettings = Warp.setPort soPort + . Warp.setHost soHost + . Warp.setGracefulShutdownTimeout (Just 30) -- 30s graceful shutdown + . Warp.setInstallShutdownHandler (shutdownHandler logger shutdownApp eventEngineCtx _icPgPool) + $ Warp.defaultSettings liftIO $ Warp.runSettings warpSettings app where + -- | prepareEvents is a function to unlock all the events that are + -- locked and unprocessed. Locked and unprocessed events can occur in 2 ways + -- 1) + -- hasura's shutdown was not graceful in which all the fetched + -- events will remain locked and unprocessed(TODO: clean shutdown) + -- state. + -- 2) + -- There is another hasura instance which is processing events and + -- it will lock events to process them. + -- So, unlocking all the locked events might re-deliver an event(due to #2). prepareEvents pool (Logger logger) = do liftIO $ logger $ mkGenericStrLog LevelInfo "event_triggers" "preparing data" - res <- runTx pool unlockAllEvents + res <- liftIO $ runTx pool (Q.Serializable, Nothing) unlockAllEvents either printErrJExit return res + -- | shutdownEvents will be triggered when a graceful shutdown has been inititiated, it will + -- get the locked events from the event engine context and then it will unlock all those events. + -- It may happen that an event may be processed more than one time, an event that has been already + -- processed but not been marked as delivered in the db will be unlocked by `shutdownEvents` + -- and will be processed when the events are proccessed next time. + shutdownEvents :: Q.PGPool -> Logger Hasura -> EventEngineCtx -> IO () + shutdownEvents pool (Logger logger) EventEngineCtx {..} = do + liftIO $ logger $ mkGenericStrLog LevelInfo "event_triggers" "unlocking events that are locked by the HGE" + lockedEvents <- readTVarIO _eeCtxLockedEvents + liftIO $ do + when (not $ Set.null $ lockedEvents) $ do + res <- runTx pool (Q.ReadCommitted, Nothing) (unlockEvents $ toList lockedEvents) + case res of + Left err -> logger $ mkGenericStrLog + LevelWarn "event_triggers" ("Error in unlocking the events " ++ (show err)) + Right count -> logger $ mkGenericStrLog + LevelInfo "event_triggers" ((show count) ++ " events were updated") + getFromEnv :: (Read a) => a -> String -> IO a getFromEnv defaults env = do mEnv <- lookupEnv env @@ -299,21 +328,24 @@ runHGEServer ServeOptions{..} InitCtx{..} initTime = do eRes = maybe (Left $ "Wrong expected type for environment variable: " <> env) Right mRes either printErrExit return eRes - runTx pool tx = - liftIO $ runExceptT $ Q.runTx pool (Q.Serializable, Nothing) tx + runTx :: Q.PGPool -> Q.TxMode -> Q.TxE QErr a -> IO (Either QErr a) + runTx pool txLevel tx = + liftIO $ runExceptT $ Q.runTx pool txLevel tx -- | Catches the SIGTERM signal and initiates a graceful shutdown. Graceful shutdown for regular HTTP -- requests is already implemented in Warp, and is triggered by invoking the 'closeSocket' callback. - -- We only catch the SIGTERM signal once, that is, if the user hits CTRL-C once again, we terminate - -- the process immediately. - shutdownHandler :: Logger Hasura -> IO () -> IO () -> IO () - shutdownHandler (Logger logger) shutdownApp closeSocket = + -- We only catch the SIGTERM signal once, that is, if we catch another SIGTERM signal then the process + -- is terminated immediately. + -- If the user hits CTRL-C (SIGINT), then the process is terminated immediately + shutdownHandler :: Logger Hasura -> IO () -> EventEngineCtx -> Q.PGPool -> IO () -> IO () + shutdownHandler (Logger logger) shutdownApp eeCtx pool closeSocket = void $ Signals.installHandler Signals.sigTERM (Signals.CatchOnce shutdownSequence) Nothing where shutdownSequence = do + shutdownEvents pool (Logger logger) eeCtx closeSocket shutdownApp logShutdown diff --git a/server/src-lib/Hasura/Events/Lib.hs b/server/src-lib/Hasura/Events/Lib.hs index 67ae03985680d..9eb221292a38d 100644 --- a/server/src-lib/Hasura/Events/Lib.hs +++ b/server/src-lib/Hasura/Events/Lib.hs @@ -7,6 +7,8 @@ module Hasura.Events.Lib , defaultMaxEventThreads , defaultFetchIntervalMilliSec , Event(..) + , unlockEvents + , EventEngineCtx(..) ) where import Control.Concurrent.Extended (sleep) @@ -30,6 +32,10 @@ import Hasura.RQL.Types import Hasura.Server.Version (HasVersion) import Hasura.SQL.Types +-- remove these when array encoding is merged +import qualified Database.PG.Query.PTI as PTI +import qualified PostgreSQL.Binary.Encoding as PE + import qualified Data.ByteString as BS import qualified Data.CaseInsensitive as CI import qualified Data.HashMap.Strict as M @@ -43,6 +49,7 @@ import qualified Database.PG.Query as Q import qualified Hasura.Logging as L import qualified Network.HTTP.Client as HTTP import qualified Network.HTTP.Types as HTTP +import qualified Data.Set as Set type Version = T.Text @@ -74,7 +81,7 @@ $(deriveJSON (aesonDrop 2 snakeCase){omitNothingFields=True} ''DeliveryInfo) -- | Change data for a particular row -- --- https://docs.hasura.io/1.0/graphql/manual/event-triggers/payload.html +-- https://docs.hasura.io/1.0/graphql/manual/event-triggers/payload.html data Event = Event { eId :: EventId @@ -155,6 +162,7 @@ data EventEngineCtx = EventEngineCtx { _eeCtxEventThreadsCapacity :: TVar Int , _eeCtxFetchInterval :: DiffTime + , _eeCtxLockedEvents :: TVar (Set.Set EventId) } defaultMaxEventThreads :: Int @@ -169,6 +177,7 @@ retryAfterHeader = "Retry-After" initEventEngineCtx :: Int -> DiffTime -> STM EventEngineCtx initEventEngineCtx maxT _eeCtxFetchInterval = do _eeCtxEventThreadsCapacity <- newTVar maxT + _eeCtxLockedEvents <- newTVar Set.empty return $ EventEngineCtx{..} -- | Service events from our in-DB queue. @@ -176,13 +185,13 @@ initEventEngineCtx maxT _eeCtxFetchInterval = do -- There are a few competing concerns and constraints here; we want to... -- - fetch events in batches for lower DB pressure -- - don't fetch more than N at a time (since that can mean: space leak, less --- effective scale out, possible double sends for events we've checked out +-- effective scale out, possible double sends for events we've checked out -- on exit (TODO clean shutdown procedure)) -- - try not to cause webhook workers to stall waiting on DB fetch -- - limit webhook HTTP concurrency per HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE processEventQueue :: (HasVersion) => L.Logger L.Hasura -> LogEnvHeaders -> HTTP.Manager-> Q.PGPool - -> IO SchemaCache -> EventEngineCtx + -> IO SchemaCache -> EventEngineCtx -> IO void processEventQueue logger logenv httpMgr pool getSchemaCache EventEngineCtx{..} = do events0 <- popEventsBatch @@ -192,12 +201,25 @@ processEventQueue logger logenv httpMgr pool getSchemaCache EventEngineCtx{..} = popEventsBatch = do let run = runExceptT . Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) run (fetchEvents fetchBatchSize) >>= \case - Left err -> do + Left err -> do L.unLogger logger $ EventInternalErr err return [] - Right events -> + Right events -> do + saveLockedEvents events return events + -- After the events are fetched from the DB, we store the locked events + -- in a hash set(order doesn't matter and look ups are faster) in the + -- event engine context + saveLockedEvents :: [Event] -> IO () + saveLockedEvents evts = + liftIO $ atomically $ do + lockedEvents <- readTVar _eeCtxLockedEvents + let evtsIds = map eId evts + let newLockedEvents = Set.union lockedEvents (Set.fromList evtsIds) + writeTVar _eeCtxLockedEvents newLockedEvents + + -- work on this batch of events while prefetching the next. Recurse after we've forked workers -- for each in the batch, minding the requested pool size. go :: [Event] -> Int -> Bool -> IO void @@ -210,17 +232,22 @@ processEventQueue logger logenv httpMgr pool getSchemaCache EventEngineCtx{..} = -- worth the effort for something more fine-tuned eventsNext <- withAsync popEventsBatch $ \eventsNextA -> do -- process approximately in order, minding HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE: - forM_ events $ \event -> + forM_ events $ \event -> mask_ $ do atomically $ do -- block until < HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE threads: capacity <- readTVar _eeCtxEventThreadsCapacity check $ capacity > 0 - writeTVar _eeCtxEventThreadsCapacity $! (capacity - 1) + writeTVar _eeCtxEventThreadsCapacity $! (capacity - 1) -- since there is some capacity in our worker threads, we can launch another: - let restoreCapacity = liftIO $ atomically $ - modifyTVar' _eeCtxEventThreadsCapacity (+ 1) - t <- async $ flip runReaderT (logger, httpMgr) $ - processEvent event `finally` restoreCapacity + let restoreCapacity evt = + liftIO $ atomically $ + do + modifyTVar' _eeCtxEventThreadsCapacity (+ 1) + -- After the event has been processed, remove it from the + -- locked events cache + modifyTVar' _eeCtxLockedEvents (Set.delete (eId evt)) + t <- async $ flip runReaderT (logger, httpMgr) $ + processEvent event `finally` (restoreCapacity event) link t -- return when next batch ready; some 'processEvent' threads may be running. @@ -228,7 +255,7 @@ processEventQueue logger logenv httpMgr pool getSchemaCache EventEngineCtx{..} = let lenEvents = length events if | lenEvents == fetchBatchSize -> do - -- If we've seen N fetches in a row from the DB come back full (i.e. only limited + -- If we've seen N fetches in a row from the DB come back full (i.e. only limited -- by our LIMIT clause), then we say we're clearly falling behind: let clearlyBehind = fullFetchCount >= 3 unless alreadyWarned $ @@ -541,7 +568,27 @@ unlockAllEvents = UPDATE hdb_catalog.event_log SET locked = 'f' WHERE locked = 't' - |] () False + |] () True toInt64 :: (Integral a) => a -> Int64 toInt64 = fromIntegral + +-- EventIdArray is only used for PG array encoding +newtype EventIdArray = EventIdArray { unEventIdArray :: [EventId]} deriving (Show, Eq) + +instance Q.ToPrepArg EventIdArray where + toPrepVal (EventIdArray l) = Q.toPrepValHelper PTI.unknown encoder $ l + where + -- 25 is the OID value of TEXT, https://jdbc.postgresql.org/development/privateapi/constant-values.html + encoder = PE.array 25 . PE.dimensionArray foldl' (PE.encodingArray . PE.text_strict) + +unlockEvents :: [EventId] -> Q.TxE QErr Int +unlockEvents eventIds = + (runIdentity . Q.getRow) <$> Q.withQE defaultTxErrorHandler + [Q.sql| + WITH "cte" AS + (UPDATE hdb_catalog.event_log + SET locked = 'f' + WHERE id = ANY($1::text[]) RETURNING *) + SELECT count(*) FROM "cte" + |] (Identity $ EventIdArray eventIds) True From a1057b7de3d6c4e82a0bc8063fc4fd4498b273f6 Mon Sep 17 00:00:00 2001 From: Karthikeyan Chinnakonda Date: Wed, 1 Apr 2020 21:51:03 +0530 Subject: [PATCH 2/4] modify the preparevents documentation --- server/src-lib/Hasura/App.hs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src-lib/Hasura/App.hs b/server/src-lib/Hasura/App.hs index f23ee0fb0abb8..43e6765b7601b 100644 --- a/server/src-lib/Hasura/App.hs +++ b/server/src-lib/Hasura/App.hs @@ -287,7 +287,8 @@ runHGEServer ServeOptions{..} InitCtx{..} initTime = do where -- | prepareEvents is a function to unlock all the events that are - -- locked and unprocessed. Locked and unprocessed events can occur in 2 ways + -- locked and unprocessed, which is called while hasura is started. + -- Locked and unprocessed events can occur in 2 ways -- 1) -- hasura's shutdown was not graceful in which all the fetched -- events will remain locked and unprocessed(TODO: clean shutdown) From 5e53e06ad2984a236ecde34dac98f61306fae707 Mon Sep 17 00:00:00 2001 From: Karthikeyan Chinnakonda Date: Wed, 1 Apr 2020 22:52:06 +0530 Subject: [PATCH 3/4] modify the prepareEvents doc --- server/src-lib/Hasura/App.hs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src-lib/Hasura/App.hs b/server/src-lib/Hasura/App.hs index 43e6765b7601b..2a1c7b7fcdfd2 100644 --- a/server/src-lib/Hasura/App.hs +++ b/server/src-lib/Hasura/App.hs @@ -289,11 +289,11 @@ runHGEServer ServeOptions{..} InitCtx{..} initTime = do -- | prepareEvents is a function to unlock all the events that are -- locked and unprocessed, which is called while hasura is started. -- Locked and unprocessed events can occur in 2 ways - -- 1) - -- hasura's shutdown was not graceful in which all the fetched + -- 1. + -- Hasura's shutdown was not graceful in which all the fetched -- events will remain locked and unprocessed(TODO: clean shutdown) -- state. - -- 2) + -- 2. -- There is another hasura instance which is processing events and -- it will lock events to process them. -- So, unlocking all the locked events might re-deliver an event(due to #2). From 98d9aced6d1627fbdfeb5a70d22f66e713311c16 Mon Sep 17 00:00:00 2001 From: Karthikeyan Chinnakonda Date: Thu, 9 Apr 2020 11:05:18 +0530 Subject: [PATCH 4/4] update changelog --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 833f79bddb62c..5ebbc3b0a1190 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,6 @@ ## Next release - ### console: persist columns state in data browser The order and collapsed state of columns is now persisted across page navigation @@ -21,6 +20,8 @@ The order and collapsed state of columns is now persisted across page navigation - docs: add One-Click Render deployment guide (close #3683) (#4209) - server: reserved keywords in column references break parser (fix #3597) #3927 - server: fix postgres specific error message that exposed database type on invalid query parameters (#4294) +- server: manage inflight events when HGE instance is gracefully shutdown (close #3548) + ## `v1.2.0-beta.3`