From d7db0677e566f3ec6f57020ee4a2421d6f8a758a Mon Sep 17 00:00:00 2001 From: Ben Givertz Date: Fri, 4 Feb 2022 11:54:25 -0500 Subject: [PATCH 01/15] Standardize runtime exceptions to use ExceptionInfo --- tuplex/core/include/physical/ResolveTask.h | 10 ++++ tuplex/core/src/ee/local/LocalBackend.cc | 13 +++- tuplex/core/src/physical/ResolveTask.cc | 70 ++++++++++------------ 3 files changed, 53 insertions(+), 40 deletions(-) diff --git a/tuplex/core/include/physical/ResolveTask.h b/tuplex/core/include/physical/ResolveTask.h index 2044a5699..e3033f0dd 100644 --- a/tuplex/core/include/physical/ResolveTask.h +++ b/tuplex/core/include/physical/ResolveTask.h @@ -62,6 +62,7 @@ namespace tuplex { int64_t contextID, const std::vector& partitions, const std::vector& runtimeExceptions, + ExceptionInfo runtimeExceptionInfo, const std::vector& inputExceptions, ExceptionInfo inputExceptionInfo, const std::vector& operatorIDsAffectedByResolvers, //! used to identify which exceptions DO require reprocessing because there might be a resolver in the slow path for them. @@ -79,6 +80,10 @@ namespace tuplex { _stageID(stageID), _partitions(partitions), _runtimeExceptions(runtimeExceptions), + _numRuntimeExceptions(runtimeExceptionInfo.numExceptions), + _runtimeExceptionIndex(runtimeExceptionInfo.exceptionIndex), + _runtimeExceptionRowOffset(runtimeExceptionInfo.exceptionRowOffset), + _runtimeExceptionByteOffset(runtimeExceptionInfo.exceptionByteOffset), _inputExceptions(inputExceptions), _numInputExceptions(inputExceptionInfo.numExceptions), _inputExceptionIndex(inputExceptionInfo.exceptionIndex), @@ -220,6 +225,11 @@ namespace tuplex { size_t _inputExceptionIndex; size_t _inputExceptionRowOffset; size_t _inputExceptionByteOffset; + size_t _numRuntimeExceptions; + size_t _runtimeExceptionIndex; + size_t _runtimeExceptionRowOffset; + size_t _runtimeExceptionByteOffset; + inline Schema commonCaseInputSchema() const { return _deserializerGeneralCaseOutput->getSchema(); } Schema _resolverOutputSchema; //! what the resolve functor produces Schema _targetOutputSchema; //! which schema the final rows should be in... diff --git a/tuplex/core/src/ee/local/LocalBackend.cc b/tuplex/core/src/ee/local/LocalBackend.cc index bed96ec5a..af8e4b8bb 100644 --- a/tuplex/core/src/ee/local/LocalBackend.cc +++ b/tuplex/core/src/ee/local/LocalBackend.cc @@ -1349,6 +1349,7 @@ namespace tuplex { std::vector tasks_result; std::vector resolveTasks; + std::vector exceptionPartitions; std::vector maxOrder; auto opsToCheck = tstage->operatorIDsWithResolvers(); @@ -1400,13 +1401,20 @@ namespace tuplex { assert(tt->getStageID() == stageID); + // On first execution, taks hold their own runtime exceptions so they can iterate over all partitions fully + auto runtimeExceptionInfo = ExceptionInfo(tt->getNumExceptions(), 0, 0, 0); + // save exception partitions to invalidate after all tasks have been completed + auto taskExceptionPartitions = tt->getExceptionPartitions(); + exceptionPartitions.insert(exceptionPartitions.end(), taskExceptionPartitions.begin(), taskExceptionPartitions.end()); + // this task needs to be resolved, b.c. exceptions occurred... // pretty simple, just create a ResolveTask auto exceptionInputSchema = tt->inputSchema(); // this could be specialized! auto rtask = new ResolveTask(stageID, tstage->context().id(), tt->getOutputPartitions(), - tt->getExceptionPartitions(), + taskExceptionPartitions, + runtimeExceptionInfo, tt->inputExceptions(), tt->inputExceptionInfo(), opsToCheck, @@ -1502,6 +1510,9 @@ namespace tuplex { for (auto& p : tstage->inputExceptions()) { p->invalidate(); } + for (auto& p : exceptionPartitions) { + p->invalidate(); + } // cout<<"*** total number of tasks to return is "<lockRaw(); - int64_t numRows = *((int64_t *) ptr); - ptr += sizeof(int64_t); - - for(int i = 0; i < numRows; ++i) { - // old - // _currentRowNumber = *((int64_t*)ptr); - // ptr += sizeof(int64_t); - // int64_t ecCode = *((int64_t*)ptr); - // ptr += sizeof(int64_t); - // int64_t operatorID = *((int64_t*)ptr); - // ptr += sizeof(int64_t); - // int64_t eSize = *((int64_t*)ptr); - // ptr += sizeof(int64_t); + if (_numRuntimeExceptions > 0) { + // Initialize runtime exception to starting index + auto partition = _runtimeExceptions[_runtimeExceptionIndex]; + auto rowsLeftInPartition = partition->getNumRows() - _runtimeExceptionRowOffset; + const uint8_t *ptr = partition->lock() + _runtimeExceptionByteOffset; + + // Iterate over all runtime exceptions, may be accross multiple partitions + for (int i = 0; i < _numRuntimeExceptions; ++i) { + // Change partition once exhausted + if (rowsLeftInPartition == 0) { + partition->unlock(); + _runtimeExceptionIndex++; + partition = _runtimeExceptions[_runtimeExceptionIndex]; + rowsLeftInPartition = partition->getNumRows(); + ptr = partition->lock(); + } const uint8_t *ebuf = nullptr; int64_t ecCode = -1, operatorID = -1; @@ -764,24 +765,14 @@ namespace tuplex { auto delta = deserializeExceptionFromMemory(ptr, &ecCode, &operatorID, &_currentRowNumber, &ebuf, &eSize); - - // call functor over this... - // ==> important to use row number here for continuous exception resolution! - // args are: "userData", "rowNumber", "exceptionCode", "rowBuf", "bufSize" - processExceptionRow(ecCode, operatorID, ebuf, eSize); ptr += delta; - // old - //ptr += eSize; - - // always inc row number _rowNumber++; + rowsLeftInPartition--; } + // Unlock but wait to invalidate until all resolve tasks have finished partition->unlock(); - - // exception partition is done or exceptions are transferred to new partition... - partition->invalidate(); } // now process all of the input exceptions @@ -885,10 +876,10 @@ namespace tuplex { size_t curRuntimePartitionInd = 0; // current index into vector of runtime exception partitions int64_t numRuntimeRowsLeftInPartition = 0; // number of rows remaining in partition const uint8_t *runPtr = nullptr; - if (_runtimeExceptions.size() > 0) { - curRuntimePartitionInd = 0; - numRuntimeRowsLeftInPartition = _runtimeExceptions[curRuntimePartitionInd]->getNumRows(); - runPtr = _runtimeExceptions[curRuntimePartitionInd]->lock(); + if (_numRuntimeExceptions > 0) { + curRuntimePartitionInd = _runtimeExceptionIndex; + numRuntimeRowsLeftInPartition = _runtimeExceptions[curRuntimePartitionInd]->getNumRows() - _runtimeExceptionRowOffset; + runPtr = _runtimeExceptions[curRuntimePartitionInd]->lock() + _runtimeExceptionByteOffset; } // Initialize input exception variables @@ -906,11 +897,12 @@ namespace tuplex { // runtime exceptions do not account for the existence of input exceptions, so we need to add the previous // input exceptions to compare the true row number size_t inputRowsProcessed = 0; + size_t runtimeRowsProcessed = 0; const uint8_t *ptr = nullptr; while (runPtr && inputPtr) { auto runRowInd = *((int64_t *) runPtr); // get current runtime row index auto inputRowInd = *((int64_t *) inputPtr); // get current input row index - bool isRuntimeException = false; + bool isRuntimeException; // compare indices with accounting for previous input exceptions if (runRowInd + inputRowsProcessed < inputRowInd) { ptr = runPtr; @@ -920,6 +912,7 @@ namespace tuplex { ptr = inputPtr; numInputRowsLeftInPartition--; inputRowsProcessed++; + isRuntimeException = false; } const uint8_t *ebuf = nullptr; @@ -939,12 +932,11 @@ namespace tuplex { _rowNumber++; // Exhausted current runtime exceptions, need to switch partitions - if (numRuntimeRowsLeftInPartition == 0) { + if (numRuntimeRowsLeftInPartition == 0 || runtimeRowsProcessed == _numRuntimeExceptions) { _runtimeExceptions[curRuntimePartitionInd]->unlock(); - _runtimeExceptions[curRuntimePartitionInd]->invalidate(); curRuntimePartitionInd++; // Still have more exceptions to go through - if (curRuntimePartitionInd < _runtimeExceptions.size()) { + if (curRuntimePartitionInd < _runtimeExceptions.size() && runtimeRowsProcessed < _numRuntimeExceptions) { numRuntimeRowsLeftInPartition = _runtimeExceptions[curRuntimePartitionInd]->getNumRows(); runPtr = _runtimeExceptions[curRuntimePartitionInd]->lock(); } else { @@ -981,13 +973,13 @@ namespace tuplex { _rowNumber++; numRuntimeRowsLeftInPartition--; + runtimeRowsProcessed++; // Exhausted current runtime exceptions in partitions need to switch partitions or could be done - if (numRuntimeRowsLeftInPartition == 0) { + if (numRuntimeRowsLeftInPartition == 0 || runtimeRowsProcessed == _numRuntimeExceptions) { _runtimeExceptions[curRuntimePartitionInd]->unlock(); - _runtimeExceptions[curRuntimePartitionInd]->invalidate(); curRuntimePartitionInd++; // More exceptions to process - if (curRuntimePartitionInd < _runtimeExceptions.size()) { + if (curRuntimePartitionInd < _runtimeExceptions.size() && runtimeRowsProcessed < _numRuntimeExceptions) { numRuntimeRowsLeftInPartition = _runtimeExceptions[curRuntimePartitionInd]->getNumRows(); runPtr = _runtimeExceptions[curRuntimePartitionInd]->lock(); } else { From 31f7d113f752f7a39db893d8a00573e0d797a716 Mon Sep 17 00:00:00 2001 From: Ben Givertz Date: Wed, 9 Feb 2022 16:24:50 -0500 Subject: [PATCH 02/15] Support for fallback and general case partitions --- tuplex/core/include/Context.h | 24 +- tuplex/core/include/ExceptionInfo.h | 47 -- tuplex/core/include/PartitionGroup.h | 50 ++ tuplex/core/include/ee/local/LocalBackend.h | 16 +- tuplex/core/include/logical/CacheOperator.h | 30 +- .../include/logical/ParallelizeOperator.h | 19 +- tuplex/core/include/physical/CodeDefs.h | 10 +- tuplex/core/include/physical/ResolveTask.h | 45 +- tuplex/core/include/physical/ResultSet.h | 168 ++++-- tuplex/core/include/physical/TransformStage.h | 62 ++- tuplex/core/include/physical/TransformTask.h | 15 +- tuplex/core/src/Context.cc | 117 +--- tuplex/core/src/DataSet.cc | 7 +- tuplex/core/src/Partition.cc | 2 +- tuplex/core/src/ee/local/LocalBackend.cc | 374 +++++++------ tuplex/core/src/logical/CacheOperator.cc | 111 ++-- .../core/src/logical/ParallelizeOperator.cc | 30 +- .../src/physical/BlockBasedTaskBuilder.cc | 33 +- .../physical/ExceptionSourceTaskBuilder.cc | 357 ++++++------ tuplex/core/src/physical/PhysicalPlan.cc | 21 +- tuplex/core/src/physical/ResolveTask.cc | 497 +++++++++++------ tuplex/core/src/physical/ResultSet.cc | 513 +++++++++++++----- tuplex/core/src/physical/StageBuilder.cc | 2 +- tuplex/core/src/physical/TransformStage.cc | 32 +- tuplex/core/src/physical/TransformTask.cc | 128 +++-- tuplex/python/include/PythonContext.h | 2 + tuplex/python/src/PythonContext.cc | 312 ++++++----- tuplex/python/src/PythonDataSet.cc | 34 +- tuplex/python/src/PythonWrappers.cc | 4 +- tuplex/test/core/DataSetShow.cc | 2 +- tuplex/test/core/ResultSetTest.cc | 85 ++- tuplex/test/wrappers/WrapperTest.cc | 191 +++---- 32 files changed, 2041 insertions(+), 1299 deletions(-) delete mode 100644 tuplex/core/include/ExceptionInfo.h create mode 100644 tuplex/core/include/PartitionGroup.h diff --git a/tuplex/core/include/Context.h b/tuplex/core/include/Context.h index b75409533..8baff3bfc 100644 --- a/tuplex/core/include/Context.h +++ b/tuplex/core/include/Context.h @@ -15,6 +15,7 @@ #include #include #include "Partition.h" +#include #include "Row.h" #include "HistoryServerClasses.h" #include @@ -37,7 +38,7 @@ namespace tuplex { class Executor; class Partition; class IBackend; - class ExceptionInfo; + class PartitionGroup; class Context { private: @@ -59,24 +60,7 @@ namespace tuplex { // needed because of C++ template issues void addPartition(DataSet* ds, Partition *partition); - void addParallelizeNode(DataSet *ds, const std::vector> &badParallelizeObjects=std::vector>(), const std::vector &numExceptionsInPartition=std::vector()); //! adds a paralellize node to the computation graph - - /*! - * serialize python objects as pickled objects into exception partitions. Set the python objects map to - * map all normalPartitions to the exceptions that occured within them. - * @param pythonObjects normal case schema violations and their initial row numbers - * @param numExceptionsInPartition number of exceptions in each normal partition - * @param normalPartitions normal partitions created - * @param opID parallelize operator ID - * @param serializedPythonObjects output vector for partitions - * @param pythonObjectsMap output for mapping - */ - void serializePythonObjects(const std::vector>& pythonObjects, - const std::vector &numExceptionsInPartition, - const std::vector &normalPartitions, - const int64_t opID, - std::vector &serializedPythonObjects, - std::unordered_map &pythonObjectsMap); + void addParallelizeNode(DataSet *ds, const std::vector& fallbackPartitions=std::vector{}, const std::vector& partitionGroups=std::vector{}); //! adds a paralellize node to the computation graph Partition* requestNewPartition(const Schema& schema, const int dataSetID, size_t minBytesRequired); uint8_t* partitionLockRaw(Partition *partition); @@ -276,7 +260,7 @@ namespace tuplex { * @param numExceptionsInPartition number of schema violations that occured in each of the partitions * @return reference to newly created dataset. */ - DataSet& fromPartitions(const Schema& schema, const std::vector& partitions, const std::vector& columns, const std::vector> &badParallelizeObjects, const std::vector &numExceptionsInPartition); + DataSet& fromPartitions(const Schema& schema, const std::vector& partitions, const std::vector& fallbackPartitions, const std::vector& partitionGroups, const std::vector& columns); }; // needed for template mechanism to work #include diff --git a/tuplex/core/include/ExceptionInfo.h b/tuplex/core/include/ExceptionInfo.h deleted file mode 100644 index d6ee35886..000000000 --- a/tuplex/core/include/ExceptionInfo.h +++ /dev/null @@ -1,47 +0,0 @@ -//--------------------------------------------------------------------------------------------------------------------// -// // -// Tuplex: Blazing Fast Python Data Science // -// // -// // -// (c) 2017 - 2021, Tuplex team // -// Created by Benjamin Givertz first on 1/1/2022 // -// License: Apache 2.0 // -//--------------------------------------------------------------------------------------------------------------------// - -#ifndef TUPLEX_EXCEPTIONINFO_H -#define TUPLEX_EXCEPTIONINFO_H - -namespace tuplex { - /*! - * Struct to hold information that maps input partitions to input exceptions that occur within them. - * - * Explanation: - * Each input partition is passed the same vector of all input exceptions that occured during data parallelization - * or caching. Thus, each input partition must know how many input exceptions occur in its partition, the index - * of the input exception partition where its first exception occurs, and the offset into that partition where the - * first exception occurs. These values are held in this struct and each input partition is mapped to an ExceptionInfo. - */ - struct ExceptionInfo { - size_t numExceptions; //! number of exception rows that occur within a single input partition - size_t exceptionIndex; //! index into a vector of input exception partitions that holds the first input exception - size_t exceptionRowOffset; //! offset in rows into the first input exception partition where the first exception occurs. - size_t exceptionByteOffset; //! offset in bytes into the first input exception partition where the first exception occurs - - ExceptionInfo() : - numExceptions(0), - exceptionIndex(0), - exceptionRowOffset(0), - exceptionByteOffset(0) {} - - ExceptionInfo(size_t numExps, - size_t expIndex, - size_t expRowOffset, - size_t expByteOffset) : - numExceptions(numExps), - exceptionIndex(expIndex), - exceptionRowOffset(expRowOffset), - exceptionByteOffset(expByteOffset) {} - }; -} - -#endif //TUPLEX_EXCEPTIONINFO_H \ No newline at end of file diff --git a/tuplex/core/include/PartitionGroup.h b/tuplex/core/include/PartitionGroup.h new file mode 100644 index 000000000..bb83585e5 --- /dev/null +++ b/tuplex/core/include/PartitionGroup.h @@ -0,0 +1,50 @@ +//--------------------------------------------------------------------------------------------------------------------// +// // +// Tuplex: Blazing Fast Python Data Science // +// // +// // +// (c) 2017 - 2021, Tuplex team // +// Created by Benjamin Givertz first on 1/1/2021 // +// License: Apache 2.0 // +//--------------------------------------------------------------------------------------------------------------------// + +#ifndef TUPLEX_PARTITIONGROUP_H +#define TUPLEX_PARTITIONGROUP_H + +namespace tuplex { + struct PartitionGroup { + public: + /*! + * Groups normal, general, and fallback partitions into groups, with all partitions that originated from a single + * task being grouped together. + * @param numNormalPartitons number of normal partitions in group + * @param normalPartitionStartInd starting index in list of all normal partitions + * @param numGeneralPartitions number of general partitions in group + * @param generalPartitionStartInd starting index in list of all general partitions + * @param numFallbackPartitions number of fallback partitions in group + * @param fallbackPartitionStartInd starting index in list of all fallback partitions + */ + PartitionGroup(size_t numNormalPartitons, size_t normalPartitionStartInd, + size_t numGeneralPartitions, size_t generalPartitionStartInd, + size_t numFallbackPartitions, size_t fallbackPartitionStartInd): + numNormalPartitions(numNormalPartitons), normalPartitionStartInd(normalPartitionStartInd), + numGeneralPartitions(numGeneralPartitions), generalPartitionStartInd(generalPartitionStartInd), + numFallbackPartitions(numFallbackPartitions), fallbackPartitionStartInd(fallbackPartitionStartInd) {} + + /*! + * Initialize empty struct with all values set to zero. + */ + PartitionGroup() : + numNormalPartitions(0), numGeneralPartitions(0), numFallbackPartitions(0), + normalPartitionStartInd(0), generalPartitionStartInd(0), fallbackPartitionStartInd(0) {} + + size_t numNormalPartitions; + size_t normalPartitionStartInd; + size_t numGeneralPartitions; + size_t generalPartitionStartInd; + size_t numFallbackPartitions; + size_t fallbackPartitionStartInd; + }; +} + +#endif //TUPLEX_PARTITIONGROUP_H diff --git a/tuplex/core/include/ee/local/LocalBackend.h b/tuplex/core/include/ee/local/LocalBackend.h index 77d375aed..b16dc0b33 100644 --- a/tuplex/core/include/ee/local/LocalBackend.h +++ b/tuplex/core/include/ee/local/LocalBackend.h @@ -99,7 +99,7 @@ namespace tuplex { return std::accumulate(counts.begin(), counts.end(), 0, [](size_t acc, std::pair, size_t> val) { return acc + val.second; }); } - inline std::vector getOutputPartitions(IExecutorTask* task) { + inline std::vector getNormalPartitions(IExecutorTask* task) { if(!task) return std::vector(); @@ -113,7 +113,7 @@ namespace tuplex { return std::vector(); } - inline std::vector getRemainingExceptions(IExecutorTask* task) { + inline std::vector getExceptionPartitions(IExecutorTask* task) { if(!task) return std::vector(); @@ -127,7 +127,7 @@ namespace tuplex { return std::vector(); } - inline std::vector generalCasePartitions(IExecutorTask* task) { + inline std::vector getGeneralPartitions(IExecutorTask* task) { if(!task) return std::vector(); @@ -155,18 +155,18 @@ namespace tuplex { return std::unordered_map, size_t>(); } - inline std::vector> getNonConformingRows(IExecutorTask* task) { + inline std::vector getFallbackPartitions(IExecutorTask* task) { if(!task) - return std::vector>(); + return std::vector(); if(task->type() == TaskType::UDFTRAFOTASK) - return std::vector>(); // none here, can be only result from ResolveTask. + return std::vector(); // none here, can be only result from ResolveTask. if(task->type() == TaskType::RESOLVE) - return dynamic_cast(task)->getNonConformingRows(); + return dynamic_cast(task)->getOutputFallbackPartitions(); throw std::runtime_error("unknown task type seen in " + std::string(__FILE_NAME__) + ":" + std::to_string(__LINE__)); - return std::vector>(); + return std::vector(); } std::vector resolveViaSlowPath(std::vector& tasks, diff --git a/tuplex/core/include/logical/CacheOperator.h b/tuplex/core/include/logical/CacheOperator.h index 563aa8f0b..2ca184a1d 100644 --- a/tuplex/core/include/logical/CacheOperator.h +++ b/tuplex/core/include/logical/CacheOperator.h @@ -79,9 +79,10 @@ namespace tuplex { * @return */ bool isCached() const { return _cached; } - std::vector cachedPartitions() const { return _normalCasePartitions; } - std::vector cachedExceptions() const { return _generalCasePartitions; } - std::unordered_map partitionToExceptionsMap() const { return _partitionToExceptionsMap; } + std::vector cachedNormalPartitions() const { return _normalPartitions; } + std::vector cachedGeneralPartitions() const { return _generalPartitions; } + std::vector cachedFallbackPartitions() const { return _fallbackPartitions; } + std::vector partitionGroups() const { return _partitionGroups; } size_t getTotalCachedRows() const; @@ -107,28 +108,19 @@ namespace tuplex { // or merge them. bool _cached; bool _storeSpecialized; - std::vector _normalCasePartitions; //! holds all data conforming to the normal case schema - std::vector _generalCasePartitions; //! holds all data which is considered to be a normal-case violation, - //! i.e. which does not adhere to the normal case schema, but did not produce - //! an exception while being processed through the pipeline before - std::unordered_map _partitionToExceptionsMap; //! maps normal case partitions to corresponding general case ones - std::vector _py_objects; //! all python objects who do not adhere to the general case schema + std::vector _normalPartitions; //! holds all data conforming to the normal case schema + std::vector _generalPartitions; //! holds all data which is considered to be a normal-case violation, + std::vector _fallbackPartitions; //! holds all data which is output as a python object from interpreter processing + std::vector _partitionGroups; //! groups together partitions for correct row ordering std::vector _columns; // internal sample of normal case rows, used for tracing & Co. std::vector _sample; // number of rows need to be stored for cost estimates - size_t _normalCaseRowCount; - size_t _generalCaseRowCount; - - // @TODO: there should be 3 things stored - // 1.) common case => i.e. - // 2.) general case => i.e. what in general can be done (null-values & Co, wide integers, ...) - // 3.) python case => i.e. things that don't fit into either case (interpreter objects serialized via pickle) - - // Note: the pickling could be parallelized by simply matching python types & Co... - // ==> store python data as tuple of elements! + size_t _normalRowCount; + size_t _generalRowCount; + size_t _fallbackRowCount; }; } diff --git a/tuplex/core/include/logical/ParallelizeOperator.h b/tuplex/core/include/logical/ParallelizeOperator.h index 0960baf89..87666950c 100644 --- a/tuplex/core/include/logical/ParallelizeOperator.h +++ b/tuplex/core/include/logical/ParallelizeOperator.h @@ -17,10 +17,9 @@ namespace tuplex { class ParallelizeOperator : public LogicalOperator { - std::vector _partitions; // data, conforming to majority type - std::vector _pythonObjects; // schema violations stored for interpreter processing as python objects - // maps partitions to their corresponding python objects - std::unordered_map _inputPartitionToPythonObjectsMap; + std::vector _normalPartitions; // data, conforming to majority type + std::vector _fallbackPartitions; // schema violations stored for interpreter processing as python objects + std::vector _partitionGroups; // maps normal partitions to their corresponding fallback partitions std::vector _columnNames; std::vector _sample; // sample, not necessary conforming to one type @@ -31,7 +30,7 @@ namespace tuplex { // this a root node ParallelizeOperator(const Schema& schema, - const std::vector& partitions, + const std::vector& normalPartitions, const std::vector& columns); std::string name() override { return "parallelize"; } @@ -47,13 +46,13 @@ namespace tuplex { * get the partitions where the parallelized data is stored. * @return vector of partitions. */ - std::vector getPartitions(); + std::vector getNormalPartitions(); - void setPythonObjects(const std::vector &pythonObjects) { _pythonObjects = pythonObjects; } - std::vector getPythonObjects() { return _pythonObjects; } + void setFallbackPartitions(const std::vector &fallbackPartitions) { _fallbackPartitions = fallbackPartitions; } + std::vector getFallbackPartitions() { return _fallbackPartitions; } - void setInputPartitionToPythonObjectsMap(const std::unordered_map& pythonObjectsMap) { _inputPartitionToPythonObjectsMap = pythonObjectsMap; } - std::unordered_map getInputPartitionToPythonObjectsMap() { return _inputPartitionToPythonObjectsMap; } + void setPartitionGroups(const std::vector& partitionGroups) { _partitionGroups = partitionGroups; } + std::vector getPartitionGroups() { return _partitionGroups; } Schema getInputSchema() const override { return getOutputSchema(); } diff --git a/tuplex/core/include/physical/CodeDefs.h b/tuplex/core/include/physical/CodeDefs.h index b8c3cd76b..c185b2d54 100644 --- a/tuplex/core/include/physical/CodeDefs.h +++ b/tuplex/core/include/physical/CodeDefs.h @@ -48,8 +48,14 @@ namespace tuplex { typedef int64_t(*read_block_f)(void*, const uint8_t*, int64_t, int64_t*, int64_t*, int8_t); // protoype of the function generated by the below builder - // parameters are userData, block, blocksize, expPtrs, expPtrSizes, numExceptions, normalrowsout, badrowsout, lastRow - typedef int64_t(*read_block_exp_f)(void*, const uint8_t*, int64_t, uint8_t **, int64_t *, int64_t, int64_t*, int64_t*, bool); + // parameters are userData, block, blocksize, normalrowsout, badrowsout, lastRow, + // totalFilterCounter, totalNormalRowCounter, totalGeneralRowCounter, totalFallbackRowCounter, + // generalPartitions, numGeneralPartitions, generalIndexOffset, generalRowOffset, generalByteOffset + // fallbackPartitions, numFallbackPartitions, fallbackIndexOffset, fallbackRowOffset, fallbackByteOffset + typedef int64_t(*read_block_exp_f)(void*, const uint8_t*, int64_t, int64_t*, int64_t*, bool, + int64_t*, int64_t*, int64_t*, int64_t*, + uint8_t **, int64_t, int64_t*, int64_t*, int64_t*, + uint8_t **, int64_t, int64_t*, int64_t*, int64_t*); /*! * prototype for processing a single row (with callbacks etc.). Returns how many bytes were processed diff --git a/tuplex/core/include/physical/ResolveTask.h b/tuplex/core/include/physical/ResolveTask.h index e3033f0dd..6f85c57a3 100644 --- a/tuplex/core/include/physical/ResolveTask.h +++ b/tuplex/core/include/physical/ResolveTask.h @@ -61,10 +61,9 @@ namespace tuplex { ResolveTask(int64_t stageID, int64_t contextID, const std::vector& partitions, - const std::vector& runtimeExceptions, - ExceptionInfo runtimeExceptionInfo, - const std::vector& inputExceptions, - ExceptionInfo inputExceptionInfo, + const std::vector& exceptionPartitions, + const std::vector& generalPartitions, + const std::vector& fallbackPartitions, const std::vector& operatorIDsAffectedByResolvers, //! used to identify which exceptions DO require reprocessing because there might be a resolver in the slow path for them. Schema exceptionInputSchema, //! schema of the input rows in which both user exceptions and normal-case violations are stored in. This is also the schema in which rows which on the slow path produce again an exception will be stored in. Schema resolverOutputSchema, //! schema of rows that the resolve function outputs if it doesn't rethrow exceptions @@ -79,16 +78,12 @@ namespace tuplex { PyObject* interpreterFunctor=nullptr) : IExceptionableTask::IExceptionableTask(exceptionInputSchema, contextID), _stageID(stageID), _partitions(partitions), - _runtimeExceptions(runtimeExceptions), - _numRuntimeExceptions(runtimeExceptionInfo.numExceptions), - _runtimeExceptionIndex(runtimeExceptionInfo.exceptionIndex), - _runtimeExceptionRowOffset(runtimeExceptionInfo.exceptionRowOffset), - _runtimeExceptionByteOffset(runtimeExceptionInfo.exceptionByteOffset), - _inputExceptions(inputExceptions), - _numInputExceptions(inputExceptionInfo.numExceptions), - _inputExceptionIndex(inputExceptionInfo.exceptionIndex), - _inputExceptionRowOffset(inputExceptionInfo.exceptionRowOffset), - _inputExceptionByteOffset(inputExceptionInfo.exceptionByteOffset), + _exceptionPartitions(exceptionPartitions), + _generalPartitions(generalPartitions), + _fallbackPartitions(fallbackPartitions), + _exceptionCounter(0), + _generalCounter(0), + _fallbackCounter(0), _resolverOutputSchema(resolverOutputSchema), _targetOutputSchema(targetNormalCaseOutputSchema), _mergeRows(mergeRows), @@ -175,7 +170,7 @@ namespace tuplex { std::vector getOutputPartitions() const override { return _partitions; } - std::vector> getNonConformingRows() const { return _py_nonconfirming; } + std::vector getOutputFallbackPartitions() const { return _fallbackSink.partitions; } /// very important to override this because of the special two exceptions fields of ResolveTask /// i.e. _generalCasePartitions store what exceptions to resolve, IExceptionableTask::_generalCasePartitions exceptions that occurred @@ -219,16 +214,13 @@ namespace tuplex { private: int64_t _stageID; /// to which stage does this task belong to. std::vector _partitions; - std::vector _runtimeExceptions; - std::vector _inputExceptions; - size_t _numInputExceptions; - size_t _inputExceptionIndex; - size_t _inputExceptionRowOffset; - size_t _inputExceptionByteOffset; - size_t _numRuntimeExceptions; - size_t _runtimeExceptionIndex; - size_t _runtimeExceptionRowOffset; - size_t _runtimeExceptionByteOffset; + std::vector _exceptionPartitions; + std::vector _generalPartitions; + std::vector _fallbackPartitions; + + size_t _exceptionCounter; + size_t _generalCounter; + size_t _fallbackCounter; inline Schema commonCaseInputSchema() const { return _deserializerGeneralCaseOutput->getSchema(); } Schema _resolverOutputSchema; //! what the resolve functor produces @@ -268,6 +260,8 @@ namespace tuplex { // sink for type violation rows MemorySink _generalCaseSink; + MemorySink _fallbackSink; + // hash table sink // -> hash to be a hybrid because sometimes incompatible python objects have to be hashed here. HashTableSink _htable; @@ -281,7 +275,6 @@ namespace tuplex { // python output which can't be consolidated, saved as separate list void writePythonObject(PyObject* out_row); - std::vector> _py_nonconfirming; int64_t _outputRowNumber; diff --git a/tuplex/core/include/physical/ResultSet.h b/tuplex/core/include/physical/ResultSet.h index e94b8f1ae..6a97be70c 100644 --- a/tuplex/core/include/physical/ResultSet.h +++ b/tuplex/core/include/physical/ResultSet.h @@ -13,7 +13,7 @@ #include #include -#include +#include #include #include #include @@ -25,23 +25,61 @@ namespace tuplex { class ResultSet { private: - std::list _partitions; - std::vector _exceptions; // unresolved exceptions - std::unordered_map _partitionToExceptionsMap; - // @TODO: use here rows instead? would make it potentially cleaner... - std::deque> _pyobjects; // python objects remaining whose type - // did not confirm to the one of partitions. Maybe use Row here instead? - size_t _curRowCounter; //! row counter for the current partition - size_t _byteCounter; //! byte offset for the current partition - size_t _rowsRetrieved; - size_t _totalRowCounter; // used for merging in rows! - size_t _maxRows; - Schema _schema; - - void removeFirstPartition(); + std::list _currentNormalPartitions; //! normal partitions in current group + std::list _currentGeneralPartitions; //! general partitions in current group + std::list _currentFallbackPartitions; //! fallback partitions in current group + std::list _remainingNormalPartitions; //! remaining normal partitions in other groups + std::list _remainingGeneralPartitions; //! remaining general partitions in other groups + std::list _remainingFallbackPartitions; //! remaining fallback partitions in other groups + std::list _partitionGroups; //! groups together normal, general, and fallback partitions for merging + + size_t _totalRowCounter; //! total rows emitted across all groups + size_t _maxRows; //! max number of rows to emit + Schema _schema; //! normal case schema + + size_t _curNormalRowCounter; + size_t _curNormalByteCounter; + size_t _curGeneralRowCounter; + size_t _curGeneralByteCounter; + size_t _curFallbackRowCounter; + size_t _curFallbackByteCounter; + size_t _normalRowCounter; + size_t _generalRowCounter; + size_t _fallbackRowCounter; + + int64_t currentGeneralRowInd(); + int64_t currentFallbackRowInd(); + + Row getNextNormalRow(); + bool hasNextNormalRow(); + Row getNextFallbackRow(); + bool hasNextFallbackRow(); + Row getNextGeneralRow(); + bool hasNextGeneralRow(); + + void removeFirstGeneralPartition(); + void removeFirstFallbackPartition(); + void removeFirstNormalPartition(); public: - ResultSet() : _curRowCounter(0), _byteCounter(0), _rowsRetrieved(0), - _totalRowCounter(0), _maxRows(0), _schema(Schema::UNKNOWN) {} + /*! + * Create new result set with normal, general, and fallback rows + * @param schema normal case schema + * @param normalPartitions normal case rows + * @param generalPartitions general case rows + * @param fallbackPartitions fallback case rows + * @param partitionGroups information to merge row numbers correctly + * @param maxRows limit on rows to emit + */ + ResultSet(const Schema& schema, + const std::vector& normalPartitions, + const std::vector& generalPartitions=std::vector{}, + const std::vector& fallbackPartitions=std::vector{}, + const std::vector& partitionGroups=std::vector{}, + int64_t maxRows=std::numeric_limits::max()); + + ResultSet() : _curNormalRowCounter(0), _curNormalByteCounter(0), _curGeneralRowCounter(0), _curGeneralByteCounter(0), + _curFallbackRowCounter(0), _curFallbackByteCounter(0), _totalRowCounter(0), _maxRows(0), _schema(Schema::UNKNOWN), + _normalRowCounter(0), _generalRowCounter(0), _fallbackRowCounter(0) {} ~ResultSet() = default; // Non copyable @@ -51,13 +89,6 @@ namespace tuplex { ResultSet(const ResultSet&) = delete; ResultSet& operator = (const ResultSet&) = delete; - ResultSet(const Schema& _schema, - const std::vector& partitions, - const std::vector& exceptions=std::vector{}, - const std::unordered_map& partitionToExceptionsMap=std::unordered_map(), - const std::vector> pyobjects=std::vector>{}, - int64_t maxRows=std::numeric_limits::max()); - /*! * check whether result contains one more row */ @@ -75,52 +106,107 @@ namespace tuplex { */ std::vector getRows(size_t limit); - bool hasNextPartition() const; + /*! + * check whether general partitions remain + * @return + */ + bool hasNextGeneralPartition() const; + + /*! + * get next general partition but does not invalidate it + * @return + */ + Partition* getNextGeneralPartition(); + + /*! + * check whether fallback partitions remain + * @return + */ + bool hasNextFallbackPartition() const; + + /*! + * get next fallback partition but does not invalidate it + * @return + */ + Partition* getNextFallbackPartition(); + + /*! + * check whether normal partitions remain + * @return + */ + bool hasNextNormalPartition() const; /*! user needs to invalidate then! - * * @return */ - Partition* getNextPartition(); + Partition* getNextNormalPartition(); + + /*! + * number of rows across all cases of partitions + * @return + */ size_t rowCount() const; + /*! + * normal case schema + * @return + */ Schema schema() const { return _schema; } /*! - * removes and invalidates all partitions! + * removes and invalidates all normalPartitions! */ void clear(); + /*! + * number of rows in fallback partitions + * @return + */ + size_t fallbackRowCount() const; + /*! * retrieve all good rows in bulk, removes them from this result set. * @return */ - std::vector partitions() { + std::vector normalPartitions() { std::vector p; - while(hasNextPartition()) - p.push_back(getNextPartition()); + while(hasNextNormalPartition()) + p.push_back(getNextNormalPartition()); return p; } /*! - * retrieve all unresolved rows (should be only called internally). DOES NOT REMOVE THEM FROM result set. + * returns/removes all general partitions * @return */ - std::vector exceptions() const { return _exceptions; } - - std::unordered_map partitionToExceptionsMap() const { return _partitionToExceptionsMap; } + std::vector generalPartitions() { + std::vector p; + while(hasNextGeneralPartition()) + p.push_back(getNextGeneralPartition()); + return p; + } /*! - * returns/removes all objects + * returns/removes all fallback partitions * @return */ - std::deque> pyobjects() { - return std::move(_pyobjects); + std::vector fallbackPartitions() { + std::vector p; + while(hasNextFallbackPartition()) + p.push_back(getNextFallbackPartition()); + return p; } - size_t pyobject_count() const { return _pyobjects.size(); } - - size_t numPartitions() const { return _partitions.size(); } + /*! + * returns/removes all partition groups + * @return + */ + std::vector partitionGroups() { + std::vector g; + for (const auto& group : _partitionGroups) + g.push_back(group); + return g; + } }; } #endif //TUPLEX_RESULTSET_H \ No newline at end of file diff --git a/tuplex/core/include/physical/TransformStage.h b/tuplex/core/include/physical/TransformStage.h index 22d7f5fb4..7488d29e9 100644 --- a/tuplex/core/include/physical/TransformStage.h +++ b/tuplex/core/include/physical/TransformStage.h @@ -13,7 +13,7 @@ #include #include -#include +#include #include "PhysicalStage.h" #include "LLVMOptimizer.h" #include @@ -95,16 +95,41 @@ namespace tuplex { } /*! - * set input exceptions, i.e. rows that could come from a parallelize or csv operator. - * @param pythonObjects + * set stage's general case normalPartitions + * @param generalPartitions */ - void setInputExceptions(const std::vector& inputExceptions) { _inputExceptions = inputExceptions; } + void setGeneralPartitions(const std::vector& generalPartitions) { _generalPartitions = generalPartitions; } - std::vector inputExceptions() { return _inputExceptions; } + /*! + * get stage's general case normalPartitions + * @return + */ + std::vector generalPartitions() const { return _generalPartitions; } + + /*! + * set stage's fallback normalPartitions as serialized python objects + * @param fallbackPartitions + */ + void setFallbackPartitions(const std::vector& fallbackPartitions) { _fallbackPartitions = fallbackPartitions; } + + /*! + * get fallback normalPartitions as serialized python objects + * @return + */ + std::vector fallbackPartitions() const { return _fallbackPartitions; } - void setPartitionToExceptionsMap(const std::unordered_map& partitionToExceptionsMap) { _partitionToExceptionsMap = partitionToExceptionsMap; } + /*! + * set merge information for each set of normal, fallback, and general partitions + * @param partitionGroups + */ + void setPartitionGroups(const std::vector& partitionGroups) { + _partitionGroups = partitionGroups; + } - std::unordered_map partitionToExceptionsMap() { return _partitionToExceptionsMap; } + /*! + * get partition groups for all sets of partitions + */ + std::vector partitionGroups() const { return _partitionGroups; } /*! * sets maximum number of rows this pipeline will produce @@ -157,12 +182,12 @@ namespace tuplex { */ std::shared_ptr resultSet() const override { return _rs;} - void setMemoryResult(const std::vector& partitions, - const std::vector& generalCase=std::vector{}, - const std::unordered_map& parttionToExceptionsMap=std::unordered_map(), - const std::vector>& interpreterRows=std::vector>{}, - const std::vector& remainingExceptions=std::vector{}, - const std::unordered_map, size_t>& ecounts=std::unordered_map, size_t>()); // creates local result set? + void setMemoryResult(const std::vector& normalPartitions=std::vector{}, + const std::vector& generalPartitions=std::vector{}, + const std::vector& fallbackPartitions=std::vector{}, + const std::vector& partitionGroups=std::vector{}, + const std::unordered_map, size_t>& exceptionCounts=std::unordered_map, size_t>()); // creates local result set? + void setFileResult(const std::unordered_map, size_t>& ecounts); // creates empty result set with exceptions void setEmptyResult() { @@ -173,9 +198,8 @@ namespace tuplex { setMemoryResult( std::vector(), std::vector(), - std::unordered_map(), - std::vector>(), std::vector(), + std::vector(), ecounts); } @@ -443,6 +467,9 @@ namespace tuplex { std::vector _inputPartitions; //! memory input partitions for this task. size_t _inputLimit; //! limit number of input rows (inf per default) size_t _outputLimit; //! output limit, set e.g. by take, to_csv etc. (inf per default) + std::vector _generalPartitions; //! general case input partitions + std::vector _fallbackPartitions; //! fallback case input partitions + std::vector _partitionGroups; //! groups partitions together for correct row indices std::shared_ptr _rs; //! result set @@ -469,11 +496,6 @@ namespace tuplex { // Todo: move this to physicalplan!!! //void pushDownOutputLimit(); //! enable optimizations for limited pipeline by restricting input read! - // unresolved exceptions. Important i.e. when no IO interleave is used... - std::vector _inputExceptions; - std::unordered_map _partitionToExceptionsMap; - - // for hash output, the key and bucket type python::Type _hashOutputKeyType; python::Type _hashOutputBucketType; diff --git a/tuplex/core/include/physical/TransformTask.h b/tuplex/core/include/physical/TransformTask.h index 2868ba668..3eb8013dd 100644 --- a/tuplex/core/include/physical/TransformTask.h +++ b/tuplex/core/include/physical/TransformTask.h @@ -239,11 +239,14 @@ namespace tuplex { */ std::unordered_map, size_t> exceptionCounts() const { return _exceptionCounts; } - ExceptionInfo inputExceptionInfo() { return _inputExceptionInfo; } - std::vector inputExceptions() { return _inputExceptions; } + std::vector generalPartitions() const { return _generalPartitions; } + + std::vector fallbackPartitions() const { return _fallbackPartitions; } + + void setGeneralPartitions(const std::vector& generalPartitions) { _generalPartitions = generalPartitions; } + + void setFallbackPartitions(const std::vector& fallbackPartitions) { _fallbackPartitions = fallbackPartitions; } - void setInputExceptionInfo(ExceptionInfo info) { _inputExceptionInfo = info; } - void setInputExceptions(const std::vector& inputExceptions) { _inputExceptions = inputExceptions; } void setUpdateInputExceptions(bool updateInputExceptions) { _updateInputExceptions = updateInputExceptions; } double wallTime() const override { return _wallTime; } @@ -292,8 +295,8 @@ namespace tuplex { MemorySink _exceptions; Schema _inputSchema; - ExceptionInfo _inputExceptionInfo; - std::vector _inputExceptions; + std::vector _generalPartitions; + std::vector _fallbackPartitions; bool _updateInputExceptions; // hash table sink diff --git a/tuplex/core/src/Context.cc b/tuplex/core/src/Context.cc index 2d46608be..f9c3a655d 100644 --- a/tuplex/core/src/Context.cc +++ b/tuplex/core/src/Context.cc @@ -202,7 +202,7 @@ namespace tuplex { } - DataSet& Context::fromPartitions(const Schema& schema, const std::vector& partitions, const std::vector& columns, const std::vector> &badParallelizeObjects, const std::vector &numExceptionsInPartition) { + DataSet& Context::fromPartitions(const Schema& schema, const std::vector& partitions, const std::vector& fallbackPartitions, const std::vector& partitionGroups, const std::vector& columns) { auto dataSetID = getNextDataSetID(); DataSet *dsptr = createDataSet(schema); @@ -214,7 +214,7 @@ namespace tuplex { // empty? if(partitions.empty()) { dsptr->setColumns(columns); - addParallelizeNode(dsptr, badParallelizeObjects, numExceptionsInPartition); + addParallelizeNode(dsptr, fallbackPartitions, partitionGroups); return *dsptr; } else { size_t numRows = 0; @@ -230,7 +230,8 @@ namespace tuplex { // set rows dsptr->setColumns(columns); - addParallelizeNode(dsptr, badParallelizeObjects, numExceptionsInPartition); + addParallelizeNode(dsptr, fallbackPartitions, partitionGroups); + // signal check if(check_and_forward_signals()) { @@ -257,6 +258,7 @@ namespace tuplex { addParallelizeNode(dsptr); return *dsptr; } else { + std::vector partitionGroups; // get row type from first element @TODO: should be inferred from sample, no? auto rtype = rows.front().getRowType(); schema = Schema(Schema::MemoryLayout::ROW, rtype); @@ -303,6 +305,7 @@ namespace tuplex { numWrittenRowsInPartition++; capacityRemaining -= bytesWritten; } else { + partitionGroups.push_back(PartitionGroup(1, dsptr->getPartitions().size(), 0, 0, 0, 0)); // partition is full, request new one. // create new partition... partition->unlock(); @@ -319,6 +322,7 @@ namespace tuplex { base_ptr = (uint8_t*)partition->lock(); } } + partitionGroups.push_back(PartitionGroup(1, dsptr->getPartitions().size(), 0, 0, 0, 0)); partition->unlock(); partition->setNumRows(numWrittenRowsInPartition); @@ -330,7 +334,7 @@ namespace tuplex { // set rows dsptr->setColumns(columnNames); - addParallelizeNode(dsptr); + addParallelizeNode(dsptr, std::vector{}, partitionGroups); // signal check if(check_and_forward_signals()) { @@ -349,94 +353,7 @@ namespace tuplex { return op; } - void Context::serializePythonObjects(const std::vector>& pythonObjects, - const std::vector &numExceptionsInPartition, - const std::vector &normalPartitions, - const int64_t opID, - std::vector &serializedPythonObjects, - std::unordered_map &pythonObjectsMap) { - if (pythonObjects.empty()) { - for (const auto &p : normalPartitions) { - pythonObjectsMap[uuidToString(p->uuid())] = ExceptionInfo(); - } - return; - } - - Schema schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::STRING})); - const size_t allocMinSize = 1024 * 64; // 64KB - - Partition* partition = requestNewPartition(schema, -1, allocMinSize); - int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); - *rawPtr = 0; - uint8_t* ptr = (uint8_t*)(rawPtr + 1); - size_t numBytesSerialized = 0; - - auto prevExpByteOffset = 0; - auto prevExpRowOffset = 0; - auto prevExpInd = 0; - auto curNormalPartitionInd = 0; - auto numNewExps = 0; - - // Serialize each exception to a partition using the following schema: - // (1) is the field containing rowNum - // (2) is the field containing ecCode - // (3) is the field containing opID - // (4) is the field containing pickledObjectSize - // (5) is the field containing pickledObject - for(auto &exception : pythonObjects) { - auto rowNum = std::get<0>(exception); - auto pyObj = std::get<1>(exception); - auto ecCode = ecToI64(ExceptionCode::PYTHON_PARALLELIZE); - auto pickledObject = python::pickleObject(python::getMainModule(), pyObj); - auto pickledObjectSize = pickledObject.size(); - size_t requiredBytes = sizeof(int64_t) * 4 + pickledObjectSize; - - if (partition->capacity() < numBytesSerialized + requiredBytes) { - partition->unlockWrite(); - serializedPythonObjects.push_back(partition); - partition = requestNewPartition(schema, -1, allocMinSize); - rawPtr = (int64_t *) partition->lockWriteRaw(); - *rawPtr = 0; - ptr = (uint8_t * )(rawPtr + 1); - numBytesSerialized = 0; - } - - // Check if we have reached the number of exceptions in the input partition - // Record the current exception index and offset and iterate to next one - auto curNormalPartition = normalPartitions[curNormalPartitionInd]; - auto normalUUID = uuidToString(curNormalPartition->uuid()); - auto numExps = numExceptionsInPartition[curNormalPartitionInd]; - if (numNewExps >= numExps) { - pythonObjectsMap[normalUUID] = ExceptionInfo(numExps, prevExpInd, prevExpRowOffset, prevExpByteOffset); - prevExpRowOffset = *rawPtr; - prevExpByteOffset = numBytesSerialized; - prevExpInd = serializedPythonObjects.size(); - numNewExps = 0; - curNormalPartitionInd++; - } - - *((int64_t*)(ptr)) = rowNum; ptr += sizeof(int64_t); - *((int64_t*)(ptr)) = ecCode; ptr += sizeof(int64_t); - *((int64_t*)(ptr)) = opID; ptr += sizeof(int64_t); - *((int64_t*)(ptr)) = pickledObjectSize; ptr += sizeof(int64_t); - memcpy(ptr, pickledObject.c_str(), pickledObjectSize); ptr += pickledObjectSize; - - *rawPtr = *rawPtr + 1; - numBytesSerialized += requiredBytes; - numNewExps += 1; - } - - // Record mapping for normal last partition - auto curNormalPartition = normalPartitions[curNormalPartitionInd]; - auto normalUUID = uuidToString(curNormalPartition->uuid()); - auto numExceptions = numExceptionsInPartition[curNormalPartitionInd]; - pythonObjectsMap[normalUUID] = ExceptionInfo(numExceptions, prevExpInd, prevExpRowOffset, prevExpByteOffset); - - partition->unlockWrite(); - serializedPythonObjects.push_back(partition); - } - - void Context::addParallelizeNode(DataSet *ds, const std::vector> &badParallelizeObjects, const std::vector &numExceptionsInPartition) { + void Context::addParallelizeNode(DataSet *ds, const std::vector& fallbackPartitions, const std::vector& partitionGroups) { assert(ds); // @TODO: make empty list as special case work. Also true for empty files. @@ -446,11 +363,17 @@ namespace tuplex { assert(ds->_schema.getRowType() != python::Type::UNKNOWN); auto op = new ParallelizeOperator(ds->_schema, ds->getPartitions(), ds->columns()); - std::vector serializedPythonObjects; - std::unordered_map pythonObjectsMap; - serializePythonObjects(badParallelizeObjects, numExceptionsInPartition, ds->getPartitions(), op->getID(), serializedPythonObjects, pythonObjectsMap); - op->setPythonObjects(serializedPythonObjects); - op->setInputPartitionToPythonObjectsMap(pythonObjectsMap); + op->setFallbackPartitions(fallbackPartitions); + if (partitionGroups.empty()) { + std::vector defaultPartitionGroups; + for (int i = 0; i < ds->getPartitions().size(); ++i) { + defaultPartitionGroups.push_back(PartitionGroup(1, i, 0, 0, 0, 0)); + } + op->setPartitionGroups(defaultPartitionGroups); + } else { + op->setPartitionGroups(partitionGroups); + } + // add new (root) node ds->_operator = addOperator(op); diff --git a/tuplex/core/src/DataSet.cc b/tuplex/core/src/DataSet.cc index a53a14094..a33925d7f 100644 --- a/tuplex/core/src/DataSet.cc +++ b/tuplex/core/src/DataSet.cc @@ -869,9 +869,10 @@ namespace tuplex { // what data source operators are there? if(_operator->type() == LogicalOperatorType::FILEINPUT) return static_cast(_operator)->isEmpty(); - else if(_operator->type() == LogicalOperatorType::PARALLELIZE) - return static_cast(_operator)->getPartitions().empty(); - else + else if(_operator->type() == LogicalOperatorType::PARALLELIZE) { + auto pop = static_cast(_operator); assert(pop); + return pop->getNormalPartitions().empty() && pop->getFallbackPartitions().empty(); + } else throw std::runtime_error("unknown data source operator detected"); } else return false; diff --git a/tuplex/core/src/Partition.cc b/tuplex/core/src/Partition.cc index a16d1c2eb..c554788cd 100644 --- a/tuplex/core/src/Partition.cc +++ b/tuplex/core/src/Partition.cc @@ -55,7 +55,7 @@ namespace tuplex { uint8_t* Partition::lockWriteRaw() { // must be the thread who allocated this - assert(_owner->getThreadID() == std::this_thread::get_id()); +// assert(_owner->getThreadID() == std::this_thread::get_id()); TRACE_LOCK("partition " + uuidToString(_uuid)); std::this_thread::yield(); diff --git a/tuplex/core/src/ee/local/LocalBackend.cc b/tuplex/core/src/ee/local/LocalBackend.cc index af8e4b8bb..e23a4f937 100644 --- a/tuplex/core/src/ee/local/LocalBackend.cc +++ b/tuplex/core/src/ee/local/LocalBackend.cc @@ -266,8 +266,8 @@ namespace tuplex { Timer timer; // BUILD phase // TODO: codegen build phase. I.e. a function should be code generated which hashes a partition to a hashmap. - while(rsRight->hasNextPartition()) { - Partition* p = rsRight->getNextPartition(); + while(rsRight->hasNextNormalPartition()) { + Partition* p = rsRight->getNextNormalPartition(); // lock partition! auto ptr = p->lockRaw(); @@ -435,7 +435,7 @@ namespace tuplex { auto combinedType = hstage->combinedType(); Schema combinedSchema(Schema::MemoryLayout::ROW, combinedType); std::vector probeTasks; - for(auto partition : rsLeft->partitions()) { + for(auto partition : rsLeft->normalPartitions()) { probeTasks.emplace_back(new HashProbeTask(partition, hmap, probeFunction, hstage->combinedType(), hstage->outputDataSetID(), @@ -648,12 +648,35 @@ namespace tuplex { // --> issue for each memory partition a transform task and put it into local workqueue assert(tstage->inputMode() == EndPointMode::MEMORY); - - // restrict after input limit size_t numInputRows = 0; + auto inputPartitions = tstage->inputPartitions(); - for(int i = 0; i < inputPartitions.size(); ++i) { - auto partition = inputPartitions[i]; + auto generalPartitions = tstage->generalPartitions(); + auto fallbackPartitions = tstage->fallbackPartitions(); + auto partitionGroups = tstage->partitionGroups(); + for (const auto &group : partitionGroups) { + std::vector taskNormalPartitions; + bool invalidateAfterUse = false; + for (int i = group.normalPartitionStartInd; i < group.normalPartitionStartInd + group.numNormalPartitions; ++i) { + auto p = inputPartitions[i]; + numInputRows += p->getNumRows(); + if (!p->isImmortal()) + invalidateAfterUse = true; + taskNormalPartitions.push_back(p); + } + std::vector taskGeneralPartitions; + for (int i = group.generalPartitionStartInd; i < group.generalPartitionStartInd + group.numGeneralPartitions; ++i) { + auto p = generalPartitions[i]; + numInputRows += p->getNumRows(); + taskGeneralPartitions.push_back(p); + } + std::vector taskFallbackPartitions; + for (int i = group.fallbackPartitionStartInd; i < group.fallbackPartitionStartInd + group.numFallbackPartitions; ++i) { + auto p = fallbackPartitions[i]; + numInputRows += p->getNumRows(); + taskFallbackPartitions.push_back(p); + } + auto task = new TransformTask(); if (tstage->updateInputExceptions()) { task->setFunctor(syms->functorWithExp); @@ -661,7 +684,9 @@ namespace tuplex { task->setFunctor(syms->functor); } task->setUpdateInputExceptions(tstage->updateInputExceptions()); - task->setInputMemorySource(partition, !partition->isImmortal()); + task->setInputMemorySources(taskNormalPartitions, invalidateAfterUse); + task->setGeneralPartitions(taskGeneralPartitions); + task->setFallbackPartitions(taskFallbackPartitions); // hash table or memory output? if(tstage->outputMode() == EndPointMode::HASHTABLE) { if (tstage->hashtableKeyByteWidth() == 8) @@ -676,16 +701,10 @@ namespace tuplex { tstage->outputMode() == EndPointMode::MEMORY); task->sinkOutputToMemory(outputSchema, tstage->outputDataSetID(), tstage->context().id()); } - - auto partitionId = uuidToString(partition->uuid()); - auto info = tstage->partitionToExceptionsMap()[partitionId]; - task->setInputExceptionInfo(info); - task->setInputExceptions(tstage->inputExceptions()); - task->sinkExceptionsToMemory(inputSchema); + task->sinkExceptionsToMemory(tstage->inputSchema()); task->setStageID(tstage->getID()); task->setOutputLimit(tstage->outputLimit()); tasks.emplace_back(std::move(task)); - numInputRows += partition->getNumRows(); // input limit exhausted? break! if(numInputRows >= tstage->inputLimit()) @@ -750,92 +769,6 @@ namespace tuplex { return pip_object; } - std::vector> inputExceptionsToPythonObjects(const std::vector& partitions, Schema schema) { - using namespace tuplex; - - std::vector> pyObjects; - for (const auto &partition : partitions) { - auto numRows = partition->getNumRows(); - const uint8_t* ptr = partition->lock(); - - python::lockGIL(); - for (int i = 0; i < numRows; ++i) { - int64_t rowNum = *((int64_t*)ptr); - ptr += sizeof(int64_t); - int64_t ecCode = *((int64_t*)ptr); - ptr += 2 * sizeof(int64_t); - int64_t objSize = *((int64_t*)ptr); - ptr += sizeof(int64_t); - - PyObject* pyObj = nullptr; - if (ecCode == ecToI64(ExceptionCode::PYTHON_PARALLELIZE)) { - pyObj = python::deserializePickledObject(python::getMainModule(), (char *) ptr, objSize); - } else { - pyObj = python::rowToPython(Row::fromMemory(schema, ptr, objSize), true); - } - - ptr += objSize; - pyObjects.emplace_back(rowNum, pyObj); - } - python::unlockGIL(); - - partition->unlock(); - partition->invalidate(); - } - - return pyObjects; - } - - void setExceptionInfo(const std::vector &normalOutput, const std::vector &exceptions, std::unordered_map &partitionToExceptionsMap) { - if (exceptions.empty()) { - for (const auto &p : normalOutput) { - partitionToExceptionsMap[uuidToString(p->uuid())] = ExceptionInfo(); - } - return; - } - - auto expRowCount = 0; - auto expInd = 0; - auto expRowOff = 0; - auto expByteOff = 0; - - auto expNumRows = exceptions[0]->getNumRows(); - auto expPtr = exceptions[0]->lockWrite(); - auto rowsProcessed = 0; - for (const auto &p : normalOutput) { - auto pNumRows = p->getNumRows(); - auto curNumExps = 0; - auto curExpOff = expRowOff; - auto curExpInd = expInd; - auto curExpByteOff = expByteOff; - - while (*((int64_t *) expPtr) - rowsProcessed <= pNumRows + curNumExps && expRowCount < expNumRows) { - *((int64_t *) expPtr) -= rowsProcessed; - curNumExps++; - expRowOff++; - auto eSize = ((int64_t *)expPtr)[3] + 4*sizeof(int64_t); - expPtr += eSize; - expByteOff += eSize; - expRowCount++; - - if (expRowOff == expNumRows && expInd < exceptions.size() - 1) { - exceptions[expInd]->unlockWrite(); - expInd++; - expPtr = exceptions[expInd]->lockWrite(); - expNumRows = exceptions[expInd]->getNumRows(); - expRowOff = 0; - expByteOff = 0; - expRowCount = 0; - } - } - - rowsProcessed += curNumExps + pNumRows; - partitionToExceptionsMap[uuidToString(p->uuid())] = ExceptionInfo(curNumExps, curExpInd, curExpOff, curExpByteOff); - } - - exceptions[expInd]->unlockWrite(); - } - void LocalBackend::executeTransformStage(tuplex::TransformStage *tstage) { Timer stageTimer; @@ -855,9 +788,8 @@ namespace tuplex { // special case: skip stage, i.e. empty code and mem2mem if(tstage->code().empty() && !tstage->fileInputMode() && !tstage->fileOutputMode()) { - auto pyObjects = inputExceptionsToPythonObjects(tstage->inputExceptions(), tstage->normalCaseInputSchema()); - tstage->setMemoryResult(tstage->inputPartitions(), std::vector{}, std::unordered_map(), pyObjects); - pyObjects.clear(); + tstage->setMemoryResult(tstage->inputPartitions(), tstage->generalPartitions(), tstage->fallbackPartitions(), + tstage->partitionGroups()); // skip stage Logger::instance().defaultLogger().info("[Transform Stage] skipped stage " + std::to_string(tstage->number()) + " because there is nothing todo here."); return; @@ -985,7 +917,7 @@ namespace tuplex { bool executeSlowPath = true; //TODO: implement pure python resolution here... // exceptions found or slowpath data given? - if(totalECountsBeforeResolution > 0 || !tstage->inputExceptions().empty()) { + if(totalECountsBeforeResolution > 0 || !tstage->generalPartitions().empty() || !tstage->fallbackPartitions().empty()) { stringstream ss; // log out what exists in a table ss<<"Exception details: "<inputExceptions().empty()) { + if(!tstage->generalPartitions().empty()) { size_t numExceptions = 0; - for (auto &p : tstage->inputExceptions()) + for (auto &p : tstage->generalPartitions()) numExceptions += p->getNumRows(); - lines.push_back(Row("(input)", exceptionCodeToPythonClass(ExceptionCode::NORMALCASEVIOLATION), (int64_t)numExceptions)); + lines.push_back(Row("(cache)", exceptionCodeToPythonClass(ExceptionCode::NORMALCASEVIOLATION), (int64_t)numExceptions)); + totalECountsBeforeResolution += numExceptions; + } + + if(!tstage->fallbackPartitions().empty()) { + size_t numExceptions = 0; + for (auto &p : tstage->fallbackPartitions()) + numExceptions += p->getNumRows(); + lines.push_back(Row("(parallelize)", exceptionCodeToPythonClass(ExceptionCode::NORMALCASEVIOLATION), (int64_t)numExceptions)); totalECountsBeforeResolution += numExceptions; } @@ -1044,7 +984,7 @@ namespace tuplex { executeSlowPath = true; // input exceptions or py objects? - if(!tstage->inputExceptions().empty()) + if(!tstage->generalPartitions().empty() || !tstage->fallbackPartitions().empty()) executeSlowPath = true; if(executeSlowPath) { @@ -1123,30 +1063,22 @@ namespace tuplex { } case EndPointMode::MEMORY: { // memory output, fetch partitions & ecounts - vector output; - vector generalOutput; - unordered_map partitionToExceptionsMap; - vector remainingExceptions; - vector> nonConformingRows; // rows where the output type does not fit, - // need to manually merged. - unordered_map, size_t> ecounts; - size_t rowDelta = 0; + vector normalPartitions; + vector generalPartitions; + vector fallbackPartitions; + vector exceptionPartitions; + vector partitionGroups; + unordered_map, size_t> exceptionCounts; + for (const auto& task : completedTasks) { - auto taskOutput = getOutputPartitions(task); - auto taskRemainingExceptions = getRemainingExceptions(task); - auto taskGeneralOutput = generalCasePartitions(task); - auto taskNonConformingRows = getNonConformingRows(task); + auto taskNormalPartitions = getNormalPartitions(task); + auto taskGeneralPartitions = getGeneralPartitions(task); + auto taskFallbackPartitions = getFallbackPartitions(task); + auto taskExceptionPartitions = getExceptionPartitions(task); auto taskExceptionCounts = getExceptionCounts(task); // update exception counts - ecounts = merge_ecounts(ecounts, taskExceptionCounts); - - // update nonConforming with delta - for(int i = 0; i < taskNonConformingRows.size(); ++i) { - auto t = taskNonConformingRows[i]; - t = std::make_tuple(std::get<0>(t) + rowDelta, std::get<1>(t)); - taskNonConformingRows[i] = t; - } + exceptionCounts = merge_ecounts(exceptionCounts, taskExceptionCounts); // debug trace issues using namespace std; @@ -1156,21 +1088,17 @@ namespace tuplex { if(task->type() == TaskType::RESOLVE) task_name = "resolve"; - setExceptionInfo(taskOutput, taskGeneralOutput, partitionToExceptionsMap); - std::copy(taskOutput.begin(), taskOutput.end(), std::back_inserter(output)); - std::copy(taskRemainingExceptions.begin(), taskRemainingExceptions.end(), std::back_inserter(remainingExceptions)); - std::copy(taskGeneralOutput.begin(), taskGeneralOutput.end(), std::back_inserter(generalOutput)); - std::copy(taskNonConformingRows.begin(), taskNonConformingRows.end(), std::back_inserter(nonConformingRows)); - - // compute the delta used to offset records! - for (const auto &p : taskOutput) - rowDelta += p->getNumRows(); - for (const auto &p : taskGeneralOutput) - rowDelta += p->getNumRows(); - rowDelta += taskNonConformingRows.size(); + partitionGroups.push_back(PartitionGroup( + taskNormalPartitions.size(), normalPartitions.size(), + taskGeneralPartitions.size(), generalPartitions.size(), + taskFallbackPartitions.size(), fallbackPartitions.size())); + std::copy(taskNormalPartitions.begin(), taskNormalPartitions.end(), std::back_inserter(normalPartitions)); + std::copy(taskGeneralPartitions.begin(), taskGeneralPartitions.end(), std::back_inserter(generalPartitions)); + std::copy(taskFallbackPartitions.begin(), taskFallbackPartitions.end(), std::back_inserter(fallbackPartitions)); + std::copy(taskExceptionPartitions.begin(), taskExceptionPartitions.end(), std::back_inserter(exceptionPartitions)); } - tstage->setMemoryResult(output, generalOutput, partitionToExceptionsMap, nonConformingRows, remainingExceptions, ecounts); + tstage->setMemoryResult(normalPartitions, generalPartitions, fallbackPartitions, partitionGroups, exceptionCounts); break; } case EndPointMode::HASHTABLE: { @@ -1262,6 +1190,140 @@ namespace tuplex { Logger::instance().defaultLogger().info(ss.str()); } +// void LocalBackend::setPartitionMergeInfo(const std::vector& normalPartitions, +// const std::vector& generalPartitions, const size_t generalStartInd, +// const std::vector& fallbackPartitions, const size_t fallbackStartInd, +// std::vector& partitionMergeInfo) { +// +// +// +// +// auto generalInd = 0; +// auto generalRowOff = 0; +// auto generalByteOff = 0; +// auto generalRowsInPartition = 0; +// const uint8_t *generalPtr = nullptr; +// if (!generalPartitions.empty()) { +// generalRowsInPartition = generalPartitions[0]->getNumRows(); +// generalPtr = generalPartitions[0]->lock(); +// } +// +// auto fallbackInd = 0; +// auto fallbackRowOff = 0; +// auto fallbackByteOff = 0; +// auto fallbackRowsInPartition = 0; +// const uint8_t *fallbackPtr = nullptr; +// if (!fallbackPartitions.empty()) { +// fallbackRowsInPartition = fallbackPartitions[0]->getNumRows(); +// fallbackPtr = fallbackPartitions[0]->lock(); +// } +// +// auto exceptionInd = 0; +// auto exceptionRowOff = 0; +// auto exceptionByteOff = 0; +// auto exceptionRowsInPartition = 0; +// const uint8_t *exceptionPtr = nullptr; +// if (!exceptionPartitions.empty()) { +// exceptionRowsInPartition = exceptionPartitions[0]->getNumRows(); +// exceptionPtr = exceptionPartitions[0]->lock(); +// } +// +// auto totalRowCounter = 0; +// auto rowDelta = 0; +// for (const auto &p : normalPartitions) { +// auto mergeInfo = MergeInfo(); +// mergeInfo.setRowDelta(rowDelta); +// auto numNormalRows = p->getNumRows(); +// +// auto generalRowCounter = 0; +// auto curGeneralStartInd = generalInd + generalStartInd; +// auto curGeneralRowOff = generalRowOff; +// auto curGeneralByteOff = generalByteOff; +// while (generalPtr && *((int64_t*)generalPtr) <= totalRowCounter + numNormalRows) { +// generalRowCounter++; +// totalRowCounter++; +// +// auto dataSize = ((int64_t*)generalPtr)[3] + 4*sizeof(int64_t); +// generalByteOff += dataSize; +// generalPtr += dataSize; +// generalRowOff++; +// +// if (generalRowOff == generalRowsInPartition) { +// generalPartitions[generalInd]->unlock(); +// generalInd++; +// if (generalInd < generalPartitions.size()) { +// generalPtr = generalPartitions[generalInd]->lock(); +// generalRowsInPartition = generalPartitions[generalInd]->getNumRows(); +// generalRowOff = 0; +// generalByteOff = 0; +// } else { +// generalPtr = nullptr; +// } +// } +// } +// mergeInfo.setGeneralInfo(generalRowCounter, curGeneralStartInd, curGeneralRowOff, curGeneralByteOff); +// +// auto fallbackRowCounter = 0; +// auto curFallbackStartInd = fallbackInd + fallbackStartInd; +// auto curFallbackRowOff = fallbackRowOff; +// auto curFallbackByteOff = fallbackByteOff; +// while (fallbackPtr && *((int64_t*)fallbackPtr) <= totalRowCounter + numNormalRows + generalRowCounter) { +// fallbackRowCounter++; +// totalRowCounter++; +// +// auto dataSize = ((int64_t*)fallbackPtr)[1] + 2*sizeof(int64_t); +// fallbackByteOff += dataSize; +// fallbackPtr += dataSize; +// fallbackRowOff++; +// +// if (fallbackRowOff == fallbackRowsInPartition) { +// fallbackPartitions[fallbackInd]->unlock(); +// fallbackInd++; +// if (fallbackInd < fallbackPartitions.size()) { +// fallbackPtr = fallbackPartitions[fallbackInd]->lock(); +// fallbackRowsInPartition = fallbackPartitions[fallbackInd]->getNumRows(); +// fallbackRowOff = 0; +// fallbackByteOff = 0; +// } else { +// fallbackPtr = nullptr; +// } +// } +// } +// mergeInfo.setFallbackInfo(fallbackRowCounter, curFallbackStartInd, curFallbackRowOff, curFallbackByteOff); +// +// auto exceptionRowCounter = 0; +// auto curExceptionStartInd = exceptionInd + exceptionStartInd; +// auto curExceptionRowOff = exceptionRowOff; +// auto curExceptionByteOff = exceptionByteOff; +// while (exceptionPtr && *((int64_t*)exceptionPtr) <= totalRowCounter + numNormalRows + generalRowCounter + fallbackRowCounter) { +// exceptionRowCounter++; +// totalRowCounter++; +// +// auto dataSize = ((int64_t*)exceptionPtr)[3] + 4*sizeof(int64_t); +// exceptionByteOff += dataSize; +// exceptionPtr += dataSize; +// exceptionRowOff++; +// +// if (exceptionRowOff == exceptionRowsInPartition) { +// exceptionPartitions[exceptionInd]->unlock(); +// exceptionInd++; +// if (exceptionInd < exceptionPartitions.size()) { +// exceptionPtr = exceptionPartitions[exceptionInd]->lock(); +// exceptionRowsInPartition = exceptionPartitions[exceptionInd]->getNumRows(); +// exceptionRowOff = 0; +// exceptionByteOff = 0; +// } else { +// exceptionPtr = nullptr; +// } +// } +// } +// mergeInfo.setExceptionInfo(exceptionRowCounter, curExceptionStartInd, curExceptionRowOff, curExceptionByteOff); +// +// rowDelta += numNormalRows + generalRowCounter + fallbackRowCounter + exceptionRowCounter; +// partitionMergeInfo.push_back(mergeInfo); +// } +// } + std::vector LocalBackend::resolveViaSlowPath( std::vector &tasks, bool merge_rows_in_order, @@ -1349,7 +1411,6 @@ namespace tuplex { std::vector tasks_result; std::vector resolveTasks; - std::vector exceptionPartitions; std::vector maxOrder; auto opsToCheck = tstage->operatorIDsWithResolvers(); @@ -1392,7 +1453,7 @@ namespace tuplex { else if(compareOrders(maxOrder, tt->getOrder())) maxOrder = tt->getOrder(); - if (tt->exceptionCounts().size() > 0 || tt->inputExceptionInfo().numExceptions > 0) { + if (tt->exceptionCounts().size() > 0 || !tt->generalPartitions().empty() || !tt->fallbackPartitions().empty()) { // task found with exceptions in it => exception partitions need to be resolved using special functor // hash-table output not yet supported @@ -1401,22 +1462,15 @@ namespace tuplex { assert(tt->getStageID() == stageID); - // On first execution, taks hold their own runtime exceptions so they can iterate over all partitions fully - auto runtimeExceptionInfo = ExceptionInfo(tt->getNumExceptions(), 0, 0, 0); - // save exception partitions to invalidate after all tasks have been completed - auto taskExceptionPartitions = tt->getExceptionPartitions(); - exceptionPartitions.insert(exceptionPartitions.end(), taskExceptionPartitions.begin(), taskExceptionPartitions.end()); - // this task needs to be resolved, b.c. exceptions occurred... // pretty simple, just create a ResolveTask auto exceptionInputSchema = tt->inputSchema(); // this could be specialized! auto rtask = new ResolveTask(stageID, tstage->context().id(), tt->getOutputPartitions(), - taskExceptionPartitions, - runtimeExceptionInfo, - tt->inputExceptions(), - tt->inputExceptionInfo(), + tt->getExceptionPartitions(), + tt->generalPartitions(), + tt->fallbackPartitions(), opsToCheck, exceptionInputSchema, compiledSlowPathOutputSchema, @@ -1506,14 +1560,6 @@ namespace tuplex { // cout<<"*** git "<inputExceptions()) { - p->invalidate(); - } - for (auto& p : exceptionPartitions) { - p->invalidate(); - } - // cout<<"*** total number of tasks to return is "<hasNextPartition()) { - Partition* p = rs->getNextPartition(); + while(rs->hasNextNormalPartition()) { + Partition* p = rs->getNextNormalPartition(); // lock partition! auto ptr = p->lockRaw(); diff --git a/tuplex/core/src/logical/CacheOperator.cc b/tuplex/core/src/logical/CacheOperator.cc index f71522f21..4a571599c 100644 --- a/tuplex/core/src/logical/CacheOperator.cc +++ b/tuplex/core/src/logical/CacheOperator.cc @@ -22,19 +22,16 @@ namespace tuplex { LogicalOperator::copyMembers(other); auto cop = (CacheOperator*)other; setSchema(other->getOutputSchema()); - _normalCasePartitions = cop->cachedPartitions(); - _generalCasePartitions = cop->cachedExceptions(); - _partitionToExceptionsMap = cop->partitionToExceptionsMap(); - // copy python objects and incref for each! - _py_objects = cop->_py_objects; - python::lockGIL(); - for(auto obj : _py_objects) - Py_XINCREF(obj); - python::unlockGIL(); + _normalPartitions = cop->cachedNormalPartitions(); + _generalPartitions = cop->cachedGeneralPartitions(); + _fallbackPartitions = cop->cachedFallbackPartitions(); + _partitionGroups = cop->partitionGroups(); + _optimizedSchema = cop->_optimizedSchema; _cached = cop->_cached; - _normalCaseRowCount = cop->_normalCaseRowCount; - _generalCaseRowCount = cop->_generalCaseRowCount; + _normalRowCount = cop->_normalRowCount; + _generalRowCount = cop->_generalRowCount; + _fallbackRowCount = cop->_fallbackRowCount; _columns = cop->_columns; _sample = cop->_sample; _storeSpecialized = cop->_storeSpecialized; @@ -60,7 +57,7 @@ namespace tuplex { // is operator cached? => return combined cost! // @NOTE: could make exceptions more expensive than normal rows if(isCached()) { - return _generalCaseRowCount + _normalCaseRowCount; + return _generalRowCount + _fallbackRowCount + _normalRowCount; } else { // return parent cost return parent()->cost(); @@ -73,30 +70,29 @@ namespace tuplex { _cached = true; // fetch both partitions (consume) from resultset + any unresolved exceptions - _normalCasePartitions = rs->partitions(); - for(auto p : _normalCasePartitions) + _normalPartitions = rs->normalPartitions(); + for(auto p : _normalPartitions) p->makeImmortal(); - // @TODO: there are two sorts of exceptions here... - // i.e. separate normal-case violations out from the rest - // => these can be stored separately for faster processing! - // @TODO: right now, everything just gets cached... + _generalPartitions = rs->generalPartitions(); + for(auto p : _generalPartitions) + p->makeImmortal(); - _generalCasePartitions = rs->exceptions(); - for(auto p : _generalCasePartitions) + _fallbackPartitions = rs->fallbackPartitions(); + for(auto p : _fallbackPartitions) p->makeImmortal(); - _partitionToExceptionsMap = rs->partitionToExceptionsMap(); + _partitionGroups = rs->partitionGroups(); // check whether partitions have different schema than the currently set one // => i.e. they have been specialized. - if(!_normalCasePartitions.empty()) { - _optimizedSchema = _normalCasePartitions.front()->schema(); + if(!_normalPartitions.empty()) { + _optimizedSchema = _normalPartitions.front()->schema(); assert(_optimizedSchema != Schema::UNKNOWN); } // if exceptions are empty, then force output schema to be the optimized one as well! - if(_generalCasePartitions.empty()) + if(_generalPartitions.empty()) setSchema(_optimizedSchema); // because the schema might have changed due to the result, need to update the dataset! @@ -104,36 +100,46 @@ namespace tuplex { getDataSet()->setSchema(getOutputSchema()); // print out some statistics about cached data - size_t cachedPartitionsMemory = 0; - size_t totalCachedPartitionsMemory = 0; - size_t totalCachedRows = 0; - size_t cachedExceptionsMemory = 0; - size_t totalCachedExceptionsMemory = 0; - size_t totalCachedExceptions = 0; - - int pos = 0; - for(auto p : _normalCasePartitions) { - totalCachedRows += p->getNumRows(); - cachedPartitionsMemory += p->bytesWritten(); - totalCachedPartitionsMemory += p->size(); - pos++; + size_t normalBytesWritten = 0; + size_t normalCapacity = 0; + size_t normalRows = 0; + size_t generalBytesWritten = 0; + size_t generalCapacity = 0; + size_t generalRows = 0; + size_t fallbackBytesWritten = 0; + size_t fallbackCapacity = 0; + size_t fallbackRows = 0; + + for(const auto &p : _normalPartitions) { + normalRows += p->getNumRows(); + normalBytesWritten += p->bytesWritten(); + normalCapacity += p->size(); } - for(auto p : _generalCasePartitions) { - totalCachedExceptions += p->getNumRows(); - cachedExceptionsMemory += p->bytesWritten(); - totalCachedExceptionsMemory += p->size(); + for(const auto &p : _generalPartitions) { + generalRows += p->getNumRows(); + generalBytesWritten += p->bytesWritten(); + generalCapacity += p->size(); } + for(const auto &p : _fallbackPartitions) { + fallbackRows += p->getNumRows(); + fallbackBytesWritten += p->bytesWritten(); + fallbackCapacity += p->size(); + } + - _normalCaseRowCount = totalCachedRows; - _generalCaseRowCount = totalCachedExceptions; + _normalRowCount = normalRows; + _generalRowCount = generalRows; + _fallbackRowCount = fallbackRows; stringstream ss; - ss<<"Cached "<getNumRows(); + } + for(const auto &p : _generalPartitions) { totalCachedRows += p->getNumRows(); } - for(auto p : _generalCasePartitions) { + for (const auto &p : _fallbackPartitions) { totalCachedRows += p->getNumRows(); } return totalCachedRows; diff --git a/tuplex/core/src/logical/ParallelizeOperator.cc b/tuplex/core/src/logical/ParallelizeOperator.cc index 770ac2d4f..3ea6916e6 100644 --- a/tuplex/core/src/logical/ParallelizeOperator.cc +++ b/tuplex/core/src/logical/ParallelizeOperator.cc @@ -12,15 +12,15 @@ namespace tuplex { ParallelizeOperator::ParallelizeOperator(const Schema& schema, - const std::vector& partitions, - const std::vector& columns) : _partitions(partitions), - _columnNames(columns) { + const std::vector& normalPartitions, + const std::vector& columns) : _normalPartitions(normalPartitions), + _columnNames(columns) { setSchema(schema); // parallelize operator holds data in memory for infinite lifetime. // => make partitions immortal - for(auto& partition : _partitions) + for(auto& partition : _normalPartitions) partition->makeImmortal(); // get sample @@ -31,15 +31,15 @@ namespace tuplex { _sample.clear(); // todo: general python objects from parallelize... - if(!_partitions.empty()) { + if(!_normalPartitions.empty()) { auto maxRows = getDataSet() ? getDataSet()->getContext()->getOptions().CSV_MAX_DETECTION_ROWS() : MAX_TYPE_SAMPLING_ROWS; // @TODO: change this variable/config name // fetch up to maxRows from partitions! - auto schema = _partitions.front()->schema(); + auto schema = _normalPartitions.front()->schema(); Deserializer ds(schema); size_t rowCount = 0; size_t numBytesRead = 0; - for(auto p : _partitions) { + for(auto p : _normalPartitions) { const uint8_t* ptr = p->lockRaw(); auto partitionRowCount = *(int64_t*)ptr; ptr += sizeof(int64_t); @@ -59,8 +59,8 @@ namespace tuplex { } } - std::vector ParallelizeOperator::getPartitions() { - return _partitions; + std::vector ParallelizeOperator::getNormalPartitions() { + return _normalPartitions; } bool ParallelizeOperator::good() const { @@ -69,7 +69,7 @@ namespace tuplex { std::vector ParallelizeOperator::getSample(const size_t num) const { // samples exist? - if(_partitions.empty() || 0 == num) { + if(_normalPartitions.empty() || 0 == num) { return std::vector(); } @@ -109,11 +109,11 @@ namespace tuplex { } LogicalOperator *ParallelizeOperator::clone() { - auto copy = new ParallelizeOperator(getOutputSchema(), _partitions, columns()); + auto copy = new ParallelizeOperator(getOutputSchema(), _normalPartitions, columns()); copy->setDataSet(getDataSet()); copy->copyMembers(this); - copy->setPythonObjects(_pythonObjects); - copy->setInputPartitionToPythonObjectsMap(_inputPartitionToPythonObjectsMap); + copy->setFallbackPartitions(_fallbackPartitions); + copy->setPartitionGroups(_partitionGroups); assert(getID() == copy->getID()); return copy; } @@ -121,7 +121,9 @@ namespace tuplex { int64_t ParallelizeOperator::cost() const { // use #rows stored in partitions int64_t numRows = 0; - for(auto p : _partitions) + for(const auto& p : _normalPartitions) + numRows += p->getNumRows(); + for(const auto& p : _fallbackPartitions) numRows += p->getNumRows(); return numRows; } diff --git a/tuplex/core/src/physical/BlockBasedTaskBuilder.cc b/tuplex/core/src/physical/BlockBasedTaskBuilder.cc index 80e21c0a1..111e97d8f 100644 --- a/tuplex/core/src/physical/BlockBasedTaskBuilder.cc +++ b/tuplex/core/src/physical/BlockBasedTaskBuilder.cc @@ -58,12 +58,23 @@ namespace tuplex { FunctionType* read_block_type = FunctionType::get(env().i64Type(), {env().i8ptrType(), env().i8ptrType(), env().i64Type(), + env().i64Type()->getPointerTo(0), + env().i64Type()->getPointerTo(0), + env().getBooleanType(), + env().i64Type()->getPointerTo(0), + env().i64Type()->getPointerTo(0), + env().i64Type()->getPointerTo(0), + env().i64Type()->getPointerTo(0), env().i8ptrType()->getPointerTo(0), + env().i64Type(), env().i64Type()->getPointerTo(0), + env().i64Type()->getPointerTo(0), + env().i64Type()->getPointerTo(0), + env().i8ptrType()->getPointerTo(0), env().i64Type(), env().i64Type()->getPointerTo(0), env().i64Type()->getPointerTo(0), - env().getBooleanType()}, false); + env().i64Type()->getPointerTo(0)}, false); // create function and set argNames Function* read_block_func = Function::Create(read_block_type, Function::ExternalLinkage, _desiredFuncName, env().getModule().get()); @@ -76,12 +87,24 @@ namespace tuplex { vector argNames{"userData", "inPtr", "inSize", - "expPtrs", - "expPtrSizes", - "numExps", "outNormalRowCount", "outBadRowCount", - "ignoreLastRow"}; + "ignoreLastRow", + "totalFilterCounter", + "totalNormalRowCounter", + "totalGeneralRowCounter", + "totalFallbackRowCounter", + "generalPartitions", + "numGeneralPartitions", + "generalIndexOffset", + "generalRowOffset", + "generalByteOffset", + "fallbackPartitions", + "numFallbackPartitions", + "fallbackIndexOffset", + "fallbackRowOffset", + "fallbackByteOffset"}; + for(int i = 0; i < argNames.size(); ++i) { args[i]->setName(argNames[i]); _args[argNames[i]] = args[i]; diff --git a/tuplex/core/src/physical/ExceptionSourceTaskBuilder.cc b/tuplex/core/src/physical/ExceptionSourceTaskBuilder.cc index b3bd3847f..dd37a1c07 100644 --- a/tuplex/core/src/physical/ExceptionSourceTaskBuilder.cc +++ b/tuplex/core/src/physical/ExceptionSourceTaskBuilder.cc @@ -106,89 +106,115 @@ namespace tuplex { assert(read_block_func); + // Initialize context auto& context = env().getContext(); + // Load function arguments auto argUserData = arg("userData"); auto argInPtr = arg("inPtr"); auto argInSize = arg("inSize"); - auto argExpPtrs = arg("expPtrs"); - auto argExpPtrSizes = arg("expPtrSizes"); - auto argNumExps = arg("numExps"); auto argOutNormalRowCount = arg("outNormalRowCount"); auto argOutBadRowCount = arg("outBadRowCount"); auto argIgnoreLastRow = arg("ignoreLastRow"); - + auto totalFilterCounter = arg("totalFilterCounter"); + auto totalNormalRowCounter = arg("totalNormalRowCounter"); + auto totalGeneralRowCounter = arg("totalGeneralRowCounter"); + auto totalFallbackRowCounter = arg("totalFallbackRowCounter"); + auto generalPartitions = arg("generalPartitions"); + auto numGeneralPartitions = arg("numGeneralPartitions"); + auto generalIndexOffset = arg("generalIndexOffset"); + auto generalRowOffset = arg("generalRowOffset"); + auto generalByteOffset = arg("generalByteOffset"); + auto fallbackPartitions = arg("fallbackPartitions"); + auto numFallbackPartitions = arg("numFallbackPartitions"); + auto fallbackIndexOffset = arg("fallbackIndexOffset"); + auto fallbackRowOffset = arg("fallbackRowOffset"); + auto fallbackByteOffset = arg("fallbackByteOffset"); + + // Initialize function body BasicBlock *bbBody = BasicBlock::Create(context, "entry", read_block_func); IRBuilder<> builder(bbBody); - - // there should be a check if argInSize is 0 - // if so -> handle separately, i.e. return immediately -#warning "add here argInSize > 0 check" - - - // compute endptr from args - Value *endPtr = builder.CreateGEP(argInPtr, argInSize, "endPtr"); - Value *currentPtrVar = builder.CreateAlloca(env().i8ptrType(), 0, nullptr, "readPtrVar"); - // later use combi of normal & bad rows - Value *outRowCountVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "outRowCountVar"); // counter for output row number (used for exception resolution) + // Define basic blocks for function + auto bbInitializeGeneral = llvm::BasicBlock::Create(context, "initialize_general", builder.GetInsertBlock()->getParent()); + auto bbDeclareFallback = llvm::BasicBlock::Create(context, "declare_fallback", builder.GetInsertBlock()->getParent()); + auto bbInitializeFallback = llvm::BasicBlock::Create(context, "initialize_fallback", builder.GetInsertBlock()->getParent()); + auto bbUpdateGeneralCond = llvm::BasicBlock::Create(context, "update_general_cond", builder.GetInsertBlock()->getParent()); + auto bbUpdateGeneralBody = llvm::BasicBlock::Create(context, "update_general_body", builder.GetInsertBlock()->getParent()); + auto bbNextGeneralPartition = llvm::BasicBlock::Create(context, "next_general_partition", builder.GetInsertBlock()->getParent()); + auto bbUpdateFallbackCond = llvm::BasicBlock::Create(context, "update_fallback_cond", builder.GetInsertBlock()->getParent()); + auto bbUpdateFallbackBody = llvm::BasicBlock::Create(context, "update_fallback_body", builder.GetInsertBlock()->getParent()); + auto bbNextFallbackPartition = llvm::BasicBlock::Create(context, "next_fallback_partition", builder.GetInsertBlock()->getParent()); + auto bbUpdateDone = llvm::BasicBlock::Create(context, "update_done", builder.GetInsertBlock()->getParent()); + auto bbLoopCondition = BasicBlock::Create(context, "loop_cond", read_block_func); + auto bbLoopBody = BasicBlock::Create(context, "loop_body", read_block_func); + auto bbLoopDone = BasicBlock::Create(context, "loop_done", read_block_func); + + // Initialize values for normal partitions + auto endPtr = builder.CreateGEP(argInPtr, argInSize, "endPtr"); + auto currentPtrVar = builder.CreateAlloca(env().i8ptrType(), 0, nullptr, "readPtrVar"); + auto outRowCountVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "outRowCountVar"); // counter for output row number (used for exception resolution) builder.CreateStore(argInPtr, currentPtrVar); - - Value *normalRowCountVar = argOutNormalRowCount; - Value *badRowCountVar = argOutBadRowCount; - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(argOutBadRowCount), - builder.CreateLoad(argOutNormalRowCount)), outRowCountVar); - - // current index into array of exception partitions - auto curExpIndVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "curExpIndVar"); - builder.CreateStore(env().i64Const(0), curExpIndVar); - - // current partition pointer - auto curExpPtrVar = builder.CreateAlloca(env().i8ptrType(), 0, nullptr, "curExpPtrVar"); - builder.CreateStore(builder.CreateLoad(argExpPtrs), curExpPtrVar); - - // number of rows total in current partition - auto curExpNumRowsVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "curExpNumRowsVar"); - builder.CreateStore(builder.CreateLoad(argExpPtrSizes), curExpNumRowsVar); - - // current row number in current partition - auto curExpCurRowVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "curExpCurRowVar"); - builder.CreateStore(env().i64Const(0), curExpCurRowVar); - - // accumulator used to update exception indices when rows are filtered, counts number of previously fitlered rows - auto expAccVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "expAccVar"); - builder.CreateStore(env().i64Const(0), expAccVar); - - // used to see if rows are filtered during pipeline execution - auto prevRowNumVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "prevRowNumVar"); - auto prevBadRowNumVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "prevBadRowNumVar"); - - // current number of exceptions prosessed across all partitions - auto expCurRowVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "expCurRowVar"); - builder.CreateStore(env().i64Const(0), expCurRowVar); - + // Update the arguments at the end + auto normalRowCountVar = argOutNormalRowCount; + auto badRowCountVar = argOutBadRowCount; + builder.CreateStore(builder.CreateAdd(builder.CreateLoad(argOutBadRowCount),builder.CreateLoad(argOutNormalRowCount)), outRowCountVar); // get num rows to read & process in loop - Value *numRowsVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "numRowsVar"); - Value *input_ptr = builder.CreatePointerCast(argInPtr, env().i64Type()->getPointerTo(0)); + auto numRowsVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "numRowsVar"); + auto input_ptr = builder.CreatePointerCast(argInPtr, env().i64Type()->getPointerTo(0)); builder.CreateStore(builder.CreateLoad(input_ptr), numRowsVar); // store current input ptr - Value *currentInputPtrVar = builder.CreateAlloca(env().i8ptrType(), 0, nullptr, "ptr"); + auto currentInputPtrVar = builder.CreateAlloca(env().i8ptrType(), 0, nullptr, "ptr"); builder.CreateStore(builder.CreateGEP(argInPtr, env().i32Const(sizeof(int64_t))), currentInputPtrVar); - - // variable for current row number... - Value *rowVar = builder.CreateAlloca(env().i64Type(), 0, nullptr); + auto rowVar = builder.CreateAlloca(env().i64Type(), 0, nullptr); builder.CreateStore(env().i64Const(0), rowVar); - BasicBlock* bbLoopCondition = BasicBlock::Create(context, "loop_cond", read_block_func); - BasicBlock* bbLoopBody = BasicBlock::Create(context, "loop_body", read_block_func); - BasicBlock* bbLoopDone = BasicBlock::Create(context, "loop_done", read_block_func); + // used to see if rows are filtered during pipeline execution + auto prevRowNumVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "prevRowNumVar"); + auto prevBadRowNumVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "prevBadRowNumVar"); - // go from entry block to loop body + // Initialize values for index updating + // uint8_t *curGeneralPtr; + // int64_t curGeneralNumRows = 0; + // if (*generalIndexOffset < numGeneralPartitions) { + // curGeneralPtr = generalPartitions[*generalIndexOffset]; + // curGeneralNumRows = *curGeneralPtr; + // curGeneralPtr += sizeof(int64_t) + *generalByteOffset; + // } + auto curGeneralPtr = builder.CreateAlloca(env().i8ptrType(), 0, nullptr, "curGeneralPtr"); + auto curGeneralNumRows = builder.CreateAlloca(env().i64Type(), 0, nullptr, "curGeneralNumRows"); + builder.CreateStore(env().i64Const(0), curGeneralNumRows); + auto shouldInitializeGeneral = builder.CreateICmpSLT(builder.CreateLoad(generalIndexOffset), numGeneralPartitions); + builder.CreateCondBr(shouldInitializeGeneral, bbInitializeGeneral, bbDeclareFallback); + + builder.SetInsertPoint(bbInitializeGeneral); + builder.CreateStore(builder.CreateLoad(builder.CreateGEP(generalPartitions, builder.CreateLoad(generalIndexOffset))), curGeneralPtr); + builder.CreateStore(builder.CreateLoad(builder.CreatePointerCast(builder.CreateLoad(curGeneralPtr), env().i64ptrType())), curGeneralNumRows); + builder.CreateStore(builder.CreateGEP(builder.CreateLoad(curGeneralPtr), builder.CreateAdd(env().i64Const(sizeof(int64_t)), builder.CreateLoad(generalByteOffset))), curGeneralPtr); + builder.CreateBr(bbDeclareFallback); + + // uint8_t *curFallbackPtr; + // int64_t curFallbackNumRows = 0; + // if (*fallbackIndexOffset < numFallbackPartitions) { + // curFallbackPtr = fallbackPartitions[*fallbackIndexOffset]; + // curFallbackNumRows = *curFallbackPtr; + // curFallbackPtr += sizeof(int64_t) + *fallbackByteOffset; + // } + builder.SetInsertPoint(bbDeclareFallback); + auto curFallbackPtr = builder.CreateAlloca(env().i8ptrType(), 0, nullptr, "curFallbackPtr"); + auto curFallbackNumRows = builder.CreateAlloca(env().i64Type(), 0, nullptr, "curFallbackNumRows"); + builder.CreateStore(env().i64Const(0), curFallbackNumRows); + auto shouldInitializeFallback = builder.CreateICmpSLT(builder.CreateLoad(fallbackIndexOffset), numFallbackPartitions); + builder.CreateCondBr(shouldInitializeFallback, bbInitializeFallback, bbLoopBody); + + builder.SetInsertPoint(bbInitializeFallback); + builder.CreateStore(builder.CreateLoad(builder.CreateGEP(fallbackPartitions, builder.CreateLoad(fallbackIndexOffset))), curFallbackPtr); + builder.CreateStore(builder.CreateLoad(builder.CreatePointerCast(builder.CreateLoad(curFallbackPtr), env().i64ptrType())), curFallbackNumRows); + builder.CreateStore(builder.CreateGEP(builder.CreateLoad(curFallbackPtr), builder.CreateAdd(env().i64Const(sizeof(int64_t)), builder.CreateLoad(fallbackByteOffset))), curFallbackPtr); builder.CreateBr(bbLoopBody); - // -------------- // loop condition builder.SetInsertPoint(bbLoopCondition); Value *row = builder.CreateLoad(rowVar, "row"); @@ -198,8 +224,6 @@ namespace tuplex { auto cond = builder.CreateICmpSLT(nextRow, numRows); builder.CreateCondBr(cond, bbLoopBody, bbLoopDone); - - // --------- // loop body builder.SetInsertPoint(bbLoopBody); // decode tuple from input ptr @@ -207,9 +231,8 @@ namespace tuplex { ft.init(_inputRowType); Value* oldInputPtr = builder.CreateLoad(currentInputPtrVar, "ptr"); ft.deserializationCode(builder, oldInputPtr); - Value* newInputPtr = builder.CreateGEP(oldInputPtr, ft.getSize(builder)); // @TODO: maybe use inbounds + Value* newInputPtr = builder.CreateGEP(oldInputPtr, ft.getSize(builder)); builder.CreateStore(newInputPtr, currentInputPtrVar); - builder.CreateStore(builder.CreateLoad(outRowCountVar), prevRowNumVar); builder.CreateStore(builder.CreateLoad(badRowCountVar), prevBadRowNumVar); @@ -218,117 +241,113 @@ namespace tuplex { Value *inputRowSize = ft.getSize(builder); processRow(builder, argUserData, ft, normalRowCountVar, badRowCountVar, outRowCountVar, oldInputPtr, inputRowSize, terminateEarlyOnLimitCode, pipeline() ? pipeline()->getFunction() : nullptr); + builder.CreateStore(builder.CreateAdd(env().i64Const(1), builder.CreateLoad(totalNormalRowCounter)), totalNormalRowCounter); + // After row is processed we need to update exceptions if the row was filtered // We check that: outRowCountVar == prevRowCountVar (no new row was emitted) // badRowCountVar == prevBadRowNumVar (it was filtered, not just an exception) - // expCurRowVar < argNumExps (we still have exceptions that need updating) - // if (outRowCountVar == prevRowNumVar && badRowCountVar == prevBadRowNumVar && expCurRowVar < argNumExps) - auto bbExpUpdate = llvm::BasicBlock::Create(context, "exp_update", builder.GetInsertBlock()->getParent()); - auto expCond = builder.CreateICmpEQ(builder.CreateLoad(outRowCountVar), builder.CreateLoad(prevRowNumVar)); - auto badCond = builder.CreateICmpEQ(builder.CreateLoad(badRowCountVar), builder.CreateLoad(prevBadRowNumVar)); - auto remCond = builder.CreateICmpSLT(builder.CreateLoad(expCurRowVar), argNumExps); - builder.CreateCondBr(builder.CreateAnd(remCond, builder.CreateAnd(badCond, expCond)), bbExpUpdate, bbLoopCondition); - - // We have determined a row is filtered so we can iterate through all the input exceptions that occured before this - // row and decrement their row index with the number of previously filtered rows (expAccVar). - // This is a while loop that iterates over all exceptions that occured before this filtered row - // - // while (expCurRowVar < numExps && ((*outNormalRowCount - 1) + expCurRowVar) >= *((int64_t *) curExpPtrVar)) - // - // *outNormalRowCount - 1 changes cardinality of rows into its row index, we add the number of previously processed - // exceptions because the normal rows do not know about the exceptions to obtain the correct index. It's then compared - // against the row index of the exception pointed to currently in our partition - builder.SetInsertPoint(bbExpUpdate); - auto bbIncrement = llvm::BasicBlock::Create(context, "increment", builder.GetInsertBlock()->getParent()); - auto bbIncrementDone = llvm::BasicBlock::Create(context, "increment_done", builder.GetInsertBlock()->getParent()); - auto curExpRowIndPtr = builder.CreatePointerCast(builder.CreateLoad(curExpPtrVar), env().i64ptrType()); - auto incCond = builder.CreateICmpSGE(builder.CreateAdd(builder.CreateLoad(badRowCountVar), builder.CreateAdd(builder.CreateSub(builder.CreateLoad(normalRowCountVar), env().i64Const(1)), builder.CreateLoad(expCurRowVar))), builder.CreateLoad(curExpRowIndPtr)); - auto remCond2 = builder.CreateICmpSLT(builder.CreateLoad(expCurRowVar), argNumExps); - builder.CreateCondBr(builder.CreateAnd(remCond2, incCond), bbIncrement, bbIncrementDone); - - // Body of the while loop we need to - // 1. decrement the current exception row index by the expAccVar (all rows previously filtered) - // 2. Increment our partition pointer to next exception - // 3. Change partitions if we've exhausted all exceptions in the current, but still have more remaining in tototal - // - // Increment to the next exception by adding eSize and 4*sizeof(int64_t) to the partition pointer - // *((int64_t *) curExpPtrVar) -= expAccVar; - // curExpPtrVar += 4 * sizeof(int64_t) + ((int64_t *)curExpPtrVar)[3]; - // expCurRowVar += 1; - // curExpCurRowVar += 1; - // - // Finally we check to see if a partition change is required - // if (expCurRowVar < numExps && curExpCurRowVar >= curExpNumRowsVar) - builder.SetInsertPoint(bbIncrement); - // Change row index and go to next exception in partition - auto curExpRowIndPtr2 = builder.CreatePointerCast(builder.CreateLoad(curExpPtrVar), env().i64Type()->getPointerTo(0)); - builder.CreateStore(builder.CreateSub(builder.CreateLoad(curExpRowIndPtr2), builder.CreateLoad(expAccVar)), curExpRowIndPtr2); - auto curOffset = builder.CreateAlloca(env().i64Type(), 0, nullptr, "curOffset"); - builder.CreateStore(builder.CreateLoad(builder.CreateGEP(curExpRowIndPtr2, env().i64Const(3))), curOffset); - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(curOffset), env().i64Const(4 * sizeof(int64_t))), curOffset); - builder.CreateStore(builder.CreateGEP(builder.CreateLoad(curExpPtrVar), builder.CreateLoad(curOffset)), curExpPtrVar); - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(curExpCurRowVar), env().i64Const(1)), curExpCurRowVar); - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(expCurRowVar), env().i64Const(1)), expCurRowVar); - // See if partition change needs to occur - auto bbChange = llvm::BasicBlock::Create(context, "change", builder.GetInsertBlock()->getParent()); - auto changeCond = builder.CreateICmpSGE(builder.CreateLoad(curExpCurRowVar), builder.CreateLoad(curExpNumRowsVar)); - auto leftCond = builder.CreateICmpSLT(builder.CreateLoad(expCurRowVar), argNumExps); - builder.CreateCondBr(builder.CreateAnd(leftCond, changeCond), bbChange, bbExpUpdate); - - // This block changes to the next partition - // curExpCurRowVar = 0; - // curExpIndVar = curExpIndVar + 1; - // curExpPtrVar = expPtrs[curExpIndVar]; - // curExpNumRowsVar = expPtrSizes[curExpIndVar]; - builder.SetInsertPoint(bbChange); - builder.CreateStore(env().i64Const(0), curExpCurRowVar); - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(curExpIndVar), env().i64Const(1)), curExpIndVar); - builder.CreateStore(builder.CreateLoad(builder.CreateGEP(argExpPtrs, builder.CreateLoad(curExpIndVar))), curExpPtrVar); - builder.CreateStore(builder.CreateLoad(builder.CreateGEP(argExpPtrSizes, builder.CreateLoad(curExpIndVar))), curExpNumRowsVar); - builder.CreateBr(bbExpUpdate); - - // Finally increment the expAccVar by 1 becasue a row was filtered - // expAccVar += 1; - builder.SetInsertPoint(bbIncrementDone); - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(expAccVar), env().i64Const(1)), expAccVar); + // if (outRowCountVar == prevRowNumVar && badRowCountVar == prevBadRowNumVar) + auto rowNotEmitted = builder.CreateICmpEQ(builder.CreateLoad(outRowCountVar), builder.CreateLoad(prevRowNumVar)); + auto rowNotException = builder.CreateICmpEQ(builder.CreateLoad(badRowCountVar), builder.CreateLoad(prevBadRowNumVar)); + builder.CreateCondBr(builder.CreateAnd(rowNotEmitted, rowNotException), bbUpdateGeneralCond, bbLoopCondition); + + // Update general cond + // while (*generalRowOffset < curGeneralNumRows && *((int64_t*)curGeneralPtr) < curNormalRowInd + totalGeneralRowCounter) + builder.SetInsertPoint(bbUpdateGeneralCond); + auto generalRowsRemainCond = builder.CreateICmpSLT(builder.CreateLoad(generalRowOffset), builder.CreateLoad(curGeneralNumRows)); + auto curGeneralRowInd = builder.CreateLoad(builder.CreatePointerCast(builder.CreateLoad(curGeneralPtr), env().i64ptrType())); + auto generalIndexLTCond = builder.CreateICmpSLT(curGeneralRowInd, builder.CreateAdd(builder.CreateLoad(totalGeneralRowCounter), builder.CreateLoad(totalNormalRowCounter))); + builder.CreateCondBr(builder.CreateAnd(generalRowsRemainCond, generalIndexLTCond), bbUpdateGeneralBody, bbUpdateFallbackCond); + + // Update general body + // generalNewRowInd = *((int64_t*)curGeneralPtr) - totalFilterCounter; + // *((int64_t*)curGeneralPtr) = generalNewRowInd; + // auto generalRowDelta = 4 * sizeof(int64_t) + ((int64_t*)curGeneralPtr)[3]; + // curGeneralPtr += generalRowDelta; + // *generalByteOffset += generalRowDelta; + // *generalRowOffset++; + // *totalGeneralRowCounter++; + builder.SetInsertPoint(bbUpdateGeneralBody); + auto generalNewRowInd = builder.CreateSub(builder.CreateLoad(builder.CreatePointerCast(builder.CreateLoad(curGeneralPtr), env().i64ptrType())), builder.CreateLoad(totalFilterCounter)); + builder.CreateStore(generalNewRowInd, builder.CreatePointerCast(builder.CreateLoad(curGeneralPtr), env().i64ptrType())); + auto generalRowDelta = builder.CreateAdd(builder.CreateLoad(builder.CreateGEP(builder.CreatePointerCast(builder.CreateLoad(curGeneralPtr), env().i64ptrType()), env().i64Const(3))), env().i64Const(4 * sizeof(int64_t))); + builder.CreateStore(builder.CreateGEP(builder.CreateLoad(curGeneralPtr), generalRowDelta), curGeneralPtr); + builder.CreateStore(builder.CreateAdd(generalRowDelta, builder.CreateLoad(generalByteOffset)), generalByteOffset); + builder.CreateStore(builder.CreateAdd(env().i64Const(1), builder.CreateLoad(generalRowOffset)), generalRowOffset); + builder.CreateStore(builder.CreateAdd(env().i64Const(1), builder.CreateLoad(totalGeneralRowCounter)), totalGeneralRowCounter); + + // if (*generalRowOffset == curGeneralNumRows && *generalIndexOffset < numGeneralPartitions - 1) + auto generalNoRowsRemain = builder.CreateICmpEQ(builder.CreateLoad(generalRowOffset), builder.CreateLoad(curGeneralNumRows)); + auto generalHasMorePartitions = builder.CreateICmpSLT(builder.CreateLoad(generalIndexOffset), builder.CreateSub(numGeneralPartitions, env().i64Const(1))); + builder.CreateCondBr(builder.CreateAnd(generalNoRowsRemain, generalHasMorePartitions), bbNextGeneralPartition, bbUpdateGeneralCond); + + // generalIndexOffset += 1; + // *generalRowOffset = 0; + // *generalByteOffset = 0; + // curGeneralPtr = generalPartitions[*generalIndexOffset]; + // curGeneralNumRows = *((int64_t*)curGeneralPtr); + // curGeneralPtr += sizeof(int64_t); + builder.SetInsertPoint(bbNextGeneralPartition); + builder.CreateStore(builder.CreateAdd(builder.CreateLoad(generalIndexOffset), env().i64Const(1)), generalIndexOffset); + builder.CreateStore(env().i64Const(0), generalRowOffset); + builder.CreateStore(env().i64Const(0), generalByteOffset); + builder.CreateStore(builder.CreateLoad(builder.CreateGEP(generalPartitions, builder.CreateLoad(generalIndexOffset))), curGeneralPtr); + builder.CreateStore(builder.CreateLoad(builder.CreatePointerCast(builder.CreateLoad(curGeneralPtr), env().i64ptrType())), curGeneralNumRows); + builder.CreateStore(builder.CreateGEP(builder.CreateLoad(curGeneralPtr), builder.CreateAdd(env().i64Const(sizeof(int64_t)), builder.CreateLoad(generalByteOffset))), curGeneralPtr); + builder.CreateBr(bbUpdateGeneralCond); + + // Update fallback cond + // while (*fallbackRowOffset < curFallbackNumRows && *((int64_t*)curFallbackPtr) < curNormalRowInd + totalGeneralRowCounter + totalFallbackRowCounter) + builder.SetInsertPoint(bbUpdateFallbackCond); + auto fallbackRowsRemainCond = builder.CreateICmpSLT(builder.CreateLoad(fallbackRowOffset), builder.CreateLoad(curFallbackNumRows)); + auto curFallbackRowInd = builder.CreateLoad(builder.CreatePointerCast(builder.CreateLoad(curFallbackPtr), env().i64ptrType())); + auto fallbackIndexLTCond = builder.CreateICmpSLT(curFallbackRowInd, builder.CreateAdd(builder.CreateLoad(totalGeneralRowCounter), builder.CreateAdd(builder.CreateLoad(totalFallbackRowCounter), builder.CreateLoad(totalNormalRowCounter)))); + builder.CreateCondBr(builder.CreateAnd(fallbackRowsRemainCond, fallbackIndexLTCond), bbUpdateFallbackBody, bbUpdateDone); + + // Update fallback body + // fallbackNewRowInd = *((int64_t*)curFallbackPtr) - totalFilterCounter; + // *((int64_t*)curFallbackPtr) = fallbackNewRowInd; + // auto fallbackRowDelta = 4 * sizeof(int64_t) + ((int64_t*)curFallbackPtr)[3]; + // curFallbackPtr += fallbackRowDelta; + // *fallbackByteOffset += fallbackRowDelta; + // *fallbackRowOffset++; + // *totalFallbackRowCounter++; + builder.SetInsertPoint(bbUpdateFallbackBody); + auto fallbackNewRowInd = builder.CreateSub(builder.CreateLoad(builder.CreatePointerCast(builder.CreateLoad(curFallbackPtr), env().i64ptrType())), builder.CreateLoad(totalFilterCounter)); + builder.CreateStore(fallbackNewRowInd, builder.CreatePointerCast(builder.CreateLoad(curFallbackPtr), env().i64ptrType())); + auto fallbackRowDelta = builder.CreateAdd(builder.CreateLoad(builder.CreateGEP(builder.CreatePointerCast(builder.CreateLoad(curFallbackPtr), env().i64ptrType()), env().i64Const(3))), env().i64Const(4 * sizeof(int64_t))); + builder.CreateStore(builder.CreateGEP(builder.CreateLoad(curFallbackPtr), fallbackRowDelta), curFallbackPtr); + builder.CreateStore(builder.CreateAdd(fallbackRowDelta, builder.CreateLoad(fallbackByteOffset)), fallbackByteOffset); + builder.CreateStore(builder.CreateAdd(env().i64Const(1), builder.CreateLoad(fallbackRowOffset)), fallbackRowOffset); + builder.CreateStore(builder.CreateAdd(env().i64Const(1), builder.CreateLoad(totalFallbackRowCounter)), totalFallbackRowCounter); + + // if (*fallbackRowOffset == curFallbackNumRows && *fallbackIndexOffset < numFallbackPartitions - 1) + auto fallbackNoRowsRemain = builder.CreateICmpEQ(builder.CreateLoad(fallbackRowOffset), builder.CreateLoad(curFallbackNumRows)); + auto fallbackHasMorePartitions = builder.CreateICmpSLT(builder.CreateLoad(fallbackIndexOffset), builder.CreateSub(numFallbackPartitions, env().i64Const(1))); + builder.CreateCondBr(builder.CreateAnd(fallbackNoRowsRemain, fallbackHasMorePartitions), bbNextFallbackPartition, bbUpdateFallbackCond); + + // fallbackIndexOffset += 1; + // *fallbackRowOffset = 0; + // *fallbackByteOffset = 0; + // curFallbackPtr = fallbackPartitions[*fallbackIndexOffset]; + // curFallbackNumRows = *((int64_t*)curFallbackPtr); + // curFallbackPtr += sizeof(int64_t); + builder.SetInsertPoint(bbNextFallbackPartition); + builder.CreateStore(builder.CreateAdd(builder.CreateLoad(fallbackIndexOffset), env().i64Const(1)), fallbackIndexOffset); + builder.CreateStore(env().i64Const(0), fallbackRowOffset); + builder.CreateStore(env().i64Const(0), fallbackByteOffset); + builder.CreateStore(builder.CreateLoad(builder.CreateGEP(fallbackPartitions, builder.CreateLoad(fallbackIndexOffset))), curFallbackPtr); + builder.CreateStore(builder.CreateLoad(builder.CreatePointerCast(builder.CreateLoad(curFallbackPtr), env().i64ptrType())), curFallbackNumRows); + builder.CreateStore(builder.CreateGEP(builder.CreateLoad(curFallbackPtr), builder.CreateAdd(env().i64Const(sizeof(int64_t)), builder.CreateLoad(fallbackByteOffset))), curFallbackPtr); + builder.CreateBr(bbUpdateFallbackCond); + + // Update done + // totalFilterCounter += 1; + builder.SetInsertPoint(bbUpdateDone); + builder.CreateStore(builder.CreateAdd(env().i64Const(1), builder.CreateLoad(totalFilterCounter)), totalFilterCounter); builder.CreateBr(bbLoopCondition); - // --------- - // loop done builder.SetInsertPoint(bbLoopDone); - auto bbRemainingExceptions = llvm::BasicBlock::Create(context, "remaining_exceptions", builder.GetInsertBlock()->getParent()); - auto bbRemainingDone = llvm::BasicBlock::Create(context, "remaining_done", builder.GetInsertBlock()->getParent()); - auto expRemaining = builder.CreateICmpSLT(builder.CreateLoad(expCurRowVar), argNumExps); - builder.CreateCondBr(expRemaining, bbRemainingExceptions, bbRemainingDone); - - // We have processed all of the normal rows. If we have not exhausted all of our exceptions - // we just iterate through the remaining exceptions and decrement their row index by the final - // value of expAccVar counting our filtered rows. - // Same code as above, but just don't need to keep updating expAccVar by 1. - builder.SetInsertPoint(bbRemainingExceptions); - auto curExpRowIndPtr3 = builder.CreatePointerCast(builder.CreateLoad(curExpPtrVar), env().i64Type()->getPointerTo(0)); - builder.CreateStore(builder.CreateSub(builder.CreateLoad(curExpRowIndPtr3), builder.CreateLoad(expAccVar)), curExpRowIndPtr3); - auto curOffset2 = builder.CreateAlloca(env().i64Type(), 0, nullptr, "curOffset2"); - builder.CreateStore(builder.CreateLoad(builder.CreateGEP(curExpRowIndPtr3, env().i64Const(3))), curOffset2); - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(curOffset2), env().i64Const(4 * sizeof(int64_t))), curOffset2); - builder.CreateStore(builder.CreateGEP(builder.CreateLoad(curExpPtrVar), builder.CreateLoad(curOffset2)), curExpPtrVar); - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(curExpCurRowVar), env().i64Const(1)), curExpCurRowVar); - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(expCurRowVar), env().i64Const(1)), expCurRowVar); - - auto bbChange2 = llvm::BasicBlock::Create(context, "change2", builder.GetInsertBlock()->getParent()); - auto changeCond2 = builder.CreateICmpSGE(builder.CreateLoad(curExpCurRowVar), builder.CreateLoad(curExpNumRowsVar)); - auto leftCond2 = builder.CreateICmpSLT(builder.CreateLoad(expCurRowVar), argNumExps); - builder.CreateCondBr(builder.CreateAnd(leftCond2, changeCond2), bbChange2, bbLoopDone); - - builder.SetInsertPoint(bbChange2); - builder.CreateStore(env().i64Const(0), curExpCurRowVar); - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(curExpIndVar), env().i64Const(1)), curExpIndVar); - builder.CreateStore(builder.CreateLoad(builder.CreateGEP(argExpPtrs, builder.CreateLoad(curExpIndVar))), curExpPtrVar); - builder.CreateStore(builder.CreateLoad(builder.CreateGEP(argExpPtrSizes, builder.CreateLoad(curExpIndVar))), curExpNumRowsVar); - builder.CreateBr(bbLoopDone); - - builder.SetInsertPoint(bbRemainingDone); - // if intermediate callback desired, perform! if(_intermediateType != python::Type::UNKNOWN && !_intermediateCallbackName.empty()) { writeIntermediate(builder, argUserData, _intermediateCallbackName); } diff --git a/tuplex/core/src/physical/PhysicalPlan.cc b/tuplex/core/src/physical/PhysicalPlan.cc index 2399edf6f..8aa49a5fb 100644 --- a/tuplex/core/src/physical/PhysicalPlan.cc +++ b/tuplex/core/src/physical/PhysicalPlan.cc @@ -199,9 +199,11 @@ namespace tuplex { auto t = ops.front()->type(); assert(t == LogicalOperatorType::PARALLELIZE || t == LogicalOperatorType::CACHE); if (t == LogicalOperatorType::PARALLELIZE) - hasInputExceptions = !((ParallelizeOperator *)ops.front())->getPythonObjects().empty(); - if (t == LogicalOperatorType::CACHE) - hasInputExceptions = !((CacheOperator *)ops.front())->cachedExceptions().empty(); + hasInputExceptions = !((ParallelizeOperator *) ops.front())->getFallbackPartitions().empty(); + if (t == LogicalOperatorType::CACHE) { + auto cop = (CacheOperator *) ops.front(); + hasInputExceptions = !cop->cachedGeneralPartitions().empty() || !cop->cachedFallbackPartitions().empty(); + } } } @@ -401,14 +403,15 @@ namespace tuplex { // fill in data to start processing from operators. if (inputNode->type() == LogicalOperatorType::PARALLELIZE) { auto pop = dynamic_cast(inputNode); assert(inputNode); - stage->setInputPartitions(pop->getPartitions()); - stage->setInputExceptions(pop->getPythonObjects()); - stage->setPartitionToExceptionsMap(pop->getInputPartitionToPythonObjectsMap()); + stage->setInputPartitions(pop->getNormalPartitions()); + stage->setFallbackPartitions(pop->getFallbackPartitions()); + stage->setPartitionGroups(pop->getPartitionGroups()); } else if(inputNode->type() == LogicalOperatorType::CACHE) { auto cop = dynamic_cast(inputNode); assert(inputNode); - stage->setInputPartitions(cop->cachedPartitions()); - stage->setInputExceptions(cop->cachedExceptions()); - stage->setPartitionToExceptionsMap(cop->partitionToExceptionsMap()); + stage->setInputPartitions(cop->cachedNormalPartitions()); + stage->setGeneralPartitions(cop->cachedGeneralPartitions()); + stage->setFallbackPartitions(cop->cachedFallbackPartitions()); + stage->setPartitionGroups(cop->partitionGroups()); } else if(inputNode->type() == LogicalOperatorType::FILEINPUT) { auto csvop = dynamic_cast(inputNode); stage->setInputFiles(csvop->getURIs(), csvop->getURISizes()); diff --git a/tuplex/core/src/physical/ResolveTask.cc b/tuplex/core/src/physical/ResolveTask.cc index b3c70c876..cb4fb6801 100644 --- a/tuplex/core/src/physical/ResolveTask.cc +++ b/tuplex/core/src/physical/ResolveTask.cc @@ -228,8 +228,21 @@ namespace tuplex { // needs to be put into separate list of python objects... // save index as well to merge back in order. - assert(_rowNumber >= _numUnresolved); - _py_nonconfirming.push_back(std::make_tuple(_rowNumber - _numUnresolved, out_row)); + assert(_currentRowNumber >= _numUnresolved); + auto pickledObject = python::pickleObject(python::getMainModule(), out_row); + auto pyObjectSize = pickledObject.size(); + auto bufSize = 4 * sizeof(int64_t) + pyObjectSize; + + uint8_t *buf = new uint8_t[bufSize]; + auto ptr = buf; + *((int64_t*)ptr) = _currentRowNumber - _numUnresolved; ptr += sizeof(int64_t); + *((int64_t*)ptr) = ecToI64(ExceptionCode::PYTHON_PARALLELIZE); ptr += sizeof(int64_t); + *((int64_t*)ptr) = -1; ptr += sizeof(int64_t); + *((int64_t*)ptr) = pyObjectSize; ptr += sizeof(int64_t); + memcpy(ptr, pickledObject.c_str(), pyObjectSize); + rowToMemorySink(owner(), _fallbackSink, Schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::STRING})), + 0, contextID(), buf, bufSize); + delete[] buf; } int64_t ResolveTask::mergeNormalRow(const uint8_t *buf, int64_t bufSize) { @@ -273,7 +286,7 @@ namespace tuplex { // exceptionCode, exceptionOperatorID, rowNumber, size int64_t ecCode = ecToI64(ExceptionCode::NORMALCASEVIOLATION); int64_t ecOpID = 0; // dummy - int64_t rowNumber = _currentRowNumber; + int64_t rowNumber = _currentRowNumber - _numUnresolved; uint8_t* except_buf = serializeExceptionToMemory(ecCode, ecOpID, rowNumber, buf, bufSize, &except_size); // sink row to type violation exceptions with commonCaseOutputSchema @@ -413,7 +426,7 @@ namespace tuplex { // => need a list of for which opIds/codes resolvers are available... ///.... _numUnresolved++; - exceptionCallback(ecCode, operatorID, _rowNumber, ebuf, eSize); + exceptionCallback(ecCode, operatorID, _currentRowNumber, ebuf, eSize); return; } @@ -648,7 +661,11 @@ namespace tuplex { mergeRow(buf, serialized_length, BUF_FORMAT_GENERAL_OUTPUT); delete [] buf; } else { - writePythonObject(rowObj); + if(PyTuple_Check(rowObj) && PyTuple_Size(rowObj) == 1) { + writePythonObject(PyTuple_GetItem(rowObj, 0)); + } else { + writePythonObject(rowObj); + } } // Py_XDECREF(rowObj); } @@ -676,7 +693,7 @@ namespace tuplex { // fallback 3: still exception? save... if(resCode == -1) { _numUnresolved++; - exceptionCallback(ecCode, operatorID, _rowNumber, ebuf, eSize); + exceptionCallback(ecCode, operatorID, _currentRowNumber, ebuf, eSize); } } @@ -711,7 +728,7 @@ namespace tuplex { } // abort if no exceptions! - if(_numRuntimeExceptions == 0 && _numInputExceptions == 0) + if(_exceptionPartitions.empty() && _generalPartitions.empty() && _fallbackPartitions.empty()) return; // special case: no functor & no python pipeline functor given @@ -724,12 +741,12 @@ namespace tuplex { #endif // copy _generalCasePartitions over to base class - IExceptionableTask::setExceptions(_runtimeExceptions); + IExceptionableTask::setExceptions(_generalPartitions); // clear exceptions, because they have been resolved (or put to new exceptions!) // if task produced exceptions, they are stored in the IExceptionableTask class! // => no need to overwrite them, getter for iexceptionabletask has all info! - _runtimeExceptions.clear(); + _generalPartitions.clear(); _wallTime = timer.time(); return; @@ -742,23 +759,10 @@ namespace tuplex { // merge exceptions with normal rows after calling slow code over them... // basic idea is go over all exception partitions, execute row wise the resolution function // and merge the result back to the partitions - if (_numRuntimeExceptions > 0) { - // Initialize runtime exception to starting index - auto partition = _runtimeExceptions[_runtimeExceptionIndex]; - auto rowsLeftInPartition = partition->getNumRows() - _runtimeExceptionRowOffset; - const uint8_t *ptr = partition->lock() + _runtimeExceptionByteOffset; - - // Iterate over all runtime exceptions, may be accross multiple partitions - for (int i = 0; i < _numRuntimeExceptions; ++i) { - // Change partition once exhausted - if (rowsLeftInPartition == 0) { - partition->unlock(); - _runtimeExceptionIndex++; - partition = _runtimeExceptions[_runtimeExceptionIndex]; - rowsLeftInPartition = partition->getNumRows(); - ptr = partition->lock(); - } - + for (const auto &partition : _generalPartitions) { + const uint8_t *ptr = partition->lock(); + auto numRows = partition->getNumRows(); + for (int i = 0; i < numRows; ++i) { const uint8_t *ebuf = nullptr; int64_t ecCode = -1, operatorID = -1; size_t eSize = 0; @@ -769,42 +773,47 @@ namespace tuplex { ptr += delta; _rowNumber++; - rowsLeftInPartition--; } - // Unlock but wait to invalidate until all resolve tasks have finished partition->unlock(); + partition->invalidate(); } - // now process all of the input exceptions - if (_numInputExceptions > 0) { - // Initialize input exception to starting index - auto partition = _inputExceptions[_inputExceptionIndex]; - auto rowsLeftInPartition = partition->getNumRows() - _inputExceptionRowOffset; - const uint8_t *ptr = partition->lock() + _inputExceptionByteOffset; - - // Iterate over all input exceptions, may be accross multiple partitions - for (int i = 0; i < _numInputExceptions; ++i) { - // Change partition once exhausted - if (rowsLeftInPartition == 0) { - partition->unlock(); - _inputExceptionIndex++; - partition = _inputExceptions[_inputExceptionIndex]; - rowsLeftInPartition = partition->getNumRows(); - ptr = partition->lock(); - } + for (const auto &partition : _fallbackPartitions) { + const uint8_t *ptr = partition->lock(); + auto numRows = partition->getNumRows(); + for (int i = 0; i < numRows; ++i) { + const uint8_t *ebuf = nullptr; + int64_t ecCode = -1, operatorID = -1; + size_t eSize = 0; + auto delta = deserializeExceptionFromMemory(ptr, &ecCode, &operatorID, &_currentRowNumber, &ebuf, + &eSize); + + processExceptionRow(ecCode, operatorID, ebuf, eSize); + ptr += delta; + _rowNumber++; + } + partition->unlock(); + partition->invalidate(); + } + + for (const auto &partition : _exceptionPartitions) { + const uint8_t *ptr = partition->lock(); + auto numRows = partition->getNumRows(); + for (int i = 0; i < numRows; ++i) { const uint8_t *ebuf = nullptr; int64_t ecCode = -1, operatorID = -1; size_t eSize = 0; auto delta = deserializeExceptionFromMemory(ptr, &ecCode, &operatorID, &_currentRowNumber, &ebuf, &eSize); + processExceptionRow(ecCode, operatorID, ebuf, eSize); + ptr += delta; _rowNumber++; - rowsLeftInPartition--; } - // Unlock but wait to invalidate until all resolve tasks have finished partition->unlock(); + partition->invalidate(); } // merging is done, unlock the last partition & copy the others over. @@ -823,8 +832,9 @@ namespace tuplex { // clear exceptions, because they have been resolved (or put to new exceptions!) // if task produced exceptions, they are stored in the IExceptionableTask class! // => no need to overwrite them, getter for iexceptionabletask has all info! - _runtimeExceptions.clear(); - _inputExceptions.clear(); + _exceptionPartitions.clear(); + _generalPartitions.clear(); + _fallbackPartitions.clear(); } else { executeInOrder(); } @@ -872,147 +882,323 @@ namespace tuplex { _rowNumber = 0; } - // Initialize runtime exception variables - size_t curRuntimePartitionInd = 0; // current index into vector of runtime exception partitions - int64_t numRuntimeRowsLeftInPartition = 0; // number of rows remaining in partition - const uint8_t *runPtr = nullptr; - if (_numRuntimeExceptions > 0) { - curRuntimePartitionInd = _runtimeExceptionIndex; - numRuntimeRowsLeftInPartition = _runtimeExceptions[curRuntimePartitionInd]->getNumRows() - _runtimeExceptionRowOffset; - runPtr = _runtimeExceptions[curRuntimePartitionInd]->lock() + _runtimeExceptionByteOffset; + size_t curExceptionInd = 0; + size_t exceptionsRemaining = 0; + const uint8_t *expPtr = nullptr; + size_t exceptionNumRows = 0; + for (int i = 0; i < _exceptionPartitions.size(); ++i) { + auto numRows = _exceptionPartitions[i]->getNumRows(); + exceptionNumRows += numRows; + if (i == 0) { + expPtr = _exceptionPartitions[i]->lock(); + exceptionsRemaining = numRows; + } } - // Initialize input exception variables - size_t curInputPartitionInd = 0; // current index into vector of input exception partitions - int64_t numInputRowsLeftInPartition = 0; // number of rows remaining in partition - const uint8_t *inputPtr = nullptr; - if (_numInputExceptions > 0) { - curInputPartitionInd = _inputExceptionIndex; - numInputRowsLeftInPartition = _inputExceptions[curInputPartitionInd]->getNumRows() - _inputExceptionRowOffset; - inputPtr = _inputExceptions[curInputPartitionInd]->lock() + _inputExceptionByteOffset; + size_t curGeneralInd = 0; + size_t generalRemaining = 0; + const uint8_t *generalPtr = nullptr; + size_t generalNumRows = 0; + for (int i = 0; i < _generalPartitions.size(); ++i) { + auto numRows = _generalPartitions[i]->getNumRows(); + generalNumRows += numRows; + if (i == 0) { + generalPtr = _generalPartitions[i]->lock(); + generalRemaining = numRows; + } + } + + size_t curFallbackInd = 0; + size_t fallbackRemaining = 0; + const uint8_t *fallPtr = nullptr; + size_t fallbackNumRows = 0; + for (int i = 0; i < _fallbackPartitions.size(); ++i) { + auto numRows = _fallbackPartitions[i]->getNumRows(); + fallbackNumRows += numRows; + if (i == 0) { + fallPtr = _fallbackPartitions[i]->lock(); + fallbackRemaining = numRows; + } } // Merge input and runtime exceptions in order. To do so, we can compare the row indices of the // current runtime and input exception and process the one that occurs first. The saved row indices of // runtime exceptions do not account for the existence of input exceptions, so we need to add the previous // input exceptions to compare the true row number - size_t inputRowsProcessed = 0; - size_t runtimeRowsProcessed = 0; - const uint8_t *ptr = nullptr; - while (runPtr && inputPtr) { - auto runRowInd = *((int64_t *) runPtr); // get current runtime row index - auto inputRowInd = *((int64_t *) inputPtr); // get current input row index - bool isRuntimeException; - // compare indices with accounting for previous input exceptions - if (runRowInd + inputRowsProcessed < inputRowInd) { - ptr = runPtr; - numRuntimeRowsLeftInPartition--; - isRuntimeException = true; + while (_exceptionCounter < exceptionNumRows && _generalCounter < generalNumRows && _fallbackCounter < fallbackNumRows) { + auto expRowInd = *((int64_t *) expPtr) + _fallbackCounter + _generalCounter; + auto generalRowInd = *((int64_t *) generalPtr) + _fallbackCounter; + auto fallbackRowInd = *((int64_t *) fallPtr); + + const uint8_t *buf = nullptr; + int64_t ecCode = 0, operatorID = -1; + size_t eSize = 0; + if (fallbackRowInd <= expRowInd && fallbackRowInd <= generalRowInd) { + fallbackRemaining--; + _fallbackCounter++; + + auto delta = deserializeExceptionFromMemory(fallPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + fallPtr += delta; + } else if (generalRowInd <= expRowInd && generalRowInd <= fallbackRowInd) { + generalRemaining--; + _generalCounter++; + + auto delta = deserializeExceptionFromMemory(generalPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + _currentRowNumber += _fallbackCounter; + generalPtr += delta; } else { - ptr = inputPtr; - numInputRowsLeftInPartition--; - inputRowsProcessed++; - isRuntimeException = false; + exceptionsRemaining--; + _exceptionCounter++; + + auto delta = deserializeExceptionFromMemory(expPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + _currentRowNumber += _fallbackCounter + _generalCounter; + expPtr += delta; } - const uint8_t *ebuf = nullptr; - int64_t ecCode = -1, operatorID = -1; + processExceptionRow(ecCode, operatorID, buf, eSize); + _rowNumber++; + + if (exceptionsRemaining == 0) { + _exceptionPartitions[curExceptionInd]->unlock(); + _exceptionPartitions[curExceptionInd]->invalidate(); + curExceptionInd++; + if (curExceptionInd < _exceptionPartitions.size()) { + exceptionsRemaining = _exceptionPartitions[curExceptionInd]->getNumRows(); + expPtr = _exceptionPartitions[curExceptionInd]->lock(); + } + } + + if (generalRemaining == 0) { + _generalPartitions[curGeneralInd]->unlock(); + _generalPartitions[curGeneralInd]->invalidate(); + curGeneralInd++; + if (curGeneralInd < _generalPartitions.size()) { + generalRemaining = _generalPartitions[curGeneralInd]->getNumRows(); + generalPtr = _generalPartitions[curGeneralInd]->lock(); + } + } + + if (fallbackRemaining == 0) { + _fallbackPartitions[curFallbackInd]->unlock(); + _fallbackPartitions[curFallbackInd]->invalidate(); + curFallbackInd++; + if (curFallbackInd < _fallbackPartitions.size()) { + fallbackRemaining = _fallbackPartitions[curFallbackInd]->getNumRows(); + fallPtr = _fallbackPartitions[curFallbackInd]->lock(); + } + } + } + + while (_exceptionCounter < exceptionNumRows && _generalCounter < generalNumRows) { + auto expRowInd = *((int64_t *) expPtr) + _fallbackCounter + _generalCounter; + auto generalRowInd = *((int64_t *) generalPtr) + _generalCounter; + + const uint8_t *buf = nullptr; + int64_t ecCode = 0, operatorID = -1; size_t eSize = 0; - auto delta = deserializeExceptionFromMemory(ptr, &ecCode, &operatorID, &_currentRowNumber, &ebuf, - &eSize); + if (generalRowInd <= expRowInd) { + generalRemaining--; + _generalCounter++; - if (isRuntimeException) { - _currentRowNumber += inputRowsProcessed; - runPtr += delta; + auto delta = deserializeExceptionFromMemory(generalPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + _currentRowNumber += _fallbackCounter; + generalPtr += delta; } else { - inputPtr += delta; + exceptionsRemaining--; + _exceptionCounter++; + + auto delta = deserializeExceptionFromMemory(expPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + _currentRowNumber += _fallbackCounter + _generalCounter; + expPtr += delta; } - processExceptionRow(ecCode, operatorID, ebuf, eSize); + processExceptionRow(ecCode, operatorID, buf, eSize); _rowNumber++; - // Exhausted current runtime exceptions, need to switch partitions - if (numRuntimeRowsLeftInPartition == 0 || runtimeRowsProcessed == _numRuntimeExceptions) { - _runtimeExceptions[curRuntimePartitionInd]->unlock(); - curRuntimePartitionInd++; - // Still have more exceptions to go through - if (curRuntimePartitionInd < _runtimeExceptions.size() && runtimeRowsProcessed < _numRuntimeExceptions) { - numRuntimeRowsLeftInPartition = _runtimeExceptions[curRuntimePartitionInd]->getNumRows(); - runPtr = _runtimeExceptions[curRuntimePartitionInd]->lock(); - } else { - // processed all exceptions - runPtr = nullptr; + if (exceptionsRemaining == 0) { + _exceptionPartitions[curExceptionInd]->unlock(); + _exceptionPartitions[curExceptionInd]->invalidate(); + curExceptionInd++; + if (curExceptionInd < _exceptionPartitions.size()) { + exceptionsRemaining = _exceptionPartitions[curExceptionInd]->getNumRows(); + expPtr = _exceptionPartitions[curExceptionInd]->lock(); } } - // Exhausted current input exceptions, need to switch partitions - if (numInputRowsLeftInPartition == 0 || inputRowsProcessed == _numInputExceptions) { - _inputExceptions[curInputPartitionInd]->unlock(); - curInputPartitionInd++; - // Still have more exceptions to go through - if (curInputPartitionInd < _inputExceptions.size() && inputRowsProcessed < _numInputExceptions) { - numInputRowsLeftInPartition = _inputExceptions[curInputPartitionInd]->getNumRows(); - inputPtr = _inputExceptions[curInputPartitionInd]->lock(); - } else { - // processed all exceptions - inputPtr = nullptr; + if (generalRemaining == 0) { + _generalPartitions[curGeneralInd]->unlock(); + _generalPartitions[curGeneralInd]->invalidate(); + curGeneralInd++; + if (curGeneralInd < _generalPartitions.size()) { + generalRemaining = _generalPartitions[curGeneralInd]->getNumRows(); + generalPtr = _generalPartitions[curGeneralInd]->lock(); + } + } + } + + while (_generalCounter < generalNumRows && _fallbackCounter < fallbackNumRows) { + auto generalRowInd = *((int64_t *) generalPtr) + _fallbackCounter; + auto fallbackRowInd = *((int64_t *) fallPtr); + + const uint8_t *buf = nullptr; + int64_t ecCode = 0, operatorID = -1; + size_t eSize = 0; + if (fallbackRowInd <= generalRowInd) { + fallbackRemaining--; + _fallbackCounter++; + + auto delta = deserializeExceptionFromMemory(fallPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + fallPtr += delta; + } else { + generalRemaining--; + _generalCounter++; + + auto delta = deserializeExceptionFromMemory(generalPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + _currentRowNumber += _fallbackCounter; + generalPtr += delta; + } + + processExceptionRow(ecCode, operatorID, buf, eSize); + _rowNumber++; + + if (generalRemaining == 0) { + _generalPartitions[curGeneralInd]->unlock(); + _generalPartitions[curGeneralInd]->invalidate(); + curGeneralInd++; + if (curGeneralInd < _generalPartitions.size()) { + generalRemaining = _generalPartitions[curGeneralInd]->getNumRows(); + generalPtr = _generalPartitions[curGeneralInd]->lock(); + } + } + + if (fallbackRemaining == 0) { + _fallbackPartitions[curFallbackInd]->unlock(); + _fallbackPartitions[curFallbackInd]->invalidate(); + curFallbackInd++; + if (curFallbackInd < _fallbackPartitions.size()) { + fallbackRemaining = _fallbackPartitions[curFallbackInd]->getNumRows(); + fallPtr = _fallbackPartitions[curFallbackInd]->lock(); + } + } + } + + while (_exceptionCounter < exceptionNumRows && _fallbackCounter < fallbackNumRows) { + auto expRowInd = *((int64_t *) expPtr) + _fallbackCounter + _generalCounter; + auto fallbackRowInd = *((int64_t *) fallPtr); + + const uint8_t *buf = nullptr; + int64_t ecCode = 0, operatorID = -1; + size_t eSize = 0; + if (fallbackRowInd <= expRowInd) { + fallbackRemaining--; + _fallbackCounter++; + + auto delta = deserializeExceptionFromMemory(fallPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + fallPtr += delta; + } else { + exceptionsRemaining--; + _exceptionCounter++; + + auto delta = deserializeExceptionFromMemory(expPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + _currentRowNumber += _fallbackCounter + _generalCounter; + expPtr += delta; + } + + processExceptionRow(ecCode, operatorID, buf, eSize); + _rowNumber++; + + if (exceptionsRemaining == 0) { + _exceptionPartitions[curExceptionInd]->unlock(); + _exceptionPartitions[curExceptionInd]->invalidate(); + curExceptionInd++; + if (curExceptionInd < _exceptionPartitions.size()) { + exceptionsRemaining = _exceptionPartitions[curExceptionInd]->getNumRows(); + expPtr = _exceptionPartitions[curExceptionInd]->lock(); + } + } + + if (fallbackRemaining == 0) { + _fallbackPartitions[curFallbackInd]->unlock(); + _fallbackPartitions[curFallbackInd]->invalidate(); + curFallbackInd++; + if (curFallbackInd < _fallbackPartitions.size()) { + fallbackRemaining = _fallbackPartitions[curFallbackInd]->getNumRows(); + fallPtr = _fallbackPartitions[curFallbackInd]->lock(); } } } - // Process remaining runtime exceptions if any exist - while (runPtr) { - const uint8_t *ebuf = nullptr; + while (_exceptionCounter < exceptionNumRows) { + const uint8_t *buf = nullptr; int64_t ecCode = -1, operatorID = -1; size_t eSize = 0; - auto delta = deserializeExceptionFromMemory(runPtr, &ecCode, &operatorID, &_currentRowNumber, &ebuf, - &eSize); - _currentRowNumber += inputRowsProcessed; - processExceptionRow(ecCode, operatorID, ebuf, eSize); - runPtr += delta; + auto delta = deserializeExceptionFromMemory(expPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + _currentRowNumber += _generalCounter + _fallbackCounter; + expPtr += delta; + + processExceptionRow(ecCode, operatorID, buf, eSize); _rowNumber++; - numRuntimeRowsLeftInPartition--; - runtimeRowsProcessed++; - // Exhausted current runtime exceptions in partitions need to switch partitions or could be done - if (numRuntimeRowsLeftInPartition == 0 || runtimeRowsProcessed == _numRuntimeExceptions) { - _runtimeExceptions[curRuntimePartitionInd]->unlock(); - curRuntimePartitionInd++; - // More exceptions to process - if (curRuntimePartitionInd < _runtimeExceptions.size() && runtimeRowsProcessed < _numRuntimeExceptions) { - numRuntimeRowsLeftInPartition = _runtimeExceptions[curRuntimePartitionInd]->getNumRows(); - runPtr = _runtimeExceptions[curRuntimePartitionInd]->lock(); - } else { - // processed all exceptions - runPtr = nullptr; + exceptionsRemaining--; + _exceptionCounter++; + + if (exceptionsRemaining == 0) { + _exceptionPartitions[curExceptionInd]->unlock(); + _exceptionPartitions[curExceptionInd]->invalidate(); + curExceptionInd++; + if (curExceptionInd < _exceptionPartitions.size()) { + exceptionsRemaining = _exceptionPartitions[curExceptionInd]->getNumRows(); + expPtr = _exceptionPartitions[curExceptionInd]->lock(); } } } - // Process remaining input exceptions if any exist - while (inputPtr) { - const uint8_t *ebuf = nullptr; + while (_generalCounter < generalNumRows) { + const uint8_t *buf = nullptr; int64_t ecCode = -1, operatorID = -1; size_t eSize = 0; - auto delta = deserializeExceptionFromMemory(inputPtr, &ecCode, &operatorID, &_currentRowNumber, &ebuf, - &eSize); - processExceptionRow(ecCode, operatorID, ebuf, eSize); - inputPtr += delta; + auto delta = deserializeExceptionFromMemory(generalPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + _currentRowNumber += _fallbackCounter; + + generalPtr += delta; + + processExceptionRow(ecCode, operatorID, buf, eSize); _rowNumber++; - numInputRowsLeftInPartition--; - inputRowsProcessed++; - // Exhausted current input exceptions, need to switch partitions - if (numInputRowsLeftInPartition == 0 || inputRowsProcessed == _numInputExceptions) { - _inputExceptions[curInputPartitionInd]->unlock(); - curInputPartitionInd++; - // Still have more exceptions - if (curInputPartitionInd < _inputExceptions.size() && inputRowsProcessed < _numInputExceptions) { - numInputRowsLeftInPartition = _inputExceptions[curInputPartitionInd]->getNumRows(); - inputPtr = _inputExceptions[curInputPartitionInd]->lock(); - } else { - // processed all exceptions - inputPtr = nullptr; + generalRemaining--; + _generalCounter++; + + if (generalRemaining == 0) { + _generalPartitions[curGeneralInd]->unlock(); + _generalPartitions[curGeneralInd]->invalidate(); + curGeneralInd++; + if (curGeneralInd < _generalPartitions.size()) { + generalRemaining = _generalPartitions[curGeneralInd]->getNumRows(); + generalPtr = _generalPartitions[curGeneralInd]->lock(); + } + } + } + + while (_fallbackCounter < fallbackNumRows) { + const uint8_t *buf = nullptr; + int64_t ecCode = -1, operatorID = -1; + size_t eSize = 0; + auto delta = deserializeExceptionFromMemory(fallPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + fallPtr += delta; + + processExceptionRow(ecCode, operatorID, buf, eSize); + _rowNumber++; + + fallbackRemaining--; + _fallbackCounter++; + + if (fallbackRemaining == 0) { + _fallbackPartitions[curFallbackInd]->unlock(); + _fallbackPartitions[curFallbackInd]->invalidate(); + curFallbackInd++; + if (curFallbackInd < _fallbackPartitions.size()) { + fallbackRemaining = _fallbackPartitions[curFallbackInd]->getNumRows(); + fallPtr = _fallbackPartitions[curFallbackInd]->lock(); } } } @@ -1046,7 +1232,9 @@ namespace tuplex { // clear exceptions, because they have been resolved (or put to new exceptions!) // if task produced exceptions, they are stored in the IExceptionableTask class! // => no need to overwrite them, getter for iexceptionabletask has all info! - _runtimeExceptions.clear(); + _exceptionPartitions.clear(); + _generalPartitions.clear(); + _fallbackPartitions.clear(); } void ResolveTask::sendStatusToHistoryServer() { @@ -1063,6 +1251,7 @@ namespace tuplex { void ResolveTask::unlockAll() { _mergedRowsSink.unlock(); _generalCaseSink.unlock(); + _fallbackSink.unlock(); // unlock exceptionable task IExceptionableTask::unlockAll(); diff --git a/tuplex/core/src/physical/ResultSet.cc b/tuplex/core/src/physical/ResultSet.cc index 0f7bf7319..153553f25 100644 --- a/tuplex/core/src/physical/ResultSet.cc +++ b/tuplex/core/src/physical/ResultSet.cc @@ -13,97 +13,175 @@ namespace tuplex { ResultSet::ResultSet(const Schema& schema, - const std::vector& partitions, - const std::vector& exceptions, - const std::unordered_map& partitionToExceptionsMap, - const std::vector> pyobjects, + const std::vector& normalPartitions, + const std::vector& generalPartitions, + const std::vector& fallbackPartitions, + const std::vector& partitionGroups, int64_t maxRows) : ResultSet::ResultSet() { - for(Partition *p : partitions) - _partitions.push_back(p); - - _pyobjects = std::deque>(pyobjects.begin(), pyobjects.end()); - _exceptions = exceptions; - _partitionToExceptionsMap = partitionToExceptionsMap; - _curRowCounter = 0; + for (const auto &group : partitionGroups) + _partitionGroups.push_back(group); + + for (const auto &p : normalPartitions) + _remainingNormalPartitions.push_back(p); + for (const auto &p : generalPartitions) + _remainingGeneralPartitions.push_back(p); + for (const auto &p : fallbackPartitions) + _remainingFallbackPartitions.push_back(p); + + _curNormalRowCounter = 0; + _curNormalByteCounter = 0; + _curGeneralRowCounter = 0; + _curGeneralByteCounter = 0; + _curFallbackRowCounter = 0; + _curFallbackByteCounter = 0; + _normalRowCounter = 0; + _generalRowCounter = 0; + _fallbackRowCounter = 0; _totalRowCounter = 0; - _byteCounter = 0; + _schema = schema; _maxRows = maxRows < 0 ? std::numeric_limits::max() : maxRows; - _rowsRetrieved = 0; } - void ResultSet::clear() { - for(auto partition : _partitions) - partition->invalidate(); - _partitions.clear(); - for(auto partition : _exceptions) + void clearPartitions(std::list& partitions) { + for (auto &partition : partitions) { partition->invalidate(); + } + partitions.clear(); + } - _curRowCounter = 0; - _byteCounter = 0; + void ResultSet::clear() { + clearPartitions(_remainingNormalPartitions); + clearPartitions(_currentNormalPartitions); + clearPartitions(_remainingGeneralPartitions); + clearPartitions(_currentGeneralPartitions); + clearPartitions(_remainingFallbackPartitions); + clearPartitions(_currentFallbackPartitions); + _partitionGroups.clear(); + + _curNormalRowCounter = 0; + _curNormalByteCounter = 0; + _curGeneralRowCounter = 0; + _curGeneralByteCounter = 0; + _curFallbackRowCounter = 0; + _curFallbackByteCounter = 0; + _normalRowCounter = 0; + _generalRowCounter = 0; + _fallbackRowCounter = 0; + _totalRowCounter = 0; _maxRows = 0; - _rowsRetrieved = 0; } - bool ResultSet::hasNextRow() { - + bool ResultSet::hasNextNormalPartition() const { // all rows already retrieved? - if(_rowsRetrieved >= _maxRows) + if (_totalRowCounter >= _maxRows) return false; // empty? - if(_partitions.empty() && _pyobjects.empty()) + if (_currentNormalPartitions.empty() && _remainingNormalPartitions.empty()) { return false; - else { - // partitions empty? - if(_partitions.empty()) - return true; - else if(_pyobjects.empty()) { - assert(_partitions.size() > 0); - assert(_partitions.front()); - - // still one row left? - return _curRowCounter < _partitions.front()->getNumRows(); - } else { - return true; // there's for sure at least one object left! - } + } else if (!_currentNormalPartitions.empty()) { + return _curNormalRowCounter < _currentNormalPartitions.front()->getNumRows(); + } else { + return _remainingNormalPartitions.front()->getNumRows() > 0; } - } + bool ResultSet::hasNextGeneralPartition() const { + // all rows already retrieved? + if (_totalRowCounter >= _maxRows) + return false; + + // empty? + if (_currentGeneralPartitions.empty() && _remainingGeneralPartitions.empty()) { + return false; + } else if (!_currentGeneralPartitions.empty()) { + return _curGeneralRowCounter < _currentGeneralPartitions.front()->getNumRows(); + } else { + return _remainingGeneralPartitions.front()->getNumRows() > 0; + } + } - bool ResultSet::hasNextPartition() const { + bool ResultSet::hasNextFallbackPartition() const { // all rows already retrieved? - if(_rowsRetrieved >= _maxRows) + if (_totalRowCounter >= _maxRows) return false; // empty? - if(_partitions.empty()) + if (_currentFallbackPartitions.empty() && _remainingFallbackPartitions.empty()) { return false; - else { - assert(_partitions.size() > 0); - assert(_partitions.front()); + } else if (!_currentFallbackPartitions.empty()) { + return _curFallbackRowCounter < _currentFallbackPartitions.front()->getNumRows(); + } else { + return _remainingFallbackPartitions.front()->getNumRows() > 0; + } + } + + Partition* ResultSet::getNextGeneralPartition() { + if (_currentGeneralPartitions.empty() && _remainingGeneralPartitions.empty()) + return nullptr; - // still one row left? - return _curRowCounter < _partitions.front()->getNumRows(); + Partition *first = nullptr; + if (!_currentGeneralPartitions.empty()) { + first = _currentGeneralPartitions.front(); + _currentGeneralPartitions.pop_front(); + } else { + first = _remainingGeneralPartitions.front(); + _remainingGeneralPartitions.pop_front(); } + + auto numRows = first->getNumRows(); + _totalRowCounter += numRows; + _generalRowCounter += numRows; + + _curGeneralRowCounter = 0; + _curGeneralByteCounter = 0; + + return first; } - Partition* ResultSet::getNextPartition() { - if(_partitions.empty()) + Partition* ResultSet::getNextFallbackPartition() { + if (_currentFallbackPartitions.empty() && _remainingFallbackPartitions.empty()) return nullptr; - assert(_partitions.size() > 0); + Partition *first = nullptr; + if (!_currentFallbackPartitions.empty()) { + first = _currentFallbackPartitions.front(); + _currentFallbackPartitions.pop_front(); + } else { + first = _remainingFallbackPartitions.front(); + _remainingFallbackPartitions.pop_front(); + } + + auto numRows = first->getNumRows(); + _totalRowCounter += numRows; + _fallbackRowCounter += numRows; - Partition *first = _partitions.front(); - assert(_schema == first->schema()); + _curFallbackRowCounter = 0; + _curFallbackByteCounter = 0; + + return first; + } + + Partition* ResultSet::getNextNormalPartition() { + if (_currentNormalPartitions.empty() && _remainingNormalPartitions.empty()) + return nullptr; + + Partition *first = nullptr; + if (!_currentNormalPartitions.empty()) { + first = _currentNormalPartitions.front(); + _currentNormalPartitions.pop_front(); + } else { + first = _remainingNormalPartitions.front(); + _remainingNormalPartitions.pop_front(); + } auto numRows = first->getNumRows(); - _rowsRetrieved += numRows; + _totalRowCounter += numRows; + _normalRowCounter += numRows; - _partitions.pop_front(); - _curRowCounter = 0; - _byteCounter = 0; + _curNormalRowCounter = 0; + _curNormalByteCounter = 0; return first; } @@ -121,23 +199,25 @@ namespace tuplex { v.reserve(limit); // do a quick check whether there are ANY pyobjects, if not deserialize quickly! - if(_pyobjects.empty()) { - - if(_partitions.empty()) + if(_currentGeneralPartitions.empty() && _remainingGeneralPartitions.empty() && _currentFallbackPartitions.empty() && _remainingFallbackPartitions.empty()) { + if (_currentNormalPartitions.empty() && _remainingNormalPartitions.empty()) return vector{}; + for (const auto &p : _remainingNormalPartitions) + _currentNormalPartitions.push_back(p); + Deserializer ds(_schema); for(int i = 0; i < limit;) { // all exhausted - if(_partitions.empty()) + if(_currentNormalPartitions.empty()) break; // get number of rows in first partition - Partition *first = _partitions.front(); + Partition *first = _currentNormalPartitions.front(); auto num_rows = first->getNumRows(); // how many left to retrieve? - auto num_to_retrieve_from_partition = std::min(limit - i, num_rows - _curRowCounter); + auto num_to_retrieve_from_partition = std::min(limit - i, num_rows - _curNormalRowCounter); if(num_to_retrieve_from_partition <= 0) break; @@ -148,11 +228,11 @@ namespace tuplex { // get next element of partition const uint8_t* ptr = first->lock(); for(int j = 0; j < num_to_retrieve_from_partition; ++j) { - auto row = Row::fromMemory(ds, ptr + _byteCounter, first->capacity() - _byteCounter); - _byteCounter += row.serializedLength(); - _curRowCounter++; - _rowsRetrieved++; + auto row = Row::fromMemory(ds, ptr + _curNormalByteCounter, first->capacity() - _curNormalByteCounter); + _curNormalByteCounter += row.serializedLength(); + _curNormalRowCounter++; _totalRowCounter++; + _normalRowCounter++; v.push_back(row); } @@ -163,17 +243,13 @@ namespace tuplex { i += num_to_retrieve_from_partition; // get next Partition ready when current one is exhausted - if(_curRowCounter == first->getNumRows()) - removeFirstPartition(); + if(_curNormalRowCounter == first->getNumRows()) + removeFirstNormalPartition(); } v.shrink_to_fit(); return v; } else { - // fallback solution: - // @TODO: write faster version with proper merging! - - std::vector v; while (hasNextRow() && v.size() < limit) { v.push_back(getNextRow()); } @@ -182,81 +258,252 @@ namespace tuplex { } } - Row ResultSet::getNextRow() { - // merge rows from objects - if(!_pyobjects.empty()) { - auto row_number = std::get<0>(_pyobjects.front()); - auto obj = std::get<1>(_pyobjects.front()); - - // partitions empty? - // => simply return next row. no fancy merging possible - // else merge based on row number. - if(_partitions.empty() || row_number <= _totalRowCounter) { - // merge - python::lockGIL(); - auto row = python::pythonToRow(obj); - python::unlockGIL(); - _pyobjects.pop_front(); - _rowsRetrieved++; - - // update row counter (not for double indices which could occur from flatMap!) - if(_pyobjects.empty()) - _totalRowCounter++; - else { - auto next_row_number = std::get<0>(_pyobjects.front()); - if(next_row_number != row_number) - _totalRowCounter++; - } + bool ResultSet::hasNextNormalRow() { + if (!_currentNormalPartitions.empty() && _curNormalRowCounter < _currentNormalPartitions.front()->getNumRows()) + return true; + for (const auto &p : _remainingNormalPartitions) + if (p->getNumRows() > 0) + return true; + return false; + } + + bool ResultSet::hasNextGeneralRow() { + if (!_currentGeneralPartitions.empty() && _curGeneralRowCounter < _currentGeneralPartitions.front()->getNumRows()) + return true; + for (const auto &p : _remainingGeneralPartitions) + if (p->getNumRows() > 0) + return true; + return false; + } + + bool ResultSet::hasNextFallbackRow() { + if (!_currentFallbackPartitions.empty() && _curFallbackRowCounter < _currentFallbackPartitions.front()->getNumRows()) + return true; + for (const auto &p : _remainingFallbackPartitions) + if (p->getNumRows() > 0) + return true; + return false; + } + + bool ResultSet::hasNextRow() { + // all rows already retrieved? + if(_totalRowCounter >= _maxRows) + return false; + + return hasNextNormalRow() || hasNextGeneralRow() || hasNextFallbackRow(); + } - return row; + Row ResultSet::getNextRow() { + if (_currentNormalPartitions.empty() && _currentFallbackPartitions.empty() && _currentGeneralPartitions.empty()) { + // all partitions are exhausted return empty row as default value + if (_partitionGroups.empty()) + return Row(); + _normalRowCounter = 0; + _generalRowCounter = 0; + _fallbackRowCounter = 0; + auto group = _partitionGroups.front(); + _partitionGroups.pop_front(); + for (int i = group.normalPartitionStartInd; i < group.normalPartitionStartInd + group.numNormalPartitions; ++i) { + _currentNormalPartitions.push_back(_remainingNormalPartitions.front()); + _remainingNormalPartitions.pop_front(); + } + for (int i = group.generalPartitionStartInd; i < group.generalPartitionStartInd + group.numGeneralPartitions; ++i) { + _currentGeneralPartitions.push_back(_remainingGeneralPartitions.front()); + _remainingGeneralPartitions.pop_front(); + } + for (int i = group.fallbackPartitionStartInd; i < group.fallbackPartitionStartInd + group.numFallbackPartitions; ++i) { + _currentFallbackPartitions.push_back(_remainingFallbackPartitions.front()); + _remainingFallbackPartitions.pop_front(); + } + return getNextRow(); + } else if (_currentNormalPartitions.empty() && _currentFallbackPartitions.empty()) { + // only general rows remain, return next general row + return getNextGeneralRow(); + } else if (_currentNormalPartitions.empty() && _currentGeneralPartitions.empty()) { + // only fallback rows remain, return next fallback row + return getNextFallbackRow(); + } else if (_currentFallbackPartitions.empty() && _currentGeneralPartitions.empty()) { + // only normal rows remain, return next normal row + return getNextNormalRow(); + } else if (_currentFallbackPartitions.empty()) { + // only normal and general rows remain, compare row index + // emit normal rows until reached current general ind + if (_normalRowCounter + _generalRowCounter < currentGeneralRowInd()) { + return getNextNormalRow(); + } else { + return getNextGeneralRow(); + } + } else if (_currentGeneralPartitions.empty()) { + // only normal and fallback rows remain, compare row index + // emit normal rows until reached current fallback ind + if (_normalRowCounter + _generalRowCounter + _fallbackRowCounter < currentFallbackRowInd()) { + return getNextNormalRow(); + } else { + return getNextFallbackRow(); + } + } else { + // all three cases remain, three way row comparison + auto generalRowInd = currentGeneralRowInd(); + auto fallbackRowInd = currentFallbackRowInd(); + if (_normalRowCounter + _generalRowCounter < generalRowInd && _normalRowCounter + _generalRowCounter + _fallbackRowCounter < fallbackRowInd) { + return getNextNormalRow(); + } else if (generalRowInd <= fallbackRowInd) { + return getNextGeneralRow(); + } else { + return getNextFallbackRow(); } } + } - // check whether entry is available, else return empty row - if(_partitions.empty()) - return Row(); + int64_t ResultSet::currentFallbackRowInd() { + assert(!_currentFallbackPartitions.empty()); + auto p = _currentFallbackPartitions.front(); + auto ptr = p->lock() + _curFallbackByteCounter; + auto rowInd = *((int64_t*) ptr); + p->unlock(); + return rowInd; + } - assert(_partitions.size() > 0); - Partition *first = _partitions.front(); + int64_t ResultSet::currentGeneralRowInd() { + assert(!_currentGeneralPartitions.empty()); + auto p = _currentGeneralPartitions.front(); + auto ptr = p->lock() + _curGeneralByteCounter; + auto rowInd = *((int64_t*) ptr); + p->unlock(); + return rowInd; + } - // make sure partition schema matches stored schema - assert(_schema == first->schema()); + Row ResultSet::getNextNormalRow() { + assert (!_currentNormalPartitions.empty()); + auto p = _currentNormalPartitions.front(); + assert(_schema == p->schema()); - Row row; + auto ptr = p->lock() + _curNormalByteCounter; + auto capacity = p->capacity() - _curNormalByteCounter; + auto row = Row::fromMemory(_schema, ptr, capacity); + p->unlock(); - // thread safe version (slow) - // get next element of partition - const uint8_t* ptr = first->lock(); + _curNormalByteCounter += row.serializedLength(); + _curNormalRowCounter++; + _totalRowCounter++; + _normalRowCounter++; - row = Row::fromMemory(_schema, ptr + _byteCounter, first->capacity() - _byteCounter); + if (_curNormalRowCounter == p->getNumRows()) { + removeFirstNormalPartition(); + } - // thread safe version (slow) - // deserialize - first->unlock(); + return row; + } + + Row ResultSet::getNextGeneralRow() { + assert (!_currentGeneralPartitions.empty()); + auto p = _currentGeneralPartitions.front(); + assert(_schema == p->schema()); + + auto prevRowInd = currentGeneralRowInd(); + _curGeneralByteCounter += 4 * sizeof(int64_t); + auto ptr = p->lock() + _curGeneralByteCounter; + auto capacity = p->capacity() - _curGeneralByteCounter; + auto row = Row::fromMemory(_schema, ptr, capacity); + p->unlock(); + + _curGeneralByteCounter += row.serializedLength(); + _curGeneralRowCounter++; + + if (_curGeneralRowCounter == p->getNumRows()) { + removeFirstGeneralPartition(); + } - _byteCounter += row.serializedLength(); - _curRowCounter++; - _rowsRetrieved++; _totalRowCounter++; + if (_currentGeneralPartitions.empty() || currentGeneralRowInd() > prevRowInd) { + _generalRowCounter++; + } + + return row; + } + + Row ResultSet::getNextFallbackRow() { + assert (!_currentFallbackPartitions.empty()); + + auto prevRowInd = currentFallbackRowInd(); + auto p = _currentFallbackPartitions.front(); + auto ptr = p->lock() + _curFallbackByteCounter; + auto pyObjectSize = ((int64_t *) ptr)[3]; ptr += 4 * sizeof(int64_t); + + python::lockGIL(); + auto row = python::pythonToRow(python::deserializePickledObject(python::getMainModule(), (char *) ptr, pyObjectSize)); + python::unlockGIL(); + + p->unlock(); + + _curFallbackByteCounter += pyObjectSize + 4*sizeof(int64_t); + _curFallbackRowCounter++; + + if (_curFallbackRowCounter == p->getNumRows()) { + removeFirstFallbackPartition(); + } - // get next Partition ready when current one is exhausted - if(_curRowCounter == first->getNumRows()) - removeFirstPartition(); + _totalRowCounter++; + if (_currentFallbackPartitions.empty() || currentFallbackRowInd() > prevRowInd) { + _fallbackRowCounter++; + } return row; } size_t ResultSet::rowCount() const { size_t count = 0; - for(const auto& partition : _partitions) { + for (const auto& partition : _currentNormalPartitions) count += partition->getNumRows(); - } - return count + _pyobjects.size(); + for (const auto& partition : _remainingNormalPartitions) + count += partition->getNumRows(); + for (const auto& partition : _currentGeneralPartitions) + count += partition->getNumRows(); + for (const auto& partition : _remainingGeneralPartitions) + count += partition->getNumRows(); + for (const auto& partition : _currentFallbackPartitions) + count += partition->getNumRows(); + for (const auto& partition : _remainingFallbackPartitions) + count += partition->getNumRows(); + return count; } - void ResultSet::removeFirstPartition() { - assert(_partitions.size() > 0); - Partition *first = _partitions.front(); + void ResultSet::removeFirstGeneralPartition() { + assert(!_currentGeneralPartitions.empty()); + Partition *first = _currentGeneralPartitions.front(); + assert(first); + + // invalidate partition +#ifndef NDEBUG + Logger::instance().defaultLogger().info("ResultSet invalidates partition " + hexAddr(first) + " uuid " + uuidToString(first->uuid())); +#endif + first->invalidate(); + + _currentGeneralPartitions.pop_front(); + _curGeneralRowCounter = 0; + _curGeneralByteCounter = 0; + } + + void ResultSet::removeFirstFallbackPartition() { + assert(!_currentFallbackPartitions.empty()); + Partition *first = _currentFallbackPartitions.front(); + assert(first); + + // invalidate partition +#ifndef NDEBUG + Logger::instance().defaultLogger().info("ResultSet invalidates partition " + hexAddr(first) + " uuid " + uuidToString(first->uuid())); +#endif + first->invalidate(); + + // remove partition (is now processed) + _currentFallbackPartitions.pop_front(); + _curFallbackRowCounter = 0; + _curFallbackByteCounter = 0; + } + + void ResultSet::removeFirstNormalPartition() { + assert(!_currentNormalPartitions.empty()); + Partition *first = _currentNormalPartitions.front(); assert(first); // invalidate partition @@ -266,8 +513,18 @@ namespace tuplex { first->invalidate(); // remove partition (is now processed) - _partitions.pop_front(); - _curRowCounter = 0; - _byteCounter = 0; + + _currentNormalPartitions.pop_front(); + _curNormalRowCounter = 0; + _curNormalByteCounter = 0; + } + + size_t ResultSet::fallbackRowCount() const { + size_t count = 0; + for (const auto &p : _currentFallbackPartitions) + count += p->getNumRows(); + for (const auto&p : _remainingFallbackPartitions) + count += p->getNumRows(); + return count; } } \ No newline at end of file diff --git a/tuplex/core/src/physical/StageBuilder.cc b/tuplex/core/src/physical/StageBuilder.cc index 72f01e2b8..8844e27b0 100644 --- a/tuplex/core/src/physical/StageBuilder.cc +++ b/tuplex/core/src/physical/StageBuilder.cc @@ -1051,7 +1051,7 @@ namespace tuplex { bool requireSlowPath = _nullValueOptimization; // per default, slow path is always required when null-value opt is enabled. // special case: input source is cached and no exceptions happened => no resolve path necessary if there are no resolvers! - if(_inputNode->type() == LogicalOperatorType::CACHE && dynamic_cast(_inputNode)->cachedExceptions().empty()) + if(_inputNode->type() == LogicalOperatorType::CACHE && dynamic_cast(_inputNode)->cachedGeneralPartitions().empty() && dynamic_cast(_inputNode)->cachedFallbackPartitions().empty()) requireSlowPath = false; if (numResolveOperators > 0 || requireSlowPath) { diff --git a/tuplex/core/src/physical/TransformStage.cc b/tuplex/core/src/physical/TransformStage.cc index b61f9cbe2..f753e6e41 100644 --- a/tuplex/core/src/physical/TransformStage.cc +++ b/tuplex/core/src/physical/TransformStage.cc @@ -118,29 +118,28 @@ namespace tuplex { _rs = emptyResultSet(); } - void TransformStage::setMemoryResult(const std::vector &partitions, - const std::vector& generalCase, - const std::unordered_map& partitionToExceptionsMap, - const std::vector>& interpreterRows, - const std::vector& remainingExceptions, - const std::unordered_map, size_t> &ecounts) { - setExceptionCounts(ecounts); - - if (partitions.empty() && interpreterRows.empty() && generalCase.empty()) + void TransformStage::setMemoryResult(const std::vector& normalPartitions, + const std::vector& generalPartitions, + const std::vector& fallbackPartitions, + const std::vector& partitionGroups, + const std::unordered_map, size_t>& exceptionCounts) { + setExceptionCounts(exceptionCounts); + + if (normalPartitions.empty() && generalPartitions.empty() && fallbackPartitions.empty()) _rs = emptyResultSet(); else { std::vector limitedPartitions; auto schema = Schema::UNKNOWN; - if(!partitions.empty()) { - schema = partitions.front()->schema(); - for (auto partition : partitions) { + if(!normalPartitions.empty()) { + schema = normalPartitions.front()->schema(); + for (auto partition : normalPartitions) { assert(schema == partition->schema()); } // check output limit, adjust partitions if necessary size_t numOutputRows = 0; - for (auto partition : partitions) { + for (auto partition : normalPartitions) { numOutputRows += partition->getNumRows(); if (numOutputRows >= outputLimit()) { // clip last partition & leave loop @@ -157,10 +156,7 @@ namespace tuplex { } } - // put ALL partitions to result set - _rs = std::make_shared(schema, limitedPartitions, - generalCase, partitionToExceptionsMap, interpreterRows, - outputLimit()); + _rs = std::make_shared(schema, limitedPartitions, generalPartitions, fallbackPartitions, partitionGroups, outputLimit()); } } @@ -654,7 +650,7 @@ namespace tuplex { } case EndPointMode::MEMORY: case EndPointMode::FILE: { - auto p = stage->resultSet()->partitions(); + auto p = stage->resultSet()->normalPartitions(); std::copy(std::begin(p), std::end(p), std::back_inserter(partitions)); break; } diff --git a/tuplex/core/src/physical/TransformTask.cc b/tuplex/core/src/physical/TransformTask.cc index c560c4af4..1d24d6fdc 100644 --- a/tuplex/core/src/physical/TransformTask.cc +++ b/tuplex/core/src/physical/TransformTask.cc @@ -543,43 +543,65 @@ namespace tuplex { auto functor = reinterpret_cast(_functor); - auto numInputExceptions = _inputExceptionInfo.numExceptions; - auto inputExceptionIndex = _inputExceptionInfo.exceptionIndex; - auto inputExceptionRowOffset = _inputExceptionInfo.exceptionRowOffset; - auto inputExceptionByteOffset = _inputExceptionInfo.exceptionByteOffset; - - // First, prepare the input exception partitions to pass into the code-gen - // This is done to simplify the LLVM code. We will end up passing it an - // array of expPtrs which point to the first exception in their partition - // and expPtrSizes which tell how many exceptions are in that partition. - auto arrSize = _inputExceptions.size() - inputExceptionIndex; - auto expPtrs = new uint8_t*[arrSize]; - auto expPtrSizes = new int64_t[arrSize]; - int expInd = 0; - // Iterate through all exception partitions beginning at the one specified by the starting index - for (int i = inputExceptionIndex; i < _inputExceptions.size(); ++i) { - auto numRows = _inputExceptions[i]->getNumRows(); - auto ptr = _inputExceptions[i]->lock(); - // If its the first partition, we need to account for the offset - if (i == inputExceptionIndex) { - numRows -= inputExceptionRowOffset; - ptr += inputExceptionByteOffset; - } - expPtrSizes[expInd] = numRows; - expPtrs[expInd] = (uint8_t *) ptr; - expInd++; - } + int64_t totalNormalRowCounter = 0; + int64_t totalGeneralRowCounter = 0; + int64_t totalFallbackRowCounter = 0; + int64_t totalFilterCounter = 0; + + uint8_t **generalPartitions = new uint8_t*[_generalPartitions.size()]; + for (int i = 0; i < _generalPartitions.size(); ++i) + generalPartitions[i] = _generalPartitions[i]->lockWriteRaw(); + int64_t numGeneralPartitions = _generalPartitions.size(); + int64_t generalIndexOffset = 0; + int64_t generalRowOffset = 0; + int64_t generalByteOffset = 0; + + uint8_t **fallbackPartitions = new uint8_t*[_fallbackPartitions.size()]; + for (int i = 0; i < _fallbackPartitions.size(); ++i) + fallbackPartitions[i] = _fallbackPartitions[i]->lockWriteRaw(); + int64_t numFallbackPartitions = _fallbackPartitions.size(); + int64_t fallbackIndexOffset = 0; + int64_t fallbackRowOffset = 0; + int64_t fallbackByteOffset = 0; // go over all input partitions. - for(auto inputPartition : _inputPartitions) { + for(auto &inputPartition : _inputPartitions) { // lock ptr, extract number of rows ==> store them // lock raw & call functor! int64_t inSize = inputPartition->size(); const uint8_t *inPtr = inputPartition->lockRaw(); _numInputRowsRead += static_cast(*((int64_t*)inPtr)); +// +// int64_t totalNormalRowCounter = 0; +// int64_t totalGeneralRowCounter = 0; +// int64_t totalFallbackRowCounter = 0; +// +// int64_t g1[] = {2, +// 1, -1, -1, 8, -1, +// 2, -1, -1, 8, -1}; +// int64_t g2[] = {1, +// 3, -1, -1, 8, -1}; +// int64_t g3[] = {2, +// 5, -1, -1, 8, -1, +// 6, -1, -1, 8, -1}; +// uint8_t *generalPartitions[] = {(uint8_t*)g1, (uint8_t*)g2, (uint8_t*)g3}; +// int64_t numGeneralPartitions = 3; +// int64_t generalIndexOffset = 0; +// int64_t generalRowOffset = 0; +// int64_t generalByteOffset = 0; +// +// int64_t f1[] = {1, 2, 3}; +// uint8_t *fallbackPartitions[] = {}; +// int64_t numFallbackPartitions = 0; +// int64_t fallbackIndexOffset = 0; +// int64_t fallbackRowOffset = 0; +// int64_t fallbackByteOffset = 0; // call functor - auto bytesParsed = functor(this, inPtr, inSize, expPtrs, expPtrSizes, numInputExceptions, &num_normal_rows, &num_bad_rows, false); + auto bytesParsed = functor(this, inPtr, inSize, &num_normal_rows, &num_bad_rows, false, + &totalFilterCounter, &totalNormalRowCounter, &totalGeneralRowCounter, &totalFallbackRowCounter, + generalPartitions, numGeneralPartitions, &generalIndexOffset, &generalRowOffset, &generalByteOffset, + fallbackPartitions, numFallbackPartitions, &fallbackIndexOffset, &fallbackRowOffset, &fallbackByteOffset); // save number of normal rows to output rows written if not writeTofile if(hasMemorySink()) @@ -595,13 +617,55 @@ namespace tuplex { inputPartition->invalidate(); } - delete[] expPtrs; - delete[] expPtrSizes; + if (generalIndexOffset < numGeneralPartitions) { + auto curGeneralPtr = generalPartitions[generalIndexOffset]; + auto numRowsInPartition = *((int64_t*)curGeneralPtr); + curGeneralPtr += sizeof(int64_t) + generalByteOffset; + while (generalRowOffset < numRowsInPartition) { + *((int64_t*)curGeneralPtr) -= totalFilterCounter; + curGeneralPtr += 4 * sizeof(int64_t) + ((int64_t*)curGeneralPtr)[3]; + generalRowOffset += 1; + + if (generalRowOffset == numRowsInPartition && generalIndexOffset < numGeneralPartitions - 1) { + generalIndexOffset += 1; + curGeneralPtr = generalPartitions[generalIndexOffset]; + numRowsInPartition = *((int64_t*)curGeneralPtr); + curGeneralPtr += sizeof(int64_t); + generalByteOffset = 0; + generalRowOffset = 0; + } + } + } - for (int i = inputExceptionIndex; i < _inputExceptions.size(); ++i) { - _inputExceptions[i]->unlock(); + if (fallbackIndexOffset < numFallbackPartitions) { + auto curFallbackPtr = fallbackPartitions[fallbackIndexOffset]; + auto numRowsInPartition = *((int64_t*)curFallbackPtr); + curFallbackPtr += sizeof(int64_t) + fallbackByteOffset; + while (fallbackRowOffset < numRowsInPartition) { + *((int64_t*)curFallbackPtr) -= totalFilterCounter; + curFallbackPtr += 4 * sizeof(int64_t) + ((int64_t*)curFallbackPtr)[3]; + fallbackRowOffset += 1; + + if (fallbackRowOffset == numRowsInPartition && fallbackIndexOffset < numFallbackPartitions - 1) { + fallbackIndexOffset += 1; + curFallbackPtr = fallbackPartitions[fallbackIndexOffset]; + numRowsInPartition = *((int64_t*)curFallbackPtr); + curFallbackPtr += sizeof(int64_t); + fallbackByteOffset = 0; + fallbackRowOffset = 0; + } + } } + for (auto & _generalPartition : _generalPartitions) + _generalPartition->unlockWrite(); + + for (auto & _fallbackPartition : _fallbackPartitions) + _fallbackPartition->unlockWrite(); + + delete[] fallbackPartitions; + delete[] generalPartitions; + #ifndef NDEBUG owner()->info("Trafo task memory source exhausted (" + pluralize(_inputPartitions.size(), "partition") + ", " + pluralize(num_normal_rows, "normal row") + ", " + pluralize(num_bad_rows, "exceptional row") + ")"); diff --git a/tuplex/python/include/PythonContext.h b/tuplex/python/include/PythonContext.h index 57746bf7b..bd8b2e008 100644 --- a/tuplex/python/include/PythonContext.h +++ b/tuplex/python/include/PythonContext.h @@ -117,6 +117,8 @@ namespace tuplex { pds.wrap(&_context->makeError(message)); return pds; } + + std::vector serializeFallbackRows(const std::vector>& fallbackRows); public: /*! diff --git a/tuplex/python/src/PythonContext.cc b/tuplex/python/src/PythonContext.cc index 310bb1a6d..68b6468f7 100644 --- a/tuplex/python/src/PythonContext.cc +++ b/tuplex/python/src/PythonContext.cc @@ -39,35 +39,35 @@ namespace tuplex { Schema schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::F64})); - std::vector> badParallelizeObjects; - std::vector numExceptionsInPartition; - // check if empty? if(0 == numElements) - return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, std::vector(), std::vector(), std::vector(), columns); // create new partition on driver auto driver = _context->getDriver(); + std::vector> fallbackRows; + std::vector fallbackPartitions; + std::vector partitionMergeInfo; + std::vector partitions; Partition* partition = driver->allocWritablePartition(allocMinSize, schema, -1, _context->id()); int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); *rawPtr = 0; double* ptr = (double*)(rawPtr + 1); size_t numBytesSerialized = 0; - size_t prevNumExceptions = 0; - size_t prevNumRows = 0; + auto rowDelta = 0; for(unsigned i = 0; i < numElements; ++i) { auto obj = PyList_GET_ITEM(listObj, i); Py_XINCREF(obj); // check capacity and realloc if necessary get a new partition if(partition->capacity() < numBytesSerialized + sizeof(double)) { - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); - prevNumExceptions = badParallelizeObjects.size(); - prevNumRows += numNewExceptions + *rawPtr; + rowDelta += *rawPtr + fallbackRows.size(); + auto serializedRows = serializeFallbackRows(fallbackRows); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); @@ -89,15 +89,15 @@ namespace tuplex { val = (double)PyLong_AsLongLong(obj); if(PyErr_Occurred()) { // too large integer? PyErr_Clear(); - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); + assert(i >= rowDelta); + fallbackRows.emplace_back(std::make_tuple(i - rowDelta, obj)); continue; } } } else { - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); + assert(i >= rowDelta); + fallbackRows.emplace_back(std::make_tuple(i - rowDelta, obj)); continue; } } @@ -108,15 +108,16 @@ namespace tuplex { numBytesSerialized += sizeof(double); } - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); + auto serializedRows = serializeFallbackRows(fallbackRows); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); // create dataset from partitions. - return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, partitions, fallbackPartitions, partitionMergeInfo, columns); } DataSet& PythonContext::fastI64Parallelize(PyObject* listObj, const std::vector& columns, bool upcast) { @@ -127,16 +128,17 @@ namespace tuplex { Schema schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::I64})); - std::vector> badParallelizeObjects; - std::vector numExceptionsInPartition; - // check if empty? if(0 == numElements) - return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, std::vector(), std::vector(), std::vector(), columns); // create new partition on driver auto driver = _context->getDriver(); + std::vector> fallbackRows; + std::vector fallbackPartitions; + std::vector partitionMergeInfo; + std::vector partitions; Partition* partition = driver->allocWritablePartition(std::max(sizeof(int64_t), allocMinSize), schema, -1, _context->id()); int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); @@ -144,18 +146,18 @@ namespace tuplex { int64_t* ptr = rawPtr + 1; size_t numBytesSerialized = 0; size_t prevNumExceptions = 0; - size_t prevNumRows = 0; + auto rowDelta = 0; for(unsigned i = 0; i < numElements; ++i) { auto obj = PyList_GET_ITEM(listObj, i); Py_XINCREF(obj); // check capacity and realloc if necessary get a new partition if(partition->capacity() < numBytesSerialized + sizeof(int64_t)) { - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); - prevNumExceptions = badParallelizeObjects.size(); - prevNumRows += numNewExceptions + *rawPtr; + rowDelta += *rawPtr + fallbackRows.size(); + auto serializedRows = serializeFallbackRows(fallbackRows); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); @@ -171,8 +173,8 @@ namespace tuplex { val = PyLong_AsLongLong(obj); if(PyErr_Occurred()) { // too large integer? PyErr_Clear(); - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); + assert(i >= rowDelta); + fallbackRows.emplace_back(std::make_tuple(i - rowDelta, obj)); continue; } } else { @@ -180,8 +182,8 @@ namespace tuplex { if(upcast && (obj == Py_True || obj == Py_False)) val = obj == Py_True; else { - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); + assert(i >= rowDelta); + fallbackRows.emplace_back(std::make_tuple(i - rowDelta, obj)); continue; } } @@ -191,15 +193,16 @@ namespace tuplex { *rawPtr = *rawPtr + 1; numBytesSerialized += sizeof(int64_t); } - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); + auto serializedRows = serializeFallbackRows(fallbackRows); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); // create dataset from partitions. - return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, partitions, fallbackPartitions, partitionMergeInfo, columns); } DataSet& PythonContext::fastMixedSimpleTypeTupleTransfer(PyObject *listObj, const python::Type &majType, @@ -215,12 +218,9 @@ namespace tuplex { // now create partitions super fast Schema schema(Schema::MemoryLayout::ROW, majType); - std::vector> badParallelizeObjects; - std::vector numExceptionsInPartition; - // check if empty? if(0 == numElements) - return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, std::vector(), std::vector(), std::vector(), columns); // encode type of tuple quickly into string @@ -232,6 +232,10 @@ namespace tuplex { // create new partition on driver auto driver = _context->getDriver(); + std::vector> fallbackRows; + std::vector fallbackPartitions; + std::vector partitionMergeInfo; + std::vector partitions; Partition* partition = driver->allocWritablePartition(allocMinSize, schema, -1, _context->id()); int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); @@ -239,7 +243,7 @@ namespace tuplex { uint8_t* ptr = (uint8_t*)(rawPtr + 1); size_t numBytesSerialized = 0; size_t prevNumExceptions = 0; - size_t prevNumRows = 0; + auto rowDelta = 0; for(unsigned i = 0; i < numElements; ++i) { auto obj = PyList_GET_ITEM(listObj, i); Py_XINCREF(obj); @@ -266,19 +270,19 @@ namespace tuplex { } } if (nonConforming) { - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(i - prevNumRows, obj); + assert(i >= rowDelta); + fallbackRows.emplace_back(i - rowDelta, obj); continue; } } // get new partition if capacity exhausted if(partition->capacity() < numBytesSerialized + requiredBytes) { - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); - prevNumExceptions = badParallelizeObjects.size(); - prevNumRows += numNewExceptions + *rawPtr; + rowDelta += *rawPtr + fallbackRows.size(); + auto serializedRows = serializeFallbackRows(fallbackRows); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); @@ -358,11 +362,11 @@ namespace tuplex { // special part when bad row encountered bad_element: ptr = rowStartPtr; - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); + assert(i >= rowDelta); + fallbackRows.emplace_back(i - rowDelta, obj); } else { - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); + assert(i >= rowDelta); + fallbackRows.emplace_back(i - rowDelta, obj); } // serialization code here is a little bit more complicated @@ -371,9 +375,10 @@ namespace tuplex { // (2) is the field containing total varlength // (3) is the actual string content (incl. '\0' delimiter) } - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); + auto serializedRows = serializeFallbackRows(fallbackRows); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); @@ -381,7 +386,7 @@ namespace tuplex { delete [] typeStr; // create dataset from partitions. - return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, partitions, fallbackPartitions, partitionMergeInfo, columns); } DataSet& PythonContext::fastBoolParallelize(PyObject *listObj, const std::vector& columns) { @@ -392,17 +397,18 @@ namespace tuplex { Schema schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::BOOLEAN})); - std::vector> badParallelizeObjects; - std::vector numExceptionsInPartition; - // check if empty? if(0 == numElements) - return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, std::vector(), std::vector(), std::vector(), columns); // create new partition on driver auto driver = _context->getDriver(); + std::vector> fallbackRows; + std::vector fallbackPartitions; + std::vector partitionMergeInfo; + std::vector partitions; Partition* partition = driver->allocWritablePartition(std::max(sizeof(int64_t), allocMinSize), schema, -1, _context->id()); int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); @@ -410,18 +416,18 @@ namespace tuplex { int64_t* ptr = rawPtr + 1; size_t numBytesSerialized = 0; size_t prevNumExceptions = 0; - size_t prevNumRows = 0; + auto rowDelta = 0; for(unsigned i = 0; i < numElements; ++i) { auto obj = PyList_GET_ITEM(listObj, i); Py_XINCREF(obj); // check capacity and realloc if necessary get a new partition if(partition->capacity() < numBytesSerialized + sizeof(int64_t)) { - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); - prevNumExceptions = badParallelizeObjects.size(); - prevNumRows += numNewExceptions + *rawPtr; + rowDelta += *rawPtr + fallbackRows.size(); + auto serializedRows = serializeFallbackRows(fallbackRows); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); @@ -438,20 +444,20 @@ namespace tuplex { *rawPtr = *rawPtr + 1; numBytesSerialized += sizeof(int64_t); } else { - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); + assert(i >= rowDelta); + fallbackRows.emplace_back(i - rowDelta, obj); } } - - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); + auto serializedRows = serializeFallbackRows(fallbackRows); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); // create dataset from partitions. - return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, partitions, fallbackPartitions, partitionMergeInfo, columns); } DataSet& PythonContext::fastStrParallelize(PyObject* listObj, const std::vector& columns) { @@ -462,17 +468,18 @@ namespace tuplex { Schema schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::STRING})); - std::vector> badParallelizeObjects; - std::vector numExceptionsInPartition; - // check if empty? if(0 == numElements) - return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, std::vector(), std::vector(), std::vector(), columns); // create new partition on driver auto driver = _context->getDriver(); + std::vector> fallbackRows; + std::vector fallbackPartitions; + std::vector partitionMergeInfo; + std::vector partitions; Partition* partition = driver->allocWritablePartition(allocMinSize, schema, -1, _context->id()); int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); @@ -480,7 +487,7 @@ namespace tuplex { uint8_t* ptr = (uint8_t*)(rawPtr + 1); size_t numBytesSerialized = 0; size_t prevNumExceptions = 0; - size_t prevNumRows = 0; + auto rowDelta = 0; for(unsigned i = 0; i < numElements; ++i) { auto obj = PyList_GET_ITEM(listObj, i); Py_XINCREF(obj); @@ -500,11 +507,11 @@ namespace tuplex { // check capacity and realloc if necessary get a new partition if(partition->capacity() < numBytesSerialized + requiredBytes) { - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); - prevNumExceptions = badParallelizeObjects.size(); - prevNumRows += numNewExceptions + *rawPtr; + rowDelta += *rawPtr + fallbackRows.size(); + auto serializedRows = serializeFallbackRows(fallbackRows); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); @@ -530,19 +537,20 @@ namespace tuplex { *rawPtr = *rawPtr + 1; numBytesSerialized += requiredBytes; } else { - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); + assert(i >= rowDelta); + fallbackRows.emplace_back(i - rowDelta, obj); } } - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); + auto serializedRows = serializeFallbackRows(fallbackRows); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); // create dataset from partitions. - return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, partitions, fallbackPartitions, partitionMergeInfo, columns); } // Returns true if t1 can be considered a subtype of t2, specifically in the context of Option types @@ -578,12 +586,9 @@ namespace tuplex { auto numElements = PyList_Size(listObj); logger.debug("transferring " + std::to_string(numElements) + " elements. "); - std::vector> badParallelizeObjects; - std::vector numExceptionsInPartition; - // check if empty? if(0 == numElements) - return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, std::vector(), std::vector(), std::vector(), columns); auto firstRow = PyList_GET_ITEM(listObj, 0); Py_XINCREF(firstRow); @@ -592,6 +597,10 @@ namespace tuplex { // create new partition on driver auto driver = _context->getDriver(); + std::vector> fallbackRows; + std::vector fallbackPartitions; + std::vector partitionMergeInfo; + std::vector partitions; Partition* partition = driver->allocWritablePartition(allocMinSize, schema, -1, _context->id()); int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); @@ -599,7 +608,7 @@ namespace tuplex { uint8_t* ptr = (uint8_t*)(rawPtr + 1); size_t numBytesSerialized = 0; size_t prevNumExceptions = 0; - size_t prevNumRows = 0; + auto rowDelta = 0; for (unsigned i = 0; i < numElements; ++i) { // because this a slow transfer loop, check explicitly for signals and free anything if there's something... @@ -611,10 +620,10 @@ namespace tuplex { logger.warn("slow transfer to backend interrupted."); // free items (decref) - for(auto t : badParallelizeObjects) { + for(auto t : fallbackRows) { Py_XDECREF(std::get<1>(t)); } - badParallelizeObjects.clear(); + fallbackRows.clear(); return _context->makeError("interrupted transfer"); } @@ -632,11 +641,11 @@ namespace tuplex { auto requiredBytes = row.serializedLength(); if(partition->capacity() < numBytesSerialized + requiredBytes) { - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); - prevNumExceptions = badParallelizeObjects.size(); - prevNumRows += numNewExceptions + *rawPtr; + rowDelta += *rawPtr + fallbackRows.size(); + auto serializedRows = serializeFallbackRows(fallbackRows); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); @@ -653,17 +662,18 @@ namespace tuplex { *rawPtr = *rawPtr + 1; numBytesSerialized += requiredBytes; } else - badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, item)); + fallbackRows.emplace_back(std::make_tuple(i - rowDelta, item)); } - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); + auto serializedRows = serializeFallbackRows(fallbackRows); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); // serialize in main memory - return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, partitions, fallbackPartitions, partitionMergeInfo, columns); } DataSet& PythonContext::strDictParallelize(PyObject *listObj, const python::Type &rowType, @@ -679,16 +689,17 @@ namespace tuplex { assert(rowType.parameters().size() == columns.size()); // also very important!!! Schema schema(Schema::MemoryLayout::ROW, rowType); - std::vector> badParallelizeObjects; - std::vector numExceptionsInPartition; - // check if empty? if(0 == numElements) - return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, std::vector(), std::vector(), std::vector(), columns); // create new partition on driver auto driver = _context->getDriver(); + std::vector> fallbackRows; + std::vector fallbackPartitions; + std::vector partitionMergeInfo; + std::vector partitions; Partition* partition = driver->allocWritablePartition(allocMinSize, schema, -1, _context->id()); int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); @@ -696,7 +707,7 @@ namespace tuplex { uint8_t* ptr = (uint8_t*)(rawPtr + 1); size_t numBytesSerialized = 0; size_t prevNumExceptions = 0; - size_t prevNumRows = 0; + auto rowDelta = 0; for(unsigned i = 0; i < numElements; ++i) { auto obj = PyList_GET_ITEM(listObj, i); Py_XINCREF(obj); @@ -724,11 +735,11 @@ namespace tuplex { size_t requiredBytes = row.serializedLength(); // check capacity and realloc if necessary get a new partition if (partition->capacity() < numBytesSerialized + allocMinSize) { - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); - prevNumExceptions = badParallelizeObjects.size(); - prevNumRows += numNewExceptions + *rawPtr; + rowDelta += *rawPtr + fallbackRows.size(); + auto serializedRows = serializeFallbackRows(fallbackRows); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); @@ -744,24 +755,25 @@ namespace tuplex { *rawPtr = *rawPtr + 1; numBytesSerialized += requiredBytes; } catch (const std::exception& e) { - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(i - prevNumRows, obj); + assert(i >= rowDelta); + fallbackRows.emplace_back(std::make_tuple(i - rowDelta, obj)); } } else { - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(i - prevNumRows, obj); + assert(i >= rowDelta); + fallbackRows.emplace_back(i - rowDelta, obj); } } - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); + auto serializedRows = serializeFallbackRows(fallbackRows); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); // create dataset from partitions. - return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, partitions, fallbackPartitions, partitionMergeInfo, columns); } PythonDataSet PythonContext::parallelize(boost::python::list L, @@ -1289,7 +1301,55 @@ namespace tuplex { return co; } - // // running with another python version might lead to severe issues + std::vector PythonContext::serializeFallbackRows(const std::vector>& fallbackRows) { + std::vector fallbackPartitions; + if (fallbackRows.empty()) { + return fallbackPartitions; + } + + auto driver = _context->getDriver(); + Schema schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::STRING})); + auto partition = driver->allocWritablePartition(allocMinSize, schema, -1, _context->id()); + int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); + *rawPtr = 0; + uint8_t* ptr = (uint8_t*)(rawPtr + 1); + size_t numBytesSerialized = 0; + + for (const auto& row: fallbackRows) { + auto rowNum = std::get<0>(row); + auto pythonObject = std::get<1>(row); + auto ecCode = ecToI64(ExceptionCode::PYTHON_PARALLELIZE); + auto pickledObject = python::pickleObject(python::getMainModule(), pythonObject); + auto pickledObjectSize = pickledObject.size(); + size_t requiredBytes = sizeof(int64_t) * 4 + pickledObjectSize; + + if (partition->capacity() < numBytesSerialized + requiredBytes) { + partition->unlockWrite(); + fallbackPartitions.push_back(partition); + partition = driver->allocWritablePartition(allocMinSize, schema, -1, _context->id()); + rawPtr = (int64_t *) partition->lockWriteRaw(); + *rawPtr = 0; + ptr = (uint8_t * )(rawPtr + 1); + numBytesSerialized = 0; + } + + *((int64_t*)(ptr)) = rowNum; ptr += sizeof(int64_t); + *((int64_t*)(ptr)) = ecCode; ptr += sizeof(int64_t); + *((int64_t*)(ptr)) = -1; ptr += sizeof(int64_t); + *((int64_t*)(ptr)) = pickledObjectSize; ptr += sizeof(int64_t); + memcpy(ptr, pickledObject.c_str(), pickledObjectSize); ptr += pickledObjectSize; + + *rawPtr = *rawPtr + 1; + numBytesSerialized += requiredBytes; + } + + partition->unlockWrite(); + fallbackPartitions.push_back(partition); + + return fallbackPartitions; + } + + // // running with another python version might lead to severe issues // // hence, perform check at context startup! // bool checkPythonVersion() { // using namespace std; diff --git a/tuplex/python/src/PythonDataSet.cc b/tuplex/python/src/PythonDataSet.cc index ffe14fdd6..7a04d9aed 100644 --- a/tuplex/python/src/PythonDataSet.cc +++ b/tuplex/python/src/PythonDataSet.cc @@ -908,8 +908,8 @@ namespace tuplex { // retrieve full partitions for speed Partition *partition = nullptr; size_t pos = 0; - while (rs->hasNextPartition() && pos < maxRowCount) { - partition = rs->getNextPartition(); + while (rs->hasNextNormalPartition() && pos < maxRowCount) { + partition = rs->getNextNormalPartition(); auto schema = partition->schema(); // single value? --> reset rowtype by one level auto type = schema.getRowType(); @@ -963,8 +963,8 @@ namespace tuplex { Partition *partition = nullptr; size_t pos = 0; - while (rs->hasNextPartition() && pos < maxRowCount) { - partition = rs->getNextPartition(); + while (rs->hasNextNormalPartition() && pos < maxRowCount) { + partition = rs->getNextNormalPartition(); // add memory towards list object auto ptr = partition->lockRaw(); @@ -1001,8 +1001,8 @@ namespace tuplex { Partition *partition = nullptr; size_t pos = 0; - while (rs->hasNextPartition() && pos < maxRowCount) { - partition = rs->getNextPartition(); + while (rs->hasNextNormalPartition() && pos < maxRowCount) { + partition = rs->getNextNormalPartition(); // add memory towards list object auto ptr = partition->lockRaw(); @@ -1041,8 +1041,8 @@ namespace tuplex { Partition *partition = nullptr; size_t pos = 0; - while (rs->hasNextPartition() && pos < maxRowCount) { - partition = rs->getNextPartition(); + while (rs->hasNextNormalPartition() && pos < maxRowCount) { + partition = rs->getNextNormalPartition(); // add memory towards list object auto ptr = partition->lockRaw(); @@ -1090,8 +1090,8 @@ namespace tuplex { Partition *partition = nullptr; size_t pos = 0; - while (rs->hasNextPartition() && pos < maxRowCount) { - partition = rs->getNextPartition(); + while (rs->hasNextNormalPartition() && pos < maxRowCount) { + partition = rs->getNextNormalPartition(); // add memory towards list object auto ptr = partition->lockRaw(); @@ -1146,8 +1146,8 @@ namespace tuplex { Partition *partition = nullptr; size_t pos = 0; - while (rs->hasNextPartition() && pos < maxRowCount) { - partition = rs->getNextPartition(); + while (rs->hasNextNormalPartition() && pos < maxRowCount) { + partition = rs->getNextNormalPartition(); // add memory towards list object auto ptr = partition->lockRaw(); @@ -1190,8 +1190,8 @@ namespace tuplex { Partition *partition = nullptr; size_t pos = 0; - while (rs->hasNextPartition() && pos < maxRowCount) { - partition = rs->getNextPartition(); + while (rs->hasNextNormalPartition() && pos < maxRowCount) { + partition = rs->getNextNormalPartition(); // add memory towards list object auto ptr = partition->lockRaw(); @@ -1250,8 +1250,8 @@ namespace tuplex { Partition* partition = nullptr; size_t pos = 0; - while(rs->hasNextPartition() && pos < maxRowCount) { - partition = rs->getNextPartition(); + while(rs->hasNextNormalPartition() && pos < maxRowCount) { + partition = rs->getNextNormalPartition(); // add memory towards list object auto ptr = partition->lockRaw(); @@ -1347,7 +1347,7 @@ namespace tuplex { // b.c. merging of arbitrary python objects is not implemented yet, whenever they're present, use general // version // @TODO: this could be optimized! - if(rs->pyobject_count() != 0) + if(rs->fallbackRowCount() != 0) return anyToCPythonWithPyObjects(rs, maxRowCount); auto type = rs->schema().getRowType(); diff --git a/tuplex/python/src/PythonWrappers.cc b/tuplex/python/src/PythonWrappers.cc index 1824c6220..091177cce 100644 --- a/tuplex/python/src/PythonWrappers.cc +++ b/tuplex/python/src/PythonWrappers.cc @@ -173,8 +173,8 @@ namespace tuplex { Partition* partition = nullptr; size_t pos = 0; - while(rs->hasNextPartition()) { - partition = rs->getNextPartition(); + while(rs->hasNextNormalPartition()) { + partition = rs->getNextNormalPartition(); // add memory towards list object auto ptr = partition->lockRaw(); diff --git a/tuplex/test/core/DataSetShow.cc b/tuplex/test/core/DataSetShow.cc index cf50705b8..4ec70c4e6 100644 --- a/tuplex/test/core/DataSetShow.cc +++ b/tuplex/test/core/DataSetShow.cc @@ -14,7 +14,7 @@ #include #include "TestUtils.h" -class DataSetTest : public TuplexTest {}; +class DataSetTest : public PyTest {}; TEST_F(DataSetTest, DataSetShow) { using namespace tuplex; diff --git a/tuplex/test/core/ResultSetTest.cc b/tuplex/test/core/ResultSetTest.cc index 4acd38921..4aedd0649 100644 --- a/tuplex/test/core/ResultSetTest.cc +++ b/tuplex/test/core/ResultSetTest.cc @@ -51,6 +51,57 @@ class ResultSetTest : public PyTest { return pw.getOutputPartitions(); } + std::vector pyObjectsToPartitions(const std::vector>& pyObjects) { + using namespace tuplex; + + std::vector partitions; + if (pyObjects.empty()) { + return partitions; + } + + Schema schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::STRING})); + Partition* partition = allocPartition(schema.getRowType(), -1); + auto rawPtr = (int64_t*)partition->lockWriteRaw(); + *rawPtr = 0; + auto ptr = (uint8_t*)(rawPtr + 1); + size_t numBytesSerialized = 0; + + python::lockGIL(); + for (auto &row: pyObjects) { + auto rowNum = std::get<0>(row); + auto pyObj = std::get<1>(row); + auto ecCode = -1; + auto opID = -1; + auto pickledObject = python::pickleObject(python::getMainModule(), pyObj); + auto pickledObjectSize = pickledObject.size(); + size_t requiredBytes = sizeof(int64_t) * 4 + pickledObjectSize; + + if (partition->capacity() < numBytesSerialized + requiredBytes) { + partition->unlockWrite(); + partitions.push_back(partition); + partition = allocPartition(schema.getRowType(), -1); + rawPtr = (int64_t *) partition->lockWriteRaw(); + *rawPtr = 0; + ptr = (uint8_t*)(rawPtr + 1); + numBytesSerialized = 0; + } + + *((int64_t*)ptr) = rowNum; ptr += sizeof(int64_t); + *((int64_t*)ptr) = ecCode; ptr += sizeof(int64_t); + *((int64_t*)ptr) = opID; ptr += sizeof(int64_t); + *((int64_t*)ptr) = pickledObjectSize; ptr += sizeof(int64_t); + memcpy(ptr, pickledObject.c_str(), pickledObjectSize); ptr += pickledObjectSize; + + *rawPtr += 1; + numBytesSerialized += requiredBytes; + } + python::unlockGIL(); + + partition->unlockWrite(); + partitions.push_back(partition); + + return partitions; + } }; TEST_F(ResultSetTest, NoPyObjects) { @@ -68,10 +119,13 @@ TEST_F(ResultSetTest, NoPyObjects) { sample_rows.push_back(Row(rand() % 256, rand() % 256 * 0.1 - 1.0, strs[rand() % strs.size()])); } auto partitions = rowsToPartitions(sample_rows); - for(auto p : partitions) - p->makeImmortal(); + std::vector partitionGroups; + for(int i = 0; i < partitions.size(); ++i) { + partitions[i]->makeImmortal(); + partitionGroups.push_back(PartitionGroup(1, i, 0, 0, 0, 0)); + } - auto rsA = make_shared(Schema(Schema::MemoryLayout::ROW, sample_rows.front().getRowType()), partitions); + auto rsA = make_shared(Schema(Schema::MemoryLayout::ROW, sample_rows.front().getRowType()), partitions, std::vector{}, std::vector{}, partitionGroups); EXPECT_EQ(rsA->rowCount(), sample_rows.size()); // check correct order returned @@ -79,13 +133,14 @@ TEST_F(ResultSetTest, NoPyObjects) { while(rsA->hasNextRow()) { EXPECT_EQ(rsA->getNextRow().toPythonString(), sample_rows[pos++].toPythonString()); } + EXPECT_EQ(pos, sample_rows.size()); // now limit result set to 17 rows, check this works as well! int Nlimit = 17; auto rsB = make_shared(Schema(Schema::MemoryLayout::ROW, sample_rows.front().getRowType()), partitions, std::vector{}, - std::unordered_map(), - vector>{}, + std::vector{}, + partitionGroups, Nlimit); pos = 0; while(rsB->hasNextRow()) { @@ -137,13 +192,15 @@ TEST_F(ResultSetTest, WithPyObjects) { vector refC = {Row(10), Row(20), Row(30), Row(35), Row(37)}; vector refD = {Row(-1), Row(0), Row(1)}; + auto partitionGroups = std::vector{PartitionGroup(1,0,0,0,1,0)}; + // TEST A: // ----------------- auto rsA = make_shared(Schema(Schema::MemoryLayout::ROW, rows.front().getRowType()), partitions, std::vector{}, - std::unordered_map(), - objsA); + pyObjectsToPartitions(objsA), + partitionGroups); EXPECT_EQ(rsA->rowCount(), objsA.size() + rows.size()); pos = 0; while(rsA->hasNextRow()) { @@ -156,8 +213,8 @@ TEST_F(ResultSetTest, WithPyObjects) { auto rsB = make_shared(Schema(Schema::MemoryLayout::ROW, rows.front().getRowType()), partitions, std::vector{}, - std::unordered_map(), - objsB); + pyObjectsToPartitions(objsB), + partitionGroups); EXPECT_EQ(rsB->rowCount(), objsB.size() + rows.size()); pos = 0; while(rsB->hasNextRow()) { @@ -171,8 +228,8 @@ TEST_F(ResultSetTest, WithPyObjects) { auto rsC = make_shared(Schema(Schema::MemoryLayout::ROW, rows.front().getRowType()), partitions, std::vector{}, - std::unordered_map(), - objsC); + pyObjectsToPartitions(objsC), + partitionGroups); EXPECT_EQ(rsC->rowCount(), objsC.size() + rows.size()); pos = 0; while(rsC->hasNextRow()) { @@ -180,6 +237,8 @@ TEST_F(ResultSetTest, WithPyObjects) { EXPECT_EQ(rsC->getNextRow().toPythonString(), refC[pos++].toPythonString()); } + partitionGroups = std::vector{PartitionGroup(0, 0, 0, 0, 1, 0)}; + // TEST D: // ------- // only pyobjects. @@ -188,8 +247,8 @@ TEST_F(ResultSetTest, WithPyObjects) { auto rsD = make_shared(Schema(Schema::MemoryLayout::ROW, rows.front().getRowType()), std::vector{}, std::vector{}, - std::unordered_map(), - objsD); + pyObjectsToPartitions(objsD), + partitionGroups); EXPECT_EQ(rsD->rowCount(), objsD.size()); pos = 0; while(rsD->hasNextRow()) { diff --git a/tuplex/test/wrappers/WrapperTest.cc b/tuplex/test/wrappers/WrapperTest.cc index c8a819c16..08f894067 100644 --- a/tuplex/test/wrappers/WrapperTest.cc +++ b/tuplex/test/wrappers/WrapperTest.cc @@ -17,60 +17,23 @@ #include #include #include +#include "../core/TestUtils.h" #include // need for these tests a running python interpreter, so spin it up -class WrapperTest : public ::testing::Test { -protected: - std::string testName; - std::string scratchDir; - +class WrapperTest : public TuplexTest { void SetUp() override { - testName = std::string(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) + std::string(::testing::UnitTest::GetInstance()->current_test_info()->name()); - scratchDir = "/tmp/" + testName; + TuplexTest::SetUp(); python::initInterpreter(); - - // hold GIL assert(python::holdsGIL()); } void TearDown() override { - - // important to get GIL for this + TuplexTest::TearDown(); python::closeInterpreter(); } - - inline void remove_temp_files() { - tuplex::Timer timer; - boost::filesystem::remove_all(scratchDir.c_str()); - std::cout<<"removed temp files in "<(listObj)); + auto res = c.parallelize(list).map("lambda x: 1 // x if x == 0 else x", "").resolve(ecToI64(ExceptionCode::ZERODIVISIONERROR), "lambda x: -1", "").collect(); + auto resObj = res.ptr(); + + ASSERT_EQ(PyList_Size(resObj), PyList_Size(expectedResult)); + for (int i = 0; i < PyList_Size(expectedResult); ++i) { + EXPECT_EQ(python::pythonToRow(PyList_GetItem(resObj, i)).toPythonString(), python::pythonToRow( + PyList_GetItem(expectedResult, i)).toPythonString()); + } + } +} + TEST_F(WrapperTest, StringTuple) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject *listObj = PyList_New(4); PyObject *tupleObj1 = PyTuple_New(2); @@ -133,7 +132,7 @@ TEST_F(WrapperTest, StringTuple) { TEST_F(WrapperTest, MixedSimpleTupleTuple) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject *listObj = PyList_New(4); PyObject *tupleObj1 = PyTuple_New(2); @@ -174,7 +173,7 @@ TEST_F(WrapperTest, MixedSimpleTupleTuple) { TEST_F(WrapperTest, StringParallelize) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(3); PyList_SET_ITEM(listObj, 0, python::PyString_FromString("Hello")); @@ -200,7 +199,7 @@ TEST_F(WrapperTest, StringParallelize) { TEST_F(WrapperTest, DictionaryParallelize) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * dictObj1 = PyDict_New(); PyDict_SetItem(dictObj1, python::PyString_FromString("a"), PyFloat_FromDouble(0.0)); @@ -251,7 +250,7 @@ TEST_F(WrapperTest, SimpleCSVParse) { PyDict_SetItemString(pyopt, "tuplex.webui.enable", Py_False); // RAII, destruct python context! - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); // weird block syntax due to RAII problems. { @@ -282,7 +281,7 @@ TEST_F(WrapperTest, SimpleCSVParse) { TEST_F(WrapperTest, GetOptions) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); // weird RAII problems of boost python { @@ -298,8 +297,8 @@ TEST_F(WrapperTest, GetOptions) { TEST_F(WrapperTest, TwoContexts) { using namespace tuplex; - PythonContext c("", "", testOptions()); - PythonContext c2("", "", testOptions()); + PythonContext c("", "", microTestOptions().asJSON()); + PythonContext c2("", "", microTestOptions().asJSON()); { auto opt1 = c.options(); @@ -323,7 +322,7 @@ TEST_F(WrapperTest, Show) { PyDict_SetItemString(pyopt, "tuplex.webui.enable", Py_False); // RAII, destruct python context! - PythonContext c("python", "", testOptions()); + PythonContext c("python", "", microTestOptions().asJSON()); // weird block syntax due to RAII problems. { @@ -348,7 +347,7 @@ TEST_F(WrapperTest, GoogleTrace) { PyDict_SetItemString(pyopt, "tuplex.webui.enable", Py_False); // RAII, destruct python context! - PythonContext c("python", "", testOptions()); + PythonContext c("python", "", testOptions().asJSON()); /// Based on Google trace data, this mini pipeline serves as CSV parsing test ground. /// c.csv(file_path) \ /// .filter(lambda x: x[3] == 0) \ @@ -495,7 +494,7 @@ TEST_F(WrapperTest, extractPriceExample) { auto cols = boost::python::list(boost::python::handle<>(colObj)); // RAII, destruct python context! - PythonContext c("python", "", testOptions()); + PythonContext c("python", "", microTestOptions().asJSON()); { // all calls go here... @@ -595,7 +594,7 @@ TEST_F(WrapperTest, DictListParallelize) { using namespace tuplex; // RAII, destruct python context! - PythonContext c("python", "", testOptions()); + PythonContext c("python", "", microTestOptions().asJSON()); // weird block syntax due to RAII problems. { @@ -632,9 +631,9 @@ TEST_F(WrapperTest, UpcastParallelizeI) { using namespace tuplex; // RAII, destruct python context! - auto opts = testOptions(); - opts = opts.substr(0, opts.length() - 1) + ", \"tuplex.autoUpcast\":\"True\"}"; - PythonContext c("python", "", opts); + auto opts = microTestOptions(); + opts.set("tuplex.autoUpcast", "true"); + PythonContext c("python", "", opts.asJSON()); // weird block syntax due to RAII problems. { @@ -664,9 +663,9 @@ TEST_F(WrapperTest, UpcastParallelizeII) { using namespace tuplex; // RAII, destruct python context! - auto opts = testOptions(); - opts = opts.substr(0, opts.length() - 1) + ", \"tuplex.autoUpcast\":\"True\"}"; - PythonContext c("python", "", opts); + auto opts = microTestOptions(); + opts.set("tuplex.autoUpcast", "true"); + PythonContext c("python", "", opts.asJSON()); // weird block syntax due to RAII problems. { @@ -700,9 +699,9 @@ TEST_F(WrapperTest, FilterAll) { using namespace tuplex; // RAII, destruct python context! - auto opts = testOptions(); - opts = opts.substr(0, opts.length() - 1) + ",\"tuplex.autoUpcast\":\"True\"}"; - PythonContext c("python", "", opts); + auto opts = microTestOptions(); + opts.set("tuplex.autoUpcast", "true"); + PythonContext c("python", "", opts.asJSON()); // weird block syntax due to RAII problems. { @@ -728,7 +727,7 @@ TEST_F(WrapperTest, ColumnNames) { using namespace tuplex; // RAII, destruct python context! - PythonContext c("python", "", testOptions()); + PythonContext c("python", "", microTestOptions().asJSON()); // weird block syntax due to RAII problems. { @@ -790,9 +789,9 @@ TEST_F(WrapperTest, IntegerTuple) { PyDict_SetItemString(pyopt, "tuplex.autoUpcast", Py_True); // RAII, destruct python context! - auto opts = testOptions(); - opts = opts.substr(0, opts.length() - 1) + ",\"tuplex.autoUpcast\":\"True\"}"; - PythonContext c("python", "", opts); + auto opts = microTestOptions(); + opts.set("tuplex.autoUpcast", "true"); + PythonContext c("python", "", opts.asJSON()); // weird block syntax due to RAII problems. { @@ -847,8 +846,9 @@ TEST_F(WrapperTest, IfWithNull) { // RAII, destruct python context! auto opts = testOptions(); - opts = opts.substr(0, opts.length() - 1) + ",\"tuplex.useLLVMOptimizer\" : \"False\", \"tuplex.executorCount\":0}"; - PythonContext c("python", "", opts); + opts.set("tuplex.useLLVMOptimizer", "false"); + opts.set("tuplex.executorCount", "0"); + PythonContext c("python", "", opts.asJSON()); // execute mini part of pipeline and output csv to file // pipeline is // df = ctx.csv(perf_path) @@ -922,8 +922,9 @@ TEST_F(WrapperTest, FlightData) { // RAII, destruct python context! auto opts = testOptions(); - opts = opts.substr(0, opts.length() - 1) + ",\"tuplex.useLLVMOptimizer\" : \"False\", \"tuplex.executorCount\":0}"; - PythonContext c("python", "", opts); + opts.set("tuplex.useLLVMOptimizer", "false"); + opts.set("tuplex.executorCount", "0"); + PythonContext c("python", "", opts.asJSON()); // execute mini part of pipeline and output csv to file // pipeline is // df = ctx.csv(perf_path) @@ -1131,7 +1132,7 @@ TEST_F(WrapperTest, Airport) { // RAII, destruct python context! PythonContext c("python", "", - testOptions()); + testOptions().asJSON()); // execute mini part of pipeline and output csv to file // pipeline is @@ -1175,7 +1176,7 @@ TEST_F(WrapperTest, Airport) { TEST_F(WrapperTest, OptionParallelizeI) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(5); PyList_SET_ITEM(listObj, 0, PyLong_FromLong(112)); @@ -1207,7 +1208,7 @@ TEST_F(WrapperTest, OptionParallelizeI) { TEST_F(WrapperTest, OptionParallelizeII) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(5); @@ -1252,7 +1253,7 @@ TEST_F(WrapperTest, OptionParallelizeII) { TEST_F(WrapperTest, NoneParallelize) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(2); PyList_SET_ITEM(listObj, 0, Py_None); @@ -1278,7 +1279,7 @@ TEST_F(WrapperTest, NoneParallelize) { TEST_F(WrapperTest, EmptyMapI) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(4); PyList_SET_ITEM(listObj, 0, PyLong_FromLong(1)); @@ -1308,7 +1309,7 @@ TEST_F(WrapperTest, EmptyMapI) { TEST_F(WrapperTest, EmptyMapII) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(4); PyList_SET_ITEM(listObj, 0, PyLong_FromLong(1)); @@ -1342,7 +1343,7 @@ TEST_F(WrapperTest, EmptyMapII) { TEST_F(WrapperTest, EmptyMapIII) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(4); PyList_SET_ITEM(listObj, 0, PyLong_FromLong(1)); @@ -1376,7 +1377,7 @@ TEST_F(WrapperTest, EmptyMapIII) { TEST_F(WrapperTest, EmptyOptionMapI) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(4); PyList_SET_ITEM(listObj, 0, PyLong_FromLong(1)); @@ -1408,7 +1409,7 @@ TEST_F(WrapperTest, EmptyOptionMapI) { TEST_F(WrapperTest, EmptyOptionMapII) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(4); PyList_SET_ITEM(listObj, 0, PyLong_FromLong(1)); @@ -1440,7 +1441,7 @@ TEST_F(WrapperTest, EmptyOptionMapII) { TEST_F(WrapperTest, OptionTupleParallelizeI) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(3); @@ -1491,7 +1492,7 @@ TEST_F(WrapperTest, OptionTupleParallelizeI) { TEST_F(WrapperTest, OptionTupleParallelizeII) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(3); @@ -1542,7 +1543,7 @@ TEST_F(WrapperTest, OptionTupleParallelizeII) { TEST_F(WrapperTest, OptionTupleParallelizeIII) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(3); @@ -1592,7 +1593,7 @@ TEST_F(WrapperTest, OptionTupleParallelizeIII) { TEST_F(WrapperTest, parallelizeOptionTypeI) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = python::runAndGet( "test_input = [(1.0, '2', 3, '4', 5, 6, True, 8, 9, None), (None, '2', 3, None, 5, 6, True, 8, 9, None)" @@ -1621,7 +1622,7 @@ TEST_F(WrapperTest, parallelizeOptionTypeI) { TEST_F(WrapperTest, parallelizeNestedSlice) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = python::runAndGet( "test_input = [((), (\"hello\",), 123, \"oh no\", (1, 2)), ((), (\"goodbye\",), 123, \"yes\", (-10, 2)),\n" @@ -1655,7 +1656,7 @@ TEST_F(WrapperTest, TPCHQ6) { " 'l_discount', 'l_tax', 'l_returnflag', 'l_linestatus',\n" " 'l_shipdate', 'l_commitdate', 'l_receiptdate',\n" " 'l_shipinstruct', 'l_shipmode', 'l_comment']", "listitem_columns"); - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", testOptions().asJSON()); { @@ -1678,7 +1679,7 @@ TEST_F(WrapperTest, TupleParallelizeI) { PyObject* listObj = python::runAndGet("L = [('hello', 'world', 'hi', 1, 2, 3), ('foo', 'bar', 'baz', 4, 5, 6), ('blank', '', 'not', 7, 8, 9)]", "L"); - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); { auto list = boost::python::list(boost::python::handle<>(listObj)); c.parallelize(list).map("lambda x: ({x[0]: x[3], x[1]: x[4], x[2]: x[5]},)", "").show(); @@ -1690,7 +1691,7 @@ TEST_F(WrapperTest, TupleParallelizeII) { PyObject* listObj = python::runAndGet("L = [({}, {}, {}), ({}, {}, {}), ({}, {}, {})]", "L"); - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); { auto list = boost::python::list(boost::python::handle<>(listObj)); c.parallelize(list).map("lambda x, y, z: [x, y, z]", "").show(); @@ -1707,7 +1708,7 @@ TEST_F(WrapperTest, DictParallelizeRefTest) { PyObject* strings = python::runAndGet("strings = [('hello', 'world', 'hi'), ('foo', 'bar', 'baz'), ('blank', '', 'not')]\n", "strings"); PyObject* floats = python::runAndGet("floats = [(1.2, 3.4, -100.2), (5.6, 7.8, -1.234), (9.0, 0.1, 2.3)]\n", "floats"); ASSERT_TRUE(floats->ob_refcnt > 0); - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); { @@ -1750,7 +1751,7 @@ TEST_F(WrapperTest, DictParallelizeRefTest) { TEST_F(WrapperTest, BuiltinModule) { using namespace tuplex; using namespace std; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); { PyObject* L = PyList_New(3); @@ -1782,7 +1783,7 @@ TEST_F(WrapperTest, SwapIII) { " return a, b\n" "\n"; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); { PyObject* L = PyList_New(2); auto tuple1 = PyTuple_New(2); @@ -2191,7 +2192,7 @@ TEST_F(WrapperTest, BitwiseAnd) { PyObject* listObj = python::runAndGet("L = [(False, False), (False, True), (True, False), (True, True)]", "L"); - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); { auto list = boost::python::list(boost::python::handle<>(listObj)); auto res_list = c.parallelize(list).map("lambda a, b: a & b", "").collect(); @@ -2207,7 +2208,7 @@ TEST_F(WrapperTest, MetricsTest) { PyObject* listObj = python::runAndGet("L = [(False, False), (False, True), (True, False), (True, True)]", "L"); - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); { auto list = boost::python::list(boost::python::handle<>(listObj)); auto res_list = c.parallelize(list).map("lambda a, b: a & b", "").collect(); @@ -2397,9 +2398,9 @@ TEST_F(WrapperTest, MixedTypesIsWithNone) { using namespace tuplex; using namespace std; - auto opts = testOptions(); - opts = opts.substr(0, opts.length() - 1) + ",\"tuplex.optimizer.mergeExceptionsInOrder\":\"True\"}"; - PythonContext c("python", "", opts); + auto opts = microTestOptions(); + opts.set("tuplex.optimizer.mergeExceptionsInOrder", "true"); + PythonContext c("python", "", opts.asJSON()); PyObject *listObj = PyList_New(8); PyList_SetItem(listObj, 0, Py_None); From c0b5d109f9c7afe170275e3d6678b44cf59af632 Mon Sep 17 00:00:00 2001 From: Ben Givertz Date: Wed, 9 Feb 2022 16:38:50 -0500 Subject: [PATCH 03/15] Give test more memory --- tuplex/test/wrappers/WrapperTest.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tuplex/test/wrappers/WrapperTest.cc b/tuplex/test/wrappers/WrapperTest.cc index 08f894067..88dddacc5 100644 --- a/tuplex/test/wrappers/WrapperTest.cc +++ b/tuplex/test/wrappers/WrapperTest.cc @@ -494,7 +494,7 @@ TEST_F(WrapperTest, extractPriceExample) { auto cols = boost::python::list(boost::python::handle<>(colObj)); // RAII, destruct python context! - PythonContext c("python", "", microTestOptions().asJSON()); + PythonContext c("python", "", testOptions().asJSON()); { // all calls go here... From ff374caeb9b65c6dbb70b5524d3c434c32ad32a4 Mon Sep 17 00:00:00 2001 From: Ben Givertz Date: Wed, 27 Apr 2022 10:07:42 -0400 Subject: [PATCH 04/15] Remove reference to boost --- tuplex/test/wrappers/WrapperTest.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tuplex/test/wrappers/WrapperTest.cc b/tuplex/test/wrappers/WrapperTest.cc index 3244a6cb0..4b7eb2148 100644 --- a/tuplex/test/wrappers/WrapperTest.cc +++ b/tuplex/test/wrappers/WrapperTest.cc @@ -75,7 +75,7 @@ TEST_F(WrapperTest, BasicMergeInOrder) { } { - auto list = boost::python::list(boost::python::handle<>(listObj)); + auto list = py::reinterpret_borrow(listObj); auto res = c.parallelize(list).map("lambda x: 1 // x if x == 0 else x", "").resolve(ecToI64(ExceptionCode::ZERODIVISIONERROR), "lambda x: -1", "").collect(); auto resObj = res.ptr(); From c70bd2764e9618635b9aea19bf31b2c60d099e01 Mon Sep 17 00:00:00 2001 From: Ben Givertz Date: Wed, 27 Apr 2022 10:20:36 -0400 Subject: [PATCH 05/15] Remove commented out code --- tuplex/core/src/physical/TransformTask.cc | 25 ----------------------- 1 file changed, 25 deletions(-) diff --git a/tuplex/core/src/physical/TransformTask.cc b/tuplex/core/src/physical/TransformTask.cc index 1d24d6fdc..1e70cb2d0 100644 --- a/tuplex/core/src/physical/TransformTask.cc +++ b/tuplex/core/src/physical/TransformTask.cc @@ -571,31 +571,6 @@ namespace tuplex { int64_t inSize = inputPartition->size(); const uint8_t *inPtr = inputPartition->lockRaw(); _numInputRowsRead += static_cast(*((int64_t*)inPtr)); -// -// int64_t totalNormalRowCounter = 0; -// int64_t totalGeneralRowCounter = 0; -// int64_t totalFallbackRowCounter = 0; -// -// int64_t g1[] = {2, -// 1, -1, -1, 8, -1, -// 2, -1, -1, 8, -1}; -// int64_t g2[] = {1, -// 3, -1, -1, 8, -1}; -// int64_t g3[] = {2, -// 5, -1, -1, 8, -1, -// 6, -1, -1, 8, -1}; -// uint8_t *generalPartitions[] = {(uint8_t*)g1, (uint8_t*)g2, (uint8_t*)g3}; -// int64_t numGeneralPartitions = 3; -// int64_t generalIndexOffset = 0; -// int64_t generalRowOffset = 0; -// int64_t generalByteOffset = 0; -// -// int64_t f1[] = {1, 2, 3}; -// uint8_t *fallbackPartitions[] = {}; -// int64_t numFallbackPartitions = 0; -// int64_t fallbackIndexOffset = 0; -// int64_t fallbackRowOffset = 0; -// int64_t fallbackByteOffset = 0; // call functor auto bytesParsed = functor(this, inPtr, inSize, &num_normal_rows, &num_bad_rows, false, From b8bb1e499daaa4af3cc28e4baad9ee76aa50acdc Mon Sep 17 00:00:00 2001 From: Ben Givertz Date: Wed, 27 Apr 2022 14:20:26 -0400 Subject: [PATCH 06/15] Symbol test fix --- tuplex/test/core/SymbolsTest.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tuplex/test/core/SymbolsTest.cc b/tuplex/test/core/SymbolsTest.cc index fff2b8f40..0301d0099 100644 --- a/tuplex/test/core/SymbolsTest.cc +++ b/tuplex/test/core/SymbolsTest.cc @@ -23,5 +23,7 @@ TEST_F(SymbolProcessTest, MissingSymbol) { auto path = "../resources/zillow_data.csv"; Context c(microTestOptions()); - auto v = c.csv(path).mapColumn("title", UDF("lambda x: split(x)[0]")).collectAsVector(); + EXPECT_THROW({ + auto v = c.csv(path).mapColumn("title", UDF("lambda x: split(x)[0]")).collectAsVector() + }, std::runtime_error); } \ No newline at end of file From c931a209ec5169236780ccbb8147c881fe8a3070 Mon Sep 17 00:00:00 2001 From: Ben Givertz Date: Wed, 27 Apr 2022 14:21:16 -0400 Subject: [PATCH 07/15] Symbol test fix --- tuplex/test/core/SymbolsTest.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tuplex/test/core/SymbolsTest.cc b/tuplex/test/core/SymbolsTest.cc index 0301d0099..2056ceb91 100644 --- a/tuplex/test/core/SymbolsTest.cc +++ b/tuplex/test/core/SymbolsTest.cc @@ -24,6 +24,6 @@ TEST_F(SymbolProcessTest, MissingSymbol) { Context c(microTestOptions()); EXPECT_THROW({ - auto v = c.csv(path).mapColumn("title", UDF("lambda x: split(x)[0]")).collectAsVector() + auto v = c.csv(path).mapColumn("title", UDF("lambda x: split(x)[0]")).collectAsVector(); }, std::runtime_error); } \ No newline at end of file From b1d21bdbd3445714a33511e103d6e30e41c62b6c Mon Sep 17 00:00:00 2001 From: Ben Givertz Date: Wed, 11 May 2022 14:04:40 -0400 Subject: [PATCH 08/15] PR Fixes --- tuplex/core/include/Context.h | 12 +- tuplex/core/include/PartitionGroup.h | 33 ++--- tuplex/core/include/ee/local/LocalBackend.h | 10 +- tuplex/core/include/logical/CacheOperator.h | 2 +- tuplex/core/src/ee/local/LocalBackend.cc | 140 +------------------- tuplex/core/src/physical/ResultSet.cc | 6 +- 6 files changed, 39 insertions(+), 164 deletions(-) diff --git a/tuplex/core/include/Context.h b/tuplex/core/include/Context.h index 8baff3bfc..856449f8a 100644 --- a/tuplex/core/include/Context.h +++ b/tuplex/core/include/Context.h @@ -60,6 +60,13 @@ namespace tuplex { // needed because of C++ template issues void addPartition(DataSet* ds, Partition *partition); + + /*! + * Add parallelize logical operator to dataset + * @param ds dataset + * @param fallbackPartitions fallback partitions from python parallelize + * @param partitionGroups partition mapping information + */ void addParallelizeNode(DataSet *ds, const std::vector& fallbackPartitions=std::vector{}, const std::vector& partitionGroups=std::vector{}); //! adds a paralellize node to the computation graph Partition* requestNewPartition(const Schema& schema, const int dataSetID, size_t minBytesRequired); @@ -254,10 +261,9 @@ namespace tuplex { * @param partitions partitions to assign to dataset. These should NOT be reused later. * Also, partitions need to hold data in supplied schema. If empty vector is given, * empty dataset will be created. - * + * @param fallbackPartitions fallback partitions to assign to dataset + * @param partitionGroups mapping of partitions to fallback partitions * @param columns optional column names - * @param badParallelizeObjects schema violations found during parallelization of partitions - * @param numExceptionsInPartition number of schema violations that occured in each of the partitions * @return reference to newly created dataset. */ DataSet& fromPartitions(const Schema& schema, const std::vector& partitions, const std::vector& fallbackPartitions, const std::vector& partitionGroups, const std::vector& columns); diff --git a/tuplex/core/include/PartitionGroup.h b/tuplex/core/include/PartitionGroup.h index bb83585e5..f74d2d5d3 100644 --- a/tuplex/core/include/PartitionGroup.h +++ b/tuplex/core/include/PartitionGroup.h @@ -11,39 +11,42 @@ #ifndef TUPLEX_PARTITIONGROUP_H #define TUPLEX_PARTITIONGROUP_H +/*! + * Maintains a grouping of normal, fallback, and general partitions that arise from the + * same executor in order to be reprocessed after a cache occurs. + */ namespace tuplex { struct PartitionGroup { - public: /*! * Groups normal, general, and fallback partitions into groups, with all partitions that originated from a single * task being grouped together. * @param numNormalPartitons number of normal partitions in group - * @param normalPartitionStartInd starting index in list of all normal partitions + * @param normalPartitionStartIndex starting index in list of all normal partitions * @param numGeneralPartitions number of general partitions in group - * @param generalPartitionStartInd starting index in list of all general partitions + * @param generalPartitionStartIndex starting index in list of all general partitions * @param numFallbackPartitions number of fallback partitions in group - * @param fallbackPartitionStartInd starting index in list of all fallback partitions + * @param fallbackPartitionStartIndex starting index in list of all fallback partitions */ - PartitionGroup(size_t numNormalPartitons, size_t normalPartitionStartInd, - size_t numGeneralPartitions, size_t generalPartitionStartInd, - size_t numFallbackPartitions, size_t fallbackPartitionStartInd): - numNormalPartitions(numNormalPartitons), normalPartitionStartInd(normalPartitionStartInd), - numGeneralPartitions(numGeneralPartitions), generalPartitionStartInd(generalPartitionStartInd), - numFallbackPartitions(numFallbackPartitions), fallbackPartitionStartInd(fallbackPartitionStartInd) {} + PartitionGroup(size_t numNormalPartitions, size_t normalPartitionStartIndex, + size_t numGeneralPartitions, size_t generalPartitionStartIndex, + size_t numFallbackPartitions, size_t fallbackPartitionStartIndex): + numNormalPartitions(numNormalPartitions), normalPartitionStartIndex(normalPartitionStartIndex), + numGeneralPartitions(numGeneralPartitions), generalPartitionStartIndex(generalPartitionStartIndex), + numFallbackPartitions(numFallbackPartitions), fallbackPartitionStartIndex(fallbackPartitionStartIndex) {} /*! * Initialize empty struct with all values set to zero. */ PartitionGroup() : - numNormalPartitions(0), numGeneralPartitions(0), numFallbackPartitions(0), - normalPartitionStartInd(0), generalPartitionStartInd(0), fallbackPartitionStartInd(0) {} + numNormalPartitions(0), numGeneralPartitions(0), numFallbackPartitions(0), + normalPartitionStartIndex(0), generalPartitionStartIndex(0), fallbackPartitionStartIndex(0) {} size_t numNormalPartitions; - size_t normalPartitionStartInd; + size_t normalPartitionStartIndex; size_t numGeneralPartitions; - size_t generalPartitionStartInd; + size_t generalPartitionStartIndex; size_t numFallbackPartitions; - size_t fallbackPartitionStartInd; + size_t fallbackPartitionStartIndex; }; } diff --git a/tuplex/core/include/ee/local/LocalBackend.h b/tuplex/core/include/ee/local/LocalBackend.h index b16dc0b33..63ea60c4b 100644 --- a/tuplex/core/include/ee/local/LocalBackend.h +++ b/tuplex/core/include/ee/local/LocalBackend.h @@ -99,7 +99,7 @@ namespace tuplex { return std::accumulate(counts.begin(), counts.end(), 0, [](size_t acc, std::pair, size_t> val) { return acc + val.second; }); } - inline std::vector getNormalPartitions(IExecutorTask* task) { + inline std::vector getNormalPartitions(IExecutorTask* task) const { if(!task) return std::vector(); @@ -113,7 +113,7 @@ namespace tuplex { return std::vector(); } - inline std::vector getExceptionPartitions(IExecutorTask* task) { + inline std::vector getExceptionPartitions(IExecutorTask* task) const { if(!task) return std::vector(); @@ -127,7 +127,7 @@ namespace tuplex { return std::vector(); } - inline std::vector getGeneralPartitions(IExecutorTask* task) { + inline std::vector getGeneralPartitions(IExecutorTask* task) const { if(!task) return std::vector(); @@ -141,7 +141,7 @@ namespace tuplex { return std::vector(); } - inline std::unordered_map, size_t> getExceptionCounts(IExecutorTask* task) { + inline std::unordered_map, size_t> getExceptionCounts(IExecutorTask* task) const { if(!task) return std::unordered_map, size_t>(); @@ -155,7 +155,7 @@ namespace tuplex { return std::unordered_map, size_t>(); } - inline std::vector getFallbackPartitions(IExecutorTask* task) { + inline std::vector getFallbackPartitions(IExecutorTask* task) const { if(!task) return std::vector(); diff --git a/tuplex/core/include/logical/CacheOperator.h b/tuplex/core/include/logical/CacheOperator.h index 2ca184a1d..1e373b79a 100644 --- a/tuplex/core/include/logical/CacheOperator.h +++ b/tuplex/core/include/logical/CacheOperator.h @@ -25,7 +25,7 @@ namespace tuplex { CacheOperator(LogicalOperator* parent, bool storeSpecialized, const Schema::MemoryLayout& memoryLayout=Schema::MemoryLayout::ROW) : LogicalOperator(parent), _storeSpecialized(storeSpecialized), _memoryLayout(memoryLayout), _cached(false), - _columns(parent->columns()) { + _columns(parent->columns()), _normalRowCount(0), _fallbackRowCount(0), _generalRowCount(0) { setSchema(this->parent()->getOutputSchema()); // inherit schema from parent _optimizedSchema = getOutputSchema(); if(memoryLayout != Schema::MemoryLayout::ROW) diff --git a/tuplex/core/src/ee/local/LocalBackend.cc b/tuplex/core/src/ee/local/LocalBackend.cc index e23a4f937..56e5de9a0 100644 --- a/tuplex/core/src/ee/local/LocalBackend.cc +++ b/tuplex/core/src/ee/local/LocalBackend.cc @@ -657,7 +657,7 @@ namespace tuplex { for (const auto &group : partitionGroups) { std::vector taskNormalPartitions; bool invalidateAfterUse = false; - for (int i = group.normalPartitionStartInd; i < group.normalPartitionStartInd + group.numNormalPartitions; ++i) { + for (int i = group.normalPartitionStartIndex; i < group.normalPartitionStartIndex + group.numNormalPartitions; ++i) { auto p = inputPartitions[i]; numInputRows += p->getNumRows(); if (!p->isImmortal()) @@ -665,13 +665,13 @@ namespace tuplex { taskNormalPartitions.push_back(p); } std::vector taskGeneralPartitions; - for (int i = group.generalPartitionStartInd; i < group.generalPartitionStartInd + group.numGeneralPartitions; ++i) { + for (int i = group.generalPartitionStartIndex; i < group.generalPartitionStartIndex + group.numGeneralPartitions; ++i) { auto p = generalPartitions[i]; numInputRows += p->getNumRows(); taskGeneralPartitions.push_back(p); } std::vector taskFallbackPartitions; - for (int i = group.fallbackPartitionStartInd; i < group.fallbackPartitionStartInd + group.numFallbackPartitions; ++i) { + for (int i = group.fallbackPartitionStartIndex; i < group.fallbackPartitionStartIndex + group.numFallbackPartitions; ++i) { auto p = fallbackPartitions[i]; numInputRows += p->getNumRows(); taskFallbackPartitions.push_back(p); @@ -1190,140 +1190,6 @@ namespace tuplex { Logger::instance().defaultLogger().info(ss.str()); } -// void LocalBackend::setPartitionMergeInfo(const std::vector& normalPartitions, -// const std::vector& generalPartitions, const size_t generalStartInd, -// const std::vector& fallbackPartitions, const size_t fallbackStartInd, -// std::vector& partitionMergeInfo) { -// -// -// -// -// auto generalInd = 0; -// auto generalRowOff = 0; -// auto generalByteOff = 0; -// auto generalRowsInPartition = 0; -// const uint8_t *generalPtr = nullptr; -// if (!generalPartitions.empty()) { -// generalRowsInPartition = generalPartitions[0]->getNumRows(); -// generalPtr = generalPartitions[0]->lock(); -// } -// -// auto fallbackInd = 0; -// auto fallbackRowOff = 0; -// auto fallbackByteOff = 0; -// auto fallbackRowsInPartition = 0; -// const uint8_t *fallbackPtr = nullptr; -// if (!fallbackPartitions.empty()) { -// fallbackRowsInPartition = fallbackPartitions[0]->getNumRows(); -// fallbackPtr = fallbackPartitions[0]->lock(); -// } -// -// auto exceptionInd = 0; -// auto exceptionRowOff = 0; -// auto exceptionByteOff = 0; -// auto exceptionRowsInPartition = 0; -// const uint8_t *exceptionPtr = nullptr; -// if (!exceptionPartitions.empty()) { -// exceptionRowsInPartition = exceptionPartitions[0]->getNumRows(); -// exceptionPtr = exceptionPartitions[0]->lock(); -// } -// -// auto totalRowCounter = 0; -// auto rowDelta = 0; -// for (const auto &p : normalPartitions) { -// auto mergeInfo = MergeInfo(); -// mergeInfo.setRowDelta(rowDelta); -// auto numNormalRows = p->getNumRows(); -// -// auto generalRowCounter = 0; -// auto curGeneralStartInd = generalInd + generalStartInd; -// auto curGeneralRowOff = generalRowOff; -// auto curGeneralByteOff = generalByteOff; -// while (generalPtr && *((int64_t*)generalPtr) <= totalRowCounter + numNormalRows) { -// generalRowCounter++; -// totalRowCounter++; -// -// auto dataSize = ((int64_t*)generalPtr)[3] + 4*sizeof(int64_t); -// generalByteOff += dataSize; -// generalPtr += dataSize; -// generalRowOff++; -// -// if (generalRowOff == generalRowsInPartition) { -// generalPartitions[generalInd]->unlock(); -// generalInd++; -// if (generalInd < generalPartitions.size()) { -// generalPtr = generalPartitions[generalInd]->lock(); -// generalRowsInPartition = generalPartitions[generalInd]->getNumRows(); -// generalRowOff = 0; -// generalByteOff = 0; -// } else { -// generalPtr = nullptr; -// } -// } -// } -// mergeInfo.setGeneralInfo(generalRowCounter, curGeneralStartInd, curGeneralRowOff, curGeneralByteOff); -// -// auto fallbackRowCounter = 0; -// auto curFallbackStartInd = fallbackInd + fallbackStartInd; -// auto curFallbackRowOff = fallbackRowOff; -// auto curFallbackByteOff = fallbackByteOff; -// while (fallbackPtr && *((int64_t*)fallbackPtr) <= totalRowCounter + numNormalRows + generalRowCounter) { -// fallbackRowCounter++; -// totalRowCounter++; -// -// auto dataSize = ((int64_t*)fallbackPtr)[1] + 2*sizeof(int64_t); -// fallbackByteOff += dataSize; -// fallbackPtr += dataSize; -// fallbackRowOff++; -// -// if (fallbackRowOff == fallbackRowsInPartition) { -// fallbackPartitions[fallbackInd]->unlock(); -// fallbackInd++; -// if (fallbackInd < fallbackPartitions.size()) { -// fallbackPtr = fallbackPartitions[fallbackInd]->lock(); -// fallbackRowsInPartition = fallbackPartitions[fallbackInd]->getNumRows(); -// fallbackRowOff = 0; -// fallbackByteOff = 0; -// } else { -// fallbackPtr = nullptr; -// } -// } -// } -// mergeInfo.setFallbackInfo(fallbackRowCounter, curFallbackStartInd, curFallbackRowOff, curFallbackByteOff); -// -// auto exceptionRowCounter = 0; -// auto curExceptionStartInd = exceptionInd + exceptionStartInd; -// auto curExceptionRowOff = exceptionRowOff; -// auto curExceptionByteOff = exceptionByteOff; -// while (exceptionPtr && *((int64_t*)exceptionPtr) <= totalRowCounter + numNormalRows + generalRowCounter + fallbackRowCounter) { -// exceptionRowCounter++; -// totalRowCounter++; -// -// auto dataSize = ((int64_t*)exceptionPtr)[3] + 4*sizeof(int64_t); -// exceptionByteOff += dataSize; -// exceptionPtr += dataSize; -// exceptionRowOff++; -// -// if (exceptionRowOff == exceptionRowsInPartition) { -// exceptionPartitions[exceptionInd]->unlock(); -// exceptionInd++; -// if (exceptionInd < exceptionPartitions.size()) { -// exceptionPtr = exceptionPartitions[exceptionInd]->lock(); -// exceptionRowsInPartition = exceptionPartitions[exceptionInd]->getNumRows(); -// exceptionRowOff = 0; -// exceptionByteOff = 0; -// } else { -// exceptionPtr = nullptr; -// } -// } -// } -// mergeInfo.setExceptionInfo(exceptionRowCounter, curExceptionStartInd, curExceptionRowOff, curExceptionByteOff); -// -// rowDelta += numNormalRows + generalRowCounter + fallbackRowCounter + exceptionRowCounter; -// partitionMergeInfo.push_back(mergeInfo); -// } -// } - std::vector LocalBackend::resolveViaSlowPath( std::vector &tasks, bool merge_rows_in_order, diff --git a/tuplex/core/src/physical/ResultSet.cc b/tuplex/core/src/physical/ResultSet.cc index 153553f25..5086a1e58 100644 --- a/tuplex/core/src/physical/ResultSet.cc +++ b/tuplex/core/src/physical/ResultSet.cc @@ -303,15 +303,15 @@ namespace tuplex { _fallbackRowCounter = 0; auto group = _partitionGroups.front(); _partitionGroups.pop_front(); - for (int i = group.normalPartitionStartInd; i < group.normalPartitionStartInd + group.numNormalPartitions; ++i) { + for (int i = group.normalPartitionStartIndex; i < group.normalPartitionStartIndex + group.numNormalPartitions; ++i) { _currentNormalPartitions.push_back(_remainingNormalPartitions.front()); _remainingNormalPartitions.pop_front(); } - for (int i = group.generalPartitionStartInd; i < group.generalPartitionStartInd + group.numGeneralPartitions; ++i) { + for (int i = group.generalPartitionStartIndex; i < group.generalPartitionStartIndex + group.numGeneralPartitions; ++i) { _currentGeneralPartitions.push_back(_remainingGeneralPartitions.front()); _remainingGeneralPartitions.pop_front(); } - for (int i = group.fallbackPartitionStartInd; i < group.fallbackPartitionStartInd + group.numFallbackPartitions; ++i) { + for (int i = group.fallbackPartitionStartIndex; i < group.fallbackPartitionStartIndex + group.numFallbackPartitions; ++i) { _currentFallbackPartitions.push_back(_remainingFallbackPartitions.front()); _remainingFallbackPartitions.pop_front(); } From 5e2874322bf8c55e3f07fa5acb00fa5bee5262c5 Mon Sep 17 00:00:00 2001 From: Ben Givertz Date: Wed, 11 May 2022 14:16:32 -0400 Subject: [PATCH 09/15] PR Fixes --- tuplex/core/src/Partition.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tuplex/core/src/Partition.cc b/tuplex/core/src/Partition.cc index c554788cd..a16d1c2eb 100644 --- a/tuplex/core/src/Partition.cc +++ b/tuplex/core/src/Partition.cc @@ -55,7 +55,7 @@ namespace tuplex { uint8_t* Partition::lockWriteRaw() { // must be the thread who allocated this -// assert(_owner->getThreadID() == std::this_thread::get_id()); + assert(_owner->getThreadID() == std::this_thread::get_id()); TRACE_LOCK("partition " + uuidToString(_uuid)); std::this_thread::yield(); From 7a009723dfd81003b160e6861071d48fcac4c8e2 Mon Sep 17 00:00:00 2001 From: Ben Givertz Date: Wed, 11 May 2022 14:36:29 -0400 Subject: [PATCH 10/15] PR Fixes --- tuplex/core/include/physical/ResolveTask.h | 8 ++++++-- tuplex/core/include/physical/ResultSet.h | 6 +++--- tuplex/core/src/Context.cc | 2 ++ tuplex/core/src/logical/CacheOperator.cc | 6 +++--- tuplex/core/src/physical/ResolveTask.cc | 6 +++--- 5 files changed, 17 insertions(+), 11 deletions(-) diff --git a/tuplex/core/include/physical/ResolveTask.h b/tuplex/core/include/physical/ResolveTask.h index 6f85c57a3..1cd098219 100644 --- a/tuplex/core/include/physical/ResolveTask.h +++ b/tuplex/core/include/physical/ResolveTask.h @@ -260,6 +260,7 @@ namespace tuplex { // sink for type violation rows MemorySink _generalCaseSink; + // sink for fallback rows that violate normal and general case MemorySink _fallbackSink; // hash table sink @@ -273,8 +274,11 @@ namespace tuplex { // hybrid inputs (i.e. when having a long stage the hash-tables of a join) std::vector _py_intermediates; - // python output which can't be consolidated, saved as separate list - void writePythonObject(PyObject* out_row); + /*! + * Serialize and write python row to the fallback row sink + * @param out_row + */ + void writePythonObjectToFallbackSink(PyObject* out_row); int64_t _outputRowNumber; diff --git a/tuplex/core/include/physical/ResultSet.h b/tuplex/core/include/physical/ResultSet.h index 6a97be70c..29273fefd 100644 --- a/tuplex/core/include/physical/ResultSet.h +++ b/tuplex/core/include/physical/ResultSet.h @@ -176,7 +176,7 @@ namespace tuplex { } /*! - * returns/removes all general partitions + * returns all general partitions, removes them from result set. * @return */ std::vector generalPartitions() { @@ -187,7 +187,7 @@ namespace tuplex { } /*! - * returns/removes all fallback partitions + * returns all fallback partitions, removes them from result set. * @return */ std::vector fallbackPartitions() { @@ -198,7 +198,7 @@ namespace tuplex { } /*! - * returns/removes all partition groups + * returns all partition groups, removes them from result set. * @return */ std::vector partitionGroups() { diff --git a/tuplex/core/src/Context.cc b/tuplex/core/src/Context.cc index 2934ee0e4..75d705b58 100644 --- a/tuplex/core/src/Context.cc +++ b/tuplex/core/src/Context.cc @@ -367,6 +367,8 @@ namespace tuplex { if (partitionGroups.empty()) { std::vector defaultPartitionGroups; for (int i = 0; i < ds->getPartitions().size(); ++i) { + // New partition group for each normal partition so number is constant at 1 + // This is because each normal partition is assigned its own task defaultPartitionGroups.push_back(PartitionGroup(1, i, 0, 0, 0, 0)); } op->setPartitionGroups(defaultPartitionGroups); diff --git a/tuplex/core/src/logical/CacheOperator.cc b/tuplex/core/src/logical/CacheOperator.cc index 4a571599c..0f3b91e79 100644 --- a/tuplex/core/src/logical/CacheOperator.cc +++ b/tuplex/core/src/logical/CacheOperator.cc @@ -71,15 +71,15 @@ namespace tuplex { // fetch both partitions (consume) from resultset + any unresolved exceptions _normalPartitions = rs->normalPartitions(); - for(auto p : _normalPartitions) + for(auto &p : _normalPartitions) p->makeImmortal(); _generalPartitions = rs->generalPartitions(); - for(auto p : _generalPartitions) + for(auto &p : _generalPartitions) p->makeImmortal(); _fallbackPartitions = rs->fallbackPartitions(); - for(auto p : _fallbackPartitions) + for(auto &p : _fallbackPartitions) p->makeImmortal(); _partitionGroups = rs->partitionGroups(); diff --git a/tuplex/core/src/physical/ResolveTask.cc b/tuplex/core/src/physical/ResolveTask.cc index cb4fb6801..926007b02 100644 --- a/tuplex/core/src/physical/ResolveTask.cc +++ b/tuplex/core/src/physical/ResolveTask.cc @@ -218,7 +218,7 @@ namespace tuplex { return tuple; } - void ResolveTask::writePythonObject(PyObject *out_row) { + void ResolveTask::writePythonObjectToFallbackSink(PyObject *out_row) { assert(out_row); // similar to merge row, need to write other rows first! @@ -662,9 +662,9 @@ namespace tuplex { delete [] buf; } else { if(PyTuple_Check(rowObj) && PyTuple_Size(rowObj) == 1) { - writePythonObject(PyTuple_GetItem(rowObj, 0)); + writePythonObjectToFallbackSink(PyTuple_GetItem(rowObj, 0)); } else { - writePythonObject(rowObj); + writePythonObjectToFallbackSink(rowObj); } } // Py_XDECREF(rowObj); From 49082888cf35cd8491f796ef9e5accef455d589f Mon Sep 17 00:00:00 2001 From: Ben Givertz Date: Fri, 13 May 2022 10:38:40 -0400 Subject: [PATCH 11/15] PR Fixes --- tuplex/core/src/physical/ResolveTask.cc | 1 + tuplex/core/src/physical/TransformTask.cc | 23 ++++++-------- tuplex/python/include/PythonContext.h | 9 +++++- tuplex/python/src/PythonContext.cc | 38 ++++++++++++----------- 4 files changed, 39 insertions(+), 32 deletions(-) diff --git a/tuplex/core/src/physical/ResolveTask.cc b/tuplex/core/src/physical/ResolveTask.cc index 926007b02..c500ea55d 100644 --- a/tuplex/core/src/physical/ResolveTask.cc +++ b/tuplex/core/src/physical/ResolveTask.cc @@ -661,6 +661,7 @@ namespace tuplex { mergeRow(buf, serialized_length, BUF_FORMAT_GENERAL_OUTPUT); delete [] buf; } else { + // Unwrap single element tuples before writing them to the fallback sink if(PyTuple_Check(rowObj) && PyTuple_Size(rowObj) == 1) { writePythonObjectToFallbackSink(PyTuple_GetItem(rowObj, 0)); } else { diff --git a/tuplex/core/src/physical/TransformTask.cc b/tuplex/core/src/physical/TransformTask.cc index 1e70cb2d0..20eccc9e5 100644 --- a/tuplex/core/src/physical/TransformTask.cc +++ b/tuplex/core/src/physical/TransformTask.cc @@ -548,17 +548,17 @@ namespace tuplex { int64_t totalFallbackRowCounter = 0; int64_t totalFilterCounter = 0; - uint8_t **generalPartitions = new uint8_t*[_generalPartitions.size()]; - for (int i = 0; i < _generalPartitions.size(); ++i) - generalPartitions[i] = _generalPartitions[i]->lockWriteRaw(); + std::vector generalPartitions; + for (auto &p : _generalPartitions) + generalPartitions.push_back(p->lockWriteRaw()); int64_t numGeneralPartitions = _generalPartitions.size(); int64_t generalIndexOffset = 0; int64_t generalRowOffset = 0; int64_t generalByteOffset = 0; - uint8_t **fallbackPartitions = new uint8_t*[_fallbackPartitions.size()]; - for (int i = 0; i < _fallbackPartitions.size(); ++i) - fallbackPartitions[i] = _fallbackPartitions[i]->lockWriteRaw(); + std::vector fallbackPartitions; + for (auto &p : _fallbackPartitions) + fallbackPartitions.push_back(p->lockWriteRaw()); int64_t numFallbackPartitions = _fallbackPartitions.size(); int64_t fallbackIndexOffset = 0; int64_t fallbackRowOffset = 0; @@ -632,14 +632,11 @@ namespace tuplex { } } - for (auto & _generalPartition : _generalPartitions) - _generalPartition->unlockWrite(); + for (auto &p : _generalPartitions) + p->unlockWrite(); - for (auto & _fallbackPartition : _fallbackPartitions) - _fallbackPartition->unlockWrite(); - - delete[] fallbackPartitions; - delete[] generalPartitions; + for (auto &p : _fallbackPartitions) + p->unlockWrite(); #ifndef NDEBUG owner()->info("Trafo task memory source exhausted (" + pluralize(_inputPartitions.size(), "partition") + ", " diff --git a/tuplex/python/include/PythonContext.h b/tuplex/python/include/PythonContext.h index b3888f342..47eddcb18 100644 --- a/tuplex/python/include/PythonContext.h +++ b/tuplex/python/include/PythonContext.h @@ -118,7 +118,14 @@ namespace tuplex { return pds; } - std::vector serializeFallbackRows(const std::vector>& fallbackRows); + /*! + * Serialize vector of PyObjects to Tuplex format fallback rows in the form of: + * row index, exception code, operator id, pickled object size, pickled object payload + * @param fallbackRows tuples of row index and fallback rows + * @param executor where fallback rows originated from + * @return + */ + std::vector serializeFallbackRows(const std::vector>& fallbackRows, Executor* executor); public: /*! diff --git a/tuplex/python/src/PythonContext.cc b/tuplex/python/src/PythonContext.cc index 6425cdad2..4e9a7bec9 100644 --- a/tuplex/python/src/PythonContext.cc +++ b/tuplex/python/src/PythonContext.cc @@ -46,8 +46,11 @@ namespace tuplex { // create new partition on driver auto driver = _context->getDriver(); + // contains tuple of row index to python object for non-conforming rows std::vector> fallbackRows; + // contains fallback rows after they have been serialized to Tuplex format std::vector fallbackPartitions; + // maps fallback partitions to the normal partitions they originated from std::vector partitionMergeInfo; std::vector partitions; @@ -64,7 +67,7 @@ namespace tuplex { // check capacity and realloc if necessary get a new partition if(partition->capacity() < numBytesSerialized + sizeof(double)) { rowDelta += *rawPtr + fallbackRows.size(); - auto serializedRows = serializeFallbackRows(fallbackRows); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); fallbackRows.clear(); partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); @@ -108,7 +111,7 @@ namespace tuplex { numBytesSerialized += sizeof(double); } - auto serializedRows = serializeFallbackRows(fallbackRows); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); fallbackRows.clear(); partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); @@ -154,7 +157,7 @@ namespace tuplex { // check capacity and realloc if necessary get a new partition if(partition->capacity() < numBytesSerialized + sizeof(int64_t)) { rowDelta += *rawPtr + fallbackRows.size(); - auto serializedRows = serializeFallbackRows(fallbackRows); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); fallbackRows.clear(); partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); @@ -193,7 +196,7 @@ namespace tuplex { *rawPtr = *rawPtr + 1; numBytesSerialized += sizeof(int64_t); } - auto serializedRows = serializeFallbackRows(fallbackRows); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); fallbackRows.clear(); partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); @@ -279,7 +282,7 @@ namespace tuplex { // get new partition if capacity exhausted if(partition->capacity() < numBytesSerialized + requiredBytes) { rowDelta += *rawPtr + fallbackRows.size(); - auto serializedRows = serializeFallbackRows(fallbackRows); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); fallbackRows.clear(); partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); @@ -375,7 +378,7 @@ namespace tuplex { // (2) is the field containing total varlength // (3) is the actual string content (incl. '\0' delimiter) } - auto serializedRows = serializeFallbackRows(fallbackRows); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); fallbackRows.clear(); partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); @@ -424,7 +427,7 @@ namespace tuplex { // check capacity and realloc if necessary get a new partition if(partition->capacity() < numBytesSerialized + sizeof(int64_t)) { rowDelta += *rawPtr + fallbackRows.size(); - auto serializedRows = serializeFallbackRows(fallbackRows); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); fallbackRows.clear(); partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); @@ -448,7 +451,7 @@ namespace tuplex { fallbackRows.emplace_back(i - rowDelta, obj); } } - auto serializedRows = serializeFallbackRows(fallbackRows); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); fallbackRows.clear(); partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); @@ -508,7 +511,7 @@ namespace tuplex { // check capacity and realloc if necessary get a new partition if(partition->capacity() < numBytesSerialized + requiredBytes) { rowDelta += *rawPtr + fallbackRows.size(); - auto serializedRows = serializeFallbackRows(fallbackRows); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); fallbackRows.clear(); partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); @@ -541,7 +544,7 @@ namespace tuplex { fallbackRows.emplace_back(i - rowDelta, obj); } } - auto serializedRows = serializeFallbackRows(fallbackRows); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); fallbackRows.clear(); partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); @@ -642,7 +645,7 @@ namespace tuplex { if(partition->capacity() < numBytesSerialized + requiredBytes) { rowDelta += *rawPtr + fallbackRows.size(); - auto serializedRows = serializeFallbackRows(fallbackRows); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); fallbackRows.clear(); partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); @@ -664,7 +667,7 @@ namespace tuplex { } else fallbackRows.emplace_back(std::make_tuple(i - rowDelta, item)); } - auto serializedRows = serializeFallbackRows(fallbackRows); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); fallbackRows.clear(); partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); @@ -736,7 +739,7 @@ namespace tuplex { // check capacity and realloc if necessary get a new partition if (partition->capacity() < numBytesSerialized + allocMinSize) { rowDelta += *rawPtr + fallbackRows.size(); - auto serializedRows = serializeFallbackRows(fallbackRows); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); fallbackRows.clear(); partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); @@ -764,7 +767,7 @@ namespace tuplex { fallbackRows.emplace_back(i - rowDelta, obj); } } - auto serializedRows = serializeFallbackRows(fallbackRows); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); fallbackRows.clear(); partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); @@ -1302,15 +1305,14 @@ namespace tuplex { return co; } - std::vector PythonContext::serializeFallbackRows(const std::vector>& fallbackRows) { + std::vector PythonContext::serializeFallbackRows(const std::vector>& fallbackRows, Executor* executor) { std::vector fallbackPartitions; if (fallbackRows.empty()) { return fallbackPartitions; } - auto driver = _context->getDriver(); Schema schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::STRING})); - auto partition = driver->allocWritablePartition(allocMinSize, schema, -1, _context->id()); + auto partition = executor->allocWritablePartition(allocMinSize, schema, -1, _context->id()); int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); *rawPtr = 0; uint8_t* ptr = (uint8_t*)(rawPtr + 1); @@ -1327,7 +1329,7 @@ namespace tuplex { if (partition->capacity() < numBytesSerialized + requiredBytes) { partition->unlockWrite(); fallbackPartitions.push_back(partition); - partition = driver->allocWritablePartition(allocMinSize, schema, -1, _context->id()); + partition = executor->allocWritablePartition(allocMinSize, schema, -1, _context->id()); rawPtr = (int64_t *) partition->lockWriteRaw(); *rawPtr = 0; ptr = (uint8_t * )(rawPtr + 1); From ad91b45ad641172fea185fcbdd38ed0e49ff114f Mon Sep 17 00:00:00 2001 From: Ben Givertz Date: Fri, 13 May 2022 11:05:47 -0400 Subject: [PATCH 12/15] PR Fixes --- tuplex/core/src/physical/TransformTask.cc | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tuplex/core/src/physical/TransformTask.cc b/tuplex/core/src/physical/TransformTask.cc index 20eccc9e5..29d06061b 100644 --- a/tuplex/core/src/physical/TransformTask.cc +++ b/tuplex/core/src/physical/TransformTask.cc @@ -548,17 +548,17 @@ namespace tuplex { int64_t totalFallbackRowCounter = 0; int64_t totalFilterCounter = 0; - std::vector generalPartitions; - for (auto &p : _generalPartitions) - generalPartitions.push_back(p->lockWriteRaw()); + uint8_t **generalPartitions = new uint8_t*[_generalPartitions.size()]; + for (int i = 0; i < _generalPartitions.size(); ++i) + generalPartitions[i] = _generalPartitions[i]->lockWriteRaw(); int64_t numGeneralPartitions = _generalPartitions.size(); int64_t generalIndexOffset = 0; int64_t generalRowOffset = 0; int64_t generalByteOffset = 0; - std::vector fallbackPartitions; - for (auto &p : _fallbackPartitions) - fallbackPartitions.push_back(p->lockWriteRaw()); + uint8_t **fallbackPartitions = new uint8_t*[_fallbackPartitions.size()]; + for (int i = 0; i < _fallbackPartitions.size(); ++i) + fallbackPartitions[i] = _fallbackPartitions[i]->lockWriteRaw(); int64_t numFallbackPartitions = _fallbackPartitions.size(); int64_t fallbackIndexOffset = 0; int64_t fallbackRowOffset = 0; @@ -638,6 +638,9 @@ namespace tuplex { for (auto &p : _fallbackPartitions) p->unlockWrite(); + delete[] fallbackPartitions; + delete[] generalPartitions; + #ifndef NDEBUG owner()->info("Trafo task memory source exhausted (" + pluralize(_inputPartitions.size(), "partition") + ", " + pluralize(num_normal_rows, "normal row") + ", " + pluralize(num_bad_rows, "exceptional row") + ")"); From 859bdaf326b1fd2818b106057c4db206d65cb701 Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Thu, 19 May 2022 21:23:55 -0400 Subject: [PATCH 13/15] mini fixes --- tuplex/core/src/physical/TransformTask.cc | 9 +++------ tuplex/python/include/PythonContext.h | 4 ++-- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/tuplex/core/src/physical/TransformTask.cc b/tuplex/core/src/physical/TransformTask.cc index 29d06061b..4490f404c 100644 --- a/tuplex/core/src/physical/TransformTask.cc +++ b/tuplex/core/src/physical/TransformTask.cc @@ -548,7 +548,7 @@ namespace tuplex { int64_t totalFallbackRowCounter = 0; int64_t totalFilterCounter = 0; - uint8_t **generalPartitions = new uint8_t*[_generalPartitions.size()]; + std::vector generalPartitions(_generalPartitions.size(), nullptr); for (int i = 0; i < _generalPartitions.size(); ++i) generalPartitions[i] = _generalPartitions[i]->lockWriteRaw(); int64_t numGeneralPartitions = _generalPartitions.size(); @@ -556,7 +556,7 @@ namespace tuplex { int64_t generalRowOffset = 0; int64_t generalByteOffset = 0; - uint8_t **fallbackPartitions = new uint8_t*[_fallbackPartitions.size()]; + std::vector fallbackPartitions(_fallbackPartitions.size(), nullptr); for (int i = 0; i < _fallbackPartitions.size(); ++i) fallbackPartitions[i] = _fallbackPartitions[i]->lockWriteRaw(); int64_t numFallbackPartitions = _fallbackPartitions.size(); @@ -638,9 +638,6 @@ namespace tuplex { for (auto &p : _fallbackPartitions) p->unlockWrite(); - delete[] fallbackPartitions; - delete[] generalPartitions; - #ifndef NDEBUG owner()->info("Trafo task memory source exhausted (" + pluralize(_inputPartitions.size(), "partition") + ", " + pluralize(num_normal_rows, "normal row") + ", " + pluralize(num_bad_rows, "exceptional row") + ")"); @@ -1002,4 +999,4 @@ namespace tuplex { _htableFormat = fmt; _outputDataSetID = outputDataSetID; } -} \ No newline at end of file +} diff --git a/tuplex/python/include/PythonContext.h b/tuplex/python/include/PythonContext.h index fd51ac1b0..de2cf5bc5 100644 --- a/tuplex/python/include/PythonContext.h +++ b/tuplex/python/include/PythonContext.h @@ -123,7 +123,7 @@ namespace tuplex { * row index, exception code, operator id, pickled object size, pickled object payload * @param fallbackRows tuples of row index and fallback rows * @param executor where fallback rows originated from - * @return + * @return vector of partitions holding pickled fallback rows */ std::vector serializeFallbackRows(const std::vector>& fallbackRows, Executor* executor); public: @@ -234,4 +234,4 @@ namespace tuplex { } -#endif //TUPLEX_PYTHONCONTEXT_H \ No newline at end of file +#endif //TUPLEX_PYTHONCONTEXT_H From 47217727f2790a0728aa22413ad791ed411c90a7 Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Thu, 19 May 2022 22:50:59 -0400 Subject: [PATCH 14/15] fix --- tuplex/core/src/physical/TransformTask.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tuplex/core/src/physical/TransformTask.cc b/tuplex/core/src/physical/TransformTask.cc index 4490f404c..ffcb6022a 100644 --- a/tuplex/core/src/physical/TransformTask.cc +++ b/tuplex/core/src/physical/TransformTask.cc @@ -575,8 +575,8 @@ namespace tuplex { // call functor auto bytesParsed = functor(this, inPtr, inSize, &num_normal_rows, &num_bad_rows, false, &totalFilterCounter, &totalNormalRowCounter, &totalGeneralRowCounter, &totalFallbackRowCounter, - generalPartitions, numGeneralPartitions, &generalIndexOffset, &generalRowOffset, &generalByteOffset, - fallbackPartitions, numFallbackPartitions, &fallbackIndexOffset, &fallbackRowOffset, &fallbackByteOffset); + &generalPartitions[0], numGeneralPartitions, &generalIndexOffset, &generalRowOffset, &generalByteOffset, + &fallbackPartitions[0], numFallbackPartitions, &fallbackIndexOffset, &fallbackRowOffset, &fallbackByteOffset); // save number of normal rows to output rows written if not writeTofile if(hasMemorySink()) From 476e433a829f8a20309dd7815b5612607923f8d8 Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 19 May 2022 23:01:00 -0400 Subject: [PATCH 15/15] compile fix --- tuplex/test/wrappers/WrapperTest.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tuplex/test/wrappers/WrapperTest.cc b/tuplex/test/wrappers/WrapperTest.cc index 879935f9e..52d3fc137 100644 --- a/tuplex/test/wrappers/WrapperTest.cc +++ b/tuplex/test/wrappers/WrapperTest.cc @@ -688,8 +688,8 @@ TEST_F(WrapperTest, OptionListTest) { // RAII, destruct python context! auto opts = testOptions(); - opts = opts.substr(0, opts.length() - 1) + ", \"tuplex.autoUpcast\":\"True\"}"; - PythonContext c("python", "", opts); + opts.set("tuplex.autoUpcast", "true"); + PythonContext c("python", "", opts.asJSON()); // weird block syntax due to RAII problems. { @@ -2678,4 +2678,4 @@ TEST_F(WrapperTest, PartitionRelease) { // Py_XDECREF(pResult); // Py_Finalize(); // return return_value; -//} \ No newline at end of file +//}