diff --git a/tuplex/core/include/Context.h b/tuplex/core/include/Context.h index b75409533..856449f8a 100644 --- a/tuplex/core/include/Context.h +++ b/tuplex/core/include/Context.h @@ -15,6 +15,7 @@ #include #include #include "Partition.h" +#include #include "Row.h" #include "HistoryServerClasses.h" #include @@ -37,7 +38,7 @@ namespace tuplex { class Executor; class Partition; class IBackend; - class ExceptionInfo; + class PartitionGroup; class Context { private: @@ -59,24 +60,14 @@ namespace tuplex { // needed because of C++ template issues void addPartition(DataSet* ds, Partition *partition); - void addParallelizeNode(DataSet *ds, const std::vector> &badParallelizeObjects=std::vector>(), const std::vector &numExceptionsInPartition=std::vector()); //! adds a paralellize node to the computation graph /*! - * serialize python objects as pickled objects into exception partitions. Set the python objects map to - * map all normalPartitions to the exceptions that occured within them. - * @param pythonObjects normal case schema violations and their initial row numbers - * @param numExceptionsInPartition number of exceptions in each normal partition - * @param normalPartitions normal partitions created - * @param opID parallelize operator ID - * @param serializedPythonObjects output vector for partitions - * @param pythonObjectsMap output for mapping + * Add parallelize logical operator to dataset + * @param ds dataset + * @param fallbackPartitions fallback partitions from python parallelize + * @param partitionGroups partition mapping information */ - void serializePythonObjects(const std::vector>& pythonObjects, - const std::vector &numExceptionsInPartition, - const std::vector &normalPartitions, - const int64_t opID, - std::vector &serializedPythonObjects, - std::unordered_map &pythonObjectsMap); + void addParallelizeNode(DataSet *ds, const std::vector& fallbackPartitions=std::vector{}, const std::vector& partitionGroups=std::vector{}); //! adds a paralellize node to the computation graph Partition* requestNewPartition(const Schema& schema, const int dataSetID, size_t minBytesRequired); uint8_t* partitionLockRaw(Partition *partition); @@ -270,13 +261,12 @@ namespace tuplex { * @param partitions partitions to assign to dataset. These should NOT be reused later. * Also, partitions need to hold data in supplied schema. If empty vector is given, * empty dataset will be created. - * + * @param fallbackPartitions fallback partitions to assign to dataset + * @param partitionGroups mapping of partitions to fallback partitions * @param columns optional column names - * @param badParallelizeObjects schema violations found during parallelization of partitions - * @param numExceptionsInPartition number of schema violations that occured in each of the partitions * @return reference to newly created dataset. */ - DataSet& fromPartitions(const Schema& schema, const std::vector& partitions, const std::vector& columns, const std::vector> &badParallelizeObjects, const std::vector &numExceptionsInPartition); + DataSet& fromPartitions(const Schema& schema, const std::vector& partitions, const std::vector& fallbackPartitions, const std::vector& partitionGroups, const std::vector& columns); }; // needed for template mechanism to work #include diff --git a/tuplex/core/include/ExceptionInfo.h b/tuplex/core/include/ExceptionInfo.h deleted file mode 100644 index d6ee35886..000000000 --- a/tuplex/core/include/ExceptionInfo.h +++ /dev/null @@ -1,47 +0,0 @@ -//--------------------------------------------------------------------------------------------------------------------// -// // -// Tuplex: Blazing Fast Python Data Science // -// // -// // -// (c) 2017 - 2021, Tuplex team // -// Created by Benjamin Givertz first on 1/1/2022 // -// License: Apache 2.0 // -//--------------------------------------------------------------------------------------------------------------------// - -#ifndef TUPLEX_EXCEPTIONINFO_H -#define TUPLEX_EXCEPTIONINFO_H - -namespace tuplex { - /*! - * Struct to hold information that maps input partitions to input exceptions that occur within them. - * - * Explanation: - * Each input partition is passed the same vector of all input exceptions that occured during data parallelization - * or caching. Thus, each input partition must know how many input exceptions occur in its partition, the index - * of the input exception partition where its first exception occurs, and the offset into that partition where the - * first exception occurs. These values are held in this struct and each input partition is mapped to an ExceptionInfo. - */ - struct ExceptionInfo { - size_t numExceptions; //! number of exception rows that occur within a single input partition - size_t exceptionIndex; //! index into a vector of input exception partitions that holds the first input exception - size_t exceptionRowOffset; //! offset in rows into the first input exception partition where the first exception occurs. - size_t exceptionByteOffset; //! offset in bytes into the first input exception partition where the first exception occurs - - ExceptionInfo() : - numExceptions(0), - exceptionIndex(0), - exceptionRowOffset(0), - exceptionByteOffset(0) {} - - ExceptionInfo(size_t numExps, - size_t expIndex, - size_t expRowOffset, - size_t expByteOffset) : - numExceptions(numExps), - exceptionIndex(expIndex), - exceptionRowOffset(expRowOffset), - exceptionByteOffset(expByteOffset) {} - }; -} - -#endif //TUPLEX_EXCEPTIONINFO_H \ No newline at end of file diff --git a/tuplex/core/include/PartitionGroup.h b/tuplex/core/include/PartitionGroup.h new file mode 100644 index 000000000..f74d2d5d3 --- /dev/null +++ b/tuplex/core/include/PartitionGroup.h @@ -0,0 +1,53 @@ +//--------------------------------------------------------------------------------------------------------------------// +// // +// Tuplex: Blazing Fast Python Data Science // +// // +// // +// (c) 2017 - 2021, Tuplex team // +// Created by Benjamin Givertz first on 1/1/2021 // +// License: Apache 2.0 // +//--------------------------------------------------------------------------------------------------------------------// + +#ifndef TUPLEX_PARTITIONGROUP_H +#define TUPLEX_PARTITIONGROUP_H + +/*! + * Maintains a grouping of normal, fallback, and general partitions that arise from the + * same executor in order to be reprocessed after a cache occurs. + */ +namespace tuplex { + struct PartitionGroup { + /*! + * Groups normal, general, and fallback partitions into groups, with all partitions that originated from a single + * task being grouped together. + * @param numNormalPartitons number of normal partitions in group + * @param normalPartitionStartIndex starting index in list of all normal partitions + * @param numGeneralPartitions number of general partitions in group + * @param generalPartitionStartIndex starting index in list of all general partitions + * @param numFallbackPartitions number of fallback partitions in group + * @param fallbackPartitionStartIndex starting index in list of all fallback partitions + */ + PartitionGroup(size_t numNormalPartitions, size_t normalPartitionStartIndex, + size_t numGeneralPartitions, size_t generalPartitionStartIndex, + size_t numFallbackPartitions, size_t fallbackPartitionStartIndex): + numNormalPartitions(numNormalPartitions), normalPartitionStartIndex(normalPartitionStartIndex), + numGeneralPartitions(numGeneralPartitions), generalPartitionStartIndex(generalPartitionStartIndex), + numFallbackPartitions(numFallbackPartitions), fallbackPartitionStartIndex(fallbackPartitionStartIndex) {} + + /*! + * Initialize empty struct with all values set to zero. + */ + PartitionGroup() : + numNormalPartitions(0), numGeneralPartitions(0), numFallbackPartitions(0), + normalPartitionStartIndex(0), generalPartitionStartIndex(0), fallbackPartitionStartIndex(0) {} + + size_t numNormalPartitions; + size_t normalPartitionStartIndex; + size_t numGeneralPartitions; + size_t generalPartitionStartIndex; + size_t numFallbackPartitions; + size_t fallbackPartitionStartIndex; + }; +} + +#endif //TUPLEX_PARTITIONGROUP_H diff --git a/tuplex/core/include/ee/local/LocalBackend.h b/tuplex/core/include/ee/local/LocalBackend.h index 77d375aed..63ea60c4b 100644 --- a/tuplex/core/include/ee/local/LocalBackend.h +++ b/tuplex/core/include/ee/local/LocalBackend.h @@ -99,7 +99,7 @@ namespace tuplex { return std::accumulate(counts.begin(), counts.end(), 0, [](size_t acc, std::pair, size_t> val) { return acc + val.second; }); } - inline std::vector getOutputPartitions(IExecutorTask* task) { + inline std::vector getNormalPartitions(IExecutorTask* task) const { if(!task) return std::vector(); @@ -113,7 +113,7 @@ namespace tuplex { return std::vector(); } - inline std::vector getRemainingExceptions(IExecutorTask* task) { + inline std::vector getExceptionPartitions(IExecutorTask* task) const { if(!task) return std::vector(); @@ -127,7 +127,7 @@ namespace tuplex { return std::vector(); } - inline std::vector generalCasePartitions(IExecutorTask* task) { + inline std::vector getGeneralPartitions(IExecutorTask* task) const { if(!task) return std::vector(); @@ -141,7 +141,7 @@ namespace tuplex { return std::vector(); } - inline std::unordered_map, size_t> getExceptionCounts(IExecutorTask* task) { + inline std::unordered_map, size_t> getExceptionCounts(IExecutorTask* task) const { if(!task) return std::unordered_map, size_t>(); @@ -155,18 +155,18 @@ namespace tuplex { return std::unordered_map, size_t>(); } - inline std::vector> getNonConformingRows(IExecutorTask* task) { + inline std::vector getFallbackPartitions(IExecutorTask* task) const { if(!task) - return std::vector>(); + return std::vector(); if(task->type() == TaskType::UDFTRAFOTASK) - return std::vector>(); // none here, can be only result from ResolveTask. + return std::vector(); // none here, can be only result from ResolveTask. if(task->type() == TaskType::RESOLVE) - return dynamic_cast(task)->getNonConformingRows(); + return dynamic_cast(task)->getOutputFallbackPartitions(); throw std::runtime_error("unknown task type seen in " + std::string(__FILE_NAME__) + ":" + std::to_string(__LINE__)); - return std::vector>(); + return std::vector(); } std::vector resolveViaSlowPath(std::vector& tasks, diff --git a/tuplex/core/include/logical/CacheOperator.h b/tuplex/core/include/logical/CacheOperator.h index 563aa8f0b..1e373b79a 100644 --- a/tuplex/core/include/logical/CacheOperator.h +++ b/tuplex/core/include/logical/CacheOperator.h @@ -25,7 +25,7 @@ namespace tuplex { CacheOperator(LogicalOperator* parent, bool storeSpecialized, const Schema::MemoryLayout& memoryLayout=Schema::MemoryLayout::ROW) : LogicalOperator(parent), _storeSpecialized(storeSpecialized), _memoryLayout(memoryLayout), _cached(false), - _columns(parent->columns()) { + _columns(parent->columns()), _normalRowCount(0), _fallbackRowCount(0), _generalRowCount(0) { setSchema(this->parent()->getOutputSchema()); // inherit schema from parent _optimizedSchema = getOutputSchema(); if(memoryLayout != Schema::MemoryLayout::ROW) @@ -79,9 +79,10 @@ namespace tuplex { * @return */ bool isCached() const { return _cached; } - std::vector cachedPartitions() const { return _normalCasePartitions; } - std::vector cachedExceptions() const { return _generalCasePartitions; } - std::unordered_map partitionToExceptionsMap() const { return _partitionToExceptionsMap; } + std::vector cachedNormalPartitions() const { return _normalPartitions; } + std::vector cachedGeneralPartitions() const { return _generalPartitions; } + std::vector cachedFallbackPartitions() const { return _fallbackPartitions; } + std::vector partitionGroups() const { return _partitionGroups; } size_t getTotalCachedRows() const; @@ -107,28 +108,19 @@ namespace tuplex { // or merge them. bool _cached; bool _storeSpecialized; - std::vector _normalCasePartitions; //! holds all data conforming to the normal case schema - std::vector _generalCasePartitions; //! holds all data which is considered to be a normal-case violation, - //! i.e. which does not adhere to the normal case schema, but did not produce - //! an exception while being processed through the pipeline before - std::unordered_map _partitionToExceptionsMap; //! maps normal case partitions to corresponding general case ones - std::vector _py_objects; //! all python objects who do not adhere to the general case schema + std::vector _normalPartitions; //! holds all data conforming to the normal case schema + std::vector _generalPartitions; //! holds all data which is considered to be a normal-case violation, + std::vector _fallbackPartitions; //! holds all data which is output as a python object from interpreter processing + std::vector _partitionGroups; //! groups together partitions for correct row ordering std::vector _columns; // internal sample of normal case rows, used for tracing & Co. std::vector _sample; // number of rows need to be stored for cost estimates - size_t _normalCaseRowCount; - size_t _generalCaseRowCount; - - // @TODO: there should be 3 things stored - // 1.) common case => i.e. - // 2.) general case => i.e. what in general can be done (null-values & Co, wide integers, ...) - // 3.) python case => i.e. things that don't fit into either case (interpreter objects serialized via pickle) - - // Note: the pickling could be parallelized by simply matching python types & Co... - // ==> store python data as tuple of elements! + size_t _normalRowCount; + size_t _generalRowCount; + size_t _fallbackRowCount; }; } diff --git a/tuplex/core/include/logical/ParallelizeOperator.h b/tuplex/core/include/logical/ParallelizeOperator.h index 0960baf89..87666950c 100644 --- a/tuplex/core/include/logical/ParallelizeOperator.h +++ b/tuplex/core/include/logical/ParallelizeOperator.h @@ -17,10 +17,9 @@ namespace tuplex { class ParallelizeOperator : public LogicalOperator { - std::vector _partitions; // data, conforming to majority type - std::vector _pythonObjects; // schema violations stored for interpreter processing as python objects - // maps partitions to their corresponding python objects - std::unordered_map _inputPartitionToPythonObjectsMap; + std::vector _normalPartitions; // data, conforming to majority type + std::vector _fallbackPartitions; // schema violations stored for interpreter processing as python objects + std::vector _partitionGroups; // maps normal partitions to their corresponding fallback partitions std::vector _columnNames; std::vector _sample; // sample, not necessary conforming to one type @@ -31,7 +30,7 @@ namespace tuplex { // this a root node ParallelizeOperator(const Schema& schema, - const std::vector& partitions, + const std::vector& normalPartitions, const std::vector& columns); std::string name() override { return "parallelize"; } @@ -47,13 +46,13 @@ namespace tuplex { * get the partitions where the parallelized data is stored. * @return vector of partitions. */ - std::vector getPartitions(); + std::vector getNormalPartitions(); - void setPythonObjects(const std::vector &pythonObjects) { _pythonObjects = pythonObjects; } - std::vector getPythonObjects() { return _pythonObjects; } + void setFallbackPartitions(const std::vector &fallbackPartitions) { _fallbackPartitions = fallbackPartitions; } + std::vector getFallbackPartitions() { return _fallbackPartitions; } - void setInputPartitionToPythonObjectsMap(const std::unordered_map& pythonObjectsMap) { _inputPartitionToPythonObjectsMap = pythonObjectsMap; } - std::unordered_map getInputPartitionToPythonObjectsMap() { return _inputPartitionToPythonObjectsMap; } + void setPartitionGroups(const std::vector& partitionGroups) { _partitionGroups = partitionGroups; } + std::vector getPartitionGroups() { return _partitionGroups; } Schema getInputSchema() const override { return getOutputSchema(); } diff --git a/tuplex/core/include/physical/CodeDefs.h b/tuplex/core/include/physical/CodeDefs.h index b8c3cd76b..c185b2d54 100644 --- a/tuplex/core/include/physical/CodeDefs.h +++ b/tuplex/core/include/physical/CodeDefs.h @@ -48,8 +48,14 @@ namespace tuplex { typedef int64_t(*read_block_f)(void*, const uint8_t*, int64_t, int64_t*, int64_t*, int8_t); // protoype of the function generated by the below builder - // parameters are userData, block, blocksize, expPtrs, expPtrSizes, numExceptions, normalrowsout, badrowsout, lastRow - typedef int64_t(*read_block_exp_f)(void*, const uint8_t*, int64_t, uint8_t **, int64_t *, int64_t, int64_t*, int64_t*, bool); + // parameters are userData, block, blocksize, normalrowsout, badrowsout, lastRow, + // totalFilterCounter, totalNormalRowCounter, totalGeneralRowCounter, totalFallbackRowCounter, + // generalPartitions, numGeneralPartitions, generalIndexOffset, generalRowOffset, generalByteOffset + // fallbackPartitions, numFallbackPartitions, fallbackIndexOffset, fallbackRowOffset, fallbackByteOffset + typedef int64_t(*read_block_exp_f)(void*, const uint8_t*, int64_t, int64_t*, int64_t*, bool, + int64_t*, int64_t*, int64_t*, int64_t*, + uint8_t **, int64_t, int64_t*, int64_t*, int64_t*, + uint8_t **, int64_t, int64_t*, int64_t*, int64_t*); /*! * prototype for processing a single row (with callbacks etc.). Returns how many bytes were processed diff --git a/tuplex/core/include/physical/ResolveTask.h b/tuplex/core/include/physical/ResolveTask.h index 2044a5699..1cd098219 100644 --- a/tuplex/core/include/physical/ResolveTask.h +++ b/tuplex/core/include/physical/ResolveTask.h @@ -61,9 +61,9 @@ namespace tuplex { ResolveTask(int64_t stageID, int64_t contextID, const std::vector& partitions, - const std::vector& runtimeExceptions, - const std::vector& inputExceptions, - ExceptionInfo inputExceptionInfo, + const std::vector& exceptionPartitions, + const std::vector& generalPartitions, + const std::vector& fallbackPartitions, const std::vector& operatorIDsAffectedByResolvers, //! used to identify which exceptions DO require reprocessing because there might be a resolver in the slow path for them. Schema exceptionInputSchema, //! schema of the input rows in which both user exceptions and normal-case violations are stored in. This is also the schema in which rows which on the slow path produce again an exception will be stored in. Schema resolverOutputSchema, //! schema of rows that the resolve function outputs if it doesn't rethrow exceptions @@ -78,12 +78,12 @@ namespace tuplex { PyObject* interpreterFunctor=nullptr) : IExceptionableTask::IExceptionableTask(exceptionInputSchema, contextID), _stageID(stageID), _partitions(partitions), - _runtimeExceptions(runtimeExceptions), - _inputExceptions(inputExceptions), - _numInputExceptions(inputExceptionInfo.numExceptions), - _inputExceptionIndex(inputExceptionInfo.exceptionIndex), - _inputExceptionRowOffset(inputExceptionInfo.exceptionRowOffset), - _inputExceptionByteOffset(inputExceptionInfo.exceptionByteOffset), + _exceptionPartitions(exceptionPartitions), + _generalPartitions(generalPartitions), + _fallbackPartitions(fallbackPartitions), + _exceptionCounter(0), + _generalCounter(0), + _fallbackCounter(0), _resolverOutputSchema(resolverOutputSchema), _targetOutputSchema(targetNormalCaseOutputSchema), _mergeRows(mergeRows), @@ -170,7 +170,7 @@ namespace tuplex { std::vector getOutputPartitions() const override { return _partitions; } - std::vector> getNonConformingRows() const { return _py_nonconfirming; } + std::vector getOutputFallbackPartitions() const { return _fallbackSink.partitions; } /// very important to override this because of the special two exceptions fields of ResolveTask /// i.e. _generalCasePartitions store what exceptions to resolve, IExceptionableTask::_generalCasePartitions exceptions that occurred @@ -214,12 +214,14 @@ namespace tuplex { private: int64_t _stageID; /// to which stage does this task belong to. std::vector _partitions; - std::vector _runtimeExceptions; - std::vector _inputExceptions; - size_t _numInputExceptions; - size_t _inputExceptionIndex; - size_t _inputExceptionRowOffset; - size_t _inputExceptionByteOffset; + std::vector _exceptionPartitions; + std::vector _generalPartitions; + std::vector _fallbackPartitions; + + size_t _exceptionCounter; + size_t _generalCounter; + size_t _fallbackCounter; + inline Schema commonCaseInputSchema() const { return _deserializerGeneralCaseOutput->getSchema(); } Schema _resolverOutputSchema; //! what the resolve functor produces Schema _targetOutputSchema; //! which schema the final rows should be in... @@ -258,6 +260,9 @@ namespace tuplex { // sink for type violation rows MemorySink _generalCaseSink; + // sink for fallback rows that violate normal and general case + MemorySink _fallbackSink; + // hash table sink // -> hash to be a hybrid because sometimes incompatible python objects have to be hashed here. HashTableSink _htable; @@ -269,9 +274,11 @@ namespace tuplex { // hybrid inputs (i.e. when having a long stage the hash-tables of a join) std::vector _py_intermediates; - // python output which can't be consolidated, saved as separate list - void writePythonObject(PyObject* out_row); - std::vector> _py_nonconfirming; + /*! + * Serialize and write python row to the fallback row sink + * @param out_row + */ + void writePythonObjectToFallbackSink(PyObject* out_row); int64_t _outputRowNumber; diff --git a/tuplex/core/include/physical/ResultSet.h b/tuplex/core/include/physical/ResultSet.h index e94b8f1ae..29273fefd 100644 --- a/tuplex/core/include/physical/ResultSet.h +++ b/tuplex/core/include/physical/ResultSet.h @@ -13,7 +13,7 @@ #include #include -#include +#include #include #include #include @@ -25,23 +25,61 @@ namespace tuplex { class ResultSet { private: - std::list _partitions; - std::vector _exceptions; // unresolved exceptions - std::unordered_map _partitionToExceptionsMap; - // @TODO: use here rows instead? would make it potentially cleaner... - std::deque> _pyobjects; // python objects remaining whose type - // did not confirm to the one of partitions. Maybe use Row here instead? - size_t _curRowCounter; //! row counter for the current partition - size_t _byteCounter; //! byte offset for the current partition - size_t _rowsRetrieved; - size_t _totalRowCounter; // used for merging in rows! - size_t _maxRows; - Schema _schema; - - void removeFirstPartition(); + std::list _currentNormalPartitions; //! normal partitions in current group + std::list _currentGeneralPartitions; //! general partitions in current group + std::list _currentFallbackPartitions; //! fallback partitions in current group + std::list _remainingNormalPartitions; //! remaining normal partitions in other groups + std::list _remainingGeneralPartitions; //! remaining general partitions in other groups + std::list _remainingFallbackPartitions; //! remaining fallback partitions in other groups + std::list _partitionGroups; //! groups together normal, general, and fallback partitions for merging + + size_t _totalRowCounter; //! total rows emitted across all groups + size_t _maxRows; //! max number of rows to emit + Schema _schema; //! normal case schema + + size_t _curNormalRowCounter; + size_t _curNormalByteCounter; + size_t _curGeneralRowCounter; + size_t _curGeneralByteCounter; + size_t _curFallbackRowCounter; + size_t _curFallbackByteCounter; + size_t _normalRowCounter; + size_t _generalRowCounter; + size_t _fallbackRowCounter; + + int64_t currentGeneralRowInd(); + int64_t currentFallbackRowInd(); + + Row getNextNormalRow(); + bool hasNextNormalRow(); + Row getNextFallbackRow(); + bool hasNextFallbackRow(); + Row getNextGeneralRow(); + bool hasNextGeneralRow(); + + void removeFirstGeneralPartition(); + void removeFirstFallbackPartition(); + void removeFirstNormalPartition(); public: - ResultSet() : _curRowCounter(0), _byteCounter(0), _rowsRetrieved(0), - _totalRowCounter(0), _maxRows(0), _schema(Schema::UNKNOWN) {} + /*! + * Create new result set with normal, general, and fallback rows + * @param schema normal case schema + * @param normalPartitions normal case rows + * @param generalPartitions general case rows + * @param fallbackPartitions fallback case rows + * @param partitionGroups information to merge row numbers correctly + * @param maxRows limit on rows to emit + */ + ResultSet(const Schema& schema, + const std::vector& normalPartitions, + const std::vector& generalPartitions=std::vector{}, + const std::vector& fallbackPartitions=std::vector{}, + const std::vector& partitionGroups=std::vector{}, + int64_t maxRows=std::numeric_limits::max()); + + ResultSet() : _curNormalRowCounter(0), _curNormalByteCounter(0), _curGeneralRowCounter(0), _curGeneralByteCounter(0), + _curFallbackRowCounter(0), _curFallbackByteCounter(0), _totalRowCounter(0), _maxRows(0), _schema(Schema::UNKNOWN), + _normalRowCounter(0), _generalRowCounter(0), _fallbackRowCounter(0) {} ~ResultSet() = default; // Non copyable @@ -51,13 +89,6 @@ namespace tuplex { ResultSet(const ResultSet&) = delete; ResultSet& operator = (const ResultSet&) = delete; - ResultSet(const Schema& _schema, - const std::vector& partitions, - const std::vector& exceptions=std::vector{}, - const std::unordered_map& partitionToExceptionsMap=std::unordered_map(), - const std::vector> pyobjects=std::vector>{}, - int64_t maxRows=std::numeric_limits::max()); - /*! * check whether result contains one more row */ @@ -75,52 +106,107 @@ namespace tuplex { */ std::vector getRows(size_t limit); - bool hasNextPartition() const; + /*! + * check whether general partitions remain + * @return + */ + bool hasNextGeneralPartition() const; + + /*! + * get next general partition but does not invalidate it + * @return + */ + Partition* getNextGeneralPartition(); + + /*! + * check whether fallback partitions remain + * @return + */ + bool hasNextFallbackPartition() const; + + /*! + * get next fallback partition but does not invalidate it + * @return + */ + Partition* getNextFallbackPartition(); + + /*! + * check whether normal partitions remain + * @return + */ + bool hasNextNormalPartition() const; /*! user needs to invalidate then! - * * @return */ - Partition* getNextPartition(); + Partition* getNextNormalPartition(); + + /*! + * number of rows across all cases of partitions + * @return + */ size_t rowCount() const; + /*! + * normal case schema + * @return + */ Schema schema() const { return _schema; } /*! - * removes and invalidates all partitions! + * removes and invalidates all normalPartitions! */ void clear(); + /*! + * number of rows in fallback partitions + * @return + */ + size_t fallbackRowCount() const; + /*! * retrieve all good rows in bulk, removes them from this result set. * @return */ - std::vector partitions() { + std::vector normalPartitions() { std::vector p; - while(hasNextPartition()) - p.push_back(getNextPartition()); + while(hasNextNormalPartition()) + p.push_back(getNextNormalPartition()); return p; } /*! - * retrieve all unresolved rows (should be only called internally). DOES NOT REMOVE THEM FROM result set. + * returns all general partitions, removes them from result set. * @return */ - std::vector exceptions() const { return _exceptions; } - - std::unordered_map partitionToExceptionsMap() const { return _partitionToExceptionsMap; } + std::vector generalPartitions() { + std::vector p; + while(hasNextGeneralPartition()) + p.push_back(getNextGeneralPartition()); + return p; + } /*! - * returns/removes all objects + * returns all fallback partitions, removes them from result set. * @return */ - std::deque> pyobjects() { - return std::move(_pyobjects); + std::vector fallbackPartitions() { + std::vector p; + while(hasNextFallbackPartition()) + p.push_back(getNextFallbackPartition()); + return p; } - size_t pyobject_count() const { return _pyobjects.size(); } - - size_t numPartitions() const { return _partitions.size(); } + /*! + * returns all partition groups, removes them from result set. + * @return + */ + std::vector partitionGroups() { + std::vector g; + for (const auto& group : _partitionGroups) + g.push_back(group); + return g; + } }; } #endif //TUPLEX_RESULTSET_H \ No newline at end of file diff --git a/tuplex/core/include/physical/TransformStage.h b/tuplex/core/include/physical/TransformStage.h index 22d7f5fb4..7488d29e9 100644 --- a/tuplex/core/include/physical/TransformStage.h +++ b/tuplex/core/include/physical/TransformStage.h @@ -13,7 +13,7 @@ #include #include -#include +#include #include "PhysicalStage.h" #include "LLVMOptimizer.h" #include @@ -95,16 +95,41 @@ namespace tuplex { } /*! - * set input exceptions, i.e. rows that could come from a parallelize or csv operator. - * @param pythonObjects + * set stage's general case normalPartitions + * @param generalPartitions */ - void setInputExceptions(const std::vector& inputExceptions) { _inputExceptions = inputExceptions; } + void setGeneralPartitions(const std::vector& generalPartitions) { _generalPartitions = generalPartitions; } - std::vector inputExceptions() { return _inputExceptions; } + /*! + * get stage's general case normalPartitions + * @return + */ + std::vector generalPartitions() const { return _generalPartitions; } + + /*! + * set stage's fallback normalPartitions as serialized python objects + * @param fallbackPartitions + */ + void setFallbackPartitions(const std::vector& fallbackPartitions) { _fallbackPartitions = fallbackPartitions; } + + /*! + * get fallback normalPartitions as serialized python objects + * @return + */ + std::vector fallbackPartitions() const { return _fallbackPartitions; } - void setPartitionToExceptionsMap(const std::unordered_map& partitionToExceptionsMap) { _partitionToExceptionsMap = partitionToExceptionsMap; } + /*! + * set merge information for each set of normal, fallback, and general partitions + * @param partitionGroups + */ + void setPartitionGroups(const std::vector& partitionGroups) { + _partitionGroups = partitionGroups; + } - std::unordered_map partitionToExceptionsMap() { return _partitionToExceptionsMap; } + /*! + * get partition groups for all sets of partitions + */ + std::vector partitionGroups() const { return _partitionGroups; } /*! * sets maximum number of rows this pipeline will produce @@ -157,12 +182,12 @@ namespace tuplex { */ std::shared_ptr resultSet() const override { return _rs;} - void setMemoryResult(const std::vector& partitions, - const std::vector& generalCase=std::vector{}, - const std::unordered_map& parttionToExceptionsMap=std::unordered_map(), - const std::vector>& interpreterRows=std::vector>{}, - const std::vector& remainingExceptions=std::vector{}, - const std::unordered_map, size_t>& ecounts=std::unordered_map, size_t>()); // creates local result set? + void setMemoryResult(const std::vector& normalPartitions=std::vector{}, + const std::vector& generalPartitions=std::vector{}, + const std::vector& fallbackPartitions=std::vector{}, + const std::vector& partitionGroups=std::vector{}, + const std::unordered_map, size_t>& exceptionCounts=std::unordered_map, size_t>()); // creates local result set? + void setFileResult(const std::unordered_map, size_t>& ecounts); // creates empty result set with exceptions void setEmptyResult() { @@ -173,9 +198,8 @@ namespace tuplex { setMemoryResult( std::vector(), std::vector(), - std::unordered_map(), - std::vector>(), std::vector(), + std::vector(), ecounts); } @@ -443,6 +467,9 @@ namespace tuplex { std::vector _inputPartitions; //! memory input partitions for this task. size_t _inputLimit; //! limit number of input rows (inf per default) size_t _outputLimit; //! output limit, set e.g. by take, to_csv etc. (inf per default) + std::vector _generalPartitions; //! general case input partitions + std::vector _fallbackPartitions; //! fallback case input partitions + std::vector _partitionGroups; //! groups partitions together for correct row indices std::shared_ptr _rs; //! result set @@ -469,11 +496,6 @@ namespace tuplex { // Todo: move this to physicalplan!!! //void pushDownOutputLimit(); //! enable optimizations for limited pipeline by restricting input read! - // unresolved exceptions. Important i.e. when no IO interleave is used... - std::vector _inputExceptions; - std::unordered_map _partitionToExceptionsMap; - - // for hash output, the key and bucket type python::Type _hashOutputKeyType; python::Type _hashOutputBucketType; diff --git a/tuplex/core/include/physical/TransformTask.h b/tuplex/core/include/physical/TransformTask.h index 2868ba668..3eb8013dd 100644 --- a/tuplex/core/include/physical/TransformTask.h +++ b/tuplex/core/include/physical/TransformTask.h @@ -239,11 +239,14 @@ namespace tuplex { */ std::unordered_map, size_t> exceptionCounts() const { return _exceptionCounts; } - ExceptionInfo inputExceptionInfo() { return _inputExceptionInfo; } - std::vector inputExceptions() { return _inputExceptions; } + std::vector generalPartitions() const { return _generalPartitions; } + + std::vector fallbackPartitions() const { return _fallbackPartitions; } + + void setGeneralPartitions(const std::vector& generalPartitions) { _generalPartitions = generalPartitions; } + + void setFallbackPartitions(const std::vector& fallbackPartitions) { _fallbackPartitions = fallbackPartitions; } - void setInputExceptionInfo(ExceptionInfo info) { _inputExceptionInfo = info; } - void setInputExceptions(const std::vector& inputExceptions) { _inputExceptions = inputExceptions; } void setUpdateInputExceptions(bool updateInputExceptions) { _updateInputExceptions = updateInputExceptions; } double wallTime() const override { return _wallTime; } @@ -292,8 +295,8 @@ namespace tuplex { MemorySink _exceptions; Schema _inputSchema; - ExceptionInfo _inputExceptionInfo; - std::vector _inputExceptions; + std::vector _generalPartitions; + std::vector _fallbackPartitions; bool _updateInputExceptions; // hash table sink diff --git a/tuplex/core/src/Context.cc b/tuplex/core/src/Context.cc index e9a30e902..75d705b58 100644 --- a/tuplex/core/src/Context.cc +++ b/tuplex/core/src/Context.cc @@ -202,7 +202,7 @@ namespace tuplex { } - DataSet& Context::fromPartitions(const Schema& schema, const std::vector& partitions, const std::vector& columns, const std::vector> &badParallelizeObjects, const std::vector &numExceptionsInPartition) { + DataSet& Context::fromPartitions(const Schema& schema, const std::vector& partitions, const std::vector& fallbackPartitions, const std::vector& partitionGroups, const std::vector& columns) { auto dataSetID = getNextDataSetID(); DataSet *dsptr = createDataSet(schema); @@ -214,7 +214,7 @@ namespace tuplex { // empty? if(partitions.empty()) { dsptr->setColumns(columns); - addParallelizeNode(dsptr, badParallelizeObjects, numExceptionsInPartition); + addParallelizeNode(dsptr, fallbackPartitions, partitionGroups); return *dsptr; } else { size_t numRows = 0; @@ -230,7 +230,8 @@ namespace tuplex { // set rows dsptr->setColumns(columns); - addParallelizeNode(dsptr, badParallelizeObjects, numExceptionsInPartition); + addParallelizeNode(dsptr, fallbackPartitions, partitionGroups); + // signal check if(check_and_forward_signals()) { @@ -257,6 +258,7 @@ namespace tuplex { addParallelizeNode(dsptr); return *dsptr; } else { + std::vector partitionGroups; // get row type from first element @TODO: should be inferred from sample, no? auto rtype = rows.front().getRowType(); schema = Schema(Schema::MemoryLayout::ROW, rtype); @@ -303,6 +305,7 @@ namespace tuplex { numWrittenRowsInPartition++; capacityRemaining -= bytesWritten; } else { + partitionGroups.push_back(PartitionGroup(1, dsptr->getPartitions().size(), 0, 0, 0, 0)); // partition is full, request new one. // create new partition... partition->unlock(); @@ -319,6 +322,7 @@ namespace tuplex { base_ptr = (uint8_t*)partition->lock(); } } + partitionGroups.push_back(PartitionGroup(1, dsptr->getPartitions().size(), 0, 0, 0, 0)); partition->unlock(); partition->setNumRows(numWrittenRowsInPartition); @@ -330,7 +334,7 @@ namespace tuplex { // set rows dsptr->setColumns(columnNames); - addParallelizeNode(dsptr); + addParallelizeNode(dsptr, std::vector{}, partitionGroups); // signal check if(check_and_forward_signals()) { @@ -349,94 +353,7 @@ namespace tuplex { return op; } - void Context::serializePythonObjects(const std::vector>& pythonObjects, - const std::vector &numExceptionsInPartition, - const std::vector &normalPartitions, - const int64_t opID, - std::vector &serializedPythonObjects, - std::unordered_map &pythonObjectsMap) { - if (pythonObjects.empty()) { - for (const auto &p : normalPartitions) { - pythonObjectsMap[uuidToString(p->uuid())] = ExceptionInfo(); - } - return; - } - - Schema schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::STRING})); - const size_t allocMinSize = 1024 * 64; // 64KB - - Partition* partition = requestNewPartition(schema, -1, allocMinSize); - int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); - *rawPtr = 0; - uint8_t* ptr = (uint8_t*)(rawPtr + 1); - size_t numBytesSerialized = 0; - - auto prevExpByteOffset = 0; - auto prevExpRowOffset = 0; - auto prevExpInd = 0; - auto curNormalPartitionInd = 0; - auto numNewExps = 0; - - // Serialize each exception to a partition using the following schema: - // (1) is the field containing rowNum - // (2) is the field containing ecCode - // (3) is the field containing opID - // (4) is the field containing pickledObjectSize - // (5) is the field containing pickledObject - for(auto &exception : pythonObjects) { - auto rowNum = std::get<0>(exception); - auto pyObj = std::get<1>(exception); - auto ecCode = ecToI64(ExceptionCode::PYTHON_PARALLELIZE); - auto pickledObject = python::pickleObject(python::getMainModule(), pyObj); - auto pickledObjectSize = pickledObject.size(); - size_t requiredBytes = sizeof(int64_t) * 4 + pickledObjectSize; - - if (partition->capacity() < numBytesSerialized + requiredBytes) { - partition->unlockWrite(); - serializedPythonObjects.push_back(partition); - partition = requestNewPartition(schema, -1, allocMinSize); - rawPtr = (int64_t *) partition->lockWriteRaw(); - *rawPtr = 0; - ptr = (uint8_t * )(rawPtr + 1); - numBytesSerialized = 0; - } - - // Check if we have reached the number of exceptions in the input partition - // Record the current exception index and offset and iterate to next one - auto curNormalPartition = normalPartitions[curNormalPartitionInd]; - auto normalUUID = uuidToString(curNormalPartition->uuid()); - auto numExps = numExceptionsInPartition[curNormalPartitionInd]; - if (numNewExps >= numExps) { - pythonObjectsMap[normalUUID] = ExceptionInfo(numExps, prevExpInd, prevExpRowOffset, prevExpByteOffset); - prevExpRowOffset = *rawPtr; - prevExpByteOffset = numBytesSerialized; - prevExpInd = serializedPythonObjects.size(); - numNewExps = 0; - curNormalPartitionInd++; - } - - *((int64_t*)(ptr)) = rowNum; ptr += sizeof(int64_t); - *((int64_t*)(ptr)) = ecCode; ptr += sizeof(int64_t); - *((int64_t*)(ptr)) = opID; ptr += sizeof(int64_t); - *((int64_t*)(ptr)) = pickledObjectSize; ptr += sizeof(int64_t); - memcpy(ptr, pickledObject.c_str(), pickledObjectSize); ptr += pickledObjectSize; - - *rawPtr = *rawPtr + 1; - numBytesSerialized += requiredBytes; - numNewExps += 1; - } - - // Record mapping for normal last partition - auto curNormalPartition = normalPartitions[curNormalPartitionInd]; - auto normalUUID = uuidToString(curNormalPartition->uuid()); - auto numExceptions = numExceptionsInPartition[curNormalPartitionInd]; - pythonObjectsMap[normalUUID] = ExceptionInfo(numExceptions, prevExpInd, prevExpRowOffset, prevExpByteOffset); - - partition->unlockWrite(); - serializedPythonObjects.push_back(partition); - } - - void Context::addParallelizeNode(DataSet *ds, const std::vector> &badParallelizeObjects, const std::vector &numExceptionsInPartition) { + void Context::addParallelizeNode(DataSet *ds, const std::vector& fallbackPartitions, const std::vector& partitionGroups) { assert(ds); // @TODO: make empty list as special case work. Also true for empty files. @@ -446,11 +363,19 @@ namespace tuplex { assert(ds->_schema.getRowType() != python::Type::UNKNOWN); auto op = new ParallelizeOperator(ds->_schema, ds->getPartitions(), ds->columns()); - std::vector serializedPythonObjects; - std::unordered_map pythonObjectsMap; - serializePythonObjects(badParallelizeObjects, numExceptionsInPartition, ds->getPartitions(), op->getID(), serializedPythonObjects, pythonObjectsMap); - op->setPythonObjects(serializedPythonObjects); - op->setInputPartitionToPythonObjectsMap(pythonObjectsMap); + op->setFallbackPartitions(fallbackPartitions); + if (partitionGroups.empty()) { + std::vector defaultPartitionGroups; + for (int i = 0; i < ds->getPartitions().size(); ++i) { + // New partition group for each normal partition so number is constant at 1 + // This is because each normal partition is assigned its own task + defaultPartitionGroups.push_back(PartitionGroup(1, i, 0, 0, 0, 0)); + } + op->setPartitionGroups(defaultPartitionGroups); + } else { + op->setPartitionGroups(partitionGroups); + } + // add new (root) node ds->_operator = addOperator(op); diff --git a/tuplex/core/src/DataSet.cc b/tuplex/core/src/DataSet.cc index a53a14094..a33925d7f 100644 --- a/tuplex/core/src/DataSet.cc +++ b/tuplex/core/src/DataSet.cc @@ -869,9 +869,10 @@ namespace tuplex { // what data source operators are there? if(_operator->type() == LogicalOperatorType::FILEINPUT) return static_cast(_operator)->isEmpty(); - else if(_operator->type() == LogicalOperatorType::PARALLELIZE) - return static_cast(_operator)->getPartitions().empty(); - else + else if(_operator->type() == LogicalOperatorType::PARALLELIZE) { + auto pop = static_cast(_operator); assert(pop); + return pop->getNormalPartitions().empty() && pop->getFallbackPartitions().empty(); + } else throw std::runtime_error("unknown data source operator detected"); } else return false; diff --git a/tuplex/core/src/ee/local/LocalBackend.cc b/tuplex/core/src/ee/local/LocalBackend.cc index bed96ec5a..56e5de9a0 100644 --- a/tuplex/core/src/ee/local/LocalBackend.cc +++ b/tuplex/core/src/ee/local/LocalBackend.cc @@ -266,8 +266,8 @@ namespace tuplex { Timer timer; // BUILD phase // TODO: codegen build phase. I.e. a function should be code generated which hashes a partition to a hashmap. - while(rsRight->hasNextPartition()) { - Partition* p = rsRight->getNextPartition(); + while(rsRight->hasNextNormalPartition()) { + Partition* p = rsRight->getNextNormalPartition(); // lock partition! auto ptr = p->lockRaw(); @@ -435,7 +435,7 @@ namespace tuplex { auto combinedType = hstage->combinedType(); Schema combinedSchema(Schema::MemoryLayout::ROW, combinedType); std::vector probeTasks; - for(auto partition : rsLeft->partitions()) { + for(auto partition : rsLeft->normalPartitions()) { probeTasks.emplace_back(new HashProbeTask(partition, hmap, probeFunction, hstage->combinedType(), hstage->outputDataSetID(), @@ -648,12 +648,35 @@ namespace tuplex { // --> issue for each memory partition a transform task and put it into local workqueue assert(tstage->inputMode() == EndPointMode::MEMORY); - - // restrict after input limit size_t numInputRows = 0; + auto inputPartitions = tstage->inputPartitions(); - for(int i = 0; i < inputPartitions.size(); ++i) { - auto partition = inputPartitions[i]; + auto generalPartitions = tstage->generalPartitions(); + auto fallbackPartitions = tstage->fallbackPartitions(); + auto partitionGroups = tstage->partitionGroups(); + for (const auto &group : partitionGroups) { + std::vector taskNormalPartitions; + bool invalidateAfterUse = false; + for (int i = group.normalPartitionStartIndex; i < group.normalPartitionStartIndex + group.numNormalPartitions; ++i) { + auto p = inputPartitions[i]; + numInputRows += p->getNumRows(); + if (!p->isImmortal()) + invalidateAfterUse = true; + taskNormalPartitions.push_back(p); + } + std::vector taskGeneralPartitions; + for (int i = group.generalPartitionStartIndex; i < group.generalPartitionStartIndex + group.numGeneralPartitions; ++i) { + auto p = generalPartitions[i]; + numInputRows += p->getNumRows(); + taskGeneralPartitions.push_back(p); + } + std::vector taskFallbackPartitions; + for (int i = group.fallbackPartitionStartIndex; i < group.fallbackPartitionStartIndex + group.numFallbackPartitions; ++i) { + auto p = fallbackPartitions[i]; + numInputRows += p->getNumRows(); + taskFallbackPartitions.push_back(p); + } + auto task = new TransformTask(); if (tstage->updateInputExceptions()) { task->setFunctor(syms->functorWithExp); @@ -661,7 +684,9 @@ namespace tuplex { task->setFunctor(syms->functor); } task->setUpdateInputExceptions(tstage->updateInputExceptions()); - task->setInputMemorySource(partition, !partition->isImmortal()); + task->setInputMemorySources(taskNormalPartitions, invalidateAfterUse); + task->setGeneralPartitions(taskGeneralPartitions); + task->setFallbackPartitions(taskFallbackPartitions); // hash table or memory output? if(tstage->outputMode() == EndPointMode::HASHTABLE) { if (tstage->hashtableKeyByteWidth() == 8) @@ -676,16 +701,10 @@ namespace tuplex { tstage->outputMode() == EndPointMode::MEMORY); task->sinkOutputToMemory(outputSchema, tstage->outputDataSetID(), tstage->context().id()); } - - auto partitionId = uuidToString(partition->uuid()); - auto info = tstage->partitionToExceptionsMap()[partitionId]; - task->setInputExceptionInfo(info); - task->setInputExceptions(tstage->inputExceptions()); - task->sinkExceptionsToMemory(inputSchema); + task->sinkExceptionsToMemory(tstage->inputSchema()); task->setStageID(tstage->getID()); task->setOutputLimit(tstage->outputLimit()); tasks.emplace_back(std::move(task)); - numInputRows += partition->getNumRows(); // input limit exhausted? break! if(numInputRows >= tstage->inputLimit()) @@ -750,92 +769,6 @@ namespace tuplex { return pip_object; } - std::vector> inputExceptionsToPythonObjects(const std::vector& partitions, Schema schema) { - using namespace tuplex; - - std::vector> pyObjects; - for (const auto &partition : partitions) { - auto numRows = partition->getNumRows(); - const uint8_t* ptr = partition->lock(); - - python::lockGIL(); - for (int i = 0; i < numRows; ++i) { - int64_t rowNum = *((int64_t*)ptr); - ptr += sizeof(int64_t); - int64_t ecCode = *((int64_t*)ptr); - ptr += 2 * sizeof(int64_t); - int64_t objSize = *((int64_t*)ptr); - ptr += sizeof(int64_t); - - PyObject* pyObj = nullptr; - if (ecCode == ecToI64(ExceptionCode::PYTHON_PARALLELIZE)) { - pyObj = python::deserializePickledObject(python::getMainModule(), (char *) ptr, objSize); - } else { - pyObj = python::rowToPython(Row::fromMemory(schema, ptr, objSize), true); - } - - ptr += objSize; - pyObjects.emplace_back(rowNum, pyObj); - } - python::unlockGIL(); - - partition->unlock(); - partition->invalidate(); - } - - return pyObjects; - } - - void setExceptionInfo(const std::vector &normalOutput, const std::vector &exceptions, std::unordered_map &partitionToExceptionsMap) { - if (exceptions.empty()) { - for (const auto &p : normalOutput) { - partitionToExceptionsMap[uuidToString(p->uuid())] = ExceptionInfo(); - } - return; - } - - auto expRowCount = 0; - auto expInd = 0; - auto expRowOff = 0; - auto expByteOff = 0; - - auto expNumRows = exceptions[0]->getNumRows(); - auto expPtr = exceptions[0]->lockWrite(); - auto rowsProcessed = 0; - for (const auto &p : normalOutput) { - auto pNumRows = p->getNumRows(); - auto curNumExps = 0; - auto curExpOff = expRowOff; - auto curExpInd = expInd; - auto curExpByteOff = expByteOff; - - while (*((int64_t *) expPtr) - rowsProcessed <= pNumRows + curNumExps && expRowCount < expNumRows) { - *((int64_t *) expPtr) -= rowsProcessed; - curNumExps++; - expRowOff++; - auto eSize = ((int64_t *)expPtr)[3] + 4*sizeof(int64_t); - expPtr += eSize; - expByteOff += eSize; - expRowCount++; - - if (expRowOff == expNumRows && expInd < exceptions.size() - 1) { - exceptions[expInd]->unlockWrite(); - expInd++; - expPtr = exceptions[expInd]->lockWrite(); - expNumRows = exceptions[expInd]->getNumRows(); - expRowOff = 0; - expByteOff = 0; - expRowCount = 0; - } - } - - rowsProcessed += curNumExps + pNumRows; - partitionToExceptionsMap[uuidToString(p->uuid())] = ExceptionInfo(curNumExps, curExpInd, curExpOff, curExpByteOff); - } - - exceptions[expInd]->unlockWrite(); - } - void LocalBackend::executeTransformStage(tuplex::TransformStage *tstage) { Timer stageTimer; @@ -855,9 +788,8 @@ namespace tuplex { // special case: skip stage, i.e. empty code and mem2mem if(tstage->code().empty() && !tstage->fileInputMode() && !tstage->fileOutputMode()) { - auto pyObjects = inputExceptionsToPythonObjects(tstage->inputExceptions(), tstage->normalCaseInputSchema()); - tstage->setMemoryResult(tstage->inputPartitions(), std::vector{}, std::unordered_map(), pyObjects); - pyObjects.clear(); + tstage->setMemoryResult(tstage->inputPartitions(), tstage->generalPartitions(), tstage->fallbackPartitions(), + tstage->partitionGroups()); // skip stage Logger::instance().defaultLogger().info("[Transform Stage] skipped stage " + std::to_string(tstage->number()) + " because there is nothing todo here."); return; @@ -985,7 +917,7 @@ namespace tuplex { bool executeSlowPath = true; //TODO: implement pure python resolution here... // exceptions found or slowpath data given? - if(totalECountsBeforeResolution > 0 || !tstage->inputExceptions().empty()) { + if(totalECountsBeforeResolution > 0 || !tstage->generalPartitions().empty() || !tstage->fallbackPartitions().empty()) { stringstream ss; // log out what exists in a table ss<<"Exception details: "<inputExceptions().empty()) { + if(!tstage->generalPartitions().empty()) { size_t numExceptions = 0; - for (auto &p : tstage->inputExceptions()) + for (auto &p : tstage->generalPartitions()) numExceptions += p->getNumRows(); - lines.push_back(Row("(input)", exceptionCodeToPythonClass(ExceptionCode::NORMALCASEVIOLATION), (int64_t)numExceptions)); + lines.push_back(Row("(cache)", exceptionCodeToPythonClass(ExceptionCode::NORMALCASEVIOLATION), (int64_t)numExceptions)); + totalECountsBeforeResolution += numExceptions; + } + + if(!tstage->fallbackPartitions().empty()) { + size_t numExceptions = 0; + for (auto &p : tstage->fallbackPartitions()) + numExceptions += p->getNumRows(); + lines.push_back(Row("(parallelize)", exceptionCodeToPythonClass(ExceptionCode::NORMALCASEVIOLATION), (int64_t)numExceptions)); totalECountsBeforeResolution += numExceptions; } @@ -1044,7 +984,7 @@ namespace tuplex { executeSlowPath = true; // input exceptions or py objects? - if(!tstage->inputExceptions().empty()) + if(!tstage->generalPartitions().empty() || !tstage->fallbackPartitions().empty()) executeSlowPath = true; if(executeSlowPath) { @@ -1123,30 +1063,22 @@ namespace tuplex { } case EndPointMode::MEMORY: { // memory output, fetch partitions & ecounts - vector output; - vector generalOutput; - unordered_map partitionToExceptionsMap; - vector remainingExceptions; - vector> nonConformingRows; // rows where the output type does not fit, - // need to manually merged. - unordered_map, size_t> ecounts; - size_t rowDelta = 0; + vector normalPartitions; + vector generalPartitions; + vector fallbackPartitions; + vector exceptionPartitions; + vector partitionGroups; + unordered_map, size_t> exceptionCounts; + for (const auto& task : completedTasks) { - auto taskOutput = getOutputPartitions(task); - auto taskRemainingExceptions = getRemainingExceptions(task); - auto taskGeneralOutput = generalCasePartitions(task); - auto taskNonConformingRows = getNonConformingRows(task); + auto taskNormalPartitions = getNormalPartitions(task); + auto taskGeneralPartitions = getGeneralPartitions(task); + auto taskFallbackPartitions = getFallbackPartitions(task); + auto taskExceptionPartitions = getExceptionPartitions(task); auto taskExceptionCounts = getExceptionCounts(task); // update exception counts - ecounts = merge_ecounts(ecounts, taskExceptionCounts); - - // update nonConforming with delta - for(int i = 0; i < taskNonConformingRows.size(); ++i) { - auto t = taskNonConformingRows[i]; - t = std::make_tuple(std::get<0>(t) + rowDelta, std::get<1>(t)); - taskNonConformingRows[i] = t; - } + exceptionCounts = merge_ecounts(exceptionCounts, taskExceptionCounts); // debug trace issues using namespace std; @@ -1156,21 +1088,17 @@ namespace tuplex { if(task->type() == TaskType::RESOLVE) task_name = "resolve"; - setExceptionInfo(taskOutput, taskGeneralOutput, partitionToExceptionsMap); - std::copy(taskOutput.begin(), taskOutput.end(), std::back_inserter(output)); - std::copy(taskRemainingExceptions.begin(), taskRemainingExceptions.end(), std::back_inserter(remainingExceptions)); - std::copy(taskGeneralOutput.begin(), taskGeneralOutput.end(), std::back_inserter(generalOutput)); - std::copy(taskNonConformingRows.begin(), taskNonConformingRows.end(), std::back_inserter(nonConformingRows)); - - // compute the delta used to offset records! - for (const auto &p : taskOutput) - rowDelta += p->getNumRows(); - for (const auto &p : taskGeneralOutput) - rowDelta += p->getNumRows(); - rowDelta += taskNonConformingRows.size(); + partitionGroups.push_back(PartitionGroup( + taskNormalPartitions.size(), normalPartitions.size(), + taskGeneralPartitions.size(), generalPartitions.size(), + taskFallbackPartitions.size(), fallbackPartitions.size())); + std::copy(taskNormalPartitions.begin(), taskNormalPartitions.end(), std::back_inserter(normalPartitions)); + std::copy(taskGeneralPartitions.begin(), taskGeneralPartitions.end(), std::back_inserter(generalPartitions)); + std::copy(taskFallbackPartitions.begin(), taskFallbackPartitions.end(), std::back_inserter(fallbackPartitions)); + std::copy(taskExceptionPartitions.begin(), taskExceptionPartitions.end(), std::back_inserter(exceptionPartitions)); } - tstage->setMemoryResult(output, generalOutput, partitionToExceptionsMap, nonConformingRows, remainingExceptions, ecounts); + tstage->setMemoryResult(normalPartitions, generalPartitions, fallbackPartitions, partitionGroups, exceptionCounts); break; } case EndPointMode::HASHTABLE: { @@ -1391,7 +1319,7 @@ namespace tuplex { else if(compareOrders(maxOrder, tt->getOrder())) maxOrder = tt->getOrder(); - if (tt->exceptionCounts().size() > 0 || tt->inputExceptionInfo().numExceptions > 0) { + if (tt->exceptionCounts().size() > 0 || !tt->generalPartitions().empty() || !tt->fallbackPartitions().empty()) { // task found with exceptions in it => exception partitions need to be resolved using special functor // hash-table output not yet supported @@ -1407,8 +1335,8 @@ namespace tuplex { tstage->context().id(), tt->getOutputPartitions(), tt->getExceptionPartitions(), - tt->inputExceptions(), - tt->inputExceptionInfo(), + tt->generalPartitions(), + tt->fallbackPartitions(), opsToCheck, exceptionInputSchema, compiledSlowPathOutputSchema, @@ -1498,11 +1426,6 @@ namespace tuplex { // cout<<"*** git "<inputExceptions()) { - p->invalidate(); - } - // cout<<"*** total number of tasks to return is "<hasNextPartition()) { - Partition* p = rs->getNextPartition(); + while(rs->hasNextNormalPartition()) { + Partition* p = rs->getNextNormalPartition(); // lock partition! auto ptr = p->lockRaw(); diff --git a/tuplex/core/src/logical/CacheOperator.cc b/tuplex/core/src/logical/CacheOperator.cc index f71522f21..0f3b91e79 100644 --- a/tuplex/core/src/logical/CacheOperator.cc +++ b/tuplex/core/src/logical/CacheOperator.cc @@ -22,19 +22,16 @@ namespace tuplex { LogicalOperator::copyMembers(other); auto cop = (CacheOperator*)other; setSchema(other->getOutputSchema()); - _normalCasePartitions = cop->cachedPartitions(); - _generalCasePartitions = cop->cachedExceptions(); - _partitionToExceptionsMap = cop->partitionToExceptionsMap(); - // copy python objects and incref for each! - _py_objects = cop->_py_objects; - python::lockGIL(); - for(auto obj : _py_objects) - Py_XINCREF(obj); - python::unlockGIL(); + _normalPartitions = cop->cachedNormalPartitions(); + _generalPartitions = cop->cachedGeneralPartitions(); + _fallbackPartitions = cop->cachedFallbackPartitions(); + _partitionGroups = cop->partitionGroups(); + _optimizedSchema = cop->_optimizedSchema; _cached = cop->_cached; - _normalCaseRowCount = cop->_normalCaseRowCount; - _generalCaseRowCount = cop->_generalCaseRowCount; + _normalRowCount = cop->_normalRowCount; + _generalRowCount = cop->_generalRowCount; + _fallbackRowCount = cop->_fallbackRowCount; _columns = cop->_columns; _sample = cop->_sample; _storeSpecialized = cop->_storeSpecialized; @@ -60,7 +57,7 @@ namespace tuplex { // is operator cached? => return combined cost! // @NOTE: could make exceptions more expensive than normal rows if(isCached()) { - return _generalCaseRowCount + _normalCaseRowCount; + return _generalRowCount + _fallbackRowCount + _normalRowCount; } else { // return parent cost return parent()->cost(); @@ -73,30 +70,29 @@ namespace tuplex { _cached = true; // fetch both partitions (consume) from resultset + any unresolved exceptions - _normalCasePartitions = rs->partitions(); - for(auto p : _normalCasePartitions) + _normalPartitions = rs->normalPartitions(); + for(auto &p : _normalPartitions) p->makeImmortal(); - // @TODO: there are two sorts of exceptions here... - // i.e. separate normal-case violations out from the rest - // => these can be stored separately for faster processing! - // @TODO: right now, everything just gets cached... + _generalPartitions = rs->generalPartitions(); + for(auto &p : _generalPartitions) + p->makeImmortal(); - _generalCasePartitions = rs->exceptions(); - for(auto p : _generalCasePartitions) + _fallbackPartitions = rs->fallbackPartitions(); + for(auto &p : _fallbackPartitions) p->makeImmortal(); - _partitionToExceptionsMap = rs->partitionToExceptionsMap(); + _partitionGroups = rs->partitionGroups(); // check whether partitions have different schema than the currently set one // => i.e. they have been specialized. - if(!_normalCasePartitions.empty()) { - _optimizedSchema = _normalCasePartitions.front()->schema(); + if(!_normalPartitions.empty()) { + _optimizedSchema = _normalPartitions.front()->schema(); assert(_optimizedSchema != Schema::UNKNOWN); } // if exceptions are empty, then force output schema to be the optimized one as well! - if(_generalCasePartitions.empty()) + if(_generalPartitions.empty()) setSchema(_optimizedSchema); // because the schema might have changed due to the result, need to update the dataset! @@ -104,36 +100,46 @@ namespace tuplex { getDataSet()->setSchema(getOutputSchema()); // print out some statistics about cached data - size_t cachedPartitionsMemory = 0; - size_t totalCachedPartitionsMemory = 0; - size_t totalCachedRows = 0; - size_t cachedExceptionsMemory = 0; - size_t totalCachedExceptionsMemory = 0; - size_t totalCachedExceptions = 0; - - int pos = 0; - for(auto p : _normalCasePartitions) { - totalCachedRows += p->getNumRows(); - cachedPartitionsMemory += p->bytesWritten(); - totalCachedPartitionsMemory += p->size(); - pos++; + size_t normalBytesWritten = 0; + size_t normalCapacity = 0; + size_t normalRows = 0; + size_t generalBytesWritten = 0; + size_t generalCapacity = 0; + size_t generalRows = 0; + size_t fallbackBytesWritten = 0; + size_t fallbackCapacity = 0; + size_t fallbackRows = 0; + + for(const auto &p : _normalPartitions) { + normalRows += p->getNumRows(); + normalBytesWritten += p->bytesWritten(); + normalCapacity += p->size(); } - for(auto p : _generalCasePartitions) { - totalCachedExceptions += p->getNumRows(); - cachedExceptionsMemory += p->bytesWritten(); - totalCachedExceptionsMemory += p->size(); + for(const auto &p : _generalPartitions) { + generalRows += p->getNumRows(); + generalBytesWritten += p->bytesWritten(); + generalCapacity += p->size(); } + for(const auto &p : _fallbackPartitions) { + fallbackRows += p->getNumRows(); + fallbackBytesWritten += p->bytesWritten(); + fallbackCapacity += p->size(); + } + - _normalCaseRowCount = totalCachedRows; - _generalCaseRowCount = totalCachedExceptions; + _normalRowCount = normalRows; + _generalRowCount = generalRows; + _fallbackRowCount = fallbackRows; stringstream ss; - ss<<"Cached "<getNumRows(); + } + for(const auto &p : _generalPartitions) { totalCachedRows += p->getNumRows(); } - for(auto p : _generalCasePartitions) { + for (const auto &p : _fallbackPartitions) { totalCachedRows += p->getNumRows(); } return totalCachedRows; diff --git a/tuplex/core/src/logical/ParallelizeOperator.cc b/tuplex/core/src/logical/ParallelizeOperator.cc index 770ac2d4f..3ea6916e6 100644 --- a/tuplex/core/src/logical/ParallelizeOperator.cc +++ b/tuplex/core/src/logical/ParallelizeOperator.cc @@ -12,15 +12,15 @@ namespace tuplex { ParallelizeOperator::ParallelizeOperator(const Schema& schema, - const std::vector& partitions, - const std::vector& columns) : _partitions(partitions), - _columnNames(columns) { + const std::vector& normalPartitions, + const std::vector& columns) : _normalPartitions(normalPartitions), + _columnNames(columns) { setSchema(schema); // parallelize operator holds data in memory for infinite lifetime. // => make partitions immortal - for(auto& partition : _partitions) + for(auto& partition : _normalPartitions) partition->makeImmortal(); // get sample @@ -31,15 +31,15 @@ namespace tuplex { _sample.clear(); // todo: general python objects from parallelize... - if(!_partitions.empty()) { + if(!_normalPartitions.empty()) { auto maxRows = getDataSet() ? getDataSet()->getContext()->getOptions().CSV_MAX_DETECTION_ROWS() : MAX_TYPE_SAMPLING_ROWS; // @TODO: change this variable/config name // fetch up to maxRows from partitions! - auto schema = _partitions.front()->schema(); + auto schema = _normalPartitions.front()->schema(); Deserializer ds(schema); size_t rowCount = 0; size_t numBytesRead = 0; - for(auto p : _partitions) { + for(auto p : _normalPartitions) { const uint8_t* ptr = p->lockRaw(); auto partitionRowCount = *(int64_t*)ptr; ptr += sizeof(int64_t); @@ -59,8 +59,8 @@ namespace tuplex { } } - std::vector ParallelizeOperator::getPartitions() { - return _partitions; + std::vector ParallelizeOperator::getNormalPartitions() { + return _normalPartitions; } bool ParallelizeOperator::good() const { @@ -69,7 +69,7 @@ namespace tuplex { std::vector ParallelizeOperator::getSample(const size_t num) const { // samples exist? - if(_partitions.empty() || 0 == num) { + if(_normalPartitions.empty() || 0 == num) { return std::vector(); } @@ -109,11 +109,11 @@ namespace tuplex { } LogicalOperator *ParallelizeOperator::clone() { - auto copy = new ParallelizeOperator(getOutputSchema(), _partitions, columns()); + auto copy = new ParallelizeOperator(getOutputSchema(), _normalPartitions, columns()); copy->setDataSet(getDataSet()); copy->copyMembers(this); - copy->setPythonObjects(_pythonObjects); - copy->setInputPartitionToPythonObjectsMap(_inputPartitionToPythonObjectsMap); + copy->setFallbackPartitions(_fallbackPartitions); + copy->setPartitionGroups(_partitionGroups); assert(getID() == copy->getID()); return copy; } @@ -121,7 +121,9 @@ namespace tuplex { int64_t ParallelizeOperator::cost() const { // use #rows stored in partitions int64_t numRows = 0; - for(auto p : _partitions) + for(const auto& p : _normalPartitions) + numRows += p->getNumRows(); + for(const auto& p : _fallbackPartitions) numRows += p->getNumRows(); return numRows; } diff --git a/tuplex/core/src/physical/BlockBasedTaskBuilder.cc b/tuplex/core/src/physical/BlockBasedTaskBuilder.cc index 80e21c0a1..111e97d8f 100644 --- a/tuplex/core/src/physical/BlockBasedTaskBuilder.cc +++ b/tuplex/core/src/physical/BlockBasedTaskBuilder.cc @@ -58,12 +58,23 @@ namespace tuplex { FunctionType* read_block_type = FunctionType::get(env().i64Type(), {env().i8ptrType(), env().i8ptrType(), env().i64Type(), + env().i64Type()->getPointerTo(0), + env().i64Type()->getPointerTo(0), + env().getBooleanType(), + env().i64Type()->getPointerTo(0), + env().i64Type()->getPointerTo(0), + env().i64Type()->getPointerTo(0), + env().i64Type()->getPointerTo(0), env().i8ptrType()->getPointerTo(0), + env().i64Type(), env().i64Type()->getPointerTo(0), + env().i64Type()->getPointerTo(0), + env().i64Type()->getPointerTo(0), + env().i8ptrType()->getPointerTo(0), env().i64Type(), env().i64Type()->getPointerTo(0), env().i64Type()->getPointerTo(0), - env().getBooleanType()}, false); + env().i64Type()->getPointerTo(0)}, false); // create function and set argNames Function* read_block_func = Function::Create(read_block_type, Function::ExternalLinkage, _desiredFuncName, env().getModule().get()); @@ -76,12 +87,24 @@ namespace tuplex { vector argNames{"userData", "inPtr", "inSize", - "expPtrs", - "expPtrSizes", - "numExps", "outNormalRowCount", "outBadRowCount", - "ignoreLastRow"}; + "ignoreLastRow", + "totalFilterCounter", + "totalNormalRowCounter", + "totalGeneralRowCounter", + "totalFallbackRowCounter", + "generalPartitions", + "numGeneralPartitions", + "generalIndexOffset", + "generalRowOffset", + "generalByteOffset", + "fallbackPartitions", + "numFallbackPartitions", + "fallbackIndexOffset", + "fallbackRowOffset", + "fallbackByteOffset"}; + for(int i = 0; i < argNames.size(); ++i) { args[i]->setName(argNames[i]); _args[argNames[i]] = args[i]; diff --git a/tuplex/core/src/physical/ExceptionSourceTaskBuilder.cc b/tuplex/core/src/physical/ExceptionSourceTaskBuilder.cc index b3bd3847f..dd37a1c07 100644 --- a/tuplex/core/src/physical/ExceptionSourceTaskBuilder.cc +++ b/tuplex/core/src/physical/ExceptionSourceTaskBuilder.cc @@ -106,89 +106,115 @@ namespace tuplex { assert(read_block_func); + // Initialize context auto& context = env().getContext(); + // Load function arguments auto argUserData = arg("userData"); auto argInPtr = arg("inPtr"); auto argInSize = arg("inSize"); - auto argExpPtrs = arg("expPtrs"); - auto argExpPtrSizes = arg("expPtrSizes"); - auto argNumExps = arg("numExps"); auto argOutNormalRowCount = arg("outNormalRowCount"); auto argOutBadRowCount = arg("outBadRowCount"); auto argIgnoreLastRow = arg("ignoreLastRow"); - + auto totalFilterCounter = arg("totalFilterCounter"); + auto totalNormalRowCounter = arg("totalNormalRowCounter"); + auto totalGeneralRowCounter = arg("totalGeneralRowCounter"); + auto totalFallbackRowCounter = arg("totalFallbackRowCounter"); + auto generalPartitions = arg("generalPartitions"); + auto numGeneralPartitions = arg("numGeneralPartitions"); + auto generalIndexOffset = arg("generalIndexOffset"); + auto generalRowOffset = arg("generalRowOffset"); + auto generalByteOffset = arg("generalByteOffset"); + auto fallbackPartitions = arg("fallbackPartitions"); + auto numFallbackPartitions = arg("numFallbackPartitions"); + auto fallbackIndexOffset = arg("fallbackIndexOffset"); + auto fallbackRowOffset = arg("fallbackRowOffset"); + auto fallbackByteOffset = arg("fallbackByteOffset"); + + // Initialize function body BasicBlock *bbBody = BasicBlock::Create(context, "entry", read_block_func); IRBuilder<> builder(bbBody); - - // there should be a check if argInSize is 0 - // if so -> handle separately, i.e. return immediately -#warning "add here argInSize > 0 check" - - - // compute endptr from args - Value *endPtr = builder.CreateGEP(argInPtr, argInSize, "endPtr"); - Value *currentPtrVar = builder.CreateAlloca(env().i8ptrType(), 0, nullptr, "readPtrVar"); - // later use combi of normal & bad rows - Value *outRowCountVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "outRowCountVar"); // counter for output row number (used for exception resolution) + // Define basic blocks for function + auto bbInitializeGeneral = llvm::BasicBlock::Create(context, "initialize_general", builder.GetInsertBlock()->getParent()); + auto bbDeclareFallback = llvm::BasicBlock::Create(context, "declare_fallback", builder.GetInsertBlock()->getParent()); + auto bbInitializeFallback = llvm::BasicBlock::Create(context, "initialize_fallback", builder.GetInsertBlock()->getParent()); + auto bbUpdateGeneralCond = llvm::BasicBlock::Create(context, "update_general_cond", builder.GetInsertBlock()->getParent()); + auto bbUpdateGeneralBody = llvm::BasicBlock::Create(context, "update_general_body", builder.GetInsertBlock()->getParent()); + auto bbNextGeneralPartition = llvm::BasicBlock::Create(context, "next_general_partition", builder.GetInsertBlock()->getParent()); + auto bbUpdateFallbackCond = llvm::BasicBlock::Create(context, "update_fallback_cond", builder.GetInsertBlock()->getParent()); + auto bbUpdateFallbackBody = llvm::BasicBlock::Create(context, "update_fallback_body", builder.GetInsertBlock()->getParent()); + auto bbNextFallbackPartition = llvm::BasicBlock::Create(context, "next_fallback_partition", builder.GetInsertBlock()->getParent()); + auto bbUpdateDone = llvm::BasicBlock::Create(context, "update_done", builder.GetInsertBlock()->getParent()); + auto bbLoopCondition = BasicBlock::Create(context, "loop_cond", read_block_func); + auto bbLoopBody = BasicBlock::Create(context, "loop_body", read_block_func); + auto bbLoopDone = BasicBlock::Create(context, "loop_done", read_block_func); + + // Initialize values for normal partitions + auto endPtr = builder.CreateGEP(argInPtr, argInSize, "endPtr"); + auto currentPtrVar = builder.CreateAlloca(env().i8ptrType(), 0, nullptr, "readPtrVar"); + auto outRowCountVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "outRowCountVar"); // counter for output row number (used for exception resolution) builder.CreateStore(argInPtr, currentPtrVar); - - Value *normalRowCountVar = argOutNormalRowCount; - Value *badRowCountVar = argOutBadRowCount; - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(argOutBadRowCount), - builder.CreateLoad(argOutNormalRowCount)), outRowCountVar); - - // current index into array of exception partitions - auto curExpIndVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "curExpIndVar"); - builder.CreateStore(env().i64Const(0), curExpIndVar); - - // current partition pointer - auto curExpPtrVar = builder.CreateAlloca(env().i8ptrType(), 0, nullptr, "curExpPtrVar"); - builder.CreateStore(builder.CreateLoad(argExpPtrs), curExpPtrVar); - - // number of rows total in current partition - auto curExpNumRowsVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "curExpNumRowsVar"); - builder.CreateStore(builder.CreateLoad(argExpPtrSizes), curExpNumRowsVar); - - // current row number in current partition - auto curExpCurRowVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "curExpCurRowVar"); - builder.CreateStore(env().i64Const(0), curExpCurRowVar); - - // accumulator used to update exception indices when rows are filtered, counts number of previously fitlered rows - auto expAccVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "expAccVar"); - builder.CreateStore(env().i64Const(0), expAccVar); - - // used to see if rows are filtered during pipeline execution - auto prevRowNumVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "prevRowNumVar"); - auto prevBadRowNumVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "prevBadRowNumVar"); - - // current number of exceptions prosessed across all partitions - auto expCurRowVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "expCurRowVar"); - builder.CreateStore(env().i64Const(0), expCurRowVar); - + // Update the arguments at the end + auto normalRowCountVar = argOutNormalRowCount; + auto badRowCountVar = argOutBadRowCount; + builder.CreateStore(builder.CreateAdd(builder.CreateLoad(argOutBadRowCount),builder.CreateLoad(argOutNormalRowCount)), outRowCountVar); // get num rows to read & process in loop - Value *numRowsVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "numRowsVar"); - Value *input_ptr = builder.CreatePointerCast(argInPtr, env().i64Type()->getPointerTo(0)); + auto numRowsVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "numRowsVar"); + auto input_ptr = builder.CreatePointerCast(argInPtr, env().i64Type()->getPointerTo(0)); builder.CreateStore(builder.CreateLoad(input_ptr), numRowsVar); // store current input ptr - Value *currentInputPtrVar = builder.CreateAlloca(env().i8ptrType(), 0, nullptr, "ptr"); + auto currentInputPtrVar = builder.CreateAlloca(env().i8ptrType(), 0, nullptr, "ptr"); builder.CreateStore(builder.CreateGEP(argInPtr, env().i32Const(sizeof(int64_t))), currentInputPtrVar); - - // variable for current row number... - Value *rowVar = builder.CreateAlloca(env().i64Type(), 0, nullptr); + auto rowVar = builder.CreateAlloca(env().i64Type(), 0, nullptr); builder.CreateStore(env().i64Const(0), rowVar); - BasicBlock* bbLoopCondition = BasicBlock::Create(context, "loop_cond", read_block_func); - BasicBlock* bbLoopBody = BasicBlock::Create(context, "loop_body", read_block_func); - BasicBlock* bbLoopDone = BasicBlock::Create(context, "loop_done", read_block_func); + // used to see if rows are filtered during pipeline execution + auto prevRowNumVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "prevRowNumVar"); + auto prevBadRowNumVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "prevBadRowNumVar"); - // go from entry block to loop body + // Initialize values for index updating + // uint8_t *curGeneralPtr; + // int64_t curGeneralNumRows = 0; + // if (*generalIndexOffset < numGeneralPartitions) { + // curGeneralPtr = generalPartitions[*generalIndexOffset]; + // curGeneralNumRows = *curGeneralPtr; + // curGeneralPtr += sizeof(int64_t) + *generalByteOffset; + // } + auto curGeneralPtr = builder.CreateAlloca(env().i8ptrType(), 0, nullptr, "curGeneralPtr"); + auto curGeneralNumRows = builder.CreateAlloca(env().i64Type(), 0, nullptr, "curGeneralNumRows"); + builder.CreateStore(env().i64Const(0), curGeneralNumRows); + auto shouldInitializeGeneral = builder.CreateICmpSLT(builder.CreateLoad(generalIndexOffset), numGeneralPartitions); + builder.CreateCondBr(shouldInitializeGeneral, bbInitializeGeneral, bbDeclareFallback); + + builder.SetInsertPoint(bbInitializeGeneral); + builder.CreateStore(builder.CreateLoad(builder.CreateGEP(generalPartitions, builder.CreateLoad(generalIndexOffset))), curGeneralPtr); + builder.CreateStore(builder.CreateLoad(builder.CreatePointerCast(builder.CreateLoad(curGeneralPtr), env().i64ptrType())), curGeneralNumRows); + builder.CreateStore(builder.CreateGEP(builder.CreateLoad(curGeneralPtr), builder.CreateAdd(env().i64Const(sizeof(int64_t)), builder.CreateLoad(generalByteOffset))), curGeneralPtr); + builder.CreateBr(bbDeclareFallback); + + // uint8_t *curFallbackPtr; + // int64_t curFallbackNumRows = 0; + // if (*fallbackIndexOffset < numFallbackPartitions) { + // curFallbackPtr = fallbackPartitions[*fallbackIndexOffset]; + // curFallbackNumRows = *curFallbackPtr; + // curFallbackPtr += sizeof(int64_t) + *fallbackByteOffset; + // } + builder.SetInsertPoint(bbDeclareFallback); + auto curFallbackPtr = builder.CreateAlloca(env().i8ptrType(), 0, nullptr, "curFallbackPtr"); + auto curFallbackNumRows = builder.CreateAlloca(env().i64Type(), 0, nullptr, "curFallbackNumRows"); + builder.CreateStore(env().i64Const(0), curFallbackNumRows); + auto shouldInitializeFallback = builder.CreateICmpSLT(builder.CreateLoad(fallbackIndexOffset), numFallbackPartitions); + builder.CreateCondBr(shouldInitializeFallback, bbInitializeFallback, bbLoopBody); + + builder.SetInsertPoint(bbInitializeFallback); + builder.CreateStore(builder.CreateLoad(builder.CreateGEP(fallbackPartitions, builder.CreateLoad(fallbackIndexOffset))), curFallbackPtr); + builder.CreateStore(builder.CreateLoad(builder.CreatePointerCast(builder.CreateLoad(curFallbackPtr), env().i64ptrType())), curFallbackNumRows); + builder.CreateStore(builder.CreateGEP(builder.CreateLoad(curFallbackPtr), builder.CreateAdd(env().i64Const(sizeof(int64_t)), builder.CreateLoad(fallbackByteOffset))), curFallbackPtr); builder.CreateBr(bbLoopBody); - // -------------- // loop condition builder.SetInsertPoint(bbLoopCondition); Value *row = builder.CreateLoad(rowVar, "row"); @@ -198,8 +224,6 @@ namespace tuplex { auto cond = builder.CreateICmpSLT(nextRow, numRows); builder.CreateCondBr(cond, bbLoopBody, bbLoopDone); - - // --------- // loop body builder.SetInsertPoint(bbLoopBody); // decode tuple from input ptr @@ -207,9 +231,8 @@ namespace tuplex { ft.init(_inputRowType); Value* oldInputPtr = builder.CreateLoad(currentInputPtrVar, "ptr"); ft.deserializationCode(builder, oldInputPtr); - Value* newInputPtr = builder.CreateGEP(oldInputPtr, ft.getSize(builder)); // @TODO: maybe use inbounds + Value* newInputPtr = builder.CreateGEP(oldInputPtr, ft.getSize(builder)); builder.CreateStore(newInputPtr, currentInputPtrVar); - builder.CreateStore(builder.CreateLoad(outRowCountVar), prevRowNumVar); builder.CreateStore(builder.CreateLoad(badRowCountVar), prevBadRowNumVar); @@ -218,117 +241,113 @@ namespace tuplex { Value *inputRowSize = ft.getSize(builder); processRow(builder, argUserData, ft, normalRowCountVar, badRowCountVar, outRowCountVar, oldInputPtr, inputRowSize, terminateEarlyOnLimitCode, pipeline() ? pipeline()->getFunction() : nullptr); + builder.CreateStore(builder.CreateAdd(env().i64Const(1), builder.CreateLoad(totalNormalRowCounter)), totalNormalRowCounter); + // After row is processed we need to update exceptions if the row was filtered // We check that: outRowCountVar == prevRowCountVar (no new row was emitted) // badRowCountVar == prevBadRowNumVar (it was filtered, not just an exception) - // expCurRowVar < argNumExps (we still have exceptions that need updating) - // if (outRowCountVar == prevRowNumVar && badRowCountVar == prevBadRowNumVar && expCurRowVar < argNumExps) - auto bbExpUpdate = llvm::BasicBlock::Create(context, "exp_update", builder.GetInsertBlock()->getParent()); - auto expCond = builder.CreateICmpEQ(builder.CreateLoad(outRowCountVar), builder.CreateLoad(prevRowNumVar)); - auto badCond = builder.CreateICmpEQ(builder.CreateLoad(badRowCountVar), builder.CreateLoad(prevBadRowNumVar)); - auto remCond = builder.CreateICmpSLT(builder.CreateLoad(expCurRowVar), argNumExps); - builder.CreateCondBr(builder.CreateAnd(remCond, builder.CreateAnd(badCond, expCond)), bbExpUpdate, bbLoopCondition); - - // We have determined a row is filtered so we can iterate through all the input exceptions that occured before this - // row and decrement their row index with the number of previously filtered rows (expAccVar). - // This is a while loop that iterates over all exceptions that occured before this filtered row - // - // while (expCurRowVar < numExps && ((*outNormalRowCount - 1) + expCurRowVar) >= *((int64_t *) curExpPtrVar)) - // - // *outNormalRowCount - 1 changes cardinality of rows into its row index, we add the number of previously processed - // exceptions because the normal rows do not know about the exceptions to obtain the correct index. It's then compared - // against the row index of the exception pointed to currently in our partition - builder.SetInsertPoint(bbExpUpdate); - auto bbIncrement = llvm::BasicBlock::Create(context, "increment", builder.GetInsertBlock()->getParent()); - auto bbIncrementDone = llvm::BasicBlock::Create(context, "increment_done", builder.GetInsertBlock()->getParent()); - auto curExpRowIndPtr = builder.CreatePointerCast(builder.CreateLoad(curExpPtrVar), env().i64ptrType()); - auto incCond = builder.CreateICmpSGE(builder.CreateAdd(builder.CreateLoad(badRowCountVar), builder.CreateAdd(builder.CreateSub(builder.CreateLoad(normalRowCountVar), env().i64Const(1)), builder.CreateLoad(expCurRowVar))), builder.CreateLoad(curExpRowIndPtr)); - auto remCond2 = builder.CreateICmpSLT(builder.CreateLoad(expCurRowVar), argNumExps); - builder.CreateCondBr(builder.CreateAnd(remCond2, incCond), bbIncrement, bbIncrementDone); - - // Body of the while loop we need to - // 1. decrement the current exception row index by the expAccVar (all rows previously filtered) - // 2. Increment our partition pointer to next exception - // 3. Change partitions if we've exhausted all exceptions in the current, but still have more remaining in tototal - // - // Increment to the next exception by adding eSize and 4*sizeof(int64_t) to the partition pointer - // *((int64_t *) curExpPtrVar) -= expAccVar; - // curExpPtrVar += 4 * sizeof(int64_t) + ((int64_t *)curExpPtrVar)[3]; - // expCurRowVar += 1; - // curExpCurRowVar += 1; - // - // Finally we check to see if a partition change is required - // if (expCurRowVar < numExps && curExpCurRowVar >= curExpNumRowsVar) - builder.SetInsertPoint(bbIncrement); - // Change row index and go to next exception in partition - auto curExpRowIndPtr2 = builder.CreatePointerCast(builder.CreateLoad(curExpPtrVar), env().i64Type()->getPointerTo(0)); - builder.CreateStore(builder.CreateSub(builder.CreateLoad(curExpRowIndPtr2), builder.CreateLoad(expAccVar)), curExpRowIndPtr2); - auto curOffset = builder.CreateAlloca(env().i64Type(), 0, nullptr, "curOffset"); - builder.CreateStore(builder.CreateLoad(builder.CreateGEP(curExpRowIndPtr2, env().i64Const(3))), curOffset); - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(curOffset), env().i64Const(4 * sizeof(int64_t))), curOffset); - builder.CreateStore(builder.CreateGEP(builder.CreateLoad(curExpPtrVar), builder.CreateLoad(curOffset)), curExpPtrVar); - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(curExpCurRowVar), env().i64Const(1)), curExpCurRowVar); - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(expCurRowVar), env().i64Const(1)), expCurRowVar); - // See if partition change needs to occur - auto bbChange = llvm::BasicBlock::Create(context, "change", builder.GetInsertBlock()->getParent()); - auto changeCond = builder.CreateICmpSGE(builder.CreateLoad(curExpCurRowVar), builder.CreateLoad(curExpNumRowsVar)); - auto leftCond = builder.CreateICmpSLT(builder.CreateLoad(expCurRowVar), argNumExps); - builder.CreateCondBr(builder.CreateAnd(leftCond, changeCond), bbChange, bbExpUpdate); - - // This block changes to the next partition - // curExpCurRowVar = 0; - // curExpIndVar = curExpIndVar + 1; - // curExpPtrVar = expPtrs[curExpIndVar]; - // curExpNumRowsVar = expPtrSizes[curExpIndVar]; - builder.SetInsertPoint(bbChange); - builder.CreateStore(env().i64Const(0), curExpCurRowVar); - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(curExpIndVar), env().i64Const(1)), curExpIndVar); - builder.CreateStore(builder.CreateLoad(builder.CreateGEP(argExpPtrs, builder.CreateLoad(curExpIndVar))), curExpPtrVar); - builder.CreateStore(builder.CreateLoad(builder.CreateGEP(argExpPtrSizes, builder.CreateLoad(curExpIndVar))), curExpNumRowsVar); - builder.CreateBr(bbExpUpdate); - - // Finally increment the expAccVar by 1 becasue a row was filtered - // expAccVar += 1; - builder.SetInsertPoint(bbIncrementDone); - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(expAccVar), env().i64Const(1)), expAccVar); + // if (outRowCountVar == prevRowNumVar && badRowCountVar == prevBadRowNumVar) + auto rowNotEmitted = builder.CreateICmpEQ(builder.CreateLoad(outRowCountVar), builder.CreateLoad(prevRowNumVar)); + auto rowNotException = builder.CreateICmpEQ(builder.CreateLoad(badRowCountVar), builder.CreateLoad(prevBadRowNumVar)); + builder.CreateCondBr(builder.CreateAnd(rowNotEmitted, rowNotException), bbUpdateGeneralCond, bbLoopCondition); + + // Update general cond + // while (*generalRowOffset < curGeneralNumRows && *((int64_t*)curGeneralPtr) < curNormalRowInd + totalGeneralRowCounter) + builder.SetInsertPoint(bbUpdateGeneralCond); + auto generalRowsRemainCond = builder.CreateICmpSLT(builder.CreateLoad(generalRowOffset), builder.CreateLoad(curGeneralNumRows)); + auto curGeneralRowInd = builder.CreateLoad(builder.CreatePointerCast(builder.CreateLoad(curGeneralPtr), env().i64ptrType())); + auto generalIndexLTCond = builder.CreateICmpSLT(curGeneralRowInd, builder.CreateAdd(builder.CreateLoad(totalGeneralRowCounter), builder.CreateLoad(totalNormalRowCounter))); + builder.CreateCondBr(builder.CreateAnd(generalRowsRemainCond, generalIndexLTCond), bbUpdateGeneralBody, bbUpdateFallbackCond); + + // Update general body + // generalNewRowInd = *((int64_t*)curGeneralPtr) - totalFilterCounter; + // *((int64_t*)curGeneralPtr) = generalNewRowInd; + // auto generalRowDelta = 4 * sizeof(int64_t) + ((int64_t*)curGeneralPtr)[3]; + // curGeneralPtr += generalRowDelta; + // *generalByteOffset += generalRowDelta; + // *generalRowOffset++; + // *totalGeneralRowCounter++; + builder.SetInsertPoint(bbUpdateGeneralBody); + auto generalNewRowInd = builder.CreateSub(builder.CreateLoad(builder.CreatePointerCast(builder.CreateLoad(curGeneralPtr), env().i64ptrType())), builder.CreateLoad(totalFilterCounter)); + builder.CreateStore(generalNewRowInd, builder.CreatePointerCast(builder.CreateLoad(curGeneralPtr), env().i64ptrType())); + auto generalRowDelta = builder.CreateAdd(builder.CreateLoad(builder.CreateGEP(builder.CreatePointerCast(builder.CreateLoad(curGeneralPtr), env().i64ptrType()), env().i64Const(3))), env().i64Const(4 * sizeof(int64_t))); + builder.CreateStore(builder.CreateGEP(builder.CreateLoad(curGeneralPtr), generalRowDelta), curGeneralPtr); + builder.CreateStore(builder.CreateAdd(generalRowDelta, builder.CreateLoad(generalByteOffset)), generalByteOffset); + builder.CreateStore(builder.CreateAdd(env().i64Const(1), builder.CreateLoad(generalRowOffset)), generalRowOffset); + builder.CreateStore(builder.CreateAdd(env().i64Const(1), builder.CreateLoad(totalGeneralRowCounter)), totalGeneralRowCounter); + + // if (*generalRowOffset == curGeneralNumRows && *generalIndexOffset < numGeneralPartitions - 1) + auto generalNoRowsRemain = builder.CreateICmpEQ(builder.CreateLoad(generalRowOffset), builder.CreateLoad(curGeneralNumRows)); + auto generalHasMorePartitions = builder.CreateICmpSLT(builder.CreateLoad(generalIndexOffset), builder.CreateSub(numGeneralPartitions, env().i64Const(1))); + builder.CreateCondBr(builder.CreateAnd(generalNoRowsRemain, generalHasMorePartitions), bbNextGeneralPartition, bbUpdateGeneralCond); + + // generalIndexOffset += 1; + // *generalRowOffset = 0; + // *generalByteOffset = 0; + // curGeneralPtr = generalPartitions[*generalIndexOffset]; + // curGeneralNumRows = *((int64_t*)curGeneralPtr); + // curGeneralPtr += sizeof(int64_t); + builder.SetInsertPoint(bbNextGeneralPartition); + builder.CreateStore(builder.CreateAdd(builder.CreateLoad(generalIndexOffset), env().i64Const(1)), generalIndexOffset); + builder.CreateStore(env().i64Const(0), generalRowOffset); + builder.CreateStore(env().i64Const(0), generalByteOffset); + builder.CreateStore(builder.CreateLoad(builder.CreateGEP(generalPartitions, builder.CreateLoad(generalIndexOffset))), curGeneralPtr); + builder.CreateStore(builder.CreateLoad(builder.CreatePointerCast(builder.CreateLoad(curGeneralPtr), env().i64ptrType())), curGeneralNumRows); + builder.CreateStore(builder.CreateGEP(builder.CreateLoad(curGeneralPtr), builder.CreateAdd(env().i64Const(sizeof(int64_t)), builder.CreateLoad(generalByteOffset))), curGeneralPtr); + builder.CreateBr(bbUpdateGeneralCond); + + // Update fallback cond + // while (*fallbackRowOffset < curFallbackNumRows && *((int64_t*)curFallbackPtr) < curNormalRowInd + totalGeneralRowCounter + totalFallbackRowCounter) + builder.SetInsertPoint(bbUpdateFallbackCond); + auto fallbackRowsRemainCond = builder.CreateICmpSLT(builder.CreateLoad(fallbackRowOffset), builder.CreateLoad(curFallbackNumRows)); + auto curFallbackRowInd = builder.CreateLoad(builder.CreatePointerCast(builder.CreateLoad(curFallbackPtr), env().i64ptrType())); + auto fallbackIndexLTCond = builder.CreateICmpSLT(curFallbackRowInd, builder.CreateAdd(builder.CreateLoad(totalGeneralRowCounter), builder.CreateAdd(builder.CreateLoad(totalFallbackRowCounter), builder.CreateLoad(totalNormalRowCounter)))); + builder.CreateCondBr(builder.CreateAnd(fallbackRowsRemainCond, fallbackIndexLTCond), bbUpdateFallbackBody, bbUpdateDone); + + // Update fallback body + // fallbackNewRowInd = *((int64_t*)curFallbackPtr) - totalFilterCounter; + // *((int64_t*)curFallbackPtr) = fallbackNewRowInd; + // auto fallbackRowDelta = 4 * sizeof(int64_t) + ((int64_t*)curFallbackPtr)[3]; + // curFallbackPtr += fallbackRowDelta; + // *fallbackByteOffset += fallbackRowDelta; + // *fallbackRowOffset++; + // *totalFallbackRowCounter++; + builder.SetInsertPoint(bbUpdateFallbackBody); + auto fallbackNewRowInd = builder.CreateSub(builder.CreateLoad(builder.CreatePointerCast(builder.CreateLoad(curFallbackPtr), env().i64ptrType())), builder.CreateLoad(totalFilterCounter)); + builder.CreateStore(fallbackNewRowInd, builder.CreatePointerCast(builder.CreateLoad(curFallbackPtr), env().i64ptrType())); + auto fallbackRowDelta = builder.CreateAdd(builder.CreateLoad(builder.CreateGEP(builder.CreatePointerCast(builder.CreateLoad(curFallbackPtr), env().i64ptrType()), env().i64Const(3))), env().i64Const(4 * sizeof(int64_t))); + builder.CreateStore(builder.CreateGEP(builder.CreateLoad(curFallbackPtr), fallbackRowDelta), curFallbackPtr); + builder.CreateStore(builder.CreateAdd(fallbackRowDelta, builder.CreateLoad(fallbackByteOffset)), fallbackByteOffset); + builder.CreateStore(builder.CreateAdd(env().i64Const(1), builder.CreateLoad(fallbackRowOffset)), fallbackRowOffset); + builder.CreateStore(builder.CreateAdd(env().i64Const(1), builder.CreateLoad(totalFallbackRowCounter)), totalFallbackRowCounter); + + // if (*fallbackRowOffset == curFallbackNumRows && *fallbackIndexOffset < numFallbackPartitions - 1) + auto fallbackNoRowsRemain = builder.CreateICmpEQ(builder.CreateLoad(fallbackRowOffset), builder.CreateLoad(curFallbackNumRows)); + auto fallbackHasMorePartitions = builder.CreateICmpSLT(builder.CreateLoad(fallbackIndexOffset), builder.CreateSub(numFallbackPartitions, env().i64Const(1))); + builder.CreateCondBr(builder.CreateAnd(fallbackNoRowsRemain, fallbackHasMorePartitions), bbNextFallbackPartition, bbUpdateFallbackCond); + + // fallbackIndexOffset += 1; + // *fallbackRowOffset = 0; + // *fallbackByteOffset = 0; + // curFallbackPtr = fallbackPartitions[*fallbackIndexOffset]; + // curFallbackNumRows = *((int64_t*)curFallbackPtr); + // curFallbackPtr += sizeof(int64_t); + builder.SetInsertPoint(bbNextFallbackPartition); + builder.CreateStore(builder.CreateAdd(builder.CreateLoad(fallbackIndexOffset), env().i64Const(1)), fallbackIndexOffset); + builder.CreateStore(env().i64Const(0), fallbackRowOffset); + builder.CreateStore(env().i64Const(0), fallbackByteOffset); + builder.CreateStore(builder.CreateLoad(builder.CreateGEP(fallbackPartitions, builder.CreateLoad(fallbackIndexOffset))), curFallbackPtr); + builder.CreateStore(builder.CreateLoad(builder.CreatePointerCast(builder.CreateLoad(curFallbackPtr), env().i64ptrType())), curFallbackNumRows); + builder.CreateStore(builder.CreateGEP(builder.CreateLoad(curFallbackPtr), builder.CreateAdd(env().i64Const(sizeof(int64_t)), builder.CreateLoad(fallbackByteOffset))), curFallbackPtr); + builder.CreateBr(bbUpdateFallbackCond); + + // Update done + // totalFilterCounter += 1; + builder.SetInsertPoint(bbUpdateDone); + builder.CreateStore(builder.CreateAdd(env().i64Const(1), builder.CreateLoad(totalFilterCounter)), totalFilterCounter); builder.CreateBr(bbLoopCondition); - // --------- - // loop done builder.SetInsertPoint(bbLoopDone); - auto bbRemainingExceptions = llvm::BasicBlock::Create(context, "remaining_exceptions", builder.GetInsertBlock()->getParent()); - auto bbRemainingDone = llvm::BasicBlock::Create(context, "remaining_done", builder.GetInsertBlock()->getParent()); - auto expRemaining = builder.CreateICmpSLT(builder.CreateLoad(expCurRowVar), argNumExps); - builder.CreateCondBr(expRemaining, bbRemainingExceptions, bbRemainingDone); - - // We have processed all of the normal rows. If we have not exhausted all of our exceptions - // we just iterate through the remaining exceptions and decrement their row index by the final - // value of expAccVar counting our filtered rows. - // Same code as above, but just don't need to keep updating expAccVar by 1. - builder.SetInsertPoint(bbRemainingExceptions); - auto curExpRowIndPtr3 = builder.CreatePointerCast(builder.CreateLoad(curExpPtrVar), env().i64Type()->getPointerTo(0)); - builder.CreateStore(builder.CreateSub(builder.CreateLoad(curExpRowIndPtr3), builder.CreateLoad(expAccVar)), curExpRowIndPtr3); - auto curOffset2 = builder.CreateAlloca(env().i64Type(), 0, nullptr, "curOffset2"); - builder.CreateStore(builder.CreateLoad(builder.CreateGEP(curExpRowIndPtr3, env().i64Const(3))), curOffset2); - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(curOffset2), env().i64Const(4 * sizeof(int64_t))), curOffset2); - builder.CreateStore(builder.CreateGEP(builder.CreateLoad(curExpPtrVar), builder.CreateLoad(curOffset2)), curExpPtrVar); - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(curExpCurRowVar), env().i64Const(1)), curExpCurRowVar); - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(expCurRowVar), env().i64Const(1)), expCurRowVar); - - auto bbChange2 = llvm::BasicBlock::Create(context, "change2", builder.GetInsertBlock()->getParent()); - auto changeCond2 = builder.CreateICmpSGE(builder.CreateLoad(curExpCurRowVar), builder.CreateLoad(curExpNumRowsVar)); - auto leftCond2 = builder.CreateICmpSLT(builder.CreateLoad(expCurRowVar), argNumExps); - builder.CreateCondBr(builder.CreateAnd(leftCond2, changeCond2), bbChange2, bbLoopDone); - - builder.SetInsertPoint(bbChange2); - builder.CreateStore(env().i64Const(0), curExpCurRowVar); - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(curExpIndVar), env().i64Const(1)), curExpIndVar); - builder.CreateStore(builder.CreateLoad(builder.CreateGEP(argExpPtrs, builder.CreateLoad(curExpIndVar))), curExpPtrVar); - builder.CreateStore(builder.CreateLoad(builder.CreateGEP(argExpPtrSizes, builder.CreateLoad(curExpIndVar))), curExpNumRowsVar); - builder.CreateBr(bbLoopDone); - - builder.SetInsertPoint(bbRemainingDone); - // if intermediate callback desired, perform! if(_intermediateType != python::Type::UNKNOWN && !_intermediateCallbackName.empty()) { writeIntermediate(builder, argUserData, _intermediateCallbackName); } diff --git a/tuplex/core/src/physical/PhysicalPlan.cc b/tuplex/core/src/physical/PhysicalPlan.cc index 2399edf6f..8aa49a5fb 100644 --- a/tuplex/core/src/physical/PhysicalPlan.cc +++ b/tuplex/core/src/physical/PhysicalPlan.cc @@ -199,9 +199,11 @@ namespace tuplex { auto t = ops.front()->type(); assert(t == LogicalOperatorType::PARALLELIZE || t == LogicalOperatorType::CACHE); if (t == LogicalOperatorType::PARALLELIZE) - hasInputExceptions = !((ParallelizeOperator *)ops.front())->getPythonObjects().empty(); - if (t == LogicalOperatorType::CACHE) - hasInputExceptions = !((CacheOperator *)ops.front())->cachedExceptions().empty(); + hasInputExceptions = !((ParallelizeOperator *) ops.front())->getFallbackPartitions().empty(); + if (t == LogicalOperatorType::CACHE) { + auto cop = (CacheOperator *) ops.front(); + hasInputExceptions = !cop->cachedGeneralPartitions().empty() || !cop->cachedFallbackPartitions().empty(); + } } } @@ -401,14 +403,15 @@ namespace tuplex { // fill in data to start processing from operators. if (inputNode->type() == LogicalOperatorType::PARALLELIZE) { auto pop = dynamic_cast(inputNode); assert(inputNode); - stage->setInputPartitions(pop->getPartitions()); - stage->setInputExceptions(pop->getPythonObjects()); - stage->setPartitionToExceptionsMap(pop->getInputPartitionToPythonObjectsMap()); + stage->setInputPartitions(pop->getNormalPartitions()); + stage->setFallbackPartitions(pop->getFallbackPartitions()); + stage->setPartitionGroups(pop->getPartitionGroups()); } else if(inputNode->type() == LogicalOperatorType::CACHE) { auto cop = dynamic_cast(inputNode); assert(inputNode); - stage->setInputPartitions(cop->cachedPartitions()); - stage->setInputExceptions(cop->cachedExceptions()); - stage->setPartitionToExceptionsMap(cop->partitionToExceptionsMap()); + stage->setInputPartitions(cop->cachedNormalPartitions()); + stage->setGeneralPartitions(cop->cachedGeneralPartitions()); + stage->setFallbackPartitions(cop->cachedFallbackPartitions()); + stage->setPartitionGroups(cop->partitionGroups()); } else if(inputNode->type() == LogicalOperatorType::FILEINPUT) { auto csvop = dynamic_cast(inputNode); stage->setInputFiles(csvop->getURIs(), csvop->getURISizes()); diff --git a/tuplex/core/src/physical/ResolveTask.cc b/tuplex/core/src/physical/ResolveTask.cc index 23ba34d26..b97fb517c 100644 --- a/tuplex/core/src/physical/ResolveTask.cc +++ b/tuplex/core/src/physical/ResolveTask.cc @@ -218,7 +218,7 @@ namespace tuplex { return tuple; } - void ResolveTask::writePythonObject(PyObject *out_row) { + void ResolveTask::writePythonObjectToFallbackSink(PyObject *out_row) { assert(out_row); // similar to merge row, need to write other rows first! @@ -228,8 +228,21 @@ namespace tuplex { // needs to be put into separate list of python objects... // save index as well to merge back in order. - assert(_rowNumber >= _numUnresolved); - _py_nonconfirming.push_back(std::make_tuple(_rowNumber - _numUnresolved, out_row)); + assert(_currentRowNumber >= _numUnresolved); + auto pickledObject = python::pickleObject(python::getMainModule(), out_row); + auto pyObjectSize = pickledObject.size(); + auto bufSize = 4 * sizeof(int64_t) + pyObjectSize; + + uint8_t *buf = new uint8_t[bufSize]; + auto ptr = buf; + *((int64_t*)ptr) = _currentRowNumber - _numUnresolved; ptr += sizeof(int64_t); + *((int64_t*)ptr) = ecToI64(ExceptionCode::PYTHON_PARALLELIZE); ptr += sizeof(int64_t); + *((int64_t*)ptr) = -1; ptr += sizeof(int64_t); + *((int64_t*)ptr) = pyObjectSize; ptr += sizeof(int64_t); + memcpy(ptr, pickledObject.c_str(), pyObjectSize); + rowToMemorySink(owner(), _fallbackSink, Schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::STRING})), + 0, contextID(), buf, bufSize); + delete[] buf; } int64_t ResolveTask::mergeNormalRow(const uint8_t *buf, int64_t bufSize) { @@ -273,7 +286,7 @@ namespace tuplex { // exceptionCode, exceptionOperatorID, rowNumber, size int64_t ecCode = ecToI64(ExceptionCode::NORMALCASEVIOLATION); int64_t ecOpID = 0; // dummy - int64_t rowNumber = _currentRowNumber; + int64_t rowNumber = _currentRowNumber - _numUnresolved; uint8_t* except_buf = serializeExceptionToMemory(ecCode, ecOpID, rowNumber, buf, bufSize, &except_size); // sink row to type violation exceptions with commonCaseOutputSchema @@ -413,7 +426,7 @@ namespace tuplex { // => need a list of for which opIds/codes resolvers are available... ///.... _numUnresolved++; - exceptionCallback(ecCode, operatorID, _rowNumber, ebuf, eSize); + exceptionCallback(ecCode, operatorID, _currentRowNumber, ebuf, eSize); return; } @@ -648,7 +661,12 @@ namespace tuplex { mergeRow(buf, serialized_length, BUF_FORMAT_GENERAL_OUTPUT); delete [] buf; } else { - writePythonObject(rowObj); + // Unwrap single element tuples before writing them to the fallback sink + if(PyTuple_Check(rowObj) && PyTuple_Size(rowObj) == 1) { + writePythonObjectToFallbackSink(PyTuple_GetItem(rowObj, 0)); + } else { + writePythonObjectToFallbackSink(rowObj); + } } // Py_XDECREF(rowObj); } @@ -676,7 +694,7 @@ namespace tuplex { // fallback 3: still exception? save... if(resCode == -1) { _numUnresolved++; - exceptionCallback(ecCode, operatorID, _rowNumber, ebuf, eSize); + exceptionCallback(ecCode, operatorID, _currentRowNumber, ebuf, eSize); } } @@ -711,7 +729,7 @@ namespace tuplex { } // abort if no exceptions! - if(_runtimeExceptions.empty() && _numInputExceptions == 0) + if(_exceptionPartitions.empty() && _generalPartitions.empty() && _fallbackPartitions.empty()) return; // special case: no functor & no python pipeline functor given @@ -724,12 +742,12 @@ namespace tuplex { #endif // copy _generalCasePartitions over to base class - IExceptionableTask::setExceptions(_runtimeExceptions); + IExceptionableTask::setExceptions(_generalPartitions); // clear exceptions, because they have been resolved (or put to new exceptions!) // if task produced exceptions, they are stored in the IExceptionableTask class! // => no need to overwrite them, getter for iexceptionabletask has all info! - _runtimeExceptions.clear(); + _generalPartitions.clear(); _wallTime = timer.time(); return; @@ -742,78 +760,61 @@ namespace tuplex { // merge exceptions with normal rows after calling slow code over them... // basic idea is go over all exception partitions, execute row wise the resolution function // and merge the result back to the partitions - for(auto partition : _runtimeExceptions) { - const uint8_t *ptr = partition->lockRaw(); - int64_t numRows = *((int64_t *) ptr); - ptr += sizeof(int64_t); - - for(int i = 0; i < numRows; ++i) { - // old - // _currentRowNumber = *((int64_t*)ptr); - // ptr += sizeof(int64_t); - // int64_t ecCode = *((int64_t*)ptr); - // ptr += sizeof(int64_t); - // int64_t operatorID = *((int64_t*)ptr); - // ptr += sizeof(int64_t); - // int64_t eSize = *((int64_t*)ptr); - // ptr += sizeof(int64_t); - + for (const auto &partition : _generalPartitions) { + const uint8_t *ptr = partition->lock(); + auto numRows = partition->getNumRows(); + for (int i = 0; i < numRows; ++i) { const uint8_t *ebuf = nullptr; int64_t ecCode = -1, operatorID = -1; size_t eSize = 0; auto delta = deserializeExceptionFromMemory(ptr, &ecCode, &operatorID, &_currentRowNumber, &ebuf, &eSize); + processExceptionRow(ecCode, operatorID, ebuf, eSize); + + ptr += delta; + _rowNumber++; + } + partition->unlock(); + partition->invalidate(); + } - // call functor over this... - // ==> important to use row number here for continuous exception resolution! - // args are: "userData", "rowNumber", "exceptionCode", "rowBuf", "bufSize" + for (const auto &partition : _fallbackPartitions) { + const uint8_t *ptr = partition->lock(); + auto numRows = partition->getNumRows(); + for (int i = 0; i < numRows; ++i) { + const uint8_t *ebuf = nullptr; + int64_t ecCode = -1, operatorID = -1; + size_t eSize = 0; + auto delta = deserializeExceptionFromMemory(ptr, &ecCode, &operatorID, &_currentRowNumber, &ebuf, + &eSize); processExceptionRow(ecCode, operatorID, ebuf, eSize); ptr += delta; - // old - //ptr += eSize; - - // always inc row number _rowNumber++; } partition->unlock(); - - // exception partition is done or exceptions are transferred to new partition... partition->invalidate(); } - // now process all of the input exceptions - if (_numInputExceptions > 0) { - // Initialize input exception to starting index - auto partition = _inputExceptions[_inputExceptionIndex]; - auto rowsLeftInPartition = partition->getNumRows() - _inputExceptionRowOffset; - const uint8_t *ptr = partition->lock() + _inputExceptionByteOffset; - - // Iterate over all input exceptions, may be accross multiple partitions - for (int i = 0; i < _numInputExceptions; ++i) { - // Change partition once exhausted - if (rowsLeftInPartition == 0) { - partition->unlock(); - _inputExceptionIndex++; - partition = _inputExceptions[_inputExceptionIndex]; - rowsLeftInPartition = partition->getNumRows(); - ptr = partition->lock(); - } - + for (const auto &partition : _exceptionPartitions) { + const uint8_t *ptr = partition->lock(); + auto numRows = partition->getNumRows(); + for (int i = 0; i < numRows; ++i) { const uint8_t *ebuf = nullptr; int64_t ecCode = -1, operatorID = -1; size_t eSize = 0; auto delta = deserializeExceptionFromMemory(ptr, &ecCode, &operatorID, &_currentRowNumber, &ebuf, &eSize); + processExceptionRow(ecCode, operatorID, ebuf, eSize); + ptr += delta; _rowNumber++; - rowsLeftInPartition--; } - // Unlock but wait to invalidate until all resolve tasks have finished partition->unlock(); + partition->invalidate(); } // merging is done, unlock the last partition & copy the others over. @@ -832,8 +833,9 @@ namespace tuplex { // clear exceptions, because they have been resolved (or put to new exceptions!) // if task produced exceptions, they are stored in the IExceptionableTask class! // => no need to overwrite them, getter for iexceptionabletask has all info! - _runtimeExceptions.clear(); - _inputExceptions.clear(); + _exceptionPartitions.clear(); + _generalPartitions.clear(); + _fallbackPartitions.clear(); } else { executeInOrder(); } @@ -881,146 +883,323 @@ namespace tuplex { _rowNumber = 0; } - // Initialize runtime exception variables - size_t curRuntimePartitionInd = 0; // current index into vector of runtime exception partitions - int64_t numRuntimeRowsLeftInPartition = 0; // number of rows remaining in partition - const uint8_t *runPtr = nullptr; - if (_runtimeExceptions.size() > 0) { - curRuntimePartitionInd = 0; - numRuntimeRowsLeftInPartition = _runtimeExceptions[curRuntimePartitionInd]->getNumRows(); - runPtr = _runtimeExceptions[curRuntimePartitionInd]->lock(); + size_t curExceptionInd = 0; + size_t exceptionsRemaining = 0; + const uint8_t *expPtr = nullptr; + size_t exceptionNumRows = 0; + for (int i = 0; i < _exceptionPartitions.size(); ++i) { + auto numRows = _exceptionPartitions[i]->getNumRows(); + exceptionNumRows += numRows; + if (i == 0) { + expPtr = _exceptionPartitions[i]->lock(); + exceptionsRemaining = numRows; + } + } + + size_t curGeneralInd = 0; + size_t generalRemaining = 0; + const uint8_t *generalPtr = nullptr; + size_t generalNumRows = 0; + for (int i = 0; i < _generalPartitions.size(); ++i) { + auto numRows = _generalPartitions[i]->getNumRows(); + generalNumRows += numRows; + if (i == 0) { + generalPtr = _generalPartitions[i]->lock(); + generalRemaining = numRows; + } } - // Initialize input exception variables - size_t curInputPartitionInd = 0; // current index into vector of input exception partitions - int64_t numInputRowsLeftInPartition = 0; // number of rows remaining in partition - const uint8_t *inputPtr = nullptr; - if (_numInputExceptions > 0) { - curInputPartitionInd = _inputExceptionIndex; - numInputRowsLeftInPartition = _inputExceptions[curInputPartitionInd]->getNumRows() - _inputExceptionRowOffset; - inputPtr = _inputExceptions[curInputPartitionInd]->lock() + _inputExceptionByteOffset; + size_t curFallbackInd = 0; + size_t fallbackRemaining = 0; + const uint8_t *fallPtr = nullptr; + size_t fallbackNumRows = 0; + for (int i = 0; i < _fallbackPartitions.size(); ++i) { + auto numRows = _fallbackPartitions[i]->getNumRows(); + fallbackNumRows += numRows; + if (i == 0) { + fallPtr = _fallbackPartitions[i]->lock(); + fallbackRemaining = numRows; + } } // Merge input and runtime exceptions in order. To do so, we can compare the row indices of the // current runtime and input exception and process the one that occurs first. The saved row indices of // runtime exceptions do not account for the existence of input exceptions, so we need to add the previous // input exceptions to compare the true row number - size_t inputRowsProcessed = 0; - const uint8_t *ptr = nullptr; - while (runPtr && inputPtr) { - auto runRowInd = *((int64_t *) runPtr); // get current runtime row index - auto inputRowInd = *((int64_t *) inputPtr); // get current input row index - bool isRuntimeException = false; - // compare indices with accounting for previous input exceptions - if (runRowInd + inputRowsProcessed < inputRowInd) { - ptr = runPtr; - numRuntimeRowsLeftInPartition--; - isRuntimeException = true; + while (_exceptionCounter < exceptionNumRows && _generalCounter < generalNumRows && _fallbackCounter < fallbackNumRows) { + auto expRowInd = *((int64_t *) expPtr) + _fallbackCounter + _generalCounter; + auto generalRowInd = *((int64_t *) generalPtr) + _fallbackCounter; + auto fallbackRowInd = *((int64_t *) fallPtr); + + const uint8_t *buf = nullptr; + int64_t ecCode = 0, operatorID = -1; + size_t eSize = 0; + if (fallbackRowInd <= expRowInd && fallbackRowInd <= generalRowInd) { + fallbackRemaining--; + _fallbackCounter++; + + auto delta = deserializeExceptionFromMemory(fallPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + fallPtr += delta; + } else if (generalRowInd <= expRowInd && generalRowInd <= fallbackRowInd) { + generalRemaining--; + _generalCounter++; + + auto delta = deserializeExceptionFromMemory(generalPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + _currentRowNumber += _fallbackCounter; + generalPtr += delta; } else { - ptr = inputPtr; - numInputRowsLeftInPartition--; - inputRowsProcessed++; + exceptionsRemaining--; + _exceptionCounter++; + + auto delta = deserializeExceptionFromMemory(expPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + _currentRowNumber += _fallbackCounter + _generalCounter; + expPtr += delta; } - const uint8_t *ebuf = nullptr; - int64_t ecCode = -1, operatorID = -1; + processExceptionRow(ecCode, operatorID, buf, eSize); + _rowNumber++; + + if (exceptionsRemaining == 0) { + _exceptionPartitions[curExceptionInd]->unlock(); + _exceptionPartitions[curExceptionInd]->invalidate(); + curExceptionInd++; + if (curExceptionInd < _exceptionPartitions.size()) { + exceptionsRemaining = _exceptionPartitions[curExceptionInd]->getNumRows(); + expPtr = _exceptionPartitions[curExceptionInd]->lock(); + } + } + + if (generalRemaining == 0) { + _generalPartitions[curGeneralInd]->unlock(); + _generalPartitions[curGeneralInd]->invalidate(); + curGeneralInd++; + if (curGeneralInd < _generalPartitions.size()) { + generalRemaining = _generalPartitions[curGeneralInd]->getNumRows(); + generalPtr = _generalPartitions[curGeneralInd]->lock(); + } + } + + if (fallbackRemaining == 0) { + _fallbackPartitions[curFallbackInd]->unlock(); + _fallbackPartitions[curFallbackInd]->invalidate(); + curFallbackInd++; + if (curFallbackInd < _fallbackPartitions.size()) { + fallbackRemaining = _fallbackPartitions[curFallbackInd]->getNumRows(); + fallPtr = _fallbackPartitions[curFallbackInd]->lock(); + } + } + } + + while (_exceptionCounter < exceptionNumRows && _generalCounter < generalNumRows) { + auto expRowInd = *((int64_t *) expPtr) + _fallbackCounter + _generalCounter; + auto generalRowInd = *((int64_t *) generalPtr) + _generalCounter; + + const uint8_t *buf = nullptr; + int64_t ecCode = 0, operatorID = -1; size_t eSize = 0; - auto delta = deserializeExceptionFromMemory(ptr, &ecCode, &operatorID, &_currentRowNumber, &ebuf, - &eSize); + if (generalRowInd <= expRowInd) { + generalRemaining--; + _generalCounter++; - if (isRuntimeException) { - _currentRowNumber += inputRowsProcessed; - runPtr += delta; + auto delta = deserializeExceptionFromMemory(generalPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + _currentRowNumber += _fallbackCounter; + generalPtr += delta; } else { - inputPtr += delta; + exceptionsRemaining--; + _exceptionCounter++; + + auto delta = deserializeExceptionFromMemory(expPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + _currentRowNumber += _fallbackCounter + _generalCounter; + expPtr += delta; } - processExceptionRow(ecCode, operatorID, ebuf, eSize); + processExceptionRow(ecCode, operatorID, buf, eSize); _rowNumber++; - // Exhausted current runtime exceptions, need to switch partitions - if (numRuntimeRowsLeftInPartition == 0) { - _runtimeExceptions[curRuntimePartitionInd]->unlock(); - _runtimeExceptions[curRuntimePartitionInd]->invalidate(); - curRuntimePartitionInd++; - // Still have more exceptions to go through - if (curRuntimePartitionInd < _runtimeExceptions.size()) { - numRuntimeRowsLeftInPartition = _runtimeExceptions[curRuntimePartitionInd]->getNumRows(); - runPtr = _runtimeExceptions[curRuntimePartitionInd]->lock(); - } else { - // processed all exceptions - runPtr = nullptr; + if (exceptionsRemaining == 0) { + _exceptionPartitions[curExceptionInd]->unlock(); + _exceptionPartitions[curExceptionInd]->invalidate(); + curExceptionInd++; + if (curExceptionInd < _exceptionPartitions.size()) { + exceptionsRemaining = _exceptionPartitions[curExceptionInd]->getNumRows(); + expPtr = _exceptionPartitions[curExceptionInd]->lock(); } } - // Exhausted current input exceptions, need to switch partitions - if (numInputRowsLeftInPartition == 0 || inputRowsProcessed == _numInputExceptions) { - _inputExceptions[curInputPartitionInd]->unlock(); - curInputPartitionInd++; - // Still have more exceptions to go through - if (curInputPartitionInd < _inputExceptions.size() && inputRowsProcessed < _numInputExceptions) { - numInputRowsLeftInPartition = _inputExceptions[curInputPartitionInd]->getNumRows(); - inputPtr = _inputExceptions[curInputPartitionInd]->lock(); - } else { - // processed all exceptions - inputPtr = nullptr; + if (generalRemaining == 0) { + _generalPartitions[curGeneralInd]->unlock(); + _generalPartitions[curGeneralInd]->invalidate(); + curGeneralInd++; + if (curGeneralInd < _generalPartitions.size()) { + generalRemaining = _generalPartitions[curGeneralInd]->getNumRows(); + generalPtr = _generalPartitions[curGeneralInd]->lock(); } } } - // Process remaining runtime exceptions if any exist - while (runPtr) { - const uint8_t *ebuf = nullptr; + while (_generalCounter < generalNumRows && _fallbackCounter < fallbackNumRows) { + auto generalRowInd = *((int64_t *) generalPtr) + _fallbackCounter; + auto fallbackRowInd = *((int64_t *) fallPtr); + + const uint8_t *buf = nullptr; + int64_t ecCode = 0, operatorID = -1; + size_t eSize = 0; + if (fallbackRowInd <= generalRowInd) { + fallbackRemaining--; + _fallbackCounter++; + + auto delta = deserializeExceptionFromMemory(fallPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + fallPtr += delta; + } else { + generalRemaining--; + _generalCounter++; + + auto delta = deserializeExceptionFromMemory(generalPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + _currentRowNumber += _fallbackCounter; + generalPtr += delta; + } + + processExceptionRow(ecCode, operatorID, buf, eSize); + _rowNumber++; + + if (generalRemaining == 0) { + _generalPartitions[curGeneralInd]->unlock(); + _generalPartitions[curGeneralInd]->invalidate(); + curGeneralInd++; + if (curGeneralInd < _generalPartitions.size()) { + generalRemaining = _generalPartitions[curGeneralInd]->getNumRows(); + generalPtr = _generalPartitions[curGeneralInd]->lock(); + } + } + + if (fallbackRemaining == 0) { + _fallbackPartitions[curFallbackInd]->unlock(); + _fallbackPartitions[curFallbackInd]->invalidate(); + curFallbackInd++; + if (curFallbackInd < _fallbackPartitions.size()) { + fallbackRemaining = _fallbackPartitions[curFallbackInd]->getNumRows(); + fallPtr = _fallbackPartitions[curFallbackInd]->lock(); + } + } + } + + while (_exceptionCounter < exceptionNumRows && _fallbackCounter < fallbackNumRows) { + auto expRowInd = *((int64_t *) expPtr) + _fallbackCounter + _generalCounter; + auto fallbackRowInd = *((int64_t *) fallPtr); + + const uint8_t *buf = nullptr; + int64_t ecCode = 0, operatorID = -1; + size_t eSize = 0; + if (fallbackRowInd <= expRowInd) { + fallbackRemaining--; + _fallbackCounter++; + + auto delta = deserializeExceptionFromMemory(fallPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + fallPtr += delta; + } else { + exceptionsRemaining--; + _exceptionCounter++; + + auto delta = deserializeExceptionFromMemory(expPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + _currentRowNumber += _fallbackCounter + _generalCounter; + expPtr += delta; + } + + processExceptionRow(ecCode, operatorID, buf, eSize); + _rowNumber++; + + if (exceptionsRemaining == 0) { + _exceptionPartitions[curExceptionInd]->unlock(); + _exceptionPartitions[curExceptionInd]->invalidate(); + curExceptionInd++; + if (curExceptionInd < _exceptionPartitions.size()) { + exceptionsRemaining = _exceptionPartitions[curExceptionInd]->getNumRows(); + expPtr = _exceptionPartitions[curExceptionInd]->lock(); + } + } + + if (fallbackRemaining == 0) { + _fallbackPartitions[curFallbackInd]->unlock(); + _fallbackPartitions[curFallbackInd]->invalidate(); + curFallbackInd++; + if (curFallbackInd < _fallbackPartitions.size()) { + fallbackRemaining = _fallbackPartitions[curFallbackInd]->getNumRows(); + fallPtr = _fallbackPartitions[curFallbackInd]->lock(); + } + } + } + + while (_exceptionCounter < exceptionNumRows) { + const uint8_t *buf = nullptr; int64_t ecCode = -1, operatorID = -1; size_t eSize = 0; - auto delta = deserializeExceptionFromMemory(runPtr, &ecCode, &operatorID, &_currentRowNumber, &ebuf, - &eSize); - _currentRowNumber += inputRowsProcessed; - processExceptionRow(ecCode, operatorID, ebuf, eSize); - runPtr += delta; + auto delta = deserializeExceptionFromMemory(expPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + _currentRowNumber += _generalCounter + _fallbackCounter; + expPtr += delta; + + processExceptionRow(ecCode, operatorID, buf, eSize); _rowNumber++; - numRuntimeRowsLeftInPartition--; - // Exhausted current runtime exceptions in partitions need to switch partitions or could be done - if (numRuntimeRowsLeftInPartition == 0) { - _runtimeExceptions[curRuntimePartitionInd]->unlock(); - _runtimeExceptions[curRuntimePartitionInd]->invalidate(); - curRuntimePartitionInd++; - // More exceptions to process - if (curRuntimePartitionInd < _runtimeExceptions.size()) { - numRuntimeRowsLeftInPartition = _runtimeExceptions[curRuntimePartitionInd]->getNumRows(); - runPtr = _runtimeExceptions[curRuntimePartitionInd]->lock(); - } else { - // processed all exceptions - runPtr = nullptr; + exceptionsRemaining--; + _exceptionCounter++; + + if (exceptionsRemaining == 0) { + _exceptionPartitions[curExceptionInd]->unlock(); + _exceptionPartitions[curExceptionInd]->invalidate(); + curExceptionInd++; + if (curExceptionInd < _exceptionPartitions.size()) { + exceptionsRemaining = _exceptionPartitions[curExceptionInd]->getNumRows(); + expPtr = _exceptionPartitions[curExceptionInd]->lock(); } } } - // Process remaining input exceptions if any exist - while (inputPtr) { - const uint8_t *ebuf = nullptr; + while (_generalCounter < generalNumRows) { + const uint8_t *buf = nullptr; int64_t ecCode = -1, operatorID = -1; size_t eSize = 0; - auto delta = deserializeExceptionFromMemory(inputPtr, &ecCode, &operatorID, &_currentRowNumber, &ebuf, - &eSize); - processExceptionRow(ecCode, operatorID, ebuf, eSize); - inputPtr += delta; + auto delta = deserializeExceptionFromMemory(generalPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + _currentRowNumber += _fallbackCounter; + + generalPtr += delta; + + processExceptionRow(ecCode, operatorID, buf, eSize); _rowNumber++; - numInputRowsLeftInPartition--; - inputRowsProcessed++; - // Exhausted current input exceptions, need to switch partitions - if (numInputRowsLeftInPartition == 0 || inputRowsProcessed == _numInputExceptions) { - _inputExceptions[curInputPartitionInd]->unlock(); - curInputPartitionInd++; - // Still have more exceptions - if (curInputPartitionInd < _inputExceptions.size() && inputRowsProcessed < _numInputExceptions) { - numInputRowsLeftInPartition = _inputExceptions[curInputPartitionInd]->getNumRows(); - inputPtr = _inputExceptions[curInputPartitionInd]->lock(); - } else { - // processed all exceptions - inputPtr = nullptr; + generalRemaining--; + _generalCounter++; + + if (generalRemaining == 0) { + _generalPartitions[curGeneralInd]->unlock(); + _generalPartitions[curGeneralInd]->invalidate(); + curGeneralInd++; + if (curGeneralInd < _generalPartitions.size()) { + generalRemaining = _generalPartitions[curGeneralInd]->getNumRows(); + generalPtr = _generalPartitions[curGeneralInd]->lock(); + } + } + } + + while (_fallbackCounter < fallbackNumRows) { + const uint8_t *buf = nullptr; + int64_t ecCode = -1, operatorID = -1; + size_t eSize = 0; + auto delta = deserializeExceptionFromMemory(fallPtr, &ecCode, &operatorID, &_currentRowNumber, &buf, &eSize); + fallPtr += delta; + + processExceptionRow(ecCode, operatorID, buf, eSize); + _rowNumber++; + + fallbackRemaining--; + _fallbackCounter++; + + if (fallbackRemaining == 0) { + _fallbackPartitions[curFallbackInd]->unlock(); + _fallbackPartitions[curFallbackInd]->invalidate(); + curFallbackInd++; + if (curFallbackInd < _fallbackPartitions.size()) { + fallbackRemaining = _fallbackPartitions[curFallbackInd]->getNumRows(); + fallPtr = _fallbackPartitions[curFallbackInd]->lock(); } } } @@ -1054,7 +1233,9 @@ namespace tuplex { // clear exceptions, because they have been resolved (or put to new exceptions!) // if task produced exceptions, they are stored in the IExceptionableTask class! // => no need to overwrite them, getter for iexceptionabletask has all info! - _runtimeExceptions.clear(); + _exceptionPartitions.clear(); + _generalPartitions.clear(); + _fallbackPartitions.clear(); } void ResolveTask::sendStatusToHistoryServer() { @@ -1071,6 +1252,7 @@ namespace tuplex { void ResolveTask::unlockAll() { _mergedRowsSink.unlock(); _generalCaseSink.unlock(); + _fallbackSink.unlock(); // unlock exceptionable task IExceptionableTask::unlockAll(); diff --git a/tuplex/core/src/physical/ResultSet.cc b/tuplex/core/src/physical/ResultSet.cc index 0f7bf7319..5086a1e58 100644 --- a/tuplex/core/src/physical/ResultSet.cc +++ b/tuplex/core/src/physical/ResultSet.cc @@ -13,97 +13,175 @@ namespace tuplex { ResultSet::ResultSet(const Schema& schema, - const std::vector& partitions, - const std::vector& exceptions, - const std::unordered_map& partitionToExceptionsMap, - const std::vector> pyobjects, + const std::vector& normalPartitions, + const std::vector& generalPartitions, + const std::vector& fallbackPartitions, + const std::vector& partitionGroups, int64_t maxRows) : ResultSet::ResultSet() { - for(Partition *p : partitions) - _partitions.push_back(p); - - _pyobjects = std::deque>(pyobjects.begin(), pyobjects.end()); - _exceptions = exceptions; - _partitionToExceptionsMap = partitionToExceptionsMap; - _curRowCounter = 0; + for (const auto &group : partitionGroups) + _partitionGroups.push_back(group); + + for (const auto &p : normalPartitions) + _remainingNormalPartitions.push_back(p); + for (const auto &p : generalPartitions) + _remainingGeneralPartitions.push_back(p); + for (const auto &p : fallbackPartitions) + _remainingFallbackPartitions.push_back(p); + + _curNormalRowCounter = 0; + _curNormalByteCounter = 0; + _curGeneralRowCounter = 0; + _curGeneralByteCounter = 0; + _curFallbackRowCounter = 0; + _curFallbackByteCounter = 0; + _normalRowCounter = 0; + _generalRowCounter = 0; + _fallbackRowCounter = 0; _totalRowCounter = 0; - _byteCounter = 0; + _schema = schema; _maxRows = maxRows < 0 ? std::numeric_limits::max() : maxRows; - _rowsRetrieved = 0; } - void ResultSet::clear() { - for(auto partition : _partitions) - partition->invalidate(); - _partitions.clear(); - for(auto partition : _exceptions) + void clearPartitions(std::list& partitions) { + for (auto &partition : partitions) { partition->invalidate(); + } + partitions.clear(); + } - _curRowCounter = 0; - _byteCounter = 0; + void ResultSet::clear() { + clearPartitions(_remainingNormalPartitions); + clearPartitions(_currentNormalPartitions); + clearPartitions(_remainingGeneralPartitions); + clearPartitions(_currentGeneralPartitions); + clearPartitions(_remainingFallbackPartitions); + clearPartitions(_currentFallbackPartitions); + _partitionGroups.clear(); + + _curNormalRowCounter = 0; + _curNormalByteCounter = 0; + _curGeneralRowCounter = 0; + _curGeneralByteCounter = 0; + _curFallbackRowCounter = 0; + _curFallbackByteCounter = 0; + _normalRowCounter = 0; + _generalRowCounter = 0; + _fallbackRowCounter = 0; + _totalRowCounter = 0; _maxRows = 0; - _rowsRetrieved = 0; } - bool ResultSet::hasNextRow() { - + bool ResultSet::hasNextNormalPartition() const { // all rows already retrieved? - if(_rowsRetrieved >= _maxRows) + if (_totalRowCounter >= _maxRows) return false; // empty? - if(_partitions.empty() && _pyobjects.empty()) + if (_currentNormalPartitions.empty() && _remainingNormalPartitions.empty()) { return false; - else { - // partitions empty? - if(_partitions.empty()) - return true; - else if(_pyobjects.empty()) { - assert(_partitions.size() > 0); - assert(_partitions.front()); - - // still one row left? - return _curRowCounter < _partitions.front()->getNumRows(); - } else { - return true; // there's for sure at least one object left! - } + } else if (!_currentNormalPartitions.empty()) { + return _curNormalRowCounter < _currentNormalPartitions.front()->getNumRows(); + } else { + return _remainingNormalPartitions.front()->getNumRows() > 0; } - } + bool ResultSet::hasNextGeneralPartition() const { + // all rows already retrieved? + if (_totalRowCounter >= _maxRows) + return false; + + // empty? + if (_currentGeneralPartitions.empty() && _remainingGeneralPartitions.empty()) { + return false; + } else if (!_currentGeneralPartitions.empty()) { + return _curGeneralRowCounter < _currentGeneralPartitions.front()->getNumRows(); + } else { + return _remainingGeneralPartitions.front()->getNumRows() > 0; + } + } - bool ResultSet::hasNextPartition() const { + bool ResultSet::hasNextFallbackPartition() const { // all rows already retrieved? - if(_rowsRetrieved >= _maxRows) + if (_totalRowCounter >= _maxRows) return false; // empty? - if(_partitions.empty()) + if (_currentFallbackPartitions.empty() && _remainingFallbackPartitions.empty()) { return false; - else { - assert(_partitions.size() > 0); - assert(_partitions.front()); + } else if (!_currentFallbackPartitions.empty()) { + return _curFallbackRowCounter < _currentFallbackPartitions.front()->getNumRows(); + } else { + return _remainingFallbackPartitions.front()->getNumRows() > 0; + } + } + + Partition* ResultSet::getNextGeneralPartition() { + if (_currentGeneralPartitions.empty() && _remainingGeneralPartitions.empty()) + return nullptr; - // still one row left? - return _curRowCounter < _partitions.front()->getNumRows(); + Partition *first = nullptr; + if (!_currentGeneralPartitions.empty()) { + first = _currentGeneralPartitions.front(); + _currentGeneralPartitions.pop_front(); + } else { + first = _remainingGeneralPartitions.front(); + _remainingGeneralPartitions.pop_front(); } + + auto numRows = first->getNumRows(); + _totalRowCounter += numRows; + _generalRowCounter += numRows; + + _curGeneralRowCounter = 0; + _curGeneralByteCounter = 0; + + return first; } - Partition* ResultSet::getNextPartition() { - if(_partitions.empty()) + Partition* ResultSet::getNextFallbackPartition() { + if (_currentFallbackPartitions.empty() && _remainingFallbackPartitions.empty()) return nullptr; - assert(_partitions.size() > 0); + Partition *first = nullptr; + if (!_currentFallbackPartitions.empty()) { + first = _currentFallbackPartitions.front(); + _currentFallbackPartitions.pop_front(); + } else { + first = _remainingFallbackPartitions.front(); + _remainingFallbackPartitions.pop_front(); + } + + auto numRows = first->getNumRows(); + _totalRowCounter += numRows; + _fallbackRowCounter += numRows; - Partition *first = _partitions.front(); - assert(_schema == first->schema()); + _curFallbackRowCounter = 0; + _curFallbackByteCounter = 0; + + return first; + } + + Partition* ResultSet::getNextNormalPartition() { + if (_currentNormalPartitions.empty() && _remainingNormalPartitions.empty()) + return nullptr; + + Partition *first = nullptr; + if (!_currentNormalPartitions.empty()) { + first = _currentNormalPartitions.front(); + _currentNormalPartitions.pop_front(); + } else { + first = _remainingNormalPartitions.front(); + _remainingNormalPartitions.pop_front(); + } auto numRows = first->getNumRows(); - _rowsRetrieved += numRows; + _totalRowCounter += numRows; + _normalRowCounter += numRows; - _partitions.pop_front(); - _curRowCounter = 0; - _byteCounter = 0; + _curNormalRowCounter = 0; + _curNormalByteCounter = 0; return first; } @@ -121,23 +199,25 @@ namespace tuplex { v.reserve(limit); // do a quick check whether there are ANY pyobjects, if not deserialize quickly! - if(_pyobjects.empty()) { - - if(_partitions.empty()) + if(_currentGeneralPartitions.empty() && _remainingGeneralPartitions.empty() && _currentFallbackPartitions.empty() && _remainingFallbackPartitions.empty()) { + if (_currentNormalPartitions.empty() && _remainingNormalPartitions.empty()) return vector{}; + for (const auto &p : _remainingNormalPartitions) + _currentNormalPartitions.push_back(p); + Deserializer ds(_schema); for(int i = 0; i < limit;) { // all exhausted - if(_partitions.empty()) + if(_currentNormalPartitions.empty()) break; // get number of rows in first partition - Partition *first = _partitions.front(); + Partition *first = _currentNormalPartitions.front(); auto num_rows = first->getNumRows(); // how many left to retrieve? - auto num_to_retrieve_from_partition = std::min(limit - i, num_rows - _curRowCounter); + auto num_to_retrieve_from_partition = std::min(limit - i, num_rows - _curNormalRowCounter); if(num_to_retrieve_from_partition <= 0) break; @@ -148,11 +228,11 @@ namespace tuplex { // get next element of partition const uint8_t* ptr = first->lock(); for(int j = 0; j < num_to_retrieve_from_partition; ++j) { - auto row = Row::fromMemory(ds, ptr + _byteCounter, first->capacity() - _byteCounter); - _byteCounter += row.serializedLength(); - _curRowCounter++; - _rowsRetrieved++; + auto row = Row::fromMemory(ds, ptr + _curNormalByteCounter, first->capacity() - _curNormalByteCounter); + _curNormalByteCounter += row.serializedLength(); + _curNormalRowCounter++; _totalRowCounter++; + _normalRowCounter++; v.push_back(row); } @@ -163,17 +243,13 @@ namespace tuplex { i += num_to_retrieve_from_partition; // get next Partition ready when current one is exhausted - if(_curRowCounter == first->getNumRows()) - removeFirstPartition(); + if(_curNormalRowCounter == first->getNumRows()) + removeFirstNormalPartition(); } v.shrink_to_fit(); return v; } else { - // fallback solution: - // @TODO: write faster version with proper merging! - - std::vector v; while (hasNextRow() && v.size() < limit) { v.push_back(getNextRow()); } @@ -182,81 +258,252 @@ namespace tuplex { } } - Row ResultSet::getNextRow() { - // merge rows from objects - if(!_pyobjects.empty()) { - auto row_number = std::get<0>(_pyobjects.front()); - auto obj = std::get<1>(_pyobjects.front()); - - // partitions empty? - // => simply return next row. no fancy merging possible - // else merge based on row number. - if(_partitions.empty() || row_number <= _totalRowCounter) { - // merge - python::lockGIL(); - auto row = python::pythonToRow(obj); - python::unlockGIL(); - _pyobjects.pop_front(); - _rowsRetrieved++; - - // update row counter (not for double indices which could occur from flatMap!) - if(_pyobjects.empty()) - _totalRowCounter++; - else { - auto next_row_number = std::get<0>(_pyobjects.front()); - if(next_row_number != row_number) - _totalRowCounter++; - } + bool ResultSet::hasNextNormalRow() { + if (!_currentNormalPartitions.empty() && _curNormalRowCounter < _currentNormalPartitions.front()->getNumRows()) + return true; + for (const auto &p : _remainingNormalPartitions) + if (p->getNumRows() > 0) + return true; + return false; + } + + bool ResultSet::hasNextGeneralRow() { + if (!_currentGeneralPartitions.empty() && _curGeneralRowCounter < _currentGeneralPartitions.front()->getNumRows()) + return true; + for (const auto &p : _remainingGeneralPartitions) + if (p->getNumRows() > 0) + return true; + return false; + } + + bool ResultSet::hasNextFallbackRow() { + if (!_currentFallbackPartitions.empty() && _curFallbackRowCounter < _currentFallbackPartitions.front()->getNumRows()) + return true; + for (const auto &p : _remainingFallbackPartitions) + if (p->getNumRows() > 0) + return true; + return false; + } + + bool ResultSet::hasNextRow() { + // all rows already retrieved? + if(_totalRowCounter >= _maxRows) + return false; + + return hasNextNormalRow() || hasNextGeneralRow() || hasNextFallbackRow(); + } - return row; + Row ResultSet::getNextRow() { + if (_currentNormalPartitions.empty() && _currentFallbackPartitions.empty() && _currentGeneralPartitions.empty()) { + // all partitions are exhausted return empty row as default value + if (_partitionGroups.empty()) + return Row(); + _normalRowCounter = 0; + _generalRowCounter = 0; + _fallbackRowCounter = 0; + auto group = _partitionGroups.front(); + _partitionGroups.pop_front(); + for (int i = group.normalPartitionStartIndex; i < group.normalPartitionStartIndex + group.numNormalPartitions; ++i) { + _currentNormalPartitions.push_back(_remainingNormalPartitions.front()); + _remainingNormalPartitions.pop_front(); + } + for (int i = group.generalPartitionStartIndex; i < group.generalPartitionStartIndex + group.numGeneralPartitions; ++i) { + _currentGeneralPartitions.push_back(_remainingGeneralPartitions.front()); + _remainingGeneralPartitions.pop_front(); + } + for (int i = group.fallbackPartitionStartIndex; i < group.fallbackPartitionStartIndex + group.numFallbackPartitions; ++i) { + _currentFallbackPartitions.push_back(_remainingFallbackPartitions.front()); + _remainingFallbackPartitions.pop_front(); + } + return getNextRow(); + } else if (_currentNormalPartitions.empty() && _currentFallbackPartitions.empty()) { + // only general rows remain, return next general row + return getNextGeneralRow(); + } else if (_currentNormalPartitions.empty() && _currentGeneralPartitions.empty()) { + // only fallback rows remain, return next fallback row + return getNextFallbackRow(); + } else if (_currentFallbackPartitions.empty() && _currentGeneralPartitions.empty()) { + // only normal rows remain, return next normal row + return getNextNormalRow(); + } else if (_currentFallbackPartitions.empty()) { + // only normal and general rows remain, compare row index + // emit normal rows until reached current general ind + if (_normalRowCounter + _generalRowCounter < currentGeneralRowInd()) { + return getNextNormalRow(); + } else { + return getNextGeneralRow(); + } + } else if (_currentGeneralPartitions.empty()) { + // only normal and fallback rows remain, compare row index + // emit normal rows until reached current fallback ind + if (_normalRowCounter + _generalRowCounter + _fallbackRowCounter < currentFallbackRowInd()) { + return getNextNormalRow(); + } else { + return getNextFallbackRow(); + } + } else { + // all three cases remain, three way row comparison + auto generalRowInd = currentGeneralRowInd(); + auto fallbackRowInd = currentFallbackRowInd(); + if (_normalRowCounter + _generalRowCounter < generalRowInd && _normalRowCounter + _generalRowCounter + _fallbackRowCounter < fallbackRowInd) { + return getNextNormalRow(); + } else if (generalRowInd <= fallbackRowInd) { + return getNextGeneralRow(); + } else { + return getNextFallbackRow(); } } + } - // check whether entry is available, else return empty row - if(_partitions.empty()) - return Row(); + int64_t ResultSet::currentFallbackRowInd() { + assert(!_currentFallbackPartitions.empty()); + auto p = _currentFallbackPartitions.front(); + auto ptr = p->lock() + _curFallbackByteCounter; + auto rowInd = *((int64_t*) ptr); + p->unlock(); + return rowInd; + } - assert(_partitions.size() > 0); - Partition *first = _partitions.front(); + int64_t ResultSet::currentGeneralRowInd() { + assert(!_currentGeneralPartitions.empty()); + auto p = _currentGeneralPartitions.front(); + auto ptr = p->lock() + _curGeneralByteCounter; + auto rowInd = *((int64_t*) ptr); + p->unlock(); + return rowInd; + } - // make sure partition schema matches stored schema - assert(_schema == first->schema()); + Row ResultSet::getNextNormalRow() { + assert (!_currentNormalPartitions.empty()); + auto p = _currentNormalPartitions.front(); + assert(_schema == p->schema()); - Row row; + auto ptr = p->lock() + _curNormalByteCounter; + auto capacity = p->capacity() - _curNormalByteCounter; + auto row = Row::fromMemory(_schema, ptr, capacity); + p->unlock(); - // thread safe version (slow) - // get next element of partition - const uint8_t* ptr = first->lock(); + _curNormalByteCounter += row.serializedLength(); + _curNormalRowCounter++; + _totalRowCounter++; + _normalRowCounter++; - row = Row::fromMemory(_schema, ptr + _byteCounter, first->capacity() - _byteCounter); + if (_curNormalRowCounter == p->getNumRows()) { + removeFirstNormalPartition(); + } - // thread safe version (slow) - // deserialize - first->unlock(); + return row; + } + + Row ResultSet::getNextGeneralRow() { + assert (!_currentGeneralPartitions.empty()); + auto p = _currentGeneralPartitions.front(); + assert(_schema == p->schema()); + + auto prevRowInd = currentGeneralRowInd(); + _curGeneralByteCounter += 4 * sizeof(int64_t); + auto ptr = p->lock() + _curGeneralByteCounter; + auto capacity = p->capacity() - _curGeneralByteCounter; + auto row = Row::fromMemory(_schema, ptr, capacity); + p->unlock(); + + _curGeneralByteCounter += row.serializedLength(); + _curGeneralRowCounter++; + + if (_curGeneralRowCounter == p->getNumRows()) { + removeFirstGeneralPartition(); + } - _byteCounter += row.serializedLength(); - _curRowCounter++; - _rowsRetrieved++; _totalRowCounter++; + if (_currentGeneralPartitions.empty() || currentGeneralRowInd() > prevRowInd) { + _generalRowCounter++; + } + + return row; + } + + Row ResultSet::getNextFallbackRow() { + assert (!_currentFallbackPartitions.empty()); + + auto prevRowInd = currentFallbackRowInd(); + auto p = _currentFallbackPartitions.front(); + auto ptr = p->lock() + _curFallbackByteCounter; + auto pyObjectSize = ((int64_t *) ptr)[3]; ptr += 4 * sizeof(int64_t); + + python::lockGIL(); + auto row = python::pythonToRow(python::deserializePickledObject(python::getMainModule(), (char *) ptr, pyObjectSize)); + python::unlockGIL(); + + p->unlock(); + + _curFallbackByteCounter += pyObjectSize + 4*sizeof(int64_t); + _curFallbackRowCounter++; + + if (_curFallbackRowCounter == p->getNumRows()) { + removeFirstFallbackPartition(); + } - // get next Partition ready when current one is exhausted - if(_curRowCounter == first->getNumRows()) - removeFirstPartition(); + _totalRowCounter++; + if (_currentFallbackPartitions.empty() || currentFallbackRowInd() > prevRowInd) { + _fallbackRowCounter++; + } return row; } size_t ResultSet::rowCount() const { size_t count = 0; - for(const auto& partition : _partitions) { + for (const auto& partition : _currentNormalPartitions) count += partition->getNumRows(); - } - return count + _pyobjects.size(); + for (const auto& partition : _remainingNormalPartitions) + count += partition->getNumRows(); + for (const auto& partition : _currentGeneralPartitions) + count += partition->getNumRows(); + for (const auto& partition : _remainingGeneralPartitions) + count += partition->getNumRows(); + for (const auto& partition : _currentFallbackPartitions) + count += partition->getNumRows(); + for (const auto& partition : _remainingFallbackPartitions) + count += partition->getNumRows(); + return count; } - void ResultSet::removeFirstPartition() { - assert(_partitions.size() > 0); - Partition *first = _partitions.front(); + void ResultSet::removeFirstGeneralPartition() { + assert(!_currentGeneralPartitions.empty()); + Partition *first = _currentGeneralPartitions.front(); + assert(first); + + // invalidate partition +#ifndef NDEBUG + Logger::instance().defaultLogger().info("ResultSet invalidates partition " + hexAddr(first) + " uuid " + uuidToString(first->uuid())); +#endif + first->invalidate(); + + _currentGeneralPartitions.pop_front(); + _curGeneralRowCounter = 0; + _curGeneralByteCounter = 0; + } + + void ResultSet::removeFirstFallbackPartition() { + assert(!_currentFallbackPartitions.empty()); + Partition *first = _currentFallbackPartitions.front(); + assert(first); + + // invalidate partition +#ifndef NDEBUG + Logger::instance().defaultLogger().info("ResultSet invalidates partition " + hexAddr(first) + " uuid " + uuidToString(first->uuid())); +#endif + first->invalidate(); + + // remove partition (is now processed) + _currentFallbackPartitions.pop_front(); + _curFallbackRowCounter = 0; + _curFallbackByteCounter = 0; + } + + void ResultSet::removeFirstNormalPartition() { + assert(!_currentNormalPartitions.empty()); + Partition *first = _currentNormalPartitions.front(); assert(first); // invalidate partition @@ -266,8 +513,18 @@ namespace tuplex { first->invalidate(); // remove partition (is now processed) - _partitions.pop_front(); - _curRowCounter = 0; - _byteCounter = 0; + + _currentNormalPartitions.pop_front(); + _curNormalRowCounter = 0; + _curNormalByteCounter = 0; + } + + size_t ResultSet::fallbackRowCount() const { + size_t count = 0; + for (const auto &p : _currentFallbackPartitions) + count += p->getNumRows(); + for (const auto&p : _remainingFallbackPartitions) + count += p->getNumRows(); + return count; } } \ No newline at end of file diff --git a/tuplex/core/src/physical/StageBuilder.cc b/tuplex/core/src/physical/StageBuilder.cc index 72f01e2b8..8844e27b0 100644 --- a/tuplex/core/src/physical/StageBuilder.cc +++ b/tuplex/core/src/physical/StageBuilder.cc @@ -1051,7 +1051,7 @@ namespace tuplex { bool requireSlowPath = _nullValueOptimization; // per default, slow path is always required when null-value opt is enabled. // special case: input source is cached and no exceptions happened => no resolve path necessary if there are no resolvers! - if(_inputNode->type() == LogicalOperatorType::CACHE && dynamic_cast(_inputNode)->cachedExceptions().empty()) + if(_inputNode->type() == LogicalOperatorType::CACHE && dynamic_cast(_inputNode)->cachedGeneralPartitions().empty() && dynamic_cast(_inputNode)->cachedFallbackPartitions().empty()) requireSlowPath = false; if (numResolveOperators > 0 || requireSlowPath) { diff --git a/tuplex/core/src/physical/TransformStage.cc b/tuplex/core/src/physical/TransformStage.cc index b61f9cbe2..f753e6e41 100644 --- a/tuplex/core/src/physical/TransformStage.cc +++ b/tuplex/core/src/physical/TransformStage.cc @@ -118,29 +118,28 @@ namespace tuplex { _rs = emptyResultSet(); } - void TransformStage::setMemoryResult(const std::vector &partitions, - const std::vector& generalCase, - const std::unordered_map& partitionToExceptionsMap, - const std::vector>& interpreterRows, - const std::vector& remainingExceptions, - const std::unordered_map, size_t> &ecounts) { - setExceptionCounts(ecounts); - - if (partitions.empty() && interpreterRows.empty() && generalCase.empty()) + void TransformStage::setMemoryResult(const std::vector& normalPartitions, + const std::vector& generalPartitions, + const std::vector& fallbackPartitions, + const std::vector& partitionGroups, + const std::unordered_map, size_t>& exceptionCounts) { + setExceptionCounts(exceptionCounts); + + if (normalPartitions.empty() && generalPartitions.empty() && fallbackPartitions.empty()) _rs = emptyResultSet(); else { std::vector limitedPartitions; auto schema = Schema::UNKNOWN; - if(!partitions.empty()) { - schema = partitions.front()->schema(); - for (auto partition : partitions) { + if(!normalPartitions.empty()) { + schema = normalPartitions.front()->schema(); + for (auto partition : normalPartitions) { assert(schema == partition->schema()); } // check output limit, adjust partitions if necessary size_t numOutputRows = 0; - for (auto partition : partitions) { + for (auto partition : normalPartitions) { numOutputRows += partition->getNumRows(); if (numOutputRows >= outputLimit()) { // clip last partition & leave loop @@ -157,10 +156,7 @@ namespace tuplex { } } - // put ALL partitions to result set - _rs = std::make_shared(schema, limitedPartitions, - generalCase, partitionToExceptionsMap, interpreterRows, - outputLimit()); + _rs = std::make_shared(schema, limitedPartitions, generalPartitions, fallbackPartitions, partitionGroups, outputLimit()); } } @@ -654,7 +650,7 @@ namespace tuplex { } case EndPointMode::MEMORY: case EndPointMode::FILE: { - auto p = stage->resultSet()->partitions(); + auto p = stage->resultSet()->normalPartitions(); std::copy(std::begin(p), std::end(p), std::back_inserter(partitions)); break; } diff --git a/tuplex/core/src/physical/TransformTask.cc b/tuplex/core/src/physical/TransformTask.cc index c560c4af4..ffcb6022a 100644 --- a/tuplex/core/src/physical/TransformTask.cc +++ b/tuplex/core/src/physical/TransformTask.cc @@ -543,35 +543,29 @@ namespace tuplex { auto functor = reinterpret_cast(_functor); - auto numInputExceptions = _inputExceptionInfo.numExceptions; - auto inputExceptionIndex = _inputExceptionInfo.exceptionIndex; - auto inputExceptionRowOffset = _inputExceptionInfo.exceptionRowOffset; - auto inputExceptionByteOffset = _inputExceptionInfo.exceptionByteOffset; - - // First, prepare the input exception partitions to pass into the code-gen - // This is done to simplify the LLVM code. We will end up passing it an - // array of expPtrs which point to the first exception in their partition - // and expPtrSizes which tell how many exceptions are in that partition. - auto arrSize = _inputExceptions.size() - inputExceptionIndex; - auto expPtrs = new uint8_t*[arrSize]; - auto expPtrSizes = new int64_t[arrSize]; - int expInd = 0; - // Iterate through all exception partitions beginning at the one specified by the starting index - for (int i = inputExceptionIndex; i < _inputExceptions.size(); ++i) { - auto numRows = _inputExceptions[i]->getNumRows(); - auto ptr = _inputExceptions[i]->lock(); - // If its the first partition, we need to account for the offset - if (i == inputExceptionIndex) { - numRows -= inputExceptionRowOffset; - ptr += inputExceptionByteOffset; - } - expPtrSizes[expInd] = numRows; - expPtrs[expInd] = (uint8_t *) ptr; - expInd++; - } + int64_t totalNormalRowCounter = 0; + int64_t totalGeneralRowCounter = 0; + int64_t totalFallbackRowCounter = 0; + int64_t totalFilterCounter = 0; + + std::vector generalPartitions(_generalPartitions.size(), nullptr); + for (int i = 0; i < _generalPartitions.size(); ++i) + generalPartitions[i] = _generalPartitions[i]->lockWriteRaw(); + int64_t numGeneralPartitions = _generalPartitions.size(); + int64_t generalIndexOffset = 0; + int64_t generalRowOffset = 0; + int64_t generalByteOffset = 0; + + std::vector fallbackPartitions(_fallbackPartitions.size(), nullptr); + for (int i = 0; i < _fallbackPartitions.size(); ++i) + fallbackPartitions[i] = _fallbackPartitions[i]->lockWriteRaw(); + int64_t numFallbackPartitions = _fallbackPartitions.size(); + int64_t fallbackIndexOffset = 0; + int64_t fallbackRowOffset = 0; + int64_t fallbackByteOffset = 0; // go over all input partitions. - for(auto inputPartition : _inputPartitions) { + for(auto &inputPartition : _inputPartitions) { // lock ptr, extract number of rows ==> store them // lock raw & call functor! int64_t inSize = inputPartition->size(); @@ -579,7 +573,10 @@ namespace tuplex { _numInputRowsRead += static_cast(*((int64_t*)inPtr)); // call functor - auto bytesParsed = functor(this, inPtr, inSize, expPtrs, expPtrSizes, numInputExceptions, &num_normal_rows, &num_bad_rows, false); + auto bytesParsed = functor(this, inPtr, inSize, &num_normal_rows, &num_bad_rows, false, + &totalFilterCounter, &totalNormalRowCounter, &totalGeneralRowCounter, &totalFallbackRowCounter, + &generalPartitions[0], numGeneralPartitions, &generalIndexOffset, &generalRowOffset, &generalByteOffset, + &fallbackPartitions[0], numFallbackPartitions, &fallbackIndexOffset, &fallbackRowOffset, &fallbackByteOffset); // save number of normal rows to output rows written if not writeTofile if(hasMemorySink()) @@ -595,13 +592,52 @@ namespace tuplex { inputPartition->invalidate(); } - delete[] expPtrs; - delete[] expPtrSizes; + if (generalIndexOffset < numGeneralPartitions) { + auto curGeneralPtr = generalPartitions[generalIndexOffset]; + auto numRowsInPartition = *((int64_t*)curGeneralPtr); + curGeneralPtr += sizeof(int64_t) + generalByteOffset; + while (generalRowOffset < numRowsInPartition) { + *((int64_t*)curGeneralPtr) -= totalFilterCounter; + curGeneralPtr += 4 * sizeof(int64_t) + ((int64_t*)curGeneralPtr)[3]; + generalRowOffset += 1; + + if (generalRowOffset == numRowsInPartition && generalIndexOffset < numGeneralPartitions - 1) { + generalIndexOffset += 1; + curGeneralPtr = generalPartitions[generalIndexOffset]; + numRowsInPartition = *((int64_t*)curGeneralPtr); + curGeneralPtr += sizeof(int64_t); + generalByteOffset = 0; + generalRowOffset = 0; + } + } + } - for (int i = inputExceptionIndex; i < _inputExceptions.size(); ++i) { - _inputExceptions[i]->unlock(); + if (fallbackIndexOffset < numFallbackPartitions) { + auto curFallbackPtr = fallbackPartitions[fallbackIndexOffset]; + auto numRowsInPartition = *((int64_t*)curFallbackPtr); + curFallbackPtr += sizeof(int64_t) + fallbackByteOffset; + while (fallbackRowOffset < numRowsInPartition) { + *((int64_t*)curFallbackPtr) -= totalFilterCounter; + curFallbackPtr += 4 * sizeof(int64_t) + ((int64_t*)curFallbackPtr)[3]; + fallbackRowOffset += 1; + + if (fallbackRowOffset == numRowsInPartition && fallbackIndexOffset < numFallbackPartitions - 1) { + fallbackIndexOffset += 1; + curFallbackPtr = fallbackPartitions[fallbackIndexOffset]; + numRowsInPartition = *((int64_t*)curFallbackPtr); + curFallbackPtr += sizeof(int64_t); + fallbackByteOffset = 0; + fallbackRowOffset = 0; + } + } } + for (auto &p : _generalPartitions) + p->unlockWrite(); + + for (auto &p : _fallbackPartitions) + p->unlockWrite(); + #ifndef NDEBUG owner()->info("Trafo task memory source exhausted (" + pluralize(_inputPartitions.size(), "partition") + ", " + pluralize(num_normal_rows, "normal row") + ", " + pluralize(num_bad_rows, "exceptional row") + ")"); @@ -963,4 +999,4 @@ namespace tuplex { _htableFormat = fmt; _outputDataSetID = outputDataSetID; } -} \ No newline at end of file +} diff --git a/tuplex/python/include/PythonContext.h b/tuplex/python/include/PythonContext.h index ef4b1d9c2..de2cf5bc5 100644 --- a/tuplex/python/include/PythonContext.h +++ b/tuplex/python/include/PythonContext.h @@ -117,6 +117,15 @@ namespace tuplex { pds.wrap(&_context->makeError(message)); return pds; } + + /*! + * Serialize vector of PyObjects to Tuplex format fallback rows in the form of: + * row index, exception code, operator id, pickled object size, pickled object payload + * @param fallbackRows tuples of row index and fallback rows + * @param executor where fallback rows originated from + * @return vector of partitions holding pickled fallback rows + */ + std::vector serializeFallbackRows(const std::vector>& fallbackRows, Executor* executor); public: /*! @@ -225,4 +234,4 @@ namespace tuplex { } -#endif //TUPLEX_PYTHONCONTEXT_H \ No newline at end of file +#endif //TUPLEX_PYTHONCONTEXT_H diff --git a/tuplex/python/src/PythonContext.cc b/tuplex/python/src/PythonContext.cc index 0e81b667e..35e387e11 100644 --- a/tuplex/python/src/PythonContext.cc +++ b/tuplex/python/src/PythonContext.cc @@ -39,35 +39,38 @@ namespace tuplex { Schema schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::F64})); - std::vector> badParallelizeObjects; - std::vector numExceptionsInPartition; - // check if empty? if(0 == numElements) - return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, std::vector(), std::vector(), std::vector(), columns); // create new partition on driver auto driver = _context->getDriver(); + // contains tuple of row index to python object for non-conforming rows + std::vector> fallbackRows; + // contains fallback rows after they have been serialized to Tuplex format + std::vector fallbackPartitions; + // maps fallback partitions to the normal partitions they originated from + std::vector partitionMergeInfo; + std::vector partitions; Partition* partition = driver->allocWritablePartition(allocMinSize, schema, -1, _context->id()); int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); *rawPtr = 0; double* ptr = (double*)(rawPtr + 1); size_t numBytesSerialized = 0; - size_t prevNumExceptions = 0; - size_t prevNumRows = 0; + auto rowDelta = 0; for(unsigned i = 0; i < numElements; ++i) { auto obj = PyList_GET_ITEM(listObj, i); Py_XINCREF(obj); // check capacity and realloc if necessary get a new partition if(partition->capacity() < numBytesSerialized + sizeof(double)) { - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); - prevNumExceptions = badParallelizeObjects.size(); - prevNumRows += numNewExceptions + *rawPtr; + rowDelta += *rawPtr + fallbackRows.size(); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); @@ -89,15 +92,15 @@ namespace tuplex { val = (double)PyLong_AsLongLong(obj); if(PyErr_Occurred()) { // too large integer? PyErr_Clear(); - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); + assert(i >= rowDelta); + fallbackRows.emplace_back(std::make_tuple(i - rowDelta, obj)); continue; } } } else { - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); + assert(i >= rowDelta); + fallbackRows.emplace_back(std::make_tuple(i - rowDelta, obj)); continue; } } @@ -108,15 +111,16 @@ namespace tuplex { numBytesSerialized += sizeof(double); } - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); // create dataset from partitions. - return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, partitions, fallbackPartitions, partitionMergeInfo, columns); } DataSet& PythonContext::fastI64Parallelize(PyObject* listObj, const std::vector& columns, bool upcast) { @@ -127,16 +131,17 @@ namespace tuplex { Schema schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::I64})); - std::vector> badParallelizeObjects; - std::vector numExceptionsInPartition; - // check if empty? if(0 == numElements) - return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, std::vector(), std::vector(), std::vector(), columns); // create new partition on driver auto driver = _context->getDriver(); + std::vector> fallbackRows; + std::vector fallbackPartitions; + std::vector partitionMergeInfo; + std::vector partitions; Partition* partition = driver->allocWritablePartition(std::max(sizeof(int64_t), allocMinSize), schema, -1, _context->id()); int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); @@ -144,18 +149,18 @@ namespace tuplex { int64_t* ptr = rawPtr + 1; size_t numBytesSerialized = 0; size_t prevNumExceptions = 0; - size_t prevNumRows = 0; + auto rowDelta = 0; for(unsigned i = 0; i < numElements; ++i) { auto obj = PyList_GET_ITEM(listObj, i); Py_XINCREF(obj); // check capacity and realloc if necessary get a new partition if(partition->capacity() < numBytesSerialized + sizeof(int64_t)) { - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); - prevNumExceptions = badParallelizeObjects.size(); - prevNumRows += numNewExceptions + *rawPtr; + rowDelta += *rawPtr + fallbackRows.size(); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); @@ -171,8 +176,8 @@ namespace tuplex { val = PyLong_AsLongLong(obj); if(PyErr_Occurred()) { // too large integer? PyErr_Clear(); - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); + assert(i >= rowDelta); + fallbackRows.emplace_back(std::make_tuple(i - rowDelta, obj)); continue; } } else { @@ -180,8 +185,8 @@ namespace tuplex { if(upcast && (obj == Py_True || obj == Py_False)) val = obj == Py_True; else { - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); + assert(i >= rowDelta); + fallbackRows.emplace_back(std::make_tuple(i - rowDelta, obj)); continue; } } @@ -191,15 +196,16 @@ namespace tuplex { *rawPtr = *rawPtr + 1; numBytesSerialized += sizeof(int64_t); } - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); // create dataset from partitions. - return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, partitions, fallbackPartitions, partitionMergeInfo, columns); } DataSet& PythonContext::fastMixedSimpleTypeTupleTransfer(PyObject *listObj, const python::Type &majType, @@ -215,12 +221,9 @@ namespace tuplex { // now create partitions super fast Schema schema(Schema::MemoryLayout::ROW, majType); - std::vector> badParallelizeObjects; - std::vector numExceptionsInPartition; - // check if empty? if(0 == numElements) - return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, std::vector(), std::vector(), std::vector(), columns); // encode type of tuple quickly into string @@ -232,6 +235,10 @@ namespace tuplex { // create new partition on driver auto driver = _context->getDriver(); + std::vector> fallbackRows; + std::vector fallbackPartitions; + std::vector partitionMergeInfo; + std::vector partitions; Partition* partition = driver->allocWritablePartition(allocMinSize, schema, -1, _context->id()); int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); @@ -239,7 +246,7 @@ namespace tuplex { uint8_t* ptr = (uint8_t*)(rawPtr + 1); size_t numBytesSerialized = 0; size_t prevNumExceptions = 0; - size_t prevNumRows = 0; + auto rowDelta = 0; for(unsigned i = 0; i < numElements; ++i) { auto obj = PyList_GET_ITEM(listObj, i); Py_XINCREF(obj); @@ -266,19 +273,19 @@ namespace tuplex { } } if (nonConforming) { - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(i - prevNumRows, obj); + assert(i >= rowDelta); + fallbackRows.emplace_back(i - rowDelta, obj); continue; } } // get new partition if capacity exhausted if(partition->capacity() < numBytesSerialized + requiredBytes) { - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); - prevNumExceptions = badParallelizeObjects.size(); - prevNumRows += numNewExceptions + *rawPtr; + rowDelta += *rawPtr + fallbackRows.size(); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); @@ -358,11 +365,11 @@ namespace tuplex { // special part when bad row encountered bad_element: ptr = rowStartPtr; - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); + assert(i >= rowDelta); + fallbackRows.emplace_back(i - rowDelta, obj); } else { - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); + assert(i >= rowDelta); + fallbackRows.emplace_back(i - rowDelta, obj); } // serialization code here is a little bit more complicated @@ -371,9 +378,10 @@ namespace tuplex { // (2) is the field containing total varlength // (3) is the actual string content (incl. '\0' delimiter) } - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); @@ -381,7 +389,7 @@ namespace tuplex { delete [] typeStr; // create dataset from partitions. - return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, partitions, fallbackPartitions, partitionMergeInfo, columns); } DataSet& PythonContext::fastBoolParallelize(PyObject *listObj, const std::vector& columns) { @@ -392,17 +400,18 @@ namespace tuplex { Schema schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::BOOLEAN})); - std::vector> badParallelizeObjects; - std::vector numExceptionsInPartition; - // check if empty? if(0 == numElements) - return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, std::vector(), std::vector(), std::vector(), columns); // create new partition on driver auto driver = _context->getDriver(); + std::vector> fallbackRows; + std::vector fallbackPartitions; + std::vector partitionMergeInfo; + std::vector partitions; Partition* partition = driver->allocWritablePartition(std::max(sizeof(int64_t), allocMinSize), schema, -1, _context->id()); int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); @@ -410,18 +419,18 @@ namespace tuplex { int64_t* ptr = rawPtr + 1; size_t numBytesSerialized = 0; size_t prevNumExceptions = 0; - size_t prevNumRows = 0; + auto rowDelta = 0; for(unsigned i = 0; i < numElements; ++i) { auto obj = PyList_GET_ITEM(listObj, i); Py_XINCREF(obj); // check capacity and realloc if necessary get a new partition if(partition->capacity() < numBytesSerialized + sizeof(int64_t)) { - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); - prevNumExceptions = badParallelizeObjects.size(); - prevNumRows += numNewExceptions + *rawPtr; + rowDelta += *rawPtr + fallbackRows.size(); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); @@ -438,20 +447,20 @@ namespace tuplex { *rawPtr = *rawPtr + 1; numBytesSerialized += sizeof(int64_t); } else { - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); + assert(i >= rowDelta); + fallbackRows.emplace_back(i - rowDelta, obj); } } - - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); // create dataset from partitions. - return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, partitions, fallbackPartitions, partitionMergeInfo, columns); } DataSet& PythonContext::fastStrParallelize(PyObject* listObj, const std::vector& columns) { @@ -462,17 +471,18 @@ namespace tuplex { Schema schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::STRING})); - std::vector> badParallelizeObjects; - std::vector numExceptionsInPartition; - // check if empty? if(0 == numElements) - return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, std::vector(), std::vector(), std::vector(), columns); // create new partition on driver auto driver = _context->getDriver(); + std::vector> fallbackRows; + std::vector fallbackPartitions; + std::vector partitionMergeInfo; + std::vector partitions; Partition* partition = driver->allocWritablePartition(allocMinSize, schema, -1, _context->id()); int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); @@ -480,7 +490,7 @@ namespace tuplex { uint8_t* ptr = (uint8_t*)(rawPtr + 1); size_t numBytesSerialized = 0; size_t prevNumExceptions = 0; - size_t prevNumRows = 0; + auto rowDelta = 0; for(unsigned i = 0; i < numElements; ++i) { auto obj = PyList_GET_ITEM(listObj, i); Py_XINCREF(obj); @@ -500,11 +510,11 @@ namespace tuplex { // check capacity and realloc if necessary get a new partition if(partition->capacity() < numBytesSerialized + requiredBytes) { - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); - prevNumExceptions = badParallelizeObjects.size(); - prevNumRows += numNewExceptions + *rawPtr; + rowDelta += *rawPtr + fallbackRows.size(); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); @@ -530,19 +540,20 @@ namespace tuplex { *rawPtr = *rawPtr + 1; numBytesSerialized += requiredBytes; } else { - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); + assert(i >= rowDelta); + fallbackRows.emplace_back(i - rowDelta, obj); } } - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); // create dataset from partitions. - return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, partitions, fallbackPartitions, partitionMergeInfo, columns); } // Returns true if t1 can be considered a subtype of t2, specifically in the context of Option types @@ -579,12 +590,9 @@ namespace tuplex { auto numElements = PyList_Size(listObj); logger.debug("transferring " + std::to_string(numElements) + " elements. "); - std::vector> badParallelizeObjects; - std::vector numExceptionsInPartition; - // check if empty? if(0 == numElements) - return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, std::vector(), std::vector(), std::vector(), columns); auto firstRow = PyList_GET_ITEM(listObj, 0); Py_XINCREF(firstRow); @@ -593,6 +601,10 @@ namespace tuplex { // create new partition on driver auto driver = _context->getDriver(); + std::vector> fallbackRows; + std::vector fallbackPartitions; + std::vector partitionMergeInfo; + std::vector partitions; Partition* partition = driver->allocWritablePartition(allocMinSize, schema, -1, _context->id()); int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); @@ -600,7 +612,7 @@ namespace tuplex { uint8_t* ptr = (uint8_t*)(rawPtr + 1); size_t numBytesSerialized = 0; size_t prevNumExceptions = 0; - size_t prevNumRows = 0; + auto rowDelta = 0; for (unsigned i = 0; i < numElements; ++i) { // because this a slow transfer loop, check explicitly for signals and free anything if there's something... @@ -612,10 +624,10 @@ namespace tuplex { logger.warn("slow transfer to backend interrupted."); // free items (decref) - for(auto t : badParallelizeObjects) { + for(auto t : fallbackRows) { Py_XDECREF(std::get<1>(t)); } - badParallelizeObjects.clear(); + fallbackRows.clear(); return _context->makeError("interrupted transfer"); } @@ -633,11 +645,11 @@ namespace tuplex { auto requiredBytes = row.serializedLength(); if(partition->capacity() < numBytesSerialized + requiredBytes) { - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); - prevNumExceptions = badParallelizeObjects.size(); - prevNumRows += numNewExceptions + *rawPtr; + rowDelta += *rawPtr + fallbackRows.size(); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); @@ -654,17 +666,18 @@ namespace tuplex { *rawPtr = *rawPtr + 1; numBytesSerialized += requiredBytes; } else - badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, item)); + fallbackRows.emplace_back(std::make_tuple(i - rowDelta, item)); } - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); // serialize in main memory - return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, partitions, fallbackPartitions, partitionMergeInfo, columns); } DataSet& PythonContext::strDictParallelize(PyObject *listObj, const python::Type &rowType, @@ -680,16 +693,17 @@ namespace tuplex { assert(rowType.parameters().size() == columns.size()); // also very important!!! Schema schema(Schema::MemoryLayout::ROW, rowType); - std::vector> badParallelizeObjects; - std::vector numExceptionsInPartition; - // check if empty? if(0 == numElements) - return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, std::vector(), std::vector(), std::vector(), columns); // create new partition on driver auto driver = _context->getDriver(); + std::vector> fallbackRows; + std::vector fallbackPartitions; + std::vector partitionMergeInfo; + std::vector partitions; Partition* partition = driver->allocWritablePartition(allocMinSize, schema, -1, _context->id()); int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); @@ -697,7 +711,7 @@ namespace tuplex { uint8_t* ptr = (uint8_t*)(rawPtr + 1); size_t numBytesSerialized = 0; size_t prevNumExceptions = 0; - size_t prevNumRows = 0; + auto rowDelta = 0; for(unsigned i = 0; i < numElements; ++i) { auto obj = PyList_GET_ITEM(listObj, i); Py_XINCREF(obj); @@ -725,11 +739,11 @@ namespace tuplex { size_t requiredBytes = row.serializedLength(); // check capacity and realloc if necessary get a new partition if (partition->capacity() < numBytesSerialized + allocMinSize) { - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); - prevNumExceptions = badParallelizeObjects.size(); - prevNumRows += numNewExceptions + *rawPtr; + rowDelta += *rawPtr + fallbackRows.size(); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); @@ -745,24 +759,25 @@ namespace tuplex { *rawPtr = *rawPtr + 1; numBytesSerialized += requiredBytes; } catch (const std::exception& e) { - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(i - prevNumRows, obj); + assert(i >= rowDelta); + fallbackRows.emplace_back(std::make_tuple(i - rowDelta, obj)); } } else { - assert(i >= prevNumRows); - badParallelizeObjects.emplace_back(i - prevNumRows, obj); + assert(i >= rowDelta); + fallbackRows.emplace_back(i - rowDelta, obj); } } - assert(badParallelizeObjects.size() >= prevNumExceptions); - auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; - numExceptionsInPartition.push_back(numNewExceptions); + auto serializedRows = serializeFallbackRows(fallbackRows, _context->getDriver()); + fallbackRows.clear(); + partitionMergeInfo.push_back(PartitionGroup(1, partitions.size(), 0, 0, serializedRows.size(), fallbackPartitions.size())); + std::copy(serializedRows.begin(), serializedRows.end(), std::back_inserter(fallbackPartitions)); partition->unlockWrite(); partitions.push_back(partition); // create dataset from partitions. - return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); + return _context->fromPartitions(schema, partitions, fallbackPartitions, partitionMergeInfo, columns); } PythonDataSet PythonContext::parallelize(py::list L, @@ -1278,7 +1293,54 @@ namespace tuplex { return co; } - // // running with another python version might lead to severe issues + std::vector PythonContext::serializeFallbackRows(const std::vector>& fallbackRows, Executor* executor) { + std::vector fallbackPartitions; + if (fallbackRows.empty()) { + return fallbackPartitions; + } + + Schema schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::STRING})); + auto partition = executor->allocWritablePartition(allocMinSize, schema, -1, _context->id()); + int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); + *rawPtr = 0; + uint8_t* ptr = (uint8_t*)(rawPtr + 1); + size_t numBytesSerialized = 0; + + for (const auto& row: fallbackRows) { + auto rowNum = std::get<0>(row); + auto pythonObject = std::get<1>(row); + auto ecCode = ecToI64(ExceptionCode::PYTHON_PARALLELIZE); + auto pickledObject = python::pickleObject(python::getMainModule(), pythonObject); + auto pickledObjectSize = pickledObject.size(); + size_t requiredBytes = sizeof(int64_t) * 4 + pickledObjectSize; + + if (partition->capacity() < numBytesSerialized + requiredBytes) { + partition->unlockWrite(); + fallbackPartitions.push_back(partition); + partition = executor->allocWritablePartition(allocMinSize, schema, -1, _context->id()); + rawPtr = (int64_t *) partition->lockWriteRaw(); + *rawPtr = 0; + ptr = (uint8_t * )(rawPtr + 1); + numBytesSerialized = 0; + } + + *((int64_t*)(ptr)) = rowNum; ptr += sizeof(int64_t); + *((int64_t*)(ptr)) = ecCode; ptr += sizeof(int64_t); + *((int64_t*)(ptr)) = -1; ptr += sizeof(int64_t); + *((int64_t*)(ptr)) = pickledObjectSize; ptr += sizeof(int64_t); + memcpy(ptr, pickledObject.c_str(), pickledObjectSize); ptr += pickledObjectSize; + + *rawPtr = *rawPtr + 1; + numBytesSerialized += requiredBytes; + } + + partition->unlockWrite(); + fallbackPartitions.push_back(partition); + + return fallbackPartitions; + } + + // // running with another python version might lead to severe issues // // hence, perform check at context startup! // bool checkPythonVersion() { // using namespace std; diff --git a/tuplex/python/src/PythonDataSet.cc b/tuplex/python/src/PythonDataSet.cc index 36f9a392b..3dd65d262 100644 --- a/tuplex/python/src/PythonDataSet.cc +++ b/tuplex/python/src/PythonDataSet.cc @@ -909,8 +909,8 @@ namespace tuplex { // retrieve full partitions for speed Partition *partition = nullptr; size_t pos = 0; - while (rs->hasNextPartition() && pos < maxRowCount) { - partition = rs->getNextPartition(); + while (rs->hasNextNormalPartition() && pos < maxRowCount) { + partition = rs->getNextNormalPartition(); auto schema = partition->schema(); // single value? --> reset rowtype by one level auto type = schema.getRowType(); @@ -964,8 +964,8 @@ namespace tuplex { Partition *partition = nullptr; size_t pos = 0; - while (rs->hasNextPartition() && pos < maxRowCount) { - partition = rs->getNextPartition(); + while (rs->hasNextNormalPartition() && pos < maxRowCount) { + partition = rs->getNextNormalPartition(); // add memory towards list object auto ptr = partition->lockRaw(); @@ -1002,8 +1002,8 @@ namespace tuplex { Partition *partition = nullptr; size_t pos = 0; - while (rs->hasNextPartition() && pos < maxRowCount) { - partition = rs->getNextPartition(); + while (rs->hasNextNormalPartition() && pos < maxRowCount) { + partition = rs->getNextNormalPartition(); // add memory towards list object auto ptr = partition->lockRaw(); @@ -1042,8 +1042,8 @@ namespace tuplex { Partition *partition = nullptr; size_t pos = 0; - while (rs->hasNextPartition() && pos < maxRowCount) { - partition = rs->getNextPartition(); + while (rs->hasNextNormalPartition() && pos < maxRowCount) { + partition = rs->getNextNormalPartition(); // add memory towards list object auto ptr = partition->lockRaw(); @@ -1091,8 +1091,8 @@ namespace tuplex { Partition *partition = nullptr; size_t pos = 0; - while (rs->hasNextPartition() && pos < maxRowCount) { - partition = rs->getNextPartition(); + while (rs->hasNextNormalPartition() && pos < maxRowCount) { + partition = rs->getNextNormalPartition(); // add memory towards list object auto ptr = partition->lockRaw(); @@ -1147,8 +1147,8 @@ namespace tuplex { Partition *partition = nullptr; size_t pos = 0; - while (rs->hasNextPartition() && pos < maxRowCount) { - partition = rs->getNextPartition(); + while (rs->hasNextNormalPartition() && pos < maxRowCount) { + partition = rs->getNextNormalPartition(); // add memory towards list object auto ptr = partition->lockRaw(); @@ -1191,8 +1191,8 @@ namespace tuplex { Partition *partition = nullptr; size_t pos = 0; - while (rs->hasNextPartition() && pos < maxRowCount) { - partition = rs->getNextPartition(); + while (rs->hasNextNormalPartition() && pos < maxRowCount) { + partition = rs->getNextNormalPartition(); // add memory towards list object auto ptr = partition->lockRaw(); @@ -1251,8 +1251,8 @@ namespace tuplex { Partition* partition = nullptr; size_t pos = 0; - while(rs->hasNextPartition() && pos < maxRowCount) { - partition = rs->getNextPartition(); + while(rs->hasNextNormalPartition() && pos < maxRowCount) { + partition = rs->getNextNormalPartition(); // add memory towards list object auto ptr = partition->lockRaw(); @@ -1348,7 +1348,7 @@ namespace tuplex { // b.c. merging of arbitrary python objects is not implemented yet, whenever they're present, use general // version // @TODO: this could be optimized! - if(rs->pyobject_count() != 0) + if(rs->fallbackRowCount() != 0) return anyToCPythonWithPyObjects(rs, maxRowCount); auto type = rs->schema().getRowType(); diff --git a/tuplex/python/src/PythonWrappers.cc b/tuplex/python/src/PythonWrappers.cc index e680765ba..ba68cc6df 100644 --- a/tuplex/python/src/PythonWrappers.cc +++ b/tuplex/python/src/PythonWrappers.cc @@ -172,8 +172,8 @@ namespace tuplex { Partition* partition = nullptr; size_t pos = 0; - while(rs->hasNextPartition()) { - partition = rs->getNextPartition(); + while(rs->hasNextNormalPartition()) { + partition = rs->getNextNormalPartition(); // add memory towards list object auto ptr = partition->lockRaw(); diff --git a/tuplex/test/core/ResultSetTest.cc b/tuplex/test/core/ResultSetTest.cc index 4acd38921..4aedd0649 100644 --- a/tuplex/test/core/ResultSetTest.cc +++ b/tuplex/test/core/ResultSetTest.cc @@ -51,6 +51,57 @@ class ResultSetTest : public PyTest { return pw.getOutputPartitions(); } + std::vector pyObjectsToPartitions(const std::vector>& pyObjects) { + using namespace tuplex; + + std::vector partitions; + if (pyObjects.empty()) { + return partitions; + } + + Schema schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::STRING})); + Partition* partition = allocPartition(schema.getRowType(), -1); + auto rawPtr = (int64_t*)partition->lockWriteRaw(); + *rawPtr = 0; + auto ptr = (uint8_t*)(rawPtr + 1); + size_t numBytesSerialized = 0; + + python::lockGIL(); + for (auto &row: pyObjects) { + auto rowNum = std::get<0>(row); + auto pyObj = std::get<1>(row); + auto ecCode = -1; + auto opID = -1; + auto pickledObject = python::pickleObject(python::getMainModule(), pyObj); + auto pickledObjectSize = pickledObject.size(); + size_t requiredBytes = sizeof(int64_t) * 4 + pickledObjectSize; + + if (partition->capacity() < numBytesSerialized + requiredBytes) { + partition->unlockWrite(); + partitions.push_back(partition); + partition = allocPartition(schema.getRowType(), -1); + rawPtr = (int64_t *) partition->lockWriteRaw(); + *rawPtr = 0; + ptr = (uint8_t*)(rawPtr + 1); + numBytesSerialized = 0; + } + + *((int64_t*)ptr) = rowNum; ptr += sizeof(int64_t); + *((int64_t*)ptr) = ecCode; ptr += sizeof(int64_t); + *((int64_t*)ptr) = opID; ptr += sizeof(int64_t); + *((int64_t*)ptr) = pickledObjectSize; ptr += sizeof(int64_t); + memcpy(ptr, pickledObject.c_str(), pickledObjectSize); ptr += pickledObjectSize; + + *rawPtr += 1; + numBytesSerialized += requiredBytes; + } + python::unlockGIL(); + + partition->unlockWrite(); + partitions.push_back(partition); + + return partitions; + } }; TEST_F(ResultSetTest, NoPyObjects) { @@ -68,10 +119,13 @@ TEST_F(ResultSetTest, NoPyObjects) { sample_rows.push_back(Row(rand() % 256, rand() % 256 * 0.1 - 1.0, strs[rand() % strs.size()])); } auto partitions = rowsToPartitions(sample_rows); - for(auto p : partitions) - p->makeImmortal(); + std::vector partitionGroups; + for(int i = 0; i < partitions.size(); ++i) { + partitions[i]->makeImmortal(); + partitionGroups.push_back(PartitionGroup(1, i, 0, 0, 0, 0)); + } - auto rsA = make_shared(Schema(Schema::MemoryLayout::ROW, sample_rows.front().getRowType()), partitions); + auto rsA = make_shared(Schema(Schema::MemoryLayout::ROW, sample_rows.front().getRowType()), partitions, std::vector{}, std::vector{}, partitionGroups); EXPECT_EQ(rsA->rowCount(), sample_rows.size()); // check correct order returned @@ -79,13 +133,14 @@ TEST_F(ResultSetTest, NoPyObjects) { while(rsA->hasNextRow()) { EXPECT_EQ(rsA->getNextRow().toPythonString(), sample_rows[pos++].toPythonString()); } + EXPECT_EQ(pos, sample_rows.size()); // now limit result set to 17 rows, check this works as well! int Nlimit = 17; auto rsB = make_shared(Schema(Schema::MemoryLayout::ROW, sample_rows.front().getRowType()), partitions, std::vector{}, - std::unordered_map(), - vector>{}, + std::vector{}, + partitionGroups, Nlimit); pos = 0; while(rsB->hasNextRow()) { @@ -137,13 +192,15 @@ TEST_F(ResultSetTest, WithPyObjects) { vector refC = {Row(10), Row(20), Row(30), Row(35), Row(37)}; vector refD = {Row(-1), Row(0), Row(1)}; + auto partitionGroups = std::vector{PartitionGroup(1,0,0,0,1,0)}; + // TEST A: // ----------------- auto rsA = make_shared(Schema(Schema::MemoryLayout::ROW, rows.front().getRowType()), partitions, std::vector{}, - std::unordered_map(), - objsA); + pyObjectsToPartitions(objsA), + partitionGroups); EXPECT_EQ(rsA->rowCount(), objsA.size() + rows.size()); pos = 0; while(rsA->hasNextRow()) { @@ -156,8 +213,8 @@ TEST_F(ResultSetTest, WithPyObjects) { auto rsB = make_shared(Schema(Schema::MemoryLayout::ROW, rows.front().getRowType()), partitions, std::vector{}, - std::unordered_map(), - objsB); + pyObjectsToPartitions(objsB), + partitionGroups); EXPECT_EQ(rsB->rowCount(), objsB.size() + rows.size()); pos = 0; while(rsB->hasNextRow()) { @@ -171,8 +228,8 @@ TEST_F(ResultSetTest, WithPyObjects) { auto rsC = make_shared(Schema(Schema::MemoryLayout::ROW, rows.front().getRowType()), partitions, std::vector{}, - std::unordered_map(), - objsC); + pyObjectsToPartitions(objsC), + partitionGroups); EXPECT_EQ(rsC->rowCount(), objsC.size() + rows.size()); pos = 0; while(rsC->hasNextRow()) { @@ -180,6 +237,8 @@ TEST_F(ResultSetTest, WithPyObjects) { EXPECT_EQ(rsC->getNextRow().toPythonString(), refC[pos++].toPythonString()); } + partitionGroups = std::vector{PartitionGroup(0, 0, 0, 0, 1, 0)}; + // TEST D: // ------- // only pyobjects. @@ -188,8 +247,8 @@ TEST_F(ResultSetTest, WithPyObjects) { auto rsD = make_shared(Schema(Schema::MemoryLayout::ROW, rows.front().getRowType()), std::vector{}, std::vector{}, - std::unordered_map(), - objsD); + pyObjectsToPartitions(objsD), + partitionGroups); EXPECT_EQ(rsD->rowCount(), objsD.size()); pos = 0; while(rsD->hasNextRow()) { diff --git a/tuplex/test/wrappers/WrapperTest.cc b/tuplex/test/wrappers/WrapperTest.cc index 5e855e4d5..52d3fc137 100644 --- a/tuplex/test/wrappers/WrapperTest.cc +++ b/tuplex/test/wrappers/WrapperTest.cc @@ -17,60 +17,23 @@ #include #include #include +#include "../core/TestUtils.h" #include // need for these tests a running python interpreter, so spin it up -class WrapperTest : public ::testing::Test { -protected: - std::string testName; - std::string scratchDir; - +class WrapperTest : public TuplexTest { void SetUp() override { - testName = std::string(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) + std::string(::testing::UnitTest::GetInstance()->current_test_info()->name()); - scratchDir = "/tmp/" + testName; + TuplexTest::SetUp(); python::initInterpreter(); - - // hold GIL assert(python::holdsGIL()); } void TearDown() override { - - // important to get GIL for this + TuplexTest::TearDown(); python::closeInterpreter(); } - - inline void remove_temp_files() { - tuplex::Timer timer; - boost::filesystem::remove_all(scratchDir.c_str()); - std::cout<<"removed temp files in "<(listObj); + auto res = c.parallelize(list).map("lambda x: 1 // x if x == 0 else x", "").resolve(ecToI64(ExceptionCode::ZERODIVISIONERROR), "lambda x: -1", "").collect(); + auto resObj = res.ptr(); + + ASSERT_EQ(PyList_Size(resObj), PyList_Size(expectedResult)); + for (int i = 0; i < PyList_Size(expectedResult); ++i) { + EXPECT_EQ(python::pythonToRow(PyList_GetItem(resObj, i)).toPythonString(), python::pythonToRow( + PyList_GetItem(expectedResult, i)).toPythonString()); + } + } +} + TEST_F(WrapperTest, StringTuple) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject *listObj = PyList_New(4); PyObject *tupleObj1 = PyTuple_New(2); @@ -131,7 +130,7 @@ TEST_F(WrapperTest, StringTuple) { TEST_F(WrapperTest, MixedSimpleTupleTuple) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject *listObj = PyList_New(4); PyObject *tupleObj1 = PyTuple_New(2); @@ -170,7 +169,7 @@ TEST_F(WrapperTest, MixedSimpleTupleTuple) { TEST_F(WrapperTest, StringParallelize) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(3); PyList_SET_ITEM(listObj, 0, python::PyString_FromString("Hello")); @@ -194,7 +193,7 @@ TEST_F(WrapperTest, StringParallelize) { TEST_F(WrapperTest, DictionaryParallelize) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * dictObj1 = PyDict_New(); PyDict_SetItem(dictObj1, python::PyString_FromString("a"), PyFloat_FromDouble(0.0)); @@ -243,7 +242,7 @@ TEST_F(WrapperTest, SimpleCSVParse) { PyDict_SetItemString(pyopt, "tuplex.webui.enable", Py_False); // RAII, destruct python context! - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); // weird block syntax due to RAII problems. { @@ -274,7 +273,7 @@ TEST_F(WrapperTest, SimpleCSVParse) { TEST_F(WrapperTest, GetOptions) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); // weird RAII problems of boost python { @@ -290,8 +289,8 @@ TEST_F(WrapperTest, GetOptions) { TEST_F(WrapperTest, TwoContexts) { using namespace tuplex; - PythonContext c("", "", testOptions()); - PythonContext c2("", "", testOptions()); + PythonContext c("", "", microTestOptions().asJSON()); + PythonContext c2("", "", microTestOptions().asJSON()); { auto opt1 = c.options(); @@ -315,7 +314,7 @@ TEST_F(WrapperTest, Show) { PyDict_SetItemString(pyopt, "tuplex.webui.enable", Py_False); // RAII, destruct python context! - PythonContext c("python", "", testOptions()); + PythonContext c("python", "", microTestOptions().asJSON()); // weird block syntax due to RAII problems. { @@ -340,7 +339,7 @@ TEST_F(WrapperTest, GoogleTrace) { PyDict_SetItemString(pyopt, "tuplex.webui.enable", Py_False); // RAII, destruct python context! - PythonContext c("python", "", testOptions()); + PythonContext c("python", "", testOptions().asJSON()); /// Based on Google trace data, this mini pipeline serves as CSV parsing test ground. /// c.csv(file_path) \ /// .filter(lambda x: x[3] == 0) \ @@ -487,7 +486,7 @@ TEST_F(WrapperTest, extractPriceExample) { auto cols = py::reinterpret_borrow(colObj); // RAII, destruct python context! - PythonContext c("python", "", testOptions()); + PythonContext c("python", "", testOptions().asJSON()); { // all calls go here... @@ -587,7 +586,7 @@ TEST_F(WrapperTest, DictListParallelize) { using namespace tuplex; // RAII, destruct python context! - PythonContext c("python", "", testOptions()); + PythonContext c("python", "", microTestOptions().asJSON()); // weird block syntax due to RAII problems. { @@ -624,9 +623,9 @@ TEST_F(WrapperTest, UpcastParallelizeI) { using namespace tuplex; // RAII, destruct python context! - auto opts = testOptions(); - opts = opts.substr(0, opts.length() - 1) + ", \"tuplex.autoUpcast\":\"True\"}"; - PythonContext c("python", "", opts); + auto opts = microTestOptions(); + opts.set("tuplex.autoUpcast", "true"); + PythonContext c("python", "", opts.asJSON()); // weird block syntax due to RAII problems. { @@ -656,9 +655,9 @@ TEST_F(WrapperTest, UpcastParallelizeII) { using namespace tuplex; // RAII, destruct python context! - auto opts = testOptions(); - opts = opts.substr(0, opts.length() - 1) + ", \"tuplex.autoUpcast\":\"True\"}"; - PythonContext c("python", "", opts); + auto opts = microTestOptions(); + opts.set("tuplex.autoUpcast", "true"); + PythonContext c("python", "", opts.asJSON()); // weird block syntax due to RAII problems. { @@ -689,8 +688,8 @@ TEST_F(WrapperTest, OptionListTest) { // RAII, destruct python context! auto opts = testOptions(); - opts = opts.substr(0, opts.length() - 1) + ", \"tuplex.autoUpcast\":\"True\"}"; - PythonContext c("python", "", opts); + opts.set("tuplex.autoUpcast", "true"); + PythonContext c("python", "", opts.asJSON()); // weird block syntax due to RAII problems. { @@ -741,9 +740,9 @@ TEST_F(WrapperTest, FilterAll) { using namespace tuplex; // RAII, destruct python context! - auto opts = testOptions(); - opts = opts.substr(0, opts.length() - 1) + ",\"tuplex.autoUpcast\":\"True\"}"; - PythonContext c("python", "", opts); + auto opts = microTestOptions(); + opts.set("tuplex.autoUpcast", "true"); + PythonContext c("python", "", opts.asJSON()); // weird block syntax due to RAII problems. { @@ -768,7 +767,7 @@ TEST_F(WrapperTest, ColumnNames) { using namespace tuplex; // RAII, destruct python context! - PythonContext c("python", "", testOptions()); + PythonContext c("python", "", microTestOptions().asJSON()); // weird block syntax due to RAII problems. { @@ -830,9 +829,9 @@ TEST_F(WrapperTest, IntegerTuple) { PyDict_SetItemString(pyopt, "tuplex.autoUpcast", Py_True); // RAII, destruct python context! - auto opts = testOptions(); - opts = opts.substr(0, opts.length() - 1) + ",\"tuplex.autoUpcast\":\"True\"}"; - PythonContext c("python", "", opts); + auto opts = microTestOptions(); + opts.set("tuplex.autoUpcast", "true"); + PythonContext c("python", "", opts.asJSON()); // weird block syntax due to RAII problems. { @@ -887,8 +886,9 @@ TEST_F(WrapperTest, IfWithNull) { // RAII, destruct python context! auto opts = testOptions(); - opts = opts.substr(0, opts.length() - 1) + ",\"tuplex.useLLVMOptimizer\" : \"False\", \"tuplex.executorCount\":0}"; - PythonContext c("python", "", opts); + opts.set("tuplex.useLLVMOptimizer", "false"); + opts.set("tuplex.executorCount", "0"); + PythonContext c("python", "", opts.asJSON()); // execute mini part of pipeline and output csv to file // pipeline is // df = ctx.csv(perf_path) @@ -962,8 +962,9 @@ TEST_F(WrapperTest, FlightData) { // RAII, destruct python context! auto opts = testOptions(); - opts = opts.substr(0, opts.length() - 1) + ",\"tuplex.useLLVMOptimizer\" : \"False\", \"tuplex.executorCount\":0}"; - PythonContext c("python", "", opts); + opts.set("tuplex.useLLVMOptimizer", "false"); + opts.set("tuplex.executorCount", "0"); + PythonContext c("python", "", opts.asJSON()); // execute mini part of pipeline and output csv to file // pipeline is // df = ctx.csv(perf_path) @@ -1171,7 +1172,7 @@ TEST_F(WrapperTest, Airport) { // RAII, destruct python context! PythonContext c("python", "", - testOptions()); + testOptions().asJSON()); // execute mini part of pipeline and output csv to file // pipeline is @@ -1215,7 +1216,7 @@ TEST_F(WrapperTest, Airport) { TEST_F(WrapperTest, OptionParallelizeI) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(5); PyList_SET_ITEM(listObj, 0, PyLong_FromLong(112)); @@ -1245,7 +1246,7 @@ TEST_F(WrapperTest, OptionParallelizeI) { TEST_F(WrapperTest, OptionParallelizeII) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(5); @@ -1288,7 +1289,7 @@ TEST_F(WrapperTest, OptionParallelizeII) { TEST_F(WrapperTest, NoneParallelize) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(2); PyList_SET_ITEM(listObj, 0, Py_None); @@ -1312,7 +1313,7 @@ TEST_F(WrapperTest, NoneParallelize) { TEST_F(WrapperTest, EmptyMapI) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(4); PyList_SET_ITEM(listObj, 0, PyLong_FromLong(1)); @@ -1340,7 +1341,7 @@ TEST_F(WrapperTest, EmptyMapI) { TEST_F(WrapperTest, EmptyMapII) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(4); PyList_SET_ITEM(listObj, 0, PyLong_FromLong(1)); @@ -1372,7 +1373,7 @@ TEST_F(WrapperTest, EmptyMapII) { TEST_F(WrapperTest, EmptyMapIII) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(4); PyList_SET_ITEM(listObj, 0, PyLong_FromLong(1)); @@ -1404,7 +1405,7 @@ TEST_F(WrapperTest, EmptyMapIII) { TEST_F(WrapperTest, EmptyOptionMapI) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(4); PyList_SET_ITEM(listObj, 0, PyLong_FromLong(1)); @@ -1434,7 +1435,7 @@ TEST_F(WrapperTest, EmptyOptionMapI) { TEST_F(WrapperTest, EmptyOptionMapII) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(4); PyList_SET_ITEM(listObj, 0, PyLong_FromLong(1)); @@ -1464,7 +1465,7 @@ TEST_F(WrapperTest, EmptyOptionMapII) { TEST_F(WrapperTest, OptionTupleParallelizeI) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(3); @@ -1513,7 +1514,7 @@ TEST_F(WrapperTest, OptionTupleParallelizeI) { TEST_F(WrapperTest, OptionTupleParallelizeII) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(3); @@ -1562,7 +1563,7 @@ TEST_F(WrapperTest, OptionTupleParallelizeII) { TEST_F(WrapperTest, OptionTupleParallelizeIII) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = PyList_New(3); @@ -1611,7 +1612,7 @@ TEST_F(WrapperTest, OptionTupleParallelizeIII) { TEST_F(WrapperTest, parallelizeOptionTypeI) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = python::runAndGet( "test_input = [(1.0, '2', 3, '4', 5, 6, True, 8, 9, None), (None, '2', 3, None, 5, 6, True, 8, 9, None)" @@ -1638,7 +1639,7 @@ TEST_F(WrapperTest, parallelizeOptionTypeI) { TEST_F(WrapperTest, parallelizeNestedSlice) { using namespace tuplex; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); PyObject * listObj = python::runAndGet( "test_input = [((), (\"hello\",), 123, \"oh no\", (1, 2)), ((), (\"goodbye\",), 123, \"yes\", (-10, 2)),\n" @@ -1670,7 +1671,7 @@ TEST_F(WrapperTest, TPCHQ6) { " 'l_discount', 'l_tax', 'l_returnflag', 'l_linestatus',\n" " 'l_shipdate', 'l_commitdate', 'l_receiptdate',\n" " 'l_shipinstruct', 'l_shipmode', 'l_comment']", "listitem_columns"); - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", testOptions().asJSON()); { @@ -1692,7 +1693,7 @@ TEST_F(WrapperTest, TupleParallelizeI) { PyObject* listObj = python::runAndGet("L = [('hello', 'world', 'hi', 1, 2, 3), ('foo', 'bar', 'baz', 4, 5, 6), ('blank', '', 'not', 7, 8, 9)]", "L"); - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); { auto list = py::reinterpret_borrow(listObj); c.parallelize(list).map("lambda x: ({x[0]: x[3], x[1]: x[4], x[2]: x[5]},)", "").show(); @@ -1704,7 +1705,7 @@ TEST_F(WrapperTest, TupleParallelizeII) { PyObject* listObj = python::runAndGet("L = [({}, {}, {}), ({}, {}, {}), ({}, {}, {})]", "L"); - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); { auto list = py::reinterpret_borrow(listObj); c.parallelize(list).map("lambda x, y, z: [x, y, z]", "").show(); @@ -1721,7 +1722,7 @@ TEST_F(WrapperTest, DictParallelizeRefTest) { PyObject* strings = python::runAndGet("strings = [('hello', 'world', 'hi'), ('foo', 'bar', 'baz'), ('blank', '', 'not')]\n", "strings"); PyObject* floats = python::runAndGet("floats = [(1.2, 3.4, -100.2), (5.6, 7.8, -1.234), (9.0, 0.1, 2.3)]\n", "floats"); ASSERT_TRUE(floats->ob_refcnt > 0); - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); { @@ -1764,7 +1765,7 @@ TEST_F(WrapperTest, DictParallelizeRefTest) { TEST_F(WrapperTest, BuiltinModule) { using namespace tuplex; using namespace std; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); { PyObject* L = PyList_New(3); @@ -1796,7 +1797,7 @@ TEST_F(WrapperTest, SwapIII) { " return a, b\n" "\n"; - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); { PyObject* L = PyList_New(2); auto tuple1 = PyTuple_New(2); @@ -2205,7 +2206,7 @@ TEST_F(WrapperTest, BitwiseAnd) { PyObject* listObj = python::runAndGet("L = [(False, False), (False, True), (True, False), (True, True)]", "L"); - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); { auto list = py::reinterpret_borrow(listObj); auto res_list = c.parallelize(list).map("lambda a, b: a & b", "").collect(); @@ -2221,7 +2222,7 @@ TEST_F(WrapperTest, MetricsTest) { PyObject* listObj = python::runAndGet("L = [(False, False), (False, True), (True, False), (True, True)]", "L"); - PythonContext c("c", "", testOptions()); + PythonContext c("c", "", microTestOptions().asJSON()); { auto list = py::reinterpret_borrow(listObj); auto res_list = c.parallelize(list).map("lambda a, b: a & b", "").collect(); @@ -2413,9 +2414,9 @@ TEST_F(WrapperTest, MixedTypesIsWithNone) { using namespace tuplex; using namespace std; - auto opts = testOptions(); - opts = opts.substr(0, opts.length() - 1) + ",\"tuplex.optimizer.mergeExceptionsInOrder\":\"True\"}"; - PythonContext c("python", "", opts); + auto opts = microTestOptions(); + opts.set("tuplex.optimizer.mergeExceptionsInOrder", "true"); + PythonContext c("python", "", opts.asJSON()); PyObject *listObj = PyList_New(8); PyList_SetItem(listObj, 0, Py_None); @@ -2677,4 +2678,4 @@ TEST_F(WrapperTest, PartitionRelease) { // Py_XDECREF(pResult); // Py_Finalize(); // return return_value; -//} \ No newline at end of file +//}