diff --git a/server/src-lib/Hasura/Events/Lib.hs b/server/src-lib/Hasura/Events/Lib.hs index f7de232dc533f..db6fa2e636037 100644 --- a/server/src-lib/Hasura/Events/Lib.hs +++ b/server/src-lib/Hasura/Events/Lib.hs @@ -20,17 +20,16 @@ import Control.Monad.STM (STM, atomically, retry) import Data.Aeson import Data.Aeson.Casing import Data.Aeson.TH -import Data.Either (isLeft) import Data.Has import Data.Int (Int64) import Data.IORef (IORef, readIORef) +import Data.Time.Clock import Hasura.Events.HTTP import Hasura.Prelude import Hasura.RQL.Types import Hasura.SQL.Types import qualified Control.Concurrent.STM.TQueue as TQ -import qualified Control.Retry as R import qualified Data.ByteString.Lazy as B import qualified Data.HashMap.Strict as M import qualified Data.TByteString as TBS @@ -152,49 +151,57 @@ processEvent => Q.PGPool -> Event -> m () processEvent pool e = do (logger:: HLogger) <- asks getter - retryPolicy <- getRetryPolicy e - res <- R.retrying retryPolicy shouldRetry $ tryWebhook pool e - liftIO $ either (errorFn logger) (void.return) res - unlockRes <- liftIO $ runExceptT $ runUnlockQ pool e - liftIO $ either (logQErr logger) (void.return ) unlockRes + res <- tryWebhook pool e + finally <- either errorFn successFn res + liftIO $ either (logQErr logger) (void.return) finally where - shouldRetry :: (Monad m ) => R.RetryStatus -> Either HTTPErr a -> m Bool - shouldRetry _ eitherResp = return $ isLeft eitherResp - - errorFn :: HLogger -> HTTPErr -> IO () - errorFn logger err = do - logger $ L.toEngineLog err - errorRes <- runExceptT $ runErrorQ pool e - case errorRes of - Left err' -> logQErr logger err' - Right _ -> return () + errorFn + :: ( MonadReader r m + , MonadIO m + , Has WS.Session r + , Has HLogger r + , Has CacheRef r + , Has EventEngineCtx r + ) + => HTTPErr -> m (Either QErr ()) + errorFn err = do + (logger:: HLogger) <- asks getter + liftIO $ logger $ L.toEngineLog err + checkError + + successFn + :: ( MonadReader r m + , MonadIO m + , Has WS.Session r + , Has HLogger r + , Has CacheRef r + , Has EventEngineCtx r + ) + => B.ByteString -> m (Either QErr ()) + successFn _ = liftIO $ runExceptT $ runUnlockQ pool e logQErr :: HLogger -> QErr -> IO () logQErr logger err = logger $ L.toEngineLog $ EventInternalErr err -getRetryPolicy - :: ( MonadReader r m - , MonadIO m - , Has WS.Session r - , Has HLogger r - , Has CacheRef r - , Has EventEngineCtx r - ) - => Event -> m (R.RetryPolicyM m) -getRetryPolicy e = do - cacheRef::CacheRef <- asks getter - (cache, _) <- liftIO $ readIORef cacheRef - let eti = getEventTriggerInfoFromEvent cache e - retryConfM = etiRetryConf <$> eti - retryConf = fromMaybe (RetryConf 0 10) retryConfM - - let remainingRetries = max 0 $ fromIntegral (rcNumRetries retryConf) - getTries - delay = fromIntegral (rcIntervalSec retryConf) * 1000000 - policy = R.constantDelay delay <> R.limitRetries remainingRetries - return policy - where - getTries :: Int - getTries = fromIntegral $ eTries e + checkError + :: ( MonadReader r m + , MonadIO m + , Has WS.Session r + , Has HLogger r + , Has CacheRef r + , Has EventEngineCtx r + ) + => m (Either QErr ()) + checkError = do + cacheRef::CacheRef <- asks getter + (cache, _) <- liftIO $ readIORef cacheRef + let eti = getEventTriggerInfoFromEvent cache e + retryConfM = etiRetryConf <$> eti + retryConf = fromMaybe (RetryConf 0 10) retryConfM + tries = eTries e + if tries >= rcNumRetries retryConf -- current_try = tries + 1 , allowed_total_tries = rcNumRetries retryConf + 1 + then liftIO $ runExceptT $ runErrorAndUnlockQ pool e + else liftIO $ runExceptT $ runRetryAfterAndUnlockQ pool e retryConf tryWebhook :: ( MonadReader r m @@ -204,16 +211,16 @@ tryWebhook , Has CacheRef r , Has EventEngineCtx r ) - => Q.PGPool -> Event -> R.RetryStatus -> m (Either HTTPErr B.ByteString) -tryWebhook pool e _ = do + => Q.PGPool -> Event -> m (Either HTTPErr B.ByteString) +tryWebhook pool e = do logger:: HLogger <- asks getter cacheRef::CacheRef <- asks getter (cache, _) <- liftIO $ readIORef cacheRef - let eti = getEventTriggerInfoFromEvent cache e - case eti of + let meti = getEventTriggerInfoFromEvent cache e + case meti of Nothing -> return $ Left $ HOther "table or event-trigger not found" - Just et -> do - let webhook = etiWebhook et + Just eti -> do + let webhook = etiWebhook eti createdAt = eCreatedAt e eventId = eId e eeCtx <- asks getter @@ -259,7 +266,7 @@ fetchEvents = FROM hdb_catalog.event_log l JOIN hdb_catalog.event_triggers e ON (l.trigger_id = e.id) - WHERE l.delivered ='f' and l.error = 'f' and l.locked = 'f' + WHERE l.delivered ='f' and l.error = 'f' and l.locked = 'f' and (l.next_retry_at is NULL or l.next_retry_at <= now()) LIMIT 100 ) RETURNING id, schema_name, table_name, trigger_id, trigger_name, payload::json, tries, created_at |] () True @@ -293,14 +300,6 @@ markError e = WHERE id = $1 |] (Identity $ eId e) True --- lockEvent :: Event -> Q.TxE QErr () --- lockEvent e = --- Q.unitQE defaultTxErrorHandler [Q.sql| --- UPDATE hdb_catalog.event_log --- SET locked = 't' --- WHERE id = $1 --- |] (Identity $ eId e) True - unlockEvent :: Event -> Q.TxE QErr () unlockEvent e = Q.unitQE defaultTxErrorHandler [Q.sql| @@ -316,19 +315,44 @@ unlockAllEvents = SET locked = 'f' |] () False +setNextRetry :: Event -> UTCTime -> Q.TxE QErr () +setNextRetry e time = + Q.unitQE defaultTxErrorHandler [Q.sql| + UPDATE hdb_catalog.event_log + SET next_retry_at = $1 + WHERE id = $2 + |] (time, eId e) True + +clearNextRetry :: Event -> Q.TxE QErr () +clearNextRetry e = + Q.unitQE defaultTxErrorHandler [Q.sql| + UPDATE hdb_catalog.event_log + SET next_retry_at = NULL + WHERE id = $1 + |] (Identity $ eId e) True + runFailureQ :: Q.PGPool -> Invocation -> ExceptT QErr IO () runFailureQ pool invo = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ insertInvocation invo runSuccessQ :: Q.PGPool -> Event -> Invocation -> ExceptT QErr IO () runSuccessQ pool e invo = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do insertInvocation invo + clearNextRetry e markDelivered e -runErrorQ :: Q.PGPool -> Event -> ExceptT QErr IO () -runErrorQ pool e = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ markError e - --- runLockQ :: Q.PGPool -> Event -> ExceptT QErr IO () --- runLockQ pool e = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ lockEvent e +runErrorAndUnlockQ :: Q.PGPool -> Event -> ExceptT QErr IO () +runErrorAndUnlockQ pool e = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do + markError e + clearNextRetry e + unlockEvent e + +runRetryAfterAndUnlockQ :: Q.PGPool -> Event -> RetryConf -> ExceptT QErr IO () +runRetryAfterAndUnlockQ pool e rconf = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do + currentTime <- liftIO getCurrentTime + let diff = fromIntegral $ rcIntervalSec rconf + retryTime = addUTCTime diff currentTime + setNextRetry e retryTime + unlockEvent e runUnlockQ :: Q.PGPool -> Event -> ExceptT QErr IO () runUnlockQ pool e = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ unlockEvent e diff --git a/server/src-lib/Hasura/RQL/DDL/Subscribe.hs b/server/src-lib/Hasura/RQL/DDL/Subscribe.hs index 066806100d81e..4f16740ccb041 100644 --- a/server/src-lib/Hasura/RQL/DDL/Subscribe.hs +++ b/server/src-lib/Hasura/RQL/DDL/Subscribe.hs @@ -237,4 +237,4 @@ instance HDBQuery DeliverEventQuery where type Phase1Res DeliverEventQuery = () phaseOne _ = adminOnly phaseTwo q _ = deliverEvent q - schemaCachePolicy = SCPReload + schemaCachePolicy = SCPNoChange diff --git a/server/src-rsr/initialise.sql b/server/src-rsr/initialise.sql index 955a67624566e..52f66fd2e332d 100644 --- a/server/src-rsr/initialise.sql +++ b/server/src-rsr/initialise.sql @@ -210,7 +210,8 @@ CREATE TABLE hdb_catalog.event_log error BOOLEAN NOT NULL DEFAULT FALSE, tries INTEGER NOT NULL DEFAULT 0, created_at TIMESTAMP DEFAULT NOW(), - locked BOOLEAN NOT NULL DEFAULT FALSE + locked BOOLEAN NOT NULL DEFAULT FALSE, + next_retry_at TIMESTAMP ); CREATE TABLE hdb_catalog.event_invocation_logs