diff --git a/tuplex/core/include/physical/PhysicalPlan.h b/tuplex/core/include/physical/PhysicalPlan.h index 542c7cfb4..828016268 100644 --- a/tuplex/core/include/physical/PhysicalPlan.h +++ b/tuplex/core/include/physical/PhysicalPlan.h @@ -41,8 +41,6 @@ namespace tuplex { IBackend* backend() const { return _context.backend(); } - std::shared_ptr _hs; - // ---- OLD CODE ----- // experimental: AWS backend LogicalPlan *_lp; @@ -72,7 +70,11 @@ namespace tuplex { void executeWithParts(const tuplex::PhysicalPlan::File2FilePipeline &pip); double aggregateSamplingTime() const; public: - + /*! + * gets the number of stages in a physical plan + * @returns number of stages in the physical plan + */ + size_t getNumStages() const {return _num_stages;} PhysicalPlan(LogicalPlan* optimizedPlan, LogicalPlan* originalPlan, const Context& context); ~PhysicalPlan(); diff --git a/tuplex/core/include/physical/PhysicalStage.h b/tuplex/core/include/physical/PhysicalStage.h index 83260de0f..9864794c3 100644 --- a/tuplex/core/include/physical/PhysicalStage.h +++ b/tuplex/core/include/physical/PhysicalStage.h @@ -17,6 +17,8 @@ #include "ResultSet.h" #define EOF (-1) #include +#include +#include namespace tuplex { @@ -26,6 +28,8 @@ namespace tuplex { class LogicalPlan; class Context; class ResultSet; + class LogicalOperator; + class HistoryServerConnector; // various sinks/sources/... enum class EndPointMode { @@ -44,9 +48,12 @@ namespace tuplex { std::vector _predecessors; int64_t _number; std::unordered_map, size_t> _ecounts; //! exception counts for this stage. + std::vector _operators; protected: IBackend* _backend; + std::shared_ptr _historyServer; public: + void setHistoryServer(std::shared_ptr hsc) { _historyServer = hsc; } PhysicalStage() = delete; PhysicalStage(PhysicalPlan *plan, IBackend* backend, int64_t number, std::vector predecessors=std::vector()) : _plan(plan), _backend(backend), _number(number), _predecessors(predecessors) { // allow plan/backend to be nullptrs for dummy stage in lambda executor. @@ -54,6 +61,10 @@ namespace tuplex { virtual ~PhysicalStage(); + std::vector operators() const {return _operators;} + + void setOperators(std::vector operators) {_operators = operators;} + std::vector predecessors() const { return _predecessors; } /*! diff --git a/tuplex/core/src/HistoryServerConnector.cc b/tuplex/core/src/HistoryServerConnector.cc index a485cc2a8..e67dd2bfe 100644 --- a/tuplex/core/src/HistoryServerConnector.cc +++ b/tuplex/core/src/HistoryServerConnector.cc @@ -99,7 +99,6 @@ namespace tuplex { return hsc; } - std::shared_ptr HistoryServerConnector::registerNewJob(const tuplex::HistoryServerConnection &conn, const std::string &contextName, const PhysicalPlan* plan, @@ -147,6 +146,32 @@ namespace tuplex { json obj; obj["job"] = job; + // add operators... + std::vector ops; + assert(plan); + plan->foreachStage([&](const PhysicalStage* stage) { + for(auto op: stage->operators()) { + json val; + val["name"] = op->name(); + val["id"] = "op" + std::to_string(op->getID()); + val["columns"] = std::vector(); + val["stageid"] = stage->getID(); + if(hasUDF(op)) { + UDFOperator *udfop = (UDFOperator*)op; + assert(udfop); + + val["udf"] = udfop->getUDF().getCode(); + } else if (op->type() == LogicalOperatorType::AGGREGATE) { + AggregateOperator *udfop = (AggregateOperator*)op; + val["combiner_udf"] = udfop->combinerUDF().getCode(); + val["aggregator_udf"] = udfop->aggregatorUDF().getCode(); + } + ops.push_back(val); + } + }); + + obj["operators"] = ops; + // post RESTInterface ri; auto response = ri.postJSON(base_uri(conn.host, conn.port) + "/api/job", obj.dump()); @@ -183,7 +208,6 @@ namespace tuplex { track_url, options.WEBUI_EXCEPTION_DISPLAY_LIMIT(), plan, maxExceptions)); } - HistoryServerConnector::HistoryServerConnector(const tuplex::HistoryServerConnection &conn, const std::string &jobID, const std::string &contextName, @@ -222,6 +246,25 @@ namespace tuplex { // _reservoirs.emplace_back(reservoir); // } // }); + assert(plan); + + // go through each stage + plan->foreachStage([this](const PhysicalStage* stage) { + assert(stage); + + // is trafo stage? + const TransformStage* tstage = nullptr; + if(tstage = dynamic_cast(stage)) { + auto operators = tstage->operators(); + if(operators.empty()) + return; + auto reservoir = std::make_shared(tstage, operators, _exceptionDisplayLimit); + + for(const auto& op : operators) + _reservoirLookup[op->getID()] = reservoir; + _reservoirs.emplace_back(reservoir); + } + }); } void HistoryServerConnector::sendStatus(tuplex::JobStatus status, unsigned num_open_tasks, unsigned num_finished_tasks) { @@ -612,5 +655,4 @@ namespace tuplex { return _reservoirLookup[opID]->getOperatorIndex(opID); } - } \ No newline at end of file diff --git a/tuplex/core/src/ee/local/LocalBackend.cc b/tuplex/core/src/ee/local/LocalBackend.cc index ce38ad721..735588acf 100644 --- a/tuplex/core/src/ee/local/LocalBackend.cc +++ b/tuplex/core/src/ee/local/LocalBackend.cc @@ -106,7 +106,7 @@ namespace tuplex { assert(stage); // reset history server - _historyServer.reset(); +// _historyServer.reset(); if(!stage) return; @@ -114,21 +114,22 @@ namespace tuplex { // history server connection should be established bool useWebUI = _options.USE_WEBUI(); // register new job - if(useWebUI) { + // checks if we should use the WebUI and if we are starting a new + // job (hence there are no stages that come before the current stage + // we are executing). + if(useWebUI && stage->predecessors().empty()) { + _historyServer.reset(); _historyServer = HistoryServerConnector::registerNewJob(_historyConn, "local backend", stage->plan(), _options); if(_historyServer) { logger().info("track job under " + _historyServer->trackURL()); - _historyServer->sendStatus(JobStatus::SCHEDULED); + _historyServer->sendStatus(JobStatus::STARTED); } - + stage->setHistoryServer(_historyServer); // attach to driver as well _driver->setHistoryServer(_historyServer.get()); } - if(_historyServer) - _historyServer->sendStatus(JobStatus::STARTED); - // check what type of stage it is auto tstage = dynamic_cast(stage); if(tstage) @@ -140,12 +141,13 @@ namespace tuplex { } else throw std::runtime_error("unknown stage encountered in local backend!"); - // detach from driver - _driver->setHistoryServer(nullptr); - // send final message to history server to signal job ended - if(_historyServer) { + // checks whether the historyserver has been set as well as + // if all stages have been iterated through (we are currently on the + // last stage) because this means the job is finished. + if(_historyServer && stage->predecessors().size() == stage->plan()->getNumStages() - 1) { _historyServer->sendStatus(JobStatus::FINISHED); + _driver->setHistoryServer(nullptr); } } @@ -1098,10 +1100,17 @@ namespace tuplex { // send final result count (exceptions + co) if(_historyServer) { - auto rs = tstage->resultSet(); - assert(rs); + size_t numOutputRows = 0; + if (tstage->outputMode() == EndPointMode::HASHTABLE) { + for (const auto& task : completedTasks) { + numOutputRows += task->getNumOutputRows(); + } + } else { + auto rs = tstage->resultSet(); + assert(rs); + numOutputRows = rs->rowCount(); + } auto ecounts = tstage->exceptionCounts(); - auto numOutputRows = rs->rowCount(); _historyServer->sendStageResult(tstage->number(), numInputRows, numOutputRows, ecounts); } @@ -1861,8 +1870,8 @@ namespace tuplex { else hashmap_free(task_sink.hm); // remove hashmap (keys and buckets already handled) // delete task - delete tasks[i]; - tasks[i] = nullptr; +// delete tasks[i]; +// tasks[i] = nullptr; } return sink; } diff --git a/tuplex/core/src/physical/PhysicalPlan.cc b/tuplex/core/src/physical/PhysicalPlan.cc index 6a3d209e9..6dcfe0448 100644 --- a/tuplex/core/src/physical/PhysicalPlan.cc +++ b/tuplex/core/src/physical/PhysicalPlan.cc @@ -263,6 +263,7 @@ namespace tuplex { // add operators for(auto op : ops) { + ops.push_back(op); switch(op->type()) { case LogicalOperatorType::FILEINPUT: case LogicalOperatorType::PARALLELIZE: { @@ -367,6 +368,12 @@ namespace tuplex { // generate code for stage and init vars auto stage = builder.build(this, backend()); + // converting deque of ops to vector of ops + std::vector operators; + for (auto op : ops) { + operators.push_back(op); + } + stage->setOperators(operators); stage->setDataAggregationMode(hashGroupedDataType); // fill in physical plan data // b.c. the stages were constructed top-down, need to reverse the stages diff --git a/tuplex/core/src/physical/TransformStage.cc b/tuplex/core/src/physical/TransformStage.cc index 95b74c3bf..4dc46d229 100644 --- a/tuplex/core/src/physical/TransformStage.cc +++ b/tuplex/core/src/physical/TransformStage.cc @@ -747,12 +747,20 @@ namespace tuplex { JobMetrics dummy_metrics; JobMetrics& metrics = PhysicalStage::plan() ? PhysicalStage::plan()->getContext().metrics() : dummy_metrics; + std::string unoptimizedIR; + std::string optimizedIR = "Not currently optimized."; + if (_historyServer) { + unoptimizedIR = code(); + } + logger.info("retrieved metrics object"); // step 1: run optimizer if desired if(optimizer) { optimizer->optimizeModule(*mod.get()); - + if (_historyServer) { + optimizedIR = code(); + } double llvm_optimization_time = timer.time(); metrics.setLLVMOptimizationTime(llvm_optimization_time); logger.info("Optimization via LLVM passes took " + std::to_string(llvm_optimization_time) + " ms"); @@ -855,12 +863,10 @@ namespace tuplex { ss<<"Compiled code paths for stage "<resolveCode() : ""); - // std::string optimizedCode = "no optimization here yet"; - // _historyServer->sendStagePlan("Stage" + std::to_string(tstage->number()), unoptimizedCode, optimizedCode, ""); - // } + + if(_historyServer) { + _historyServer->sendStagePlan("Stage" + std::to_string(number()), unoptimizedIR, optimizedIR, ""); + } return _syms; } diff --git a/tuplex/historyserver/README.md b/tuplex/historyserver/README.md new file mode 100644 index 000000000..840522dcd --- /dev/null +++ b/tuplex/historyserver/README.md @@ -0,0 +1,10 @@ +## Tuplex history server + +This directory contains the history server for Tuplex, written as [https://flask.palletsprojects.com/en/2.0.x/](Flask) web application. +All job data is stored in a MongoDB database. + +### Development setup +In order to run a local development version, simply invoke the `run-dev.py` script. + +--- +(c) 2017-2021 Tuplex contributors \ No newline at end of file diff --git a/tuplex/historyserver/bin/thserver b/tuplex/historyserver/bin/thserver index 04a6e20d8..b01474048 100755 --- a/tuplex/historyserver/bin/thserver +++ b/tuplex/historyserver/bin/thserver @@ -107,7 +107,6 @@ then fi # important to start with worker class eventlet! Else, this is insanely slow... - env MONGO_URI=$MONGO_URI gunicorn --daemon --worker-class eventlet --log-file $GUNICORN_LOGFILE -b $HOST:$PORT thserver:app # env MONGO_URI=$MONGO_URI gunicorn --worker-class eventlet --log-file $GUNICORN_LOGFILE -b $HOST:$PORT thserver:app diff --git a/tuplex/historyserver/requirements.txt b/tuplex/historyserver/requirements.txt index 8011d8ee1..167fd59e4 100644 --- a/tuplex/historyserver/requirements.txt +++ b/tuplex/historyserver/requirements.txt @@ -1,11 +1,13 @@ prompt_toolkit==2.0.7 pyyaml>=5.4 -jedi==0.13.2 +jedi astor==0.7.1 pandas>=0.23.4 -cloudpickle==0.6.1 -flask==1.0.2 -flask_socketio==3.1.2 +cloudpickle +flask>=2.0.1 +flask_socketio==4.3.1 +python-socketio==4.6.0 +python-engineio==3.13.2 flask_pymongo==2.2.0 iso8601==0.1.12 dill==0.2.8.2 diff --git a/tuplex/historyserver/thserver/database.py b/tuplex/historyserver/thserver/database.py index 38fbbd744..f8ad54d23 100644 --- a/tuplex/historyserver/thserver/database.py +++ b/tuplex/historyserver/thserver/database.py @@ -9,6 +9,8 @@ # License: Apache 2.0 # #----------------------------------------------------------------------------------------------------------------------# +# handle interactions with mongo db database (i.e. ORM) + from thserver import app, socketio, mongo from thserver.config import * from thserver.common import * @@ -40,6 +42,8 @@ def __init__(self, jobid=None): job['created'] = current_utc_timestamp() job['stages'] = [] job['status'] = 'created' + job['ncount'] = 0 + job['ecount'] = 0 # retrieve id self._id = mongo.db.jobs.insert_one(job).inserted_id @@ -101,7 +105,7 @@ def set_stages(self, stages, update=False): for stage in stages: # add empty count stages here - self.stages.append({'stageid' : stage['id'], 'ncount' : 0, 'ecount' : 0}) + self.stages.append({'stageid' : stage['id'], 'ncount' : 0, 'ecount' : 0, 'predecessors': stage["predecessors"]}) if 'operators' in stage.keys(): operators = stage['operators'] @@ -109,11 +113,11 @@ def set_stages(self, stages, update=False): # add each operator to operators collection # ncount for a stage is same across all operators self.operators += [{'idx' : idx, - 'jobid' : self._id, - 'stageid' : stage['id'], - 'ecount' : 0, - 'ncount' : 0, - **op} for idx, op in enumerate(operators)] + 'jobid' : self._id, + 'stageid' : stage['id'], + 'ecount' : 0, + 'ncount' : 0, + **op} for idx, op in enumerate(operators)] if update: mongo.db.operators.insert(self.operators) @@ -122,7 +126,6 @@ def set_stages(self, stages, update=False): mongo.db.jobs.update_one({'_id': self._id}, {'$set': {'stages': self.stages}}) def set_plan(self, ir): - print('hello') # insert into mongo for job return mongo.db.jobs.update_one({'_id': self._id}, {'$set': {'plan': ir}}) @@ -265,20 +268,20 @@ def update_stage_counts(self, stageid, num_normal_rows, num_exception_rows, exce set_dict = {'ecount': info['count']} total_ecounts += info['count'] mongo.db.operators.update_one({'jobid': self._id, 'stageid' : stageid, 'idx' : info['idx']}, - {'$set': set_dict}) + {'$set': set_dict}) assert num_exception_rows == total_ecounts, 'numbers are not matching' # compute normal / exception count for job across all stages # aggregate query to figure out total ncount AND ecount for a job grouped_stage_counts = list(mongo.db.operators.aggregate([{'$match': {'jobid': self._id}}, - {'$group': {'_id': '$stageid', - 'ecount': {'$sum': '$ecount'}}}, - {'$project': {'stageid': '$_id', '_id': False, - 'ecount': True}}])) + {'$group': {'_id': '$stageid', + 'ecount': {'$sum': '$ecount'}}}, + {'$project': {'stageid': '$_id', '_id': False, + 'ecount': True}}])) ecount = reduce(lambda a, b: a['ecount'] + b['ecount'], grouped_stage_counts, {'ecount': 0}) # update counts for stage id on job mongo.db.jobs.update_one({'_id': self._id, 'stages.stageid' : stageid}, - {'$set': {'stages.$.ecount': ecount, 'stages.$.ncount': ncount}}) \ No newline at end of file + {'$set': {'stages.$.ecount': ecount, 'stages.$.ncount': ncount}}) diff --git a/tuplex/historyserver/thserver/rest.py b/tuplex/historyserver/thserver/rest.py index bcbd2b67a..b7f8680d6 100644 --- a/tuplex/historyserver/thserver/rest.py +++ b/tuplex/historyserver/thserver/rest.py @@ -9,6 +9,7 @@ # License: Apache 2.0 # #----------------------------------------------------------------------------------------------------------------------# + from thserver import app, socketio, mongo from thserver.database import * from thserver.config import * @@ -85,7 +86,7 @@ def create_task(): jr = request.get_json()['job'] print(jr) - + operators = request.get_json()['operators'] job = Job() # creates new mongodb job # set_context(self, host, mode, name, user, conf, update=False): @@ -103,6 +104,10 @@ def create_task(): # save all data to respective documents job.persist() + # add each operator to operators collection + operators = [{'idx' : idx, 'jobid' : job.jobid, 'ecount' : 0, 'ncount' : 0, **op} for idx, op in enumerate(operators)] + mongo.db.operators.insert(operators) + # notify all socketio clients msg = job.socketio_overview() print(msg) @@ -116,7 +121,6 @@ def update_status(): """ update status of a job Returns: - """ if not request.json: abort(400) @@ -168,7 +172,6 @@ def update_task(): """ updates summary info for task, i.e. how many normal case integers occurred, which etc. Returns: - """ if not request.json: abort(400) @@ -183,11 +186,12 @@ def update_task(): # save to mongodb mongo.db.jobs.update_one({'_id': ObjectId(jobid), 'stages.stageid': stageid}, - {'$inc': {'stages.$.ncount': ncount_delta, 'stages.$.ecount': ecount_delta}}) - + {'$inc': {'stages.$.ncount': ncount_delta, 'stages.$.ecount': ecount_delta}}) + mongo.db.jobs.update_one({'_id': ObjectId(jobid)}, + { '$inc': { 'ncount': ncount_delta, 'ecount' : ecount_delta }}) # query full values status = mongo.db.jobs.find_one({'_id': ObjectId(jobid)}, - {'_id': 0, 'ncount': 1, 'ecount': 1}) + {'_id': 0, 'ncount': 1, 'ecount': 1}) status['jobid'] = jobid print('/api/task:\n{}'.format(status)) @@ -201,7 +205,6 @@ def update_plan(): """ stores plan info for physical plan page Returns: - """ if not request.json: abort(400) @@ -226,7 +229,6 @@ def update_operator(): """ updates a single exception type for one op in one job Returns: - """ print('operator progress update request') if not request.json: @@ -254,15 +256,15 @@ def update_operator(): inc_dict['detailed_ecounts.' + key] = val # upsert will create inc fields - mongo.db.operators.update_one({'jobid': ObjectId(jobid), 'id' : opid}, + mongo.db.operators.update_one({'jobid': jobid, 'id' : opid}, {'$inc': inc_dict}, upsert=True) # fetch operator info status = mongo.db.operators.find_one({'jobid': ObjectId(jobid), 'id' : opid}, - {'_id': 0, - 'ncount': 1, - 'ecount': 1}) + {'_id': 0, + 'ncount': 1, + 'ecount': 1}) assert status # query full and sent socketio update @@ -273,9 +275,54 @@ def update_operator(): return jsonify({'status': 'ok'}) +""" +This method gets a job from mongodb based on +the inputted jobid. +""" +def get_job(jobid): + + + # check whether id is valid, else return None + if not ObjectId.is_valid(jobid): + return None + + # merge with operators belonging to this job + # and sort after idx! + res = list(mongo.db.jobs.aggregate([ + {'$match': {'_id' : ObjectId(jobid)}}, + { "$addFields": { "article_id": { "$toString": "$_id" }}}, + {'$lookup' : { + 'from' : 'operators', + 'localField' : 'article_id', + 'foreignField' : 'jobid', + 'as' : 'operators' + }}, {'$sort' : {'idx' : 1}} + ])) + + + if 0 == len(res): + return None + + assert len(res) <= 1 + job = res[0] + + # change ObjectId fields + job['id'] = str(job['_id']) + del job['_id'] + + def clean_op(op): + res = op.copy() + res['jobid'] = str(res['jobid']) + del res['_id'] + return res + + + job['operators'] = [clean_op(op) for op in job['operators']] + + return job + @app.route('/api/operators', methods=['GET']) def display_all_operators(): - res = normalize_from_mongo(mongo.db.operators.find({})) print(res) @@ -288,23 +335,29 @@ def get_operator_details(): """ get details for operator Returns: - """ jobid = request.args.get('jobid') opid = request.args.get('opid') - res = mongo.db.operators.find_one({'id': opid, 'jobid': ObjectId(jobid)}) - res['exceptions'] = sorted(res['exceptions'], key=lambda x: x['code']) - # update exceptions nicely - for exc in res['exceptions']: - exc['count'] = res['detailed_ecounts'][exc['code']] - res['opid'] = res['id'] - res['jobid'] = str(res['jobid']) - del res['_id'] - del res['detailed_ecounts'] - if not res: - return jsonify({'error' : 'no result found for opid={} and jobid={}'.format(opid, jobid)}) - + res = mongo.db.operators.find_one({'id': opid, 'jobid': jobid}) + if 'exceptions' in res: + res['exceptions'] = sorted(res['exceptions'], key=lambda x: x['code']) + # update exceptions nicely + for exc in res['exceptions']: + exc['count'] = res['detailed_ecounts'][exc['code']] + res['opid'] = res['id'] + res['jobid'] = str(res['jobid']) + # del res['_id'] + del res['detailed_ecounts'] + if not res: + return jsonify({'error' : 'no result found for opid={} and jobid={}'.format(opid, jobid)}) + if 'exceptions' not in res and 'detailed_ecounts' in res: + res['exceptions'] = [] + + # get detailed_ecounts + for key in sorted(res['detailed_ecounts'].keys()): + res['exceptions'].append({'count': res['detailed_ecounts'][key], 'code': key}) + res['_id'] = str(res['_id']) return jsonify(res) @app.route('/api/exception', methods=['POST']) @@ -312,7 +365,6 @@ def update_exception(): """ updates a single exception type for one op in one job Returns: - """ print('exception (details) update request') if not request.json: @@ -349,8 +401,8 @@ def update_exception(): assert 'sample' in exception assert 'count' in exception - mongo.db.operators.update_one({'jobid': ObjectId(jobid), 'id': opid}, {'$set' : {'exceptions' : exceptions, - 'previous_operator_columns' : previous_operator_columns}}) + mongo.db.operators.update_one({'jobid': jobid, 'id': opid}, {'$set' : {'exceptions' : exceptions, + 'previous_operator_columns' : previous_operator_columns}}) return jsonify({'status' : 'ok'}) diff --git a/tuplex/historyserver/thserver/static/css/custom.css b/tuplex/historyserver/thserver/static/css/custom.css index a21552644..27b228a0a 100644 --- a/tuplex/historyserver/thserver/static/css/custom.css +++ b/tuplex/historyserver/thserver/static/css/custom.css @@ -5,6 +5,32 @@ .rounded { border-radius: 7px; } +.aggregate-operator { + padding: 8px; + background-color: #51e893; + width: 100%; + border: 1px solid #53c586; + color: #1c422d; } + +.join-operator { + padding: 8px; + background-color: #51e893; + width: 100%; + border: 1px solid #53c586; + color: #1c422d; } + +.aggregate-details { + background-color: #fafafa; + border: 1px solid #53c586; + padding: 15px; + width: 100%; } + +.join-details { + background-color: #fafafa; + border: 1px solid #53c586; + padding: 15px; + width: 100%; } + .input-operator { background-color: #379683; color: #fafafa; diff --git a/tuplex/historyserver/thserver/templates/job.html b/tuplex/historyserver/thserver/templates/job.html index f6efed0e0..53301d5b9 100644 --- a/tuplex/historyserver/thserver/templates/job.html +++ b/tuplex/historyserver/thserver/templates/job.html @@ -2,50 +2,50 @@ - - - + + + - Tuplex WebUI + Tuplex WebUI - - + + - - - - + + + + - - - + + + - - + + - - -
+
-
-

- - -
Pipeline
+
+

+ + +
Pipeline
{% if status=='running' %} started 0s ago, {% elif status=='finished' %} @@ -54,80 +54,121 @@
Pipeline
not yet started, {% endif %} - {{ ncount }}good rows, {{ ecount }} exceptions
- - - -

-

-

-

- -
+ {{ ncount }}good rows, {{ ecount }} exceptions
+ + + +

+

+

+

+ +
+ {% for op in operators %} +
- {% for stage in stages %} -
-
Stage {{ stage.number }}: ({{ stage.ncount }} normal / {{ stage.ecount }} exceptional)
+ + +
+ +
+ +
+ +
+
- {% for op in stage.operators %} -
+
+ + {% if not loop.last %} +
+
+
+ {% endif %} + {% endfor %} + {% for stage in stages %} +
+
Stage {{ stage.number }} Counts: ({{ stage.ncount }} normal / {{ stage.ecount }} exceptional)
+
+ {% if stage.dependencies %} +
+
Stage {{ stage.number }} Dependencies: {{ stage.dependencies }}
+
+ {% endif %} + {% for op in stage.operators %} +
- - -
- -
- -
+
+ +
+ +
-
-
+
+
- {% if not loop.last %} -
-
-
- {% endif %} - {% endfor %} + {% if not loop.last %} +
+
+
+ {% endif %} + {% endfor %} {% endfor %} @@ -135,119 +176,119 @@
Stage {{ stage.number }}: +
- -
+ +
-