这是indexloc提供的服务,不要输入任何密码
Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions tuplex/core/include/physical/PhysicalPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ namespace tuplex {

IBackend* backend() const { return _context.backend(); }

std::shared_ptr<HistoryServerConnector> _hs;

// ---- OLD CODE -----
// experimental: AWS backend
LogicalPlan *_lp;
Expand Down Expand Up @@ -72,7 +70,11 @@ namespace tuplex {
void executeWithParts(const tuplex::PhysicalPlan::File2FilePipeline &pip);
double aggregateSamplingTime() const;
public:

/*!
* gets the number of stages in a physical plan
* @returns number of stages in the physical plan
*/
size_t getNumStages() const {return _num_stages;}
PhysicalPlan(LogicalPlan* optimizedPlan, LogicalPlan* originalPlan, const Context& context);

~PhysicalPlan();
Expand Down
11 changes: 11 additions & 0 deletions tuplex/core/include/physical/PhysicalStage.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "ResultSet.h"
#define EOF (-1)
#include <nlohmann/json.hpp>
#include <HistoryServerConnector.h>
#include <logical/LogicalOperator.h>

namespace tuplex {

Expand All @@ -26,6 +28,8 @@ namespace tuplex {
class LogicalPlan;
class Context;
class ResultSet;
class LogicalOperator;
class HistoryServerConnector;

// various sinks/sources/...
enum class EndPointMode {
Expand All @@ -44,16 +48,23 @@ namespace tuplex {
std::vector<PhysicalStage*> _predecessors;
int64_t _number;
std::unordered_map<std::tuple<int64_t, ExceptionCode>, size_t> _ecounts; //! exception counts for this stage.
std::vector<LogicalOperator*> _operators;
protected:
IBackend* _backend;
std::shared_ptr<HistoryServerConnector> _historyServer;
public:
void setHistoryServer(std::shared_ptr<HistoryServerConnector> hsc) { _historyServer = hsc; }
PhysicalStage() = delete;
PhysicalStage(PhysicalPlan *plan, IBackend* backend, int64_t number, std::vector<PhysicalStage*> predecessors=std::vector<PhysicalStage*>()) : _plan(plan), _backend(backend), _number(number), _predecessors(predecessors) {
// allow plan/backend to be nullptrs for dummy stage in lambda executor.
}

virtual ~PhysicalStage();

std::vector<LogicalOperator*> operators() const {return _operators;}

void setOperators(std::vector<LogicalOperator*> operators) {_operators = operators;}

std::vector<PhysicalStage*> predecessors() const { return _predecessors; }

/*!
Expand Down
48 changes: 45 additions & 3 deletions tuplex/core/src/HistoryServerConnector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ namespace tuplex {
return hsc;
}


std::shared_ptr<HistoryServerConnector> HistoryServerConnector::registerNewJob(const tuplex::HistoryServerConnection &conn,
const std::string &contextName,
const PhysicalPlan* plan,
Expand Down Expand Up @@ -147,6 +146,32 @@ namespace tuplex {
json obj;
obj["job"] = job;

// add operators...
std::vector<json> ops;
assert(plan);
plan->foreachStage([&](const PhysicalStage* stage) {
for(auto op: stage->operators()) {
json val;
val["name"] = op->name();
val["id"] = "op" + std::to_string(op->getID());
val["columns"] = std::vector<std::string>();
val["stageid"] = stage->getID();
if(hasUDF(op)) {
UDFOperator *udfop = (UDFOperator*)op;
assert(udfop);

val["udf"] = udfop->getUDF().getCode();
} else if (op->type() == LogicalOperatorType::AGGREGATE) {
AggregateOperator *udfop = (AggregateOperator*)op;
val["combiner_udf"] = udfop->combinerUDF().getCode();
val["aggregator_udf"] = udfop->aggregatorUDF().getCode();
}
ops.push_back(val);
}
});

obj["operators"] = ops;

// post
RESTInterface ri;
auto response = ri.postJSON(base_uri(conn.host, conn.port) + "/api/job", obj.dump());
Expand Down Expand Up @@ -183,7 +208,6 @@ namespace tuplex {
track_url, options.WEBUI_EXCEPTION_DISPLAY_LIMIT(), plan, maxExceptions));
}


HistoryServerConnector::HistoryServerConnector(const tuplex::HistoryServerConnection &conn,
const std::string &jobID,
const std::string &contextName,
Expand Down Expand Up @@ -222,6 +246,25 @@ namespace tuplex {
// _reservoirs.emplace_back(reservoir);
// }
// });
assert(plan);

// go through each stage
plan->foreachStage([this](const PhysicalStage* stage) {
assert(stage);

// is trafo stage?
const TransformStage* tstage = nullptr;
if(tstage = dynamic_cast<const TransformStage*>(stage)) {
auto operators = tstage->operators();
if(operators.empty())
return;
auto reservoir = std::make_shared<TransformStageExceptionReservoir>(tstage, operators, _exceptionDisplayLimit);

for(const auto& op : operators)
_reservoirLookup[op->getID()] = reservoir;
_reservoirs.emplace_back(reservoir);
}
});
}

void HistoryServerConnector::sendStatus(tuplex::JobStatus status, unsigned num_open_tasks, unsigned num_finished_tasks) {
Expand Down Expand Up @@ -612,5 +655,4 @@ namespace tuplex {
return _reservoirLookup[opID]->getOperatorIndex(opID);
}


}
41 changes: 25 additions & 16 deletions tuplex/core/src/ee/local/LocalBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,29 +106,30 @@ namespace tuplex {
assert(stage);

// reset history server
_historyServer.reset();
// _historyServer.reset();

if(!stage)
return;

// history server connection should be established
bool useWebUI = _options.USE_WEBUI();
// register new job
if(useWebUI) {
// checks if we should use the WebUI and if we are starting a new
// job (hence there are no stages that come before the current stage
// we are executing).
if(useWebUI && stage->predecessors().empty()) {
_historyServer.reset();
_historyServer = HistoryServerConnector::registerNewJob(_historyConn,
"local backend", stage->plan(), _options);
if(_historyServer) {
logger().info("track job under " + _historyServer->trackURL());
_historyServer->sendStatus(JobStatus::SCHEDULED);
_historyServer->sendStatus(JobStatus::STARTED);
}

stage->setHistoryServer(_historyServer);
// attach to driver as well
_driver->setHistoryServer(_historyServer.get());
}

if(_historyServer)
_historyServer->sendStatus(JobStatus::STARTED);

// check what type of stage it is
auto tstage = dynamic_cast<TransformStage*>(stage);
if(tstage)
Expand All @@ -140,12 +141,13 @@ namespace tuplex {
} else
throw std::runtime_error("unknown stage encountered in local backend!");

// detach from driver
_driver->setHistoryServer(nullptr);

// send final message to history server to signal job ended
if(_historyServer) {
// checks whether the historyserver has been set as well as
// if all stages have been iterated through (we are currently on the
// last stage) because this means the job is finished.
if(_historyServer && stage->predecessors().size() == stage->plan()->getNumStages() - 1) {
_historyServer->sendStatus(JobStatus::FINISHED);
_driver->setHistoryServer(nullptr);
}
}

Expand Down Expand Up @@ -1098,10 +1100,17 @@ namespace tuplex {

// send final result count (exceptions + co)
if(_historyServer) {
auto rs = tstage->resultSet();
assert(rs);
size_t numOutputRows = 0;
if (tstage->outputMode() == EndPointMode::HASHTABLE) {
for (const auto& task : completedTasks) {
numOutputRows += task->getNumOutputRows();
}
} else {
auto rs = tstage->resultSet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this "consume" the resultset?

Copy link
Author

@its-colby its-colby Jul 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly sure what you mean by "consume".

assert(rs);
numOutputRows = rs->rowCount();
}
auto ecounts = tstage->exceptionCounts();
auto numOutputRows = rs->rowCount();
_historyServer->sendStageResult(tstage->number(), numInputRows, numOutputRows, ecounts);
}

Expand Down Expand Up @@ -1861,8 +1870,8 @@ namespace tuplex {
else hashmap_free(task_sink.hm); // remove hashmap (keys and buckets already handled)

// delete task
delete tasks[i];
tasks[i] = nullptr;
// delete tasks[i];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this commented? bug?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is commented because it was leading to a bug. This delete was added during the open sourcing, as it was not on my old branch in the old repo. The bug was the following: executeTransformStage calls createFinalHashmap. createFinalHashmap ends up deleting the tasks. Later in executeTransformStage, if the webUI is enabled, we have to iterate through the tasks to get the number of output rows. However, this was causing a nullptr dereference. So I commented the delete tasks[I] and setting it to a nullptr. If the memory is not being managed elsewhere, the tasks can be deleted after the history server is done using them and then deleted in an else statement if webUI not enabled.

// tasks[i] = nullptr;
}
return sink;
}
Expand Down
7 changes: 7 additions & 0 deletions tuplex/core/src/physical/PhysicalPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ namespace tuplex {

// add operators
for(auto op : ops) {
ops.push_back(op);
switch(op->type()) {
case LogicalOperatorType::FILEINPUT:
case LogicalOperatorType::PARALLELIZE: {
Expand Down Expand Up @@ -367,6 +368,12 @@ namespace tuplex {

// generate code for stage and init vars
auto stage = builder.build(this, backend());
// converting deque of ops to vector of ops
std::vector<LogicalOperator*> operators;
for (auto op : ops) {
operators.push_back(op);
}
stage->setOperators(operators);
stage->setDataAggregationMode(hashGroupedDataType);
// fill in physical plan data
// b.c. the stages were constructed top-down, need to reverse the stages
Expand Down
20 changes: 13 additions & 7 deletions tuplex/core/src/physical/TransformStage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -747,12 +747,20 @@ namespace tuplex {
JobMetrics dummy_metrics;
JobMetrics& metrics = PhysicalStage::plan() ? PhysicalStage::plan()->getContext().metrics() : dummy_metrics;

std::string unoptimizedIR;
std::string optimizedIR = "Not currently optimized.";
if (_historyServer) {
unoptimizedIR = code();
}

logger.info("retrieved metrics object");

// step 1: run optimizer if desired
if(optimizer) {
optimizer->optimizeModule(*mod.get());

if (_historyServer) {
optimizedIR = code();
}
double llvm_optimization_time = timer.time();
metrics.setLLVMOptimizationTime(llvm_optimization_time);
logger.info("Optimization via LLVM passes took " + std::to_string(llvm_optimization_time) + " ms");
Expand Down Expand Up @@ -855,12 +863,10 @@ namespace tuplex {
ss<<"Compiled code paths for stage "<<number()<<" in "<<std::fixed<<std::setprecision(2)<<compilation_time_via_llvm_this_number<<" ms";

logger.info(ss.str());
// @TODO: missing, send code to history server if desired...
// if(_historyServer) {
// auto unoptimizedCode = code + (resolveCode ? tstage->resolveCode() : "");
// std::string optimizedCode = "no optimization here yet";
// _historyServer->sendStagePlan("Stage" + std::to_string(tstage->number()), unoptimizedCode, optimizedCode, "");
// }

if(_historyServer) {
_historyServer->sendStagePlan("Stage" + std::to_string(number()), unoptimizedIR, optimizedIR, "");
}
return _syms;
}

Expand Down
10 changes: 10 additions & 0 deletions tuplex/historyserver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
## Tuplex history server

This directory contains the history server for Tuplex, written as [https://flask.palletsprojects.com/en/2.0.x/](Flask) web application.
All job data is stored in a MongoDB database.

### Development setup
In order to run a local development version, simply invoke the `run-dev.py` script.

---
(c) 2017-2021 Tuplex contributors
1 change: 0 additions & 1 deletion tuplex/historyserver/bin/thserver
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ then
fi

# important to start with worker class eventlet! Else, this is insanely slow...

env MONGO_URI=$MONGO_URI gunicorn --daemon --worker-class eventlet --log-file $GUNICORN_LOGFILE -b $HOST:$PORT thserver:app
# env MONGO_URI=$MONGO_URI gunicorn --worker-class eventlet --log-file $GUNICORN_LOGFILE -b $HOST:$PORT thserver:app

Expand Down
10 changes: 6 additions & 4 deletions tuplex/historyserver/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
prompt_toolkit==2.0.7
pyyaml>=5.4
jedi==0.13.2
jedi
astor==0.7.1
pandas>=0.23.4
cloudpickle==0.6.1
flask==1.0.2
flask_socketio==3.1.2
cloudpickle
flask>=2.0.1
flask_socketio==4.3.1
python-socketio==4.6.0
python-engineio==3.13.2
flask_pymongo==2.2.0
iso8601==0.1.12
dill==0.2.8.2
Expand Down
Loading