From 241216c8361f38f46487166caeb133b599323588 Mon Sep 17 00:00:00 2001 From: Karthikeyan Chinnakonda Date: Fri, 10 Jul 2020 16:25:05 +0530 Subject: [PATCH 1/2] server: call the webhook asynchronosly in event triggers --- .../src-lib/Hasura/Eventing/EventTrigger.hs | 7 ++-- .../event_triggers/async_execution/setup.yaml | 26 ++++++++++++++ .../async_execution/teardown.yaml | 9 +++++ server/tests-py/test_events.py | 34 +++++++++++++++++++ 4 files changed, 74 insertions(+), 2 deletions(-) create mode 100644 server/tests-py/queries/event_triggers/async_execution/setup.yaml create mode 100644 server/tests-py/queries/event_triggers/async_execution/teardown.yaml diff --git a/server/src-lib/Hasura/Eventing/EventTrigger.hs b/server/src-lib/Hasura/Eventing/EventTrigger.hs index 1a018ebe5b60c..01bb07ae9a6fe 100644 --- a/server/src-lib/Hasura/Eventing/EventTrigger.hs +++ b/server/src-lib/Hasura/Eventing/EventTrigger.hs @@ -41,7 +41,7 @@ module Hasura.Eventing.EventTrigger ) where -import Control.Concurrent.Async (wait, withAsync) +import Control.Concurrent.Async (async, link, wait, withAsync) import Control.Concurrent.Extended (sleep) import Control.Concurrent.STM.TVar import Control.Monad.Catch (MonadMask, bracket_) @@ -189,10 +189,13 @@ processEventQueue logger logenv httpMgr pool getSchemaCache eeCtx@EventEngineCtx eventsNext <- withAsync popEventsBatch $ \eventsNextA -> do -- process approximately in order, minding HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE: forM_ events $ \event -> do - runReaderT (withEventEngineCtx eeCtx $ (processEvent event)) (logger, httpMgr) + t <- async $ runReaderT (withEventEngineCtx eeCtx $ (processEvent event)) (logger, httpMgr) -- removing an event from the _eeCtxLockedEvents after the event has -- been processed removeEventFromLockedEvents (eId event) leEvents + link t + + -- return when next batch ready; some 'processEvent' threads may be running. wait eventsNextA let lenEvents = length events diff --git a/server/tests-py/queries/event_triggers/async_execution/setup.yaml b/server/tests-py/queries/event_triggers/async_execution/setup.yaml new file mode 100644 index 0000000000000..7f8a1439c89b9 --- /dev/null +++ b/server/tests-py/queries/event_triggers/async_execution/setup.yaml @@ -0,0 +1,26 @@ +type: bulk +args: +- type: run_sql + args: + sql: | + create table hge_tests.test_t1( + c1 int, + c2 text + ); +- type: track_table + args: + schema: hge_tests + name: test_t1 +- type: create_event_trigger + args: + name: t1_timeout_long + table: + schema: hge_tests + name: test_t1 + insert: + columns: "*" + update: + columns: "*" + delete: + columns: "*" + webhook: http://127.0.0.1:5592/timeout_long diff --git a/server/tests-py/queries/event_triggers/async_execution/teardown.yaml b/server/tests-py/queries/event_triggers/async_execution/teardown.yaml new file mode 100644 index 0000000000000..e953892f65486 --- /dev/null +++ b/server/tests-py/queries/event_triggers/async_execution/teardown.yaml @@ -0,0 +1,9 @@ +type: bulk +args: +- type: delete_event_trigger + args: + name: t1_timeout_long +- type: run_sql + args: + sql: | + drop table hge_tests.test_t1; diff --git a/server/tests-py/test_events.py b/server/tests-py/test_events.py index f0e36887203c6..ca93fc864a325 100644 --- a/server/tests-py/test_events.py +++ b/server/tests-py/test_events.py @@ -588,3 +588,37 @@ def test_basic(self, hge_ctx, evts_webhook): assert st_code == 200, resp st_code, resp = hge_ctx.v1q_f('queries/event_triggers/manual_events/disabled.yaml') assert st_code == 400, resp + +@usefixtures('per_method_tests_db_state') +class TestEventsAsynchronousExecution(object): + + @classmethod + def dir(cls): + return 'queries/event_triggers/async_execution' + + def test_async_execution(self,hge_ctx,evts_webhook): + """ + A test to check if the events generated by the graphql-engine are + processed asynchronously. This test measures the time taken to process + all the events and that time should definitely be lesser than the time + taken if the events were to be executed sequentially. + + This test inserts 5 rows and the webhook(/timeout_long) takes + ~5 seconds to process one request. So, if the graphql-engine + were to process the events sequentially it will take 5 * 5 = 25 seconds. + Theorotically, all the events should have been processed in ~5 seconds, + adding a 5 seconds buffer to the comparision, so that this test + doesn't flake in the CI. + """ + table = {"schema": "hge_tests", "name": "test_t1"} + + payload = range(1,6) + rows = list(map(lambda x: {"c1": x, "c2": "hello"}, payload)) + st_code, resp = insert_many(hge_ctx, table, rows) + start_time = time.perf_counter() + assert st_code == 200, resp + for i in range(1,6): + _ = evts_webhook.get_event(7) # webhook takes 5 seconds to process a request + end_time = time.perf_counter() + time_elapsed = end_time - start_time + assert time_elapsed < 10 From 3fb6776f6f66aaf33d27dd986079357b7566eb24 Mon Sep 17 00:00:00 2001 From: Karthikeyan Chinnakonda Date: Fri, 10 Jul 2020 17:50:12 +0530 Subject: [PATCH 2/2] add changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 745d09444f432..78c1da1014d27 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ (Add entries here in the order of: server, console, cli, docs, others) +- server: process events generated by the event triggers asynchronously (close #5189) (#5352) - console: display line number that error originated from in GraphQL editor (close #4849) (#4942) ## `v1.3.0-beta.4`