这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
- server: shrink libpq connection request/response buffers back to 1MB if they grow beyond 2MB, fixing leak-like behavior on active servers (#5087)
- server: unlock locked scheduled events on graceful shutdown (#4928)
- server: disable prepared statements for mutations as we end up with single-use objects which result in excessive memory consumption for mutation heavy workloads (#5255)
- server: include scheduled event metadata (`created_at`,`scheduled_time`,`id`, etc) along with the configured payload in the request body to the webhook.
**WARNING:** This is breaking for beta versions as the payload is now inside a key called `payload`.
- console: allow configuring statement timeout on console RawSQL page (close #4998) (#5045)
- console: support tracking partitioned tables (close #5071) (#5258)
- console: add button to cancel one-off scheduled events and cron-trigger events (close #5161) (#5236)
Expand Down
64 changes: 48 additions & 16 deletions server/src-lib/Hasura/Eventing/ScheduledTrigger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,18 @@ data ScheduledEventType =
-- so all the configuration is fetched along the scheduled events.
deriving (Eq, Show)

data ScheduledEventWebhookPayload
= ScheduledEventWebhookPayload
{ sewpId :: !Text
, sewpName :: !(Maybe TriggerName)
, sewpScheduledTime :: !UTCTime
, sewpPayload :: !J.Value
, sewpComment :: !(Maybe Text)
, sewpCreatedAt :: !UTCTime
} deriving (Show, Eq)

$(J.deriveToJSON (J.aesonDrop 4 J.snakeCase) {J.omitNothingFields = True} ''ScheduledEventWebhookPayload)

-- | runCronEventsGenerator makes sure that all the cron triggers
-- have an adequate buffer of cron events.
runCronEventsGenerator ::
Expand Down Expand Up @@ -469,33 +481,42 @@ processScheduledEvent
httpTimeout = HTTP.responseTimeoutMicro (timeoutSeconds * 1000000)
headers = addDefaultHeaders $ map encodeHeader sefHeaders
extraLogCtx = ExtraLogContext (Just currentTime) sefId
res <- runExceptT $ tryWebhook headers httpTimeout sefPayload (T.unpack sefWebhook)
webhookReqPayload =
ScheduledEventWebhookPayload sefId sefName sefScheduledTime sefPayload sefComment currentTime
webhookReqBodyJson = J.toJSON webhookReqPayload
res <- runExceptT $ tryWebhook headers httpTimeout webhookReqBodyJson (T.unpack sefWebhook)
logHTTPForST res extraLogCtx
let decodedHeaders = map (decodeHeader logEnv sefHeaders) headers
either
(processError pgpool se decodedHeaders type')
(processSuccess pgpool se decodedHeaders type')
(processError pgpool se decodedHeaders type' webhookReqBodyJson)
(processSuccess pgpool se decodedHeaders type' webhookReqBodyJson)
res

processError
:: (MonadIO m, MonadError QErr m)
=> Q.PGPool -> ScheduledEventFull -> [HeaderConf] -> ScheduledEventType -> HTTPErr a -> m ()
processError pgpool se decodedHeaders type' err = do
=> Q.PGPool
-> ScheduledEventFull
-> [HeaderConf]
-> ScheduledEventType
-> J.Value
-> HTTPErr a
-> m ()
processError pgpool se decodedHeaders type' reqJson err = do
let invocation = case err of
HClient excp -> do
let errMsg = TBS.fromLBS $ J.encode $ show excp
mkInvocation se 1000 decodedHeaders errMsg []
mkInvocation se 1000 decodedHeaders errMsg [] reqJson
HParse _ detail -> do
let errMsg = TBS.fromLBS $ J.encode detail
mkInvocation se 1001 decodedHeaders errMsg []
mkInvocation se 1001 decodedHeaders errMsg [] reqJson
HStatus errResp -> do
let respPayload = hrsBody errResp
respHeaders = hrsHeaders errResp
respStatus = hrsStatus errResp
mkInvocation se respStatus decodedHeaders respPayload respHeaders
mkInvocation se respStatus decodedHeaders respPayload respHeaders reqJson
HOther detail -> do
let errMsg = (TBS.fromLBS $ J.encode detail)
mkInvocation se 500 decodedHeaders errMsg []
mkInvocation se 500 decodedHeaders errMsg [] reqJson
liftExceptTIO $
Q.runTx pgpool (Q.RepeatableRead, Just Q.ReadWrite) $ do
insertInvocation invocation type'
Expand Down Expand Up @@ -552,12 +573,18 @@ and it can transition to other states in the following ways:

processSuccess
:: (MonadIO m, MonadError QErr m)
=> Q.PGPool -> ScheduledEventFull -> [HeaderConf] -> ScheduledEventType -> HTTPResp a -> m ()
processSuccess pgpool se decodedHeaders type' resp = do
=> Q.PGPool
-> ScheduledEventFull
-> [HeaderConf]
-> ScheduledEventType
-> J.Value
-> HTTPResp a
-> m ()
processSuccess pgpool se decodedHeaders type' reqBodyJson resp = do
let respBody = hrsBody resp
respHeaders = hrsHeaders resp
respStatus = hrsStatus resp
invocation = mkInvocation se respStatus decodedHeaders respBody respHeaders
invocation = mkInvocation se respStatus decodedHeaders respBody respHeaders reqBodyJson
liftExceptTIO $
Q.runTx pgpool (Q.RepeatableRead, Just Q.ReadWrite) $ do
insertInvocation invocation type'
Expand Down Expand Up @@ -588,17 +615,22 @@ setRetry se time type' =
|] (time, sefId se) True

mkInvocation
:: ScheduledEventFull -> Int -> [HeaderConf] -> TBS.TByteString -> [HeaderConf]
:: ScheduledEventFull
-> Int
-> [HeaderConf]
-> TBS.TByteString
-> [HeaderConf]
-> J.Value
-> (Invocation 'ScheduledType)
mkInvocation se status reqHeaders respBody respHeaders
mkInvocation ScheduledEventFull {sefId} status reqHeaders respBody respHeaders reqBodyJson
= let resp = if isClientError status
then mkClientErr respBody
else mkResp status respBody respHeaders
in
Invocation
(sefId se)
sefId
status
(mkWebhookReq (J.toJSON se) reqHeaders invocationVersionST)
(mkWebhookReq reqBodyJson reqHeaders invocationVersionST)
resp

insertInvocation :: (Invocation 'ScheduledType) -> ScheduledEventType -> Q.TxE QErr ()
Expand Down
5 changes: 4 additions & 1 deletion server/tests-py/test_scheduled_triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ def test_check_fired_webhook_event(self,hge_ctx,scheduled_triggers_evts_webhook)
event = scheduled_triggers_evts_webhook.get_event(65)
validate_event_webhook(event['path'],'/test')
validate_event_headers(event['headers'],{"header-key":"header-value"})
assert event['body'] == self.webhook_payload
assert event['body']['payload'] == self.webhook_payload
payload_keys = dict.keys(event['body'])
for k in ["scheduled_time","created_at","id"]: # additional keys
assert k in payload_keys
assert scheduled_triggers_evts_webhook.is_queue_empty()

def test_check_events_statuses(self,hge_ctx):
Expand Down