这是indexloc提供的服务,不要输入任何密码
Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 84 additions & 60 deletions server/src-lib/Hasura/Events/Lib.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@ import Control.Monad.STM (STM, atomically, retry)
import Data.Aeson
import Data.Aeson.Casing
import Data.Aeson.TH
import Data.Either (isLeft)
import Data.Has
import Data.Int (Int64)
import Data.IORef (IORef, readIORef)
import Data.Time.Clock
import Hasura.Events.HTTP
import Hasura.Prelude
import Hasura.RQL.Types
import Hasura.SQL.Types

import qualified Control.Concurrent.STM.TQueue as TQ
import qualified Control.Retry as R
import qualified Data.ByteString.Lazy as B
import qualified Data.HashMap.Strict as M
import qualified Data.TByteString as TBS
Expand Down Expand Up @@ -152,49 +151,57 @@ processEvent
=> Q.PGPool -> Event -> m ()
processEvent pool e = do
(logger:: HLogger) <- asks getter
retryPolicy <- getRetryPolicy e
res <- R.retrying retryPolicy shouldRetry $ tryWebhook pool e
liftIO $ either (errorFn logger) (void.return) res
unlockRes <- liftIO $ runExceptT $ runUnlockQ pool e
liftIO $ either (logQErr logger) (void.return ) unlockRes
res <- tryWebhook pool e
finally <- either errorFn successFn res
liftIO $ either (logQErr logger) (void.return) finally
where
shouldRetry :: (Monad m ) => R.RetryStatus -> Either HTTPErr a -> m Bool
shouldRetry _ eitherResp = return $ isLeft eitherResp

errorFn :: HLogger -> HTTPErr -> IO ()
errorFn logger err = do
logger $ L.toEngineLog err
errorRes <- runExceptT $ runErrorQ pool e
case errorRes of
Left err' -> logQErr logger err'
Right _ -> return ()
errorFn
:: ( MonadReader r m
, MonadIO m
, Has WS.Session r
, Has HLogger r
, Has CacheRef r
, Has EventEngineCtx r
)
=> HTTPErr -> m (Either QErr ())
errorFn err = do
(logger:: HLogger) <- asks getter
liftIO $ logger $ L.toEngineLog err
checkError

successFn
:: ( MonadReader r m
, MonadIO m
, Has WS.Session r
, Has HLogger r
, Has CacheRef r
, Has EventEngineCtx r
)
=> B.ByteString -> m (Either QErr ())
successFn _ = liftIO $ runExceptT $ runUnlockQ pool e

logQErr :: HLogger -> QErr -> IO ()
logQErr logger err = logger $ L.toEngineLog $ EventInternalErr err

getRetryPolicy
:: ( MonadReader r m
, MonadIO m
, Has WS.Session r
, Has HLogger r
, Has CacheRef r
, Has EventEngineCtx r
)
=> Event -> m (R.RetryPolicyM m)
getRetryPolicy e = do
cacheRef::CacheRef <- asks getter
(cache, _) <- liftIO $ readIORef cacheRef
let eti = getEventTriggerInfoFromEvent cache e
retryConfM = etiRetryConf <$> eti
retryConf = fromMaybe (RetryConf 0 10) retryConfM

let remainingRetries = max 0 $ fromIntegral (rcNumRetries retryConf) - getTries
delay = fromIntegral (rcIntervalSec retryConf) * 1000000
policy = R.constantDelay delay <> R.limitRetries remainingRetries
return policy
where
getTries :: Int
getTries = fromIntegral $ eTries e
checkError
:: ( MonadReader r m
, MonadIO m
, Has WS.Session r
, Has HLogger r
, Has CacheRef r
, Has EventEngineCtx r
)
=> m (Either QErr ())
checkError = do
cacheRef::CacheRef <- asks getter
(cache, _) <- liftIO $ readIORef cacheRef
let eti = getEventTriggerInfoFromEvent cache e
retryConfM = etiRetryConf <$> eti
retryConf = fromMaybe (RetryConf 0 10) retryConfM
tries = eTries e
if tries >= rcNumRetries retryConf -- current_try = tries + 1 , allowed_total_tries = rcNumRetries retryConf + 1
then liftIO $ runExceptT $ runErrorAndUnlockQ pool e
else liftIO $ runExceptT $ runRetryAfterAndUnlockQ pool e retryConf

tryWebhook
:: ( MonadReader r m
Expand All @@ -204,16 +211,16 @@ tryWebhook
, Has CacheRef r
, Has EventEngineCtx r
)
=> Q.PGPool -> Event -> R.RetryStatus -> m (Either HTTPErr B.ByteString)
tryWebhook pool e _ = do
=> Q.PGPool -> Event -> m (Either HTTPErr B.ByteString)
tryWebhook pool e = do
logger:: HLogger <- asks getter
cacheRef::CacheRef <- asks getter
(cache, _) <- liftIO $ readIORef cacheRef
let eti = getEventTriggerInfoFromEvent cache e
case eti of
let meti = getEventTriggerInfoFromEvent cache e
case meti of
Nothing -> return $ Left $ HOther "table or event-trigger not found"
Just et -> do
let webhook = etiWebhook et
Just eti -> do
let webhook = etiWebhook eti
createdAt = eCreatedAt e
eventId = eId e
eeCtx <- asks getter
Expand Down Expand Up @@ -259,7 +266,7 @@ fetchEvents =
FROM hdb_catalog.event_log l
JOIN hdb_catalog.event_triggers e
ON (l.trigger_id = e.id)
WHERE l.delivered ='f' and l.error = 'f' and l.locked = 'f'
WHERE l.delivered ='f' and l.error = 'f' and l.locked = 'f' and (l.next_retry_at is NULL or l.next_retry_at <= now())
LIMIT 100 )
RETURNING id, schema_name, table_name, trigger_id, trigger_name, payload::json, tries, created_at
|] () True
Expand Down Expand Up @@ -293,14 +300,6 @@ markError e =
WHERE id = $1
|] (Identity $ eId e) True

-- lockEvent :: Event -> Q.TxE QErr ()
-- lockEvent e =
-- Q.unitQE defaultTxErrorHandler [Q.sql|
-- UPDATE hdb_catalog.event_log
-- SET locked = 't'
-- WHERE id = $1
-- |] (Identity $ eId e) True

unlockEvent :: Event -> Q.TxE QErr ()
unlockEvent e =
Q.unitQE defaultTxErrorHandler [Q.sql|
Expand All @@ -316,19 +315,44 @@ unlockAllEvents =
SET locked = 'f'
|] () False

setNextRetry :: Event -> UTCTime -> Q.TxE QErr ()
setNextRetry e time =
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET next_retry_at = $1
WHERE id = $2
|] (time, eId e) True

clearNextRetry :: Event -> Q.TxE QErr ()
clearNextRetry e =
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET next_retry_at = NULL
WHERE id = $1
|] (Identity $ eId e) True

runFailureQ :: Q.PGPool -> Invocation -> ExceptT QErr IO ()
runFailureQ pool invo = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ insertInvocation invo

runSuccessQ :: Q.PGPool -> Event -> Invocation -> ExceptT QErr IO ()
runSuccessQ pool e invo = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do
insertInvocation invo
clearNextRetry e
markDelivered e

runErrorQ :: Q.PGPool -> Event -> ExceptT QErr IO ()
runErrorQ pool e = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ markError e

-- runLockQ :: Q.PGPool -> Event -> ExceptT QErr IO ()
-- runLockQ pool e = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ lockEvent e
runErrorAndUnlockQ :: Q.PGPool -> Event -> ExceptT QErr IO ()
runErrorAndUnlockQ pool e = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do
markError e
clearNextRetry e
unlockEvent e

runRetryAfterAndUnlockQ :: Q.PGPool -> Event -> RetryConf -> ExceptT QErr IO ()
runRetryAfterAndUnlockQ pool e rconf = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ do
currentTime <- liftIO getCurrentTime
let diff = fromIntegral $ rcIntervalSec rconf
retryTime = addUTCTime diff currentTime
setNextRetry e retryTime
unlockEvent e

runUnlockQ :: Q.PGPool -> Event -> ExceptT QErr IO ()
runUnlockQ pool e = Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) $ unlockEvent e
2 changes: 1 addition & 1 deletion server/src-lib/Hasura/RQL/DDL/Subscribe.hs
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,4 @@ instance HDBQuery DeliverEventQuery where
type Phase1Res DeliverEventQuery = ()
phaseOne _ = adminOnly
phaseTwo q _ = deliverEvent q
schemaCachePolicy = SCPReload
schemaCachePolicy = SCPNoChange
3 changes: 2 additions & 1 deletion server/src-rsr/initialise.sql
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ CREATE TABLE hdb_catalog.event_log
error BOOLEAN NOT NULL DEFAULT FALSE,
tries INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMP DEFAULT NOW(),
locked BOOLEAN NOT NULL DEFAULT FALSE
locked BOOLEAN NOT NULL DEFAULT FALSE,
next_retry_at TIMESTAMP
);

CREATE TABLE hdb_catalog.event_invocation_logs
Expand Down