-
Notifications
You must be signed in to change notification settings - Fork 47
Historyserver #10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Historyserver #10
Changes from all commits
0f26da6
852a6a9
7c21a1b
832e917
49ab7b3
2ae7426
63f9f6c
ebf2e28
3b28a74
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -106,29 +106,30 @@ namespace tuplex { | |
| assert(stage); | ||
|
|
||
| // reset history server | ||
| _historyServer.reset(); | ||
| // _historyServer.reset(); | ||
LeonhardFS marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if(!stage) | ||
| return; | ||
|
|
||
| // history server connection should be established | ||
| bool useWebUI = _options.USE_WEBUI(); | ||
| // register new job | ||
| if(useWebUI) { | ||
| // checks if we should use the WebUI and if we are starting a new | ||
| // job (hence there are no stages that come before the current stage | ||
| // we are executing). | ||
| if(useWebUI && stage->predecessors().empty()) { | ||
| _historyServer.reset(); | ||
| _historyServer = HistoryServerConnector::registerNewJob(_historyConn, | ||
| "local backend", stage->plan(), _options); | ||
| if(_historyServer) { | ||
| logger().info("track job under " + _historyServer->trackURL()); | ||
| _historyServer->sendStatus(JobStatus::SCHEDULED); | ||
| _historyServer->sendStatus(JobStatus::STARTED); | ||
LeonhardFS marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| stage->setHistoryServer(_historyServer); | ||
| // attach to driver as well | ||
| _driver->setHistoryServer(_historyServer.get()); | ||
| } | ||
|
|
||
| if(_historyServer) | ||
| _historyServer->sendStatus(JobStatus::STARTED); | ||
|
|
||
| // check what type of stage it is | ||
| auto tstage = dynamic_cast<TransformStage*>(stage); | ||
| if(tstage) | ||
|
|
@@ -140,12 +141,13 @@ namespace tuplex { | |
| } else | ||
| throw std::runtime_error("unknown stage encountered in local backend!"); | ||
|
|
||
| // detach from driver | ||
| _driver->setHistoryServer(nullptr); | ||
|
|
||
| // send final message to history server to signal job ended | ||
| if(_historyServer) { | ||
| // checks whether the historyserver has been set as well as | ||
| // if all stages have been iterated through (we are currently on the | ||
| // last stage) because this means the job is finished. | ||
| if(_historyServer && stage->predecessors().size() == stage->plan()->getNumStages() - 1) { | ||
LeonhardFS marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| _historyServer->sendStatus(JobStatus::FINISHED); | ||
| _driver->setHistoryServer(nullptr); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1098,10 +1100,17 @@ namespace tuplex { | |
|
|
||
| // send final result count (exceptions + co) | ||
| if(_historyServer) { | ||
| auto rs = tstage->resultSet(); | ||
| assert(rs); | ||
| size_t numOutputRows = 0; | ||
| if (tstage->outputMode() == EndPointMode::HASHTABLE) { | ||
| for (const auto& task : completedTasks) { | ||
| numOutputRows += task->getNumOutputRows(); | ||
| } | ||
| } else { | ||
| auto rs = tstage->resultSet(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this "consume" the resultset?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not exactly sure what you mean by "consume". |
||
| assert(rs); | ||
| numOutputRows = rs->rowCount(); | ||
| } | ||
| auto ecounts = tstage->exceptionCounts(); | ||
| auto numOutputRows = rs->rowCount(); | ||
| _historyServer->sendStageResult(tstage->number(), numInputRows, numOutputRows, ecounts); | ||
| } | ||
|
|
||
|
|
@@ -1861,8 +1870,8 @@ namespace tuplex { | |
| else hashmap_free(task_sink.hm); // remove hashmap (keys and buckets already handled) | ||
|
|
||
| // delete task | ||
| delete tasks[i]; | ||
| tasks[i] = nullptr; | ||
| // delete tasks[i]; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this commented? bug?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is commented because it was leading to a bug. This delete was added during the open sourcing, as it was not on my old branch in the old repo. The bug was the following: executeTransformStage calls createFinalHashmap. createFinalHashmap ends up deleting the tasks. Later in executeTransformStage, if the webUI is enabled, we have to iterate through the tasks to get the number of output rows. However, this was causing a nullptr dereference. So I commented the delete tasks[I] and setting it to a nullptr. If the memory is not being managed elsewhere, the tasks can be deleted after the history server is done using them and then deleted in an else statement if webUI not enabled. |
||
| // tasks[i] = nullptr; | ||
| } | ||
| return sink; | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| ## Tuplex history server | ||
|
|
||
| This directory contains the history server for Tuplex, written as [https://flask.palletsprojects.com/en/2.0.x/](Flask) web application. | ||
| All job data is stored in a MongoDB database. | ||
|
|
||
| ### Development setup | ||
| In order to run a local development version, simply invoke the `run-dev.py` script. | ||
|
|
||
| --- | ||
| (c) 2017-2021 Tuplex contributors |
Uh oh!
There was an error while loading. Please reload this page.