diff --git a/tuplex/awslambda/src/lambda_main.cc b/tuplex/awslambda/src/lambda_main.cc index 8e1c83f58..6f68cb83c 100644 --- a/tuplex/awslambda/src/lambda_main.cc +++ b/tuplex/awslambda/src/lambda_main.cc @@ -329,7 +329,7 @@ tuplex::messages::InvocationResponse lambda_main(aws::lambda_runtime::invocation // parse protobuf request tuplex::messages::InvocationRequest req; auto rc = google::protobuf::util::JsonStringToMessage(lambda_req.payload, &req); - if(rc != google::protobuf::util::Status::OK) + if(!rc.ok()) throw std::runtime_error("could not parse json into protobuf message, bad parse for request"); timer.mark_time("decode_request"); diff --git a/tuplex/core/include/Context.h b/tuplex/core/include/Context.h index 5de874b42..b75409533 100644 --- a/tuplex/core/include/Context.h +++ b/tuplex/core/include/Context.h @@ -37,6 +37,7 @@ namespace tuplex { class Executor; class Partition; class IBackend; + class ExceptionInfo; class Context { private: @@ -58,7 +59,25 @@ namespace tuplex { // needed because of C++ template issues void addPartition(DataSet* ds, Partition *partition); - void addParallelizeNode(DataSet *ds); //! adds a paralellize node to the computation graph + void addParallelizeNode(DataSet *ds, const std::vector> &badParallelizeObjects=std::vector>(), const std::vector &numExceptionsInPartition=std::vector()); //! adds a paralellize node to the computation graph + + /*! + * serialize python objects as pickled objects into exception partitions. Set the python objects map to + * map all normalPartitions to the exceptions that occured within them. + * @param pythonObjects normal case schema violations and their initial row numbers + * @param numExceptionsInPartition number of exceptions in each normal partition + * @param normalPartitions normal partitions created + * @param opID parallelize operator ID + * @param serializedPythonObjects output vector for partitions + * @param pythonObjectsMap output for mapping + */ + void serializePythonObjects(const std::vector>& pythonObjects, + const std::vector &numExceptionsInPartition, + const std::vector &normalPartitions, + const int64_t opID, + std::vector &serializedPythonObjects, + std::unordered_map &pythonObjectsMap); + Partition* requestNewPartition(const Schema& schema, const int dataSetID, size_t minBytesRequired); uint8_t* partitionLockRaw(Partition *partition); void partitionUnlock(Partition *partition); @@ -111,7 +130,8 @@ namespace tuplex { columnNames); } - DataSet& parallelize(const std::vector& L, const std::vector& columnNames=std::vector()); + DataSet& parallelize(const std::vector& L, + const std::vector& columnNames=std::vector()); DataSet& parallelize(std::initializer_list L, const std::vector& columnNames=std::vector()) { if(!columnNames.empty()) @@ -252,9 +272,11 @@ namespace tuplex { * empty dataset will be created. * * @param columns optional column names + * @param badParallelizeObjects schema violations found during parallelization of partitions + * @param numExceptionsInPartition number of schema violations that occured in each of the partitions * @return reference to newly created dataset. */ - DataSet& fromPartitions(const Schema& schema, const std::vector& partitions, const std::vector& columns); + DataSet& fromPartitions(const Schema& schema, const std::vector& partitions, const std::vector& columns, const std::vector> &badParallelizeObjects, const std::vector &numExceptionsInPartition); }; // needed for template mechanism to work #include diff --git a/tuplex/core/include/ExceptionInfo.h b/tuplex/core/include/ExceptionInfo.h new file mode 100644 index 000000000..d6ee35886 --- /dev/null +++ b/tuplex/core/include/ExceptionInfo.h @@ -0,0 +1,47 @@ +//--------------------------------------------------------------------------------------------------------------------// +// // +// Tuplex: Blazing Fast Python Data Science // +// // +// // +// (c) 2017 - 2021, Tuplex team // +// Created by Benjamin Givertz first on 1/1/2022 // +// License: Apache 2.0 // +//--------------------------------------------------------------------------------------------------------------------// + +#ifndef TUPLEX_EXCEPTIONINFO_H +#define TUPLEX_EXCEPTIONINFO_H + +namespace tuplex { + /*! + * Struct to hold information that maps input partitions to input exceptions that occur within them. + * + * Explanation: + * Each input partition is passed the same vector of all input exceptions that occured during data parallelization + * or caching. Thus, each input partition must know how many input exceptions occur in its partition, the index + * of the input exception partition where its first exception occurs, and the offset into that partition where the + * first exception occurs. These values are held in this struct and each input partition is mapped to an ExceptionInfo. + */ + struct ExceptionInfo { + size_t numExceptions; //! number of exception rows that occur within a single input partition + size_t exceptionIndex; //! index into a vector of input exception partitions that holds the first input exception + size_t exceptionRowOffset; //! offset in rows into the first input exception partition where the first exception occurs. + size_t exceptionByteOffset; //! offset in bytes into the first input exception partition where the first exception occurs + + ExceptionInfo() : + numExceptions(0), + exceptionIndex(0), + exceptionRowOffset(0), + exceptionByteOffset(0) {} + + ExceptionInfo(size_t numExps, + size_t expIndex, + size_t expRowOffset, + size_t expByteOffset) : + numExceptions(numExps), + exceptionIndex(expIndex), + exceptionRowOffset(expRowOffset), + exceptionByteOffset(expByteOffset) {} + }; +} + +#endif //TUPLEX_EXCEPTIONINFO_H \ No newline at end of file diff --git a/tuplex/core/include/ee/local/LocalBackend.h b/tuplex/core/include/ee/local/LocalBackend.h index 33f7a1237..77d375aed 100644 --- a/tuplex/core/include/ee/local/LocalBackend.h +++ b/tuplex/core/include/ee/local/LocalBackend.h @@ -66,7 +66,7 @@ namespace tuplex { void freeExecutors(); - std::vector createLoadAndTransformToMemoryTasks(TransformStage* tstage, const ContextOptions& options, codegen::read_block_f functor); + std::vector createLoadAndTransformToMemoryTasks(TransformStage* tstage, const ContextOptions& options, const std::shared_ptr& syms); void executeTransformStage(TransformStage* tstage); diff --git a/tuplex/core/include/logical/CacheOperator.h b/tuplex/core/include/logical/CacheOperator.h index 31346bbfd..563aa8f0b 100644 --- a/tuplex/core/include/logical/CacheOperator.h +++ b/tuplex/core/include/logical/CacheOperator.h @@ -81,6 +81,8 @@ namespace tuplex { bool isCached() const { return _cached; } std::vector cachedPartitions() const { return _normalCasePartitions; } std::vector cachedExceptions() const { return _generalCasePartitions; } + std::unordered_map partitionToExceptionsMap() const { return _partitionToExceptionsMap; } + size_t getTotalCachedRows() const; int64_t cost() const override; @@ -109,7 +111,8 @@ namespace tuplex { std::vector _generalCasePartitions; //! holds all data which is considered to be a normal-case violation, //! i.e. which does not adhere to the normal case schema, but did not produce //! an exception while being processed through the pipeline before - std::vector _py_objects; //! all python objects who do not adhere to the general case schema ( + std::unordered_map _partitionToExceptionsMap; //! maps normal case partitions to corresponding general case ones + std::vector _py_objects; //! all python objects who do not adhere to the general case schema std::vector _columns; // internal sample of normal case rows, used for tracing & Co. diff --git a/tuplex/core/include/logical/ParallelizeOperator.h b/tuplex/core/include/logical/ParallelizeOperator.h index f65ea1c9a..0960baf89 100644 --- a/tuplex/core/include/logical/ParallelizeOperator.h +++ b/tuplex/core/include/logical/ParallelizeOperator.h @@ -18,7 +18,9 @@ namespace tuplex { class ParallelizeOperator : public LogicalOperator { std::vector _partitions; // data, conforming to majority type - //@TODO: missing: python objects & general case data + std::vector _pythonObjects; // schema violations stored for interpreter processing as python objects + // maps partitions to their corresponding python objects + std::unordered_map _inputPartitionToPythonObjectsMap; std::vector _columnNames; std::vector _sample; // sample, not necessary conforming to one type @@ -28,7 +30,9 @@ namespace tuplex { LogicalOperator *clone() override; // this a root node - ParallelizeOperator(const Schema& schema, std::vector partitions, const std::vector& columns); + ParallelizeOperator(const Schema& schema, + const std::vector& partitions, + const std::vector& columns); std::string name() override { return "parallelize"; } LogicalOperatorType type() const override { return LogicalOperatorType::PARALLELIZE; } @@ -45,6 +49,12 @@ namespace tuplex { */ std::vector getPartitions(); + void setPythonObjects(const std::vector &pythonObjects) { _pythonObjects = pythonObjects; } + std::vector getPythonObjects() { return _pythonObjects; } + + void setInputPartitionToPythonObjectsMap(const std::unordered_map& pythonObjectsMap) { _inputPartitionToPythonObjectsMap = pythonObjectsMap; } + std::unordered_map getInputPartitionToPythonObjectsMap() { return _inputPartitionToPythonObjectsMap; } + Schema getInputSchema() const override { return getOutputSchema(); } bool good() const override; diff --git a/tuplex/core/include/physical/BlockBasedTaskBuilder.h b/tuplex/core/include/physical/BlockBasedTaskBuilder.h index 5cbf95835..7f111ca83 100644 --- a/tuplex/core/include/physical/BlockBasedTaskBuilder.h +++ b/tuplex/core/include/physical/BlockBasedTaskBuilder.h @@ -30,6 +30,13 @@ namespace tuplex { llvm::Function *createFunction(); + /*! + * create function to process blocks of data along with any input exceptions. Used when filters are present + * to update the indices of the exceptions. + * @return llvm function + */ + llvm::Function *createFunctionWithExceptions(); + python::Type _inputRowType; //@TODO: make this private?? std::string _intermediateCallbackName; @@ -48,7 +55,6 @@ namespace tuplex { * @return */ inline llvm::Value *arg(const std::string &name) { - assert(_args.size() == 6); auto it = _args.find(name); if (it == _args.end()) throw std::runtime_error("unknown arg " + name + " requested"); diff --git a/tuplex/core/include/physical/CodeDefs.h b/tuplex/core/include/physical/CodeDefs.h index 0e94625af..b8c3cd76b 100644 --- a/tuplex/core/include/physical/CodeDefs.h +++ b/tuplex/core/include/physical/CodeDefs.h @@ -47,6 +47,10 @@ namespace tuplex { // parameters are userData, array of globals (e.g. hashmaps), block, blocksize, normalrowsout, badrowsout, lastRow typedef int64_t(*read_block_f)(void*, const uint8_t*, int64_t, int64_t*, int64_t*, int8_t); + // protoype of the function generated by the below builder + // parameters are userData, block, blocksize, expPtrs, expPtrSizes, numExceptions, normalrowsout, badrowsout, lastRow + typedef int64_t(*read_block_exp_f)(void*, const uint8_t*, int64_t, uint8_t **, int64_t *, int64_t, int64_t*, int64_t*, bool); + /*! * prototype for processing a single row (with callbacks etc.). Returns how many bytes were processed */ diff --git a/tuplex/core/include/physical/ExceptionSourceTaskBuilder.h b/tuplex/core/include/physical/ExceptionSourceTaskBuilder.h new file mode 100644 index 000000000..750100617 --- /dev/null +++ b/tuplex/core/include/physical/ExceptionSourceTaskBuilder.h @@ -0,0 +1,63 @@ +//--------------------------------------------------------------------------------------------------------------------// +// // +// Tuplex: Blazing Fast Python Data Science // +// // +// // +// (c) 2017 - 2021, Tuplex team // +// Created by Benjamin Givertz first on 1/1/2021 // +// License: Apache 2.0 // +//--------------------------------------------------------------------------------------------------------------------// + +#ifndef TUPLEX_EXCEPTIONSOURCETASKBUILDER_H +#define TUPLEX_EXCEPTIONSOURCETASKBUILDER_H + +#include "BlockBasedTaskBuilder.h" + +namespace tuplex { + namespace codegen { + /*! + * Class used to process tuplex partitions through the pipeline when both input exceptions and filters are + * present. This is necessary to update the row indices of any input exceptions if rows are filtered out. + */ + class ExceptionSourceTaskBuilder : public BlockBasedTaskBuilder { + private: + void createMainLoop(llvm::Function* read_block_func, bool terminateEarlyOnLimitCode); + + /*! + * generates code to process a row depending on parse result... + * @param builder + * @param userData a value for userData (i.e. the class ptr of the task typically) to be parsed to callback functions + * @param tuple (flattened) tuple representation of current tuple (LLVM) + * @param normalRowCountVar where to store normal row counts + * @param badRowCountVar where to store bad row counts + * @param processRowFunc (optional) function to be called before output is written. + * Most likely this is not a nullptr, because users want to transform data. + */ + void processRow(llvm::IRBuilder<>& builder, + llvm::Value* userData, + const FlattenedTuple& tuple, + llvm::Value *normalRowCountVar, + llvm::Value *badRowCountVar, + llvm::Value *outputRowNumberVar, + llvm::Value *inputRowPtr, + llvm::Value *inputRowSize, + bool terminateEarlyOnLimitCode, + llvm::Function* processRowFunc=nullptr); + + void callProcessFuncWithHandler(llvm::IRBuilder<> &builder, llvm::Value *userData, + const FlattenedTuple &tuple, + llvm::Value *normalRowCountVar, llvm::Value *badRowCountVar, llvm::Value *rowNumberVar, + llvm::Value *inputRowPtr, llvm::Value *inputRowSize, + bool terminateEarlyOnLimitCode, + llvm::Function *processRowFunc); + public: + ExceptionSourceTaskBuilder() = delete; + + explicit ExceptionSourceTaskBuilder(const std::shared_ptr& env, const python::Type& rowType, const std::string& name) : BlockBasedTaskBuilder::BlockBasedTaskBuilder(env, rowType, name) {} + + llvm::Function* build(bool terminateEarlyOnFailureCode) override; + }; + } +} + +#endif //TUPLEX_EXCEPTIONSOURCETASKBUILDER_H \ No newline at end of file diff --git a/tuplex/core/include/physical/ResolveTask.h b/tuplex/core/include/physical/ResolveTask.h index 47a7dc5e1..2044a5699 100644 --- a/tuplex/core/include/physical/ResolveTask.h +++ b/tuplex/core/include/physical/ResolveTask.h @@ -35,8 +35,12 @@ namespace tuplex { /*! * create a new resolve task * @param stageID to which task belongs to + * @param contextID to which context belongs to * @param partitions input rows with normal case - * @param exceptions input rows for exceptions, in exception format + * @param runtimeExceptions input rows for exceptions, in exception format + * @param inputExceptions schema violations that occur during data loading + * @param inputExceptionInfo values to map input partitions to their input exceptions + * @param operatorIDsAffectedByResolvers operators that are followed by resolvers in the pipeline * @param inputSchema input schema of exception rows * @param outputSchema output schema which resolution must adhere to * @param mergeRows whether to merge rows in order (makes only sense when no hashjoin is involved) @@ -57,7 +61,9 @@ namespace tuplex { ResolveTask(int64_t stageID, int64_t contextID, const std::vector& partitions, - const std::vector& exceptions, + const std::vector& runtimeExceptions, + const std::vector& inputExceptions, + ExceptionInfo inputExceptionInfo, const std::vector& operatorIDsAffectedByResolvers, //! used to identify which exceptions DO require reprocessing because there might be a resolver in the slow path for them. Schema exceptionInputSchema, //! schema of the input rows in which both user exceptions and normal-case violations are stored in. This is also the schema in which rows which on the slow path produce again an exception will be stored in. Schema resolverOutputSchema, //! schema of rows that the resolve function outputs if it doesn't rethrow exceptions @@ -72,7 +78,12 @@ namespace tuplex { PyObject* interpreterFunctor=nullptr) : IExceptionableTask::IExceptionableTask(exceptionInputSchema, contextID), _stageID(stageID), _partitions(partitions), - _exceptions(exceptions), + _runtimeExceptions(runtimeExceptions), + _inputExceptions(inputExceptions), + _numInputExceptions(inputExceptionInfo.numExceptions), + _inputExceptionIndex(inputExceptionInfo.exceptionIndex), + _inputExceptionRowOffset(inputExceptionInfo.exceptionRowOffset), + _inputExceptionByteOffset(inputExceptionInfo.exceptionByteOffset), _resolverOutputSchema(resolverOutputSchema), _targetOutputSchema(targetNormalCaseOutputSchema), _mergeRows(mergeRows), @@ -88,7 +99,8 @@ namespace tuplex { _htableFormat(HashTableFormat::UNKNOWN), _outputRowNumber(0), _wallTime(0.0), - _numInputRowsRead(0) { + _numInputRowsRead(0), + _numUnresolved(0) { // copy the IDs and sort them so binary search can be used. std::sort(_operatorIDsAffectedByResolvers.begin(), _operatorIDsAffectedByResolvers.end()); _normalPtrBytesRemaining = 0; @@ -202,7 +214,12 @@ namespace tuplex { private: int64_t _stageID; /// to which stage does this task belong to. std::vector _partitions; - std::vector _exceptions; + std::vector _runtimeExceptions; + std::vector _inputExceptions; + size_t _numInputExceptions; + size_t _inputExceptionIndex; + size_t _inputExceptionRowOffset; + size_t _inputExceptionByteOffset; inline Schema commonCaseInputSchema() const { return _deserializerGeneralCaseOutput->getSchema(); } Schema _resolverOutputSchema; //! what the resolve functor produces Schema _targetOutputSchema; //! which schema the final rows should be in... @@ -216,6 +233,8 @@ namespace tuplex { char _csvDelimiter; char _csvQuotechar; + size_t _numUnresolved; + int64_t _currentRowNumber; // std::vector _mergedPartitions; diff --git a/tuplex/core/include/physical/ResultSet.h b/tuplex/core/include/physical/ResultSet.h index 00b1c7cec..e94b8f1ae 100644 --- a/tuplex/core/include/physical/ResultSet.h +++ b/tuplex/core/include/physical/ResultSet.h @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -26,6 +27,7 @@ namespace tuplex { private: std::list _partitions; std::vector _exceptions; // unresolved exceptions + std::unordered_map _partitionToExceptionsMap; // @TODO: use here rows instead? would make it potentially cleaner... std::deque> _pyobjects; // python objects remaining whose type // did not confirm to the one of partitions. Maybe use Row here instead? @@ -52,6 +54,7 @@ namespace tuplex { ResultSet(const Schema& _schema, const std::vector& partitions, const std::vector& exceptions=std::vector{}, + const std::unordered_map& partitionToExceptionsMap=std::unordered_map(), const std::vector> pyobjects=std::vector>{}, int64_t maxRows=std::numeric_limits::max()); @@ -105,6 +108,8 @@ namespace tuplex { */ std::vector exceptions() const { return _exceptions; } + std::unordered_map partitionToExceptionsMap() const { return _partitionToExceptionsMap; } + /*! * returns/removes all objects * @return diff --git a/tuplex/core/include/physical/StageBuilder.h b/tuplex/core/include/physical/StageBuilder.h index b8c682e43..63b94bd57 100644 --- a/tuplex/core/include/physical/StageBuilder.h +++ b/tuplex/core/include/physical/StageBuilder.h @@ -25,13 +25,25 @@ namespace tuplex { public: StageBuilder() = delete; + /*! + * Create new StageBuilder + * @param stage_number number of the stage + * @param rootStage whether is a root stage + * @param allowUndefinedBehavior whether undefined behavior is allowed + * @param generateParser whether to generate a parser + * @param normalCaseThreshold between 0 and 1 threshold + * @param sharedObjectPropagation whether to use shared object propogation + * @param nullValueOptimization whether to use null value optimization + * @param updateInputExceptions whether input exceptions indices need to be updated + */ StageBuilder(int64_t stage_number, bool rootStage, bool allowUndefinedBehavior, bool generateParser, double normalCaseThreshold, bool sharedObjectPropagation, - bool nullValueOptimization); + bool nullValueOptimization, + bool updateInputExceptions); // builder functions void addMemoryInput(const Schema& schema, LogicalOperator* node); @@ -78,6 +90,7 @@ namespace tuplex { double _normalCaseThreshold; bool _sharedObjectPropagation; bool _nullValueOptimization; + bool _updateInputExceptions; std::vector _operators; // codegen strings diff --git a/tuplex/core/include/physical/TransformStage.h b/tuplex/core/include/physical/TransformStage.h index d37fabf6e..22d7f5fb4 100644 --- a/tuplex/core/include/physical/TransformStage.h +++ b/tuplex/core/include/physical/TransformStage.h @@ -13,6 +13,7 @@ #include #include +#include #include "PhysicalStage.h" #include "LLVMOptimizer.h" #include @@ -94,14 +95,16 @@ namespace tuplex { } /*! - * set unresolved exceptions, i.e. rows that could come from e.g. a CSV operator - * @param partitions + * set input exceptions, i.e. rows that could come from a parallelize or csv operator. + * @param pythonObjects */ - void setInputExceptions(const std::vector& partitions) { - _unresolved_exceptions = partitions; - } + void setInputExceptions(const std::vector& inputExceptions) { _inputExceptions = inputExceptions; } + + std::vector inputExceptions() { return _inputExceptions; } + + void setPartitionToExceptionsMap(const std::unordered_map& partitionToExceptionsMap) { _partitionToExceptionsMap = partitionToExceptionsMap; } - std::vector inputExceptions() const { return _unresolved_exceptions; } + std::unordered_map partitionToExceptionsMap() { return _partitionToExceptionsMap; } /*! * sets maximum number of rows this pipeline will produce @@ -156,6 +159,7 @@ namespace tuplex { void setMemoryResult(const std::vector& partitions, const std::vector& generalCase=std::vector{}, + const std::unordered_map& parttionToExceptionsMap=std::unordered_map(), const std::vector>& interpreterRows=std::vector>{}, const std::vector& remainingExceptions=std::vector{}, const std::unordered_map, size_t>& ecounts=std::unordered_map, size_t>()); // creates local result set? @@ -169,6 +173,7 @@ namespace tuplex { setMemoryResult( std::vector(), std::vector(), + std::unordered_map(), std::vector>(), std::vector(), ecounts); @@ -257,6 +262,7 @@ namespace tuplex { // JITSymbols for this stage struct JITSymbols { codegen::read_block_f functor; // can be memory2memory or file2memory + codegen::read_block_exp_f functorWithExp; codegen::read_block_f writeFunctor; // memory2file codegen::resolve_f resolveFunctor; // always memory2memory codegen::init_stage_f initStageFunctor; @@ -266,7 +272,9 @@ namespace tuplex { codegen::agg_agg_f aggAggregateFunctor; - JITSymbols() : functor(nullptr), writeFunctor(nullptr), + JITSymbols() : functor(nullptr), + functorWithExp(nullptr), + writeFunctor(nullptr), resolveFunctor(nullptr), initStageFunctor(nullptr), releaseStageFunctor(nullptr), @@ -362,6 +370,11 @@ namespace tuplex { return _persistSeparateCases; } + /*! + * whether to update indices of input exceptions during row processing + */ + bool updateInputExceptions() const { return _updateInputExceptions; } + /*! * @return Returns the type of the hash-grouped data. Hash-grouped data refers to when the operator is a * pipeline breaker that needs the previous stage's hashmap to be converted to partitions @@ -446,6 +459,7 @@ namespace tuplex { std::string _pyCode; std::string _pyPipelineName; std::string _writerFuncName; + bool _updateInputExceptions; std::shared_ptr emptyResultSet() const; @@ -456,7 +470,8 @@ namespace tuplex { //void pushDownOutputLimit(); //! enable optimizations for limited pipeline by restricting input read! // unresolved exceptions. Important i.e. when no IO interleave is used... - std::vector _unresolved_exceptions; + std::vector _inputExceptions; + std::unordered_map _partitionToExceptionsMap; // for hash output, the key and bucket type diff --git a/tuplex/core/include/physical/TransformTask.h b/tuplex/core/include/physical/TransformTask.h index efc176205..2868ba668 100644 --- a/tuplex/core/include/physical/TransformTask.h +++ b/tuplex/core/include/physical/TransformTask.h @@ -113,7 +113,8 @@ namespace tuplex { _functor(nullptr), _stageID(-1), _htableFormat(HashTableFormat::UNKNOWN), - _wallTime(0.0) { + _wallTime(0.0), + _updateInputExceptions(false) { resetSinks(); resetSources(); } @@ -123,6 +124,10 @@ namespace tuplex { void sinkExceptionsToMemory(const Schema& inputSchema) { _inputSchema = inputSchema; } void sinkExceptionsToFile(const Schema& inputSchema, const URI& exceptionOutputURI) { throw std::runtime_error("not yet supported"); } + void setFunctor(codegen::read_block_exp_f functor) { + _functor = (void*)functor; + } + void setFunctor(codegen::read_block_f functor) { _functor = (void*)functor; // @TODO: update other vars too... @@ -234,6 +239,13 @@ namespace tuplex { */ std::unordered_map, size_t> exceptionCounts() const { return _exceptionCounts; } + ExceptionInfo inputExceptionInfo() { return _inputExceptionInfo; } + std::vector inputExceptions() { return _inputExceptions; } + + void setInputExceptionInfo(ExceptionInfo info) { _inputExceptionInfo = info; } + void setInputExceptions(const std::vector& inputExceptions) { _inputExceptions = inputExceptions; } + void setUpdateInputExceptions(bool updateInputExceptions) { _updateInputExceptions = updateInputExceptions; } + double wallTime() const override { return _wallTime; } size_t output_rows_written() const { return _numOutputRowsWritten; } @@ -280,6 +292,9 @@ namespace tuplex { MemorySink _exceptions; Schema _inputSchema; + ExceptionInfo _inputExceptionInfo; + std::vector _inputExceptions; + bool _updateInputExceptions; // hash table sink HashTableSink _htable; @@ -297,6 +312,7 @@ namespace tuplex { _exceptions.unlock(); } + void processMemorySourceWithExp(); void processMemorySource(); void processFileSource(); diff --git a/tuplex/core/src/Context.cc b/tuplex/core/src/Context.cc index ea5fdd81d..2d46608be 100644 --- a/tuplex/core/src/Context.cc +++ b/tuplex/core/src/Context.cc @@ -202,7 +202,7 @@ namespace tuplex { } - DataSet& Context::fromPartitions(const Schema& schema, const std::vector& partitions, const std::vector& columns) { + DataSet& Context::fromPartitions(const Schema& schema, const std::vector& partitions, const std::vector& columns, const std::vector> &badParallelizeObjects, const std::vector &numExceptionsInPartition) { auto dataSetID = getNextDataSetID(); DataSet *dsptr = createDataSet(schema); @@ -214,7 +214,7 @@ namespace tuplex { // empty? if(partitions.empty()) { dsptr->setColumns(columns); - addParallelizeNode(dsptr); + addParallelizeNode(dsptr, badParallelizeObjects, numExceptionsInPartition); return *dsptr; } else { size_t numRows = 0; @@ -230,7 +230,7 @@ namespace tuplex { // set rows dsptr->setColumns(columns); - addParallelizeNode(dsptr); + addParallelizeNode(dsptr, badParallelizeObjects, numExceptionsInPartition); // signal check if(check_and_forward_signals()) { @@ -243,7 +243,8 @@ namespace tuplex { } } - DataSet& Context::parallelize(const std::vector& rows, const std::vector& columnNames) { + DataSet& Context::parallelize(const std::vector& rows, + const std::vector& columnNames) { Schema schema; int dataSetID = getNextDataSetID(); @@ -348,7 +349,94 @@ namespace tuplex { return op; } - void Context::addParallelizeNode(DataSet *ds) { + void Context::serializePythonObjects(const std::vector>& pythonObjects, + const std::vector &numExceptionsInPartition, + const std::vector &normalPartitions, + const int64_t opID, + std::vector &serializedPythonObjects, + std::unordered_map &pythonObjectsMap) { + if (pythonObjects.empty()) { + for (const auto &p : normalPartitions) { + pythonObjectsMap[uuidToString(p->uuid())] = ExceptionInfo(); + } + return; + } + + Schema schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::STRING})); + const size_t allocMinSize = 1024 * 64; // 64KB + + Partition* partition = requestNewPartition(schema, -1, allocMinSize); + int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); + *rawPtr = 0; + uint8_t* ptr = (uint8_t*)(rawPtr + 1); + size_t numBytesSerialized = 0; + + auto prevExpByteOffset = 0; + auto prevExpRowOffset = 0; + auto prevExpInd = 0; + auto curNormalPartitionInd = 0; + auto numNewExps = 0; + + // Serialize each exception to a partition using the following schema: + // (1) is the field containing rowNum + // (2) is the field containing ecCode + // (3) is the field containing opID + // (4) is the field containing pickledObjectSize + // (5) is the field containing pickledObject + for(auto &exception : pythonObjects) { + auto rowNum = std::get<0>(exception); + auto pyObj = std::get<1>(exception); + auto ecCode = ecToI64(ExceptionCode::PYTHON_PARALLELIZE); + auto pickledObject = python::pickleObject(python::getMainModule(), pyObj); + auto pickledObjectSize = pickledObject.size(); + size_t requiredBytes = sizeof(int64_t) * 4 + pickledObjectSize; + + if (partition->capacity() < numBytesSerialized + requiredBytes) { + partition->unlockWrite(); + serializedPythonObjects.push_back(partition); + partition = requestNewPartition(schema, -1, allocMinSize); + rawPtr = (int64_t *) partition->lockWriteRaw(); + *rawPtr = 0; + ptr = (uint8_t * )(rawPtr + 1); + numBytesSerialized = 0; + } + + // Check if we have reached the number of exceptions in the input partition + // Record the current exception index and offset and iterate to next one + auto curNormalPartition = normalPartitions[curNormalPartitionInd]; + auto normalUUID = uuidToString(curNormalPartition->uuid()); + auto numExps = numExceptionsInPartition[curNormalPartitionInd]; + if (numNewExps >= numExps) { + pythonObjectsMap[normalUUID] = ExceptionInfo(numExps, prevExpInd, prevExpRowOffset, prevExpByteOffset); + prevExpRowOffset = *rawPtr; + prevExpByteOffset = numBytesSerialized; + prevExpInd = serializedPythonObjects.size(); + numNewExps = 0; + curNormalPartitionInd++; + } + + *((int64_t*)(ptr)) = rowNum; ptr += sizeof(int64_t); + *((int64_t*)(ptr)) = ecCode; ptr += sizeof(int64_t); + *((int64_t*)(ptr)) = opID; ptr += sizeof(int64_t); + *((int64_t*)(ptr)) = pickledObjectSize; ptr += sizeof(int64_t); + memcpy(ptr, pickledObject.c_str(), pickledObjectSize); ptr += pickledObjectSize; + + *rawPtr = *rawPtr + 1; + numBytesSerialized += requiredBytes; + numNewExps += 1; + } + + // Record mapping for normal last partition + auto curNormalPartition = normalPartitions[curNormalPartitionInd]; + auto normalUUID = uuidToString(curNormalPartition->uuid()); + auto numExceptions = numExceptionsInPartition[curNormalPartitionInd]; + pythonObjectsMap[normalUUID] = ExceptionInfo(numExceptions, prevExpInd, prevExpRowOffset, prevExpByteOffset); + + partition->unlockWrite(); + serializedPythonObjects.push_back(partition); + } + + void Context::addParallelizeNode(DataSet *ds, const std::vector> &badParallelizeObjects, const std::vector &numExceptionsInPartition) { assert(ds); // @TODO: make empty list as special case work. Also true for empty files. @@ -357,8 +445,15 @@ namespace tuplex { assert(ds->_schema.getRowType() != python::Type::UNKNOWN); + auto op = new ParallelizeOperator(ds->_schema, ds->getPartitions(), ds->columns()); + std::vector serializedPythonObjects; + std::unordered_map pythonObjectsMap; + serializePythonObjects(badParallelizeObjects, numExceptionsInPartition, ds->getPartitions(), op->getID(), serializedPythonObjects, pythonObjectsMap); + op->setPythonObjects(serializedPythonObjects); + op->setInputPartitionToPythonObjectsMap(pythonObjectsMap); + // add new (root) node - ds->_operator = addOperator(new ParallelizeOperator(ds->_schema, ds->getPartitions(), ds->columns())); + ds->_operator = addOperator(op); // set dataset ds->_operator->setDataSet(ds); diff --git a/tuplex/core/src/ee/local/LocalBackend.cc b/tuplex/core/src/ee/local/LocalBackend.cc index 63aadc9b1..bed96ec5a 100644 --- a/tuplex/core/src/ee/local/LocalBackend.cc +++ b/tuplex/core/src/ee/local/LocalBackend.cc @@ -467,16 +467,14 @@ namespace tuplex { } std::vector LocalBackend::createLoadAndTransformToMemoryTasks( - tuplex::TransformStage *tstage, const tuplex::ContextOptions &options, - tuplex::codegen::read_block_f functor) { - - // @TODO: if an output limit is given for this tage, could use a shared atomic variable to check and early abort - // tasks. => that's probably the easiest design! - // --> need to make sure that mechanism doesn't cause weird overheads in the end... + TransformStage *tstage, + const tuplex::ContextOptions &options, + const std::shared_ptr& syms) { using namespace std; vector tasks; assert(tstage); + assert(syms); size_t readBufferSize = options.READ_BUFFER_SIZE(); bool normalCaseEnabled = options.OPT_NULLVALUE_OPTIMIZATION(); // this is important so exceptions get upgraded to internal ones @@ -532,7 +530,7 @@ namespace tuplex { if(options.INPUT_SPLIT_SIZE() == 0) { // one task per URI auto task = new TransformTask(); - task->setFunctor(functor); + task->setFunctor(syms->functor); task->setInputFileSource(uri, normalCaseEnabled, tstage->fileInputOperatorID(), inputRowType, header, !options.OPT_GENERATE_PARSER(), numColumns, 0, 0, delimiter, quotechar, colsToKeep, options.PARTITION_SIZE(), tstage->inputFormat()); @@ -565,7 +563,7 @@ namespace tuplex { if(file_size <= splitSize) { // 1 task (range 0,0 to indicate full file) auto task = new TransformTask(); - task->setFunctor(functor); + task->setFunctor(syms->functor); task->setInputFileSource(uri, normalCaseEnabled, tstage->fileInputOperatorID(), inputRowType, header, !options.OPT_GENERATE_PARSER(), numColumns, 0, 0, delimiter, @@ -602,7 +600,7 @@ namespace tuplex { rangeEnd = file_size; auto task = new TransformTask(); - task->setFunctor(functor); + task->setFunctor(syms->functor); task->setInputFileSource(uri, normalCaseEnabled, tstage->fileInputOperatorID(), inputRowType, header, !options.OPT_GENERATE_PARSER(), numColumns, rangeStart, rangeEnd - rangeStart, delimiter, @@ -657,7 +655,12 @@ namespace tuplex { for(int i = 0; i < inputPartitions.size(); ++i) { auto partition = inputPartitions[i]; auto task = new TransformTask(); - task->setFunctor(functor); + if (tstage->updateInputExceptions()) { + task->setFunctor(syms->functorWithExp); + } else { + task->setFunctor(syms->functor); + } + task->setUpdateInputExceptions(tstage->updateInputExceptions()); task->setInputMemorySource(partition, !partition->isImmortal()); // hash table or memory output? if(tstage->outputMode() == EndPointMode::HASHTABLE) { @@ -673,6 +676,11 @@ namespace tuplex { tstage->outputMode() == EndPointMode::MEMORY); task->sinkOutputToMemory(outputSchema, tstage->outputDataSetID(), tstage->context().id()); } + + auto partitionId = uuidToString(partition->uuid()); + auto info = tstage->partitionToExceptionsMap()[partitionId]; + task->setInputExceptionInfo(info); + task->setInputExceptions(tstage->inputExceptions()); task->sinkExceptionsToMemory(inputSchema); task->setStageID(tstage->getID()); task->setOutputLimit(tstage->outputLimit()); @@ -742,6 +750,92 @@ namespace tuplex { return pip_object; } + std::vector> inputExceptionsToPythonObjects(const std::vector& partitions, Schema schema) { + using namespace tuplex; + + std::vector> pyObjects; + for (const auto &partition : partitions) { + auto numRows = partition->getNumRows(); + const uint8_t* ptr = partition->lock(); + + python::lockGIL(); + for (int i = 0; i < numRows; ++i) { + int64_t rowNum = *((int64_t*)ptr); + ptr += sizeof(int64_t); + int64_t ecCode = *((int64_t*)ptr); + ptr += 2 * sizeof(int64_t); + int64_t objSize = *((int64_t*)ptr); + ptr += sizeof(int64_t); + + PyObject* pyObj = nullptr; + if (ecCode == ecToI64(ExceptionCode::PYTHON_PARALLELIZE)) { + pyObj = python::deserializePickledObject(python::getMainModule(), (char *) ptr, objSize); + } else { + pyObj = python::rowToPython(Row::fromMemory(schema, ptr, objSize), true); + } + + ptr += objSize; + pyObjects.emplace_back(rowNum, pyObj); + } + python::unlockGIL(); + + partition->unlock(); + partition->invalidate(); + } + + return pyObjects; + } + + void setExceptionInfo(const std::vector &normalOutput, const std::vector &exceptions, std::unordered_map &partitionToExceptionsMap) { + if (exceptions.empty()) { + for (const auto &p : normalOutput) { + partitionToExceptionsMap[uuidToString(p->uuid())] = ExceptionInfo(); + } + return; + } + + auto expRowCount = 0; + auto expInd = 0; + auto expRowOff = 0; + auto expByteOff = 0; + + auto expNumRows = exceptions[0]->getNumRows(); + auto expPtr = exceptions[0]->lockWrite(); + auto rowsProcessed = 0; + for (const auto &p : normalOutput) { + auto pNumRows = p->getNumRows(); + auto curNumExps = 0; + auto curExpOff = expRowOff; + auto curExpInd = expInd; + auto curExpByteOff = expByteOff; + + while (*((int64_t *) expPtr) - rowsProcessed <= pNumRows + curNumExps && expRowCount < expNumRows) { + *((int64_t *) expPtr) -= rowsProcessed; + curNumExps++; + expRowOff++; + auto eSize = ((int64_t *)expPtr)[3] + 4*sizeof(int64_t); + expPtr += eSize; + expByteOff += eSize; + expRowCount++; + + if (expRowOff == expNumRows && expInd < exceptions.size() - 1) { + exceptions[expInd]->unlockWrite(); + expInd++; + expPtr = exceptions[expInd]->lockWrite(); + expNumRows = exceptions[expInd]->getNumRows(); + expRowOff = 0; + expByteOff = 0; + expRowCount = 0; + } + } + + rowsProcessed += curNumExps + pNumRows; + partitionToExceptionsMap[uuidToString(p->uuid())] = ExceptionInfo(curNumExps, curExpInd, curExpOff, curExpByteOff); + } + + exceptions[expInd]->unlockWrite(); + } + void LocalBackend::executeTransformStage(tuplex::TransformStage *tstage) { Timer stageTimer; @@ -760,8 +854,10 @@ namespace tuplex { } // special case: skip stage, i.e. empty code and mem2mem - if(tstage->code().empty() && !tstage->fileInputMode() && !tstage->fileOutputMode()) { - tstage->setMemoryResult(tstage->inputPartitions()); + if(tstage->code().empty() && !tstage->fileInputMode() && !tstage->fileOutputMode()) { + auto pyObjects = inputExceptionsToPythonObjects(tstage->inputExceptions(), tstage->normalCaseInputSchema()); + tstage->setMemoryResult(tstage->inputPartitions(), std::vector{}, std::unordered_map(), pyObjects); + pyObjects.clear(); // skip stage Logger::instance().defaultLogger().info("[Transform Stage] skipped stage " + std::to_string(tstage->number()) + " because there is nothing todo here."); return; @@ -841,7 +937,7 @@ namespace tuplex { } } - auto tasks = createLoadAndTransformToMemoryTasks(tstage, _options, syms->functor); + auto tasks = createLoadAndTransformToMemoryTasks(tstage, _options, syms); auto completedTasks = performTasks(tasks); // Note: this doesn't work yet because of the globals. @@ -887,7 +983,6 @@ namespace tuplex { // => there are fallback mechanisms... bool executeSlowPath = true; - //TODO: implement pure python resolution here... // exceptions found or slowpath data given? if(totalECountsBeforeResolution > 0 || !tstage->inputExceptions().empty()) { @@ -913,13 +1008,13 @@ namespace tuplex { lines.push_back(Row((int64_t)opid, exceptionCodeToPythonClass(ec), (int64_t)keyval.second)); } } - // input partitions given? + if(!tstage->inputExceptions().empty()) { - size_t numSlowPath = 0; - for(auto& p : tstage->inputExceptions()) - numSlowPath += p->getNumRows(); - lines.push_back(Row("(cached)", exceptionCodeToPythonClass(ExceptionCode::NORMALCASEVIOLATION), (int64_t)numSlowPath)); - totalECountsBeforeResolution += numSlowPath; + size_t numExceptions = 0; + for (auto &p : tstage->inputExceptions()) + numExceptions += p->getNumRows(); + lines.push_back(Row("(input)", exceptionCodeToPythonClass(ExceptionCode::NORMALCASEVIOLATION), (int64_t)numExceptions)); + totalECountsBeforeResolution += numExceptions; } printTable(ss, headers, lines, false); @@ -935,13 +1030,6 @@ namespace tuplex { merge_except_rows = false; } - // were initial exceptions (general case) given? - if(!tstage->inputExceptions().empty() && merge_except_rows) { - auto err_msg = "when using cache with normal/general optimization, set mergeRowsInOrder=false. Not yet supported"; - logger().error(err_msg); - throw std::runtime_error(err_msg); - } - // should slow path get executed executeSlowPath = syms->resolveFunctor || !tstage->purePythonCode().empty(); @@ -1036,7 +1124,8 @@ namespace tuplex { case EndPointMode::MEMORY: { // memory output, fetch partitions & ecounts vector output; - vector generalOutput; // partitions which violate the normal case + vector generalOutput; + unordered_map partitionToExceptionsMap; vector remainingExceptions; vector> nonConformingRows; // rows where the output type does not fit, // need to manually merged. @@ -1067,20 +1156,21 @@ namespace tuplex { if(task->type() == TaskType::RESOLVE) task_name = "resolve"; + setExceptionInfo(taskOutput, taskGeneralOutput, partitionToExceptionsMap); std::copy(taskOutput.begin(), taskOutput.end(), std::back_inserter(output)); std::copy(taskRemainingExceptions.begin(), taskRemainingExceptions.end(), std::back_inserter(remainingExceptions)); std::copy(taskGeneralOutput.begin(), taskGeneralOutput.end(), std::back_inserter(generalOutput)); std::copy(taskNonConformingRows.begin(), taskNonConformingRows.end(), std::back_inserter(nonConformingRows)); // compute the delta used to offset records! - for(auto p : taskOutput) + for (const auto &p : taskOutput) rowDelta += p->getNumRows(); - for(auto p : taskGeneralOutput) + for (const auto &p : taskGeneralOutput) rowDelta += p->getNumRows(); rowDelta += taskNonConformingRows.size(); } - tstage->setMemoryResult(output, generalOutput, nonConformingRows, remainingExceptions, ecounts); + tstage->setMemoryResult(output, generalOutput, partitionToExceptionsMap, nonConformingRows, remainingExceptions, ecounts); break; } case EndPointMode::HASHTABLE: { @@ -1301,7 +1391,7 @@ namespace tuplex { else if(compareOrders(maxOrder, tt->getOrder())) maxOrder = tt->getOrder(); - if (tt->exceptionCounts().size() > 0) { + if (tt->exceptionCounts().size() > 0 || tt->inputExceptionInfo().numExceptions > 0) { // task found with exceptions in it => exception partitions need to be resolved using special functor // hash-table output not yet supported @@ -1317,6 +1407,8 @@ namespace tuplex { tstage->context().id(), tt->getOutputPartitions(), tt->getExceptionPartitions(), + tt->inputExceptions(), + tt->inputExceptionInfo(), opsToCheck, exceptionInputSchema, compiledSlowPathOutputSchema, @@ -1396,50 +1488,6 @@ namespace tuplex { } } - // are input exceptions given? create tasks here... - // @TODO: python objects?? - if(!tstage->inputExceptions().empty()) { - // add into ops to check the dummy 0 which is used in Caching tasks... - opsToCheck.push_back(0); - - assert(!merge_rows_in_order); // @TODO: no support for this yet... -> it might be complicated because of filters & Co to figure out the right row numbers... - for(auto& p : tstage->inputExceptions()) { - maxOrder.back()++; - - auto rtask = new ResolveTask(stageID, - tstage->context().id(), - std::vector(), - vector{p}, - opsToCheck, - tstage->inputSchema(), - compiledSlowPathOutputSchema, - targetNormalCaseOutputSchema, - targetGeneralCaseOutputSchema, - merge_rows_in_order, - allowNumericTypeUnification, - outFormat, - csvDelimiter, - csvQuotechar, - functor, - pip_object); - - // to implement, store i.e. tables within tasks... - rtask->setHybridIntermediateHashTables(tstage->predecessors().size(), input_intermediates.hybrids); - - rtask->setOrder(maxOrder); // this is arbitrary, just put the slow path rows at the end - // hash output? - if(hashOutput) { - if (tstage->hashtableKeyByteWidth() == 8) { - auto h = tstage->dataAggregationMode(); - rtask->sinkOutputToHashTable(HashTableFormat::UINT64, tstage->dataAggregationMode(), tstage->hashOutputKeyType().withoutOptions(), tstage->hashOutputBucketType()); - } else { - rtask->sinkOutputToHashTable(HashTableFormat::BYTES, tstage->dataAggregationMode(), tstage->hashOutputKeyType().withoutOptions(), tstage->hashOutputBucketType()); - } - } - resolveTasks.push_back(rtask); - } - } - logger().info("Created " + pluralize(resolveTasks.size(), "resolve task") + " in " + std::to_string(timer.time()) + "s"); logger().info(std::to_string(resolveTasks.size()) + "/" + pluralize(tasks.size(), "task") + " require executing the slow path."); timer.reset(); @@ -1450,6 +1498,11 @@ namespace tuplex { // cout<<"*** git "<inputExceptions()) { + p->invalidate(); + } + // cout<<"*** total number of tasks to return is "<getOutputSchema()); _normalCasePartitions = cop->cachedPartitions(); _generalCasePartitions = cop->cachedExceptions(); + _partitionToExceptionsMap = cop->partitionToExceptionsMap(); // copy python objects and incref for each! _py_objects = cop->_py_objects; python::lockGIL(); @@ -85,6 +86,8 @@ namespace tuplex { for(auto p : _generalCasePartitions) p->makeImmortal(); + _partitionToExceptionsMap = rs->partitionToExceptionsMap(); + // check whether partitions have different schema than the currently set one // => i.e. they have been specialized. if(!_normalCasePartitions.empty()) { diff --git a/tuplex/core/src/logical/ParallelizeOperator.cc b/tuplex/core/src/logical/ParallelizeOperator.cc index 5215ea7b6..770ac2d4f 100644 --- a/tuplex/core/src/logical/ParallelizeOperator.cc +++ b/tuplex/core/src/logical/ParallelizeOperator.cc @@ -12,7 +12,7 @@ namespace tuplex { ParallelizeOperator::ParallelizeOperator(const Schema& schema, - std::vector partitions, + const std::vector& partitions, const std::vector& columns) : _partitions(partitions), _columnNames(columns) { @@ -112,6 +112,8 @@ namespace tuplex { auto copy = new ParallelizeOperator(getOutputSchema(), _partitions, columns()); copy->setDataSet(getDataSet()); copy->copyMembers(this); + copy->setPythonObjects(_pythonObjects); + copy->setInputPartitionToPythonObjectsMap(_inputPartitionToPythonObjectsMap); assert(getID() == copy->getID()); return copy; } diff --git a/tuplex/core/src/physical/BlockBasedTaskBuilder.cc b/tuplex/core/src/physical/BlockBasedTaskBuilder.cc index fbc443080..80e21c0a1 100644 --- a/tuplex/core/src/physical/BlockBasedTaskBuilder.cc +++ b/tuplex/core/src/physical/BlockBasedTaskBuilder.cc @@ -50,6 +50,47 @@ namespace tuplex { return _func; } + llvm::Function* BlockBasedTaskBuilder::createFunctionWithExceptions() { + using namespace llvm; + using namespace std; + + auto& context = env().getContext(); + FunctionType* read_block_type = FunctionType::get(env().i64Type(), {env().i8ptrType(), + env().i8ptrType(), + env().i64Type(), + env().i8ptrType()->getPointerTo(0), + env().i64Type()->getPointerTo(0), + env().i64Type(), + env().i64Type()->getPointerTo(0), + env().i64Type()->getPointerTo(0), + env().getBooleanType()}, false); + // create function and set argNames + Function* read_block_func = Function::Create(read_block_type, Function::ExternalLinkage, _desiredFuncName, env().getModule().get()); + + std::vector args; + for(auto& arg : read_block_func->args()) { + args.push_back(&arg); + } + + // rename args + vector argNames{"userData", + "inPtr", + "inSize", + "expPtrs", + "expPtrSizes", + "numExps", + "outNormalRowCount", + "outBadRowCount", + "ignoreLastRow"}; + for(int i = 0; i < argNames.size(); ++i) { + args[i]->setName(argNames[i]); + _args[argNames[i]] = args[i]; + } + + _func = read_block_func; + return _func; + } + void BlockBasedTaskBuilder::setIntermediateInitialValueByRow(const python::Type &intermediateType, const Row &row) { _intermediateInitialValue = row; diff --git a/tuplex/core/src/physical/ExceptionSourceTaskBuilder.cc b/tuplex/core/src/physical/ExceptionSourceTaskBuilder.cc new file mode 100644 index 000000000..b3bd3847f --- /dev/null +++ b/tuplex/core/src/physical/ExceptionSourceTaskBuilder.cc @@ -0,0 +1,345 @@ +//--------------------------------------------------------------------------------------------------------------------// +// // +// Tuplex: Blazing Fast Python Data Science // +// // +// // +// (c) 2017 - 2021, Tuplex team // +// Created by Benjamin Givertz first on 1/1/2021 // +// License: Apache 2.0 // +//--------------------------------------------------------------------------------------------------------------------// + +#include + +namespace tuplex { + namespace codegen { + llvm::Function* ExceptionSourceTaskBuilder::build(bool terminateEarlyOnFailureCode) { + auto func = createFunctionWithExceptions(); + + // create main loop + createMainLoop(func, terminateEarlyOnFailureCode); + + return func; + } + + void ExceptionSourceTaskBuilder::processRow(llvm::IRBuilder<> &builder, llvm::Value *userData, + const FlattenedTuple &tuple, + llvm::Value *normalRowCountVar, + llvm::Value *badRowCountVar, + llvm::Value *rowNumberVar, + llvm::Value *inputRowPtr, + llvm::Value *inputRowSize, + bool terminateEarlyOnLimitCode, + llvm::Function *processRowFunc) { + using namespace llvm; + + // call pipeline function, then increase normalcounter + if(processRowFunc) { + callProcessFuncWithHandler(builder, userData, tuple, normalRowCountVar, badRowCountVar, rowNumberVar, inputRowPtr, + inputRowSize, terminateEarlyOnLimitCode, processRowFunc); + } else { + Value *normalRowCount = builder.CreateLoad(normalRowCountVar, "normalRowCount"); + builder.CreateStore(builder.CreateAdd(normalRowCount, env().i64Const(1)), normalRowCountVar); + } + } + + void ExceptionSourceTaskBuilder::callProcessFuncWithHandler(llvm::IRBuilder<> &builder, llvm::Value *userData, + const FlattenedTuple& tuple, + llvm::Value *normalRowCountVar, + llvm::Value *badRowCountVar, + llvm::Value *rowNumberVar, + llvm::Value *inputRowPtr, + llvm::Value *inputRowSize, + bool terminateEarlyOnLimitCode, + llvm::Function *processRowFunc) { + auto& context = env().getContext(); + auto pip_res = PipelineBuilder::call(builder, processRowFunc, tuple, userData, builder.CreateLoad(rowNumberVar), initIntermediate(builder)); + + // create if based on resCode to go into exception block + auto ecCode = builder.CreateZExtOrTrunc(pip_res.resultCode, env().i64Type()); + auto ecOpID = builder.CreateZExtOrTrunc(pip_res.exceptionOperatorID, env().i64Type()); + auto numRowsCreated = builder.CreateZExtOrTrunc(pip_res.numProducedRows, env().i64Type()); + + if(terminateEarlyOnLimitCode) + generateTerminateEarlyOnCode(builder, ecCode, ExceptionCode::OUTPUT_LIMIT_REACHED); + + // add number of rows created to output row number variable + auto outputRowNumber = builder.CreateLoad(rowNumberVar); + builder.CreateStore(builder.CreateAdd(builder.CreateLoad(rowNumberVar), numRowsCreated), rowNumberVar); + + auto exceptionRaised = builder.CreateICmpNE(ecCode, env().i64Const(ecToI32(ExceptionCode::SUCCESS))); + + llvm::BasicBlock* bbPipelineFailedUpdate = llvm::BasicBlock::Create(context, "pipeline_failed", builder.GetInsertBlock()->getParent()); + llvm::BasicBlock* bbPipelineOK = llvm::BasicBlock::Create(context, "pipeline_ok", builder.GetInsertBlock()->getParent()); + llvm::BasicBlock* curBlock = builder.GetInsertBlock(); + llvm::BasicBlock* bbPipelineFailed = exceptionBlock(builder, userData, ecCode, ecOpID, outputRowNumber, inputRowPtr, inputRowSize); // generate exception block (incl. ignore & handler if necessary) + + + llvm::BasicBlock* lastExceptionBlock = builder.GetInsertBlock(); + llvm::BasicBlock* bbPipelineDone = llvm::BasicBlock::Create(context, "pipeline_done", builder.GetInsertBlock()->getParent()); + + builder.SetInsertPoint(curBlock); + builder.CreateCondBr(exceptionRaised, bbPipelineFailedUpdate, bbPipelineOK); + + builder.SetInsertPoint(bbPipelineFailedUpdate); + builder.CreateStore(builder.CreateAdd(builder.CreateLoad(badRowCountVar), env().i64Const(1)), badRowCountVar); + builder.CreateBr(bbPipelineFailed); + + // pipeline ok + builder.SetInsertPoint(bbPipelineOK); + llvm::Value *normalRowCount = builder.CreateLoad(normalRowCountVar, "normalRowCount"); + builder.CreateStore(builder.CreateAdd(normalRowCount, env().i64Const(1)), normalRowCountVar); + builder.CreateBr(bbPipelineDone); + + // connect exception block to pipeline failure + builder.SetInsertPoint(lastExceptionBlock); + builder.CreateBr(bbPipelineDone); + + builder.SetInsertPoint(bbPipelineDone); + + // call runtime free all + _env->freeAll(builder); + } + + void ExceptionSourceTaskBuilder::createMainLoop(llvm::Function *read_block_func, bool terminateEarlyOnLimitCode) { + using namespace std; + using namespace llvm; + + assert(read_block_func); + + auto& context = env().getContext(); + + auto argUserData = arg("userData"); + auto argInPtr = arg("inPtr"); + auto argInSize = arg("inSize"); + auto argExpPtrs = arg("expPtrs"); + auto argExpPtrSizes = arg("expPtrSizes"); + auto argNumExps = arg("numExps"); + auto argOutNormalRowCount = arg("outNormalRowCount"); + auto argOutBadRowCount = arg("outBadRowCount"); + auto argIgnoreLastRow = arg("ignoreLastRow"); + + BasicBlock *bbBody = BasicBlock::Create(context, "entry", read_block_func); + + IRBuilder<> builder(bbBody); + + + // there should be a check if argInSize is 0 + // if so -> handle separately, i.e. return immediately +#warning "add here argInSize > 0 check" + + + // compute endptr from args + Value *endPtr = builder.CreateGEP(argInPtr, argInSize, "endPtr"); + Value *currentPtrVar = builder.CreateAlloca(env().i8ptrType(), 0, nullptr, "readPtrVar"); + // later use combi of normal & bad rows + Value *outRowCountVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "outRowCountVar"); // counter for output row number (used for exception resolution) + builder.CreateStore(argInPtr, currentPtrVar); + + Value *normalRowCountVar = argOutNormalRowCount; + Value *badRowCountVar = argOutBadRowCount; + builder.CreateStore(builder.CreateAdd(builder.CreateLoad(argOutBadRowCount), + builder.CreateLoad(argOutNormalRowCount)), outRowCountVar); + + // current index into array of exception partitions + auto curExpIndVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "curExpIndVar"); + builder.CreateStore(env().i64Const(0), curExpIndVar); + + // current partition pointer + auto curExpPtrVar = builder.CreateAlloca(env().i8ptrType(), 0, nullptr, "curExpPtrVar"); + builder.CreateStore(builder.CreateLoad(argExpPtrs), curExpPtrVar); + + // number of rows total in current partition + auto curExpNumRowsVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "curExpNumRowsVar"); + builder.CreateStore(builder.CreateLoad(argExpPtrSizes), curExpNumRowsVar); + + // current row number in current partition + auto curExpCurRowVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "curExpCurRowVar"); + builder.CreateStore(env().i64Const(0), curExpCurRowVar); + + // accumulator used to update exception indices when rows are filtered, counts number of previously fitlered rows + auto expAccVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "expAccVar"); + builder.CreateStore(env().i64Const(0), expAccVar); + + // used to see if rows are filtered during pipeline execution + auto prevRowNumVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "prevRowNumVar"); + auto prevBadRowNumVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "prevBadRowNumVar"); + + // current number of exceptions prosessed across all partitions + auto expCurRowVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "expCurRowVar"); + builder.CreateStore(env().i64Const(0), expCurRowVar); + + // get num rows to read & process in loop + Value *numRowsVar = builder.CreateAlloca(env().i64Type(), 0, nullptr, "numRowsVar"); + Value *input_ptr = builder.CreatePointerCast(argInPtr, env().i64Type()->getPointerTo(0)); + builder.CreateStore(builder.CreateLoad(input_ptr), numRowsVar); + // store current input ptr + Value *currentInputPtrVar = builder.CreateAlloca(env().i8ptrType(), 0, nullptr, "ptr"); + builder.CreateStore(builder.CreateGEP(argInPtr, env().i32Const(sizeof(int64_t))), currentInputPtrVar); + + + // variable for current row number... + Value *rowVar = builder.CreateAlloca(env().i64Type(), 0, nullptr); + builder.CreateStore(env().i64Const(0), rowVar); + + BasicBlock* bbLoopCondition = BasicBlock::Create(context, "loop_cond", read_block_func); + BasicBlock* bbLoopBody = BasicBlock::Create(context, "loop_body", read_block_func); + BasicBlock* bbLoopDone = BasicBlock::Create(context, "loop_done", read_block_func); + + // go from entry block to loop body + builder.CreateBr(bbLoopBody); + + // -------------- + // loop condition + builder.SetInsertPoint(bbLoopCondition); + Value *row = builder.CreateLoad(rowVar, "row"); + Value* nextRow = builder.CreateAdd(env().i64Const(1), row); + Value* numRows = builder.CreateLoad(numRowsVar, "numRows"); + builder.CreateStore(nextRow, rowVar, "row"); + auto cond = builder.CreateICmpSLT(nextRow, numRows); + builder.CreateCondBr(cond, bbLoopBody, bbLoopDone); + + + // --------- + // loop body + builder.SetInsertPoint(bbLoopBody); + // decode tuple from input ptr + FlattenedTuple ft(_env.get()); + ft.init(_inputRowType); + Value* oldInputPtr = builder.CreateLoad(currentInputPtrVar, "ptr"); + ft.deserializationCode(builder, oldInputPtr); + Value* newInputPtr = builder.CreateGEP(oldInputPtr, ft.getSize(builder)); // @TODO: maybe use inbounds + builder.CreateStore(newInputPtr, currentInputPtrVar); + + builder.CreateStore(builder.CreateLoad(outRowCountVar), prevRowNumVar); + builder.CreateStore(builder.CreateLoad(badRowCountVar), prevBadRowNumVar); + + // call function --> incl. exception handling + // process row here -- BEGIN + Value *inputRowSize = ft.getSize(builder); + processRow(builder, argUserData, ft, normalRowCountVar, badRowCountVar, outRowCountVar, oldInputPtr, inputRowSize, terminateEarlyOnLimitCode, pipeline() ? pipeline()->getFunction() : nullptr); + + // After row is processed we need to update exceptions if the row was filtered + // We check that: outRowCountVar == prevRowCountVar (no new row was emitted) + // badRowCountVar == prevBadRowNumVar (it was filtered, not just an exception) + // expCurRowVar < argNumExps (we still have exceptions that need updating) + // if (outRowCountVar == prevRowNumVar && badRowCountVar == prevBadRowNumVar && expCurRowVar < argNumExps) + auto bbExpUpdate = llvm::BasicBlock::Create(context, "exp_update", builder.GetInsertBlock()->getParent()); + auto expCond = builder.CreateICmpEQ(builder.CreateLoad(outRowCountVar), builder.CreateLoad(prevRowNumVar)); + auto badCond = builder.CreateICmpEQ(builder.CreateLoad(badRowCountVar), builder.CreateLoad(prevBadRowNumVar)); + auto remCond = builder.CreateICmpSLT(builder.CreateLoad(expCurRowVar), argNumExps); + builder.CreateCondBr(builder.CreateAnd(remCond, builder.CreateAnd(badCond, expCond)), bbExpUpdate, bbLoopCondition); + + // We have determined a row is filtered so we can iterate through all the input exceptions that occured before this + // row and decrement their row index with the number of previously filtered rows (expAccVar). + // This is a while loop that iterates over all exceptions that occured before this filtered row + // + // while (expCurRowVar < numExps && ((*outNormalRowCount - 1) + expCurRowVar) >= *((int64_t *) curExpPtrVar)) + // + // *outNormalRowCount - 1 changes cardinality of rows into its row index, we add the number of previously processed + // exceptions because the normal rows do not know about the exceptions to obtain the correct index. It's then compared + // against the row index of the exception pointed to currently in our partition + builder.SetInsertPoint(bbExpUpdate); + auto bbIncrement = llvm::BasicBlock::Create(context, "increment", builder.GetInsertBlock()->getParent()); + auto bbIncrementDone = llvm::BasicBlock::Create(context, "increment_done", builder.GetInsertBlock()->getParent()); + auto curExpRowIndPtr = builder.CreatePointerCast(builder.CreateLoad(curExpPtrVar), env().i64ptrType()); + auto incCond = builder.CreateICmpSGE(builder.CreateAdd(builder.CreateLoad(badRowCountVar), builder.CreateAdd(builder.CreateSub(builder.CreateLoad(normalRowCountVar), env().i64Const(1)), builder.CreateLoad(expCurRowVar))), builder.CreateLoad(curExpRowIndPtr)); + auto remCond2 = builder.CreateICmpSLT(builder.CreateLoad(expCurRowVar), argNumExps); + builder.CreateCondBr(builder.CreateAnd(remCond2, incCond), bbIncrement, bbIncrementDone); + + // Body of the while loop we need to + // 1. decrement the current exception row index by the expAccVar (all rows previously filtered) + // 2. Increment our partition pointer to next exception + // 3. Change partitions if we've exhausted all exceptions in the current, but still have more remaining in tototal + // + // Increment to the next exception by adding eSize and 4*sizeof(int64_t) to the partition pointer + // *((int64_t *) curExpPtrVar) -= expAccVar; + // curExpPtrVar += 4 * sizeof(int64_t) + ((int64_t *)curExpPtrVar)[3]; + // expCurRowVar += 1; + // curExpCurRowVar += 1; + // + // Finally we check to see if a partition change is required + // if (expCurRowVar < numExps && curExpCurRowVar >= curExpNumRowsVar) + builder.SetInsertPoint(bbIncrement); + // Change row index and go to next exception in partition + auto curExpRowIndPtr2 = builder.CreatePointerCast(builder.CreateLoad(curExpPtrVar), env().i64Type()->getPointerTo(0)); + builder.CreateStore(builder.CreateSub(builder.CreateLoad(curExpRowIndPtr2), builder.CreateLoad(expAccVar)), curExpRowIndPtr2); + auto curOffset = builder.CreateAlloca(env().i64Type(), 0, nullptr, "curOffset"); + builder.CreateStore(builder.CreateLoad(builder.CreateGEP(curExpRowIndPtr2, env().i64Const(3))), curOffset); + builder.CreateStore(builder.CreateAdd(builder.CreateLoad(curOffset), env().i64Const(4 * sizeof(int64_t))), curOffset); + builder.CreateStore(builder.CreateGEP(builder.CreateLoad(curExpPtrVar), builder.CreateLoad(curOffset)), curExpPtrVar); + builder.CreateStore(builder.CreateAdd(builder.CreateLoad(curExpCurRowVar), env().i64Const(1)), curExpCurRowVar); + builder.CreateStore(builder.CreateAdd(builder.CreateLoad(expCurRowVar), env().i64Const(1)), expCurRowVar); + // See if partition change needs to occur + auto bbChange = llvm::BasicBlock::Create(context, "change", builder.GetInsertBlock()->getParent()); + auto changeCond = builder.CreateICmpSGE(builder.CreateLoad(curExpCurRowVar), builder.CreateLoad(curExpNumRowsVar)); + auto leftCond = builder.CreateICmpSLT(builder.CreateLoad(expCurRowVar), argNumExps); + builder.CreateCondBr(builder.CreateAnd(leftCond, changeCond), bbChange, bbExpUpdate); + + // This block changes to the next partition + // curExpCurRowVar = 0; + // curExpIndVar = curExpIndVar + 1; + // curExpPtrVar = expPtrs[curExpIndVar]; + // curExpNumRowsVar = expPtrSizes[curExpIndVar]; + builder.SetInsertPoint(bbChange); + builder.CreateStore(env().i64Const(0), curExpCurRowVar); + builder.CreateStore(builder.CreateAdd(builder.CreateLoad(curExpIndVar), env().i64Const(1)), curExpIndVar); + builder.CreateStore(builder.CreateLoad(builder.CreateGEP(argExpPtrs, builder.CreateLoad(curExpIndVar))), curExpPtrVar); + builder.CreateStore(builder.CreateLoad(builder.CreateGEP(argExpPtrSizes, builder.CreateLoad(curExpIndVar))), curExpNumRowsVar); + builder.CreateBr(bbExpUpdate); + + // Finally increment the expAccVar by 1 becasue a row was filtered + // expAccVar += 1; + builder.SetInsertPoint(bbIncrementDone); + builder.CreateStore(builder.CreateAdd(builder.CreateLoad(expAccVar), env().i64Const(1)), expAccVar); + builder.CreateBr(bbLoopCondition); + + // --------- + // loop done + builder.SetInsertPoint(bbLoopDone); + auto bbRemainingExceptions = llvm::BasicBlock::Create(context, "remaining_exceptions", builder.GetInsertBlock()->getParent()); + auto bbRemainingDone = llvm::BasicBlock::Create(context, "remaining_done", builder.GetInsertBlock()->getParent()); + auto expRemaining = builder.CreateICmpSLT(builder.CreateLoad(expCurRowVar), argNumExps); + builder.CreateCondBr(expRemaining, bbRemainingExceptions, bbRemainingDone); + + // We have processed all of the normal rows. If we have not exhausted all of our exceptions + // we just iterate through the remaining exceptions and decrement their row index by the final + // value of expAccVar counting our filtered rows. + // Same code as above, but just don't need to keep updating expAccVar by 1. + builder.SetInsertPoint(bbRemainingExceptions); + auto curExpRowIndPtr3 = builder.CreatePointerCast(builder.CreateLoad(curExpPtrVar), env().i64Type()->getPointerTo(0)); + builder.CreateStore(builder.CreateSub(builder.CreateLoad(curExpRowIndPtr3), builder.CreateLoad(expAccVar)), curExpRowIndPtr3); + auto curOffset2 = builder.CreateAlloca(env().i64Type(), 0, nullptr, "curOffset2"); + builder.CreateStore(builder.CreateLoad(builder.CreateGEP(curExpRowIndPtr3, env().i64Const(3))), curOffset2); + builder.CreateStore(builder.CreateAdd(builder.CreateLoad(curOffset2), env().i64Const(4 * sizeof(int64_t))), curOffset2); + builder.CreateStore(builder.CreateGEP(builder.CreateLoad(curExpPtrVar), builder.CreateLoad(curOffset2)), curExpPtrVar); + builder.CreateStore(builder.CreateAdd(builder.CreateLoad(curExpCurRowVar), env().i64Const(1)), curExpCurRowVar); + builder.CreateStore(builder.CreateAdd(builder.CreateLoad(expCurRowVar), env().i64Const(1)), expCurRowVar); + + auto bbChange2 = llvm::BasicBlock::Create(context, "change2", builder.GetInsertBlock()->getParent()); + auto changeCond2 = builder.CreateICmpSGE(builder.CreateLoad(curExpCurRowVar), builder.CreateLoad(curExpNumRowsVar)); + auto leftCond2 = builder.CreateICmpSLT(builder.CreateLoad(expCurRowVar), argNumExps); + builder.CreateCondBr(builder.CreateAnd(leftCond2, changeCond2), bbChange2, bbLoopDone); + + builder.SetInsertPoint(bbChange2); + builder.CreateStore(env().i64Const(0), curExpCurRowVar); + builder.CreateStore(builder.CreateAdd(builder.CreateLoad(curExpIndVar), env().i64Const(1)), curExpIndVar); + builder.CreateStore(builder.CreateLoad(builder.CreateGEP(argExpPtrs, builder.CreateLoad(curExpIndVar))), curExpPtrVar); + builder.CreateStore(builder.CreateLoad(builder.CreateGEP(argExpPtrSizes, builder.CreateLoad(curExpIndVar))), curExpNumRowsVar); + builder.CreateBr(bbLoopDone); + + builder.SetInsertPoint(bbRemainingDone); + // if intermediate callback desired, perform! + if(_intermediateType != python::Type::UNKNOWN && !_intermediateCallbackName.empty()) { + writeIntermediate(builder, argUserData, _intermediateCallbackName); + } + + env().storeIfNotNull(builder, builder.CreateLoad(normalRowCountVar), argOutNormalRowCount); + env().storeIfNotNull(builder, builder.CreateLoad(badRowCountVar), argOutBadRowCount); + + // return bytes read + Value* curPtr = builder.CreateLoad(currentInputPtrVar, "ptr"); + Value* bytesRead = builder.CreateSub(builder.CreatePtrToInt(curPtr, env().i64Type()), builder.CreatePtrToInt(argInPtr, env().i64Type())); + builder.CreateRet(bytesRead); + } + } +} \ No newline at end of file diff --git a/tuplex/core/src/physical/PhysicalPlan.cc b/tuplex/core/src/physical/PhysicalPlan.cc index 22bdad166..2399edf6f 100644 --- a/tuplex/core/src/physical/PhysicalPlan.cc +++ b/tuplex/core/src/physical/PhysicalPlan.cc @@ -62,6 +62,9 @@ namespace tuplex { EndPointMode outMode=EndPointMode::UNKNOWN) { using namespace std; + // Indicators for stage + bool hasFilter = false; + bool hasInputExceptions = false; // step 1: break up pipeline according to cost model // go through nodes queue q; q.push(root); @@ -113,6 +116,8 @@ namespace tuplex { throw std::runtime_error("unsupported aggregate type found"); } } else { // follow tree + if (node->type() == LogicalOperatorType::FILTER) + hasFilter = true; q.push(node->parent()); } @@ -193,6 +198,10 @@ namespace tuplex { // type should be parallelize or cache! auto t = ops.front()->type(); assert(t == LogicalOperatorType::PARALLELIZE || t == LogicalOperatorType::CACHE); + if (t == LogicalOperatorType::PARALLELIZE) + hasInputExceptions = !((ParallelizeOperator *)ops.front())->getPythonObjects().empty(); + if (t == LogicalOperatorType::CACHE) + hasInputExceptions = !((CacheOperator *)ops.front())->cachedExceptions().empty(); } } @@ -226,6 +235,10 @@ namespace tuplex { outputMode = outMode; } + // Need to update indices input exceptions in the case that input exceptions exist, the pipeline has filters, and the + // user wants to merge exceptions in order. + bool updateInputExceptions = hasFilter && hasInputExceptions && _context.getOptions().OPT_MERGE_EXCEPTIONS_INORDER(); + // create trafostage via builder pattern auto builder = codegen::StageBuilder(_num_stages++, isRootStage, @@ -233,7 +246,8 @@ namespace tuplex { _context.getOptions().OPT_GENERATE_PARSER(), _context.getOptions().NORMALCASE_THRESHOLD(), _context.getOptions().OPT_SHARED_OBJECT_PROPAGATION(), - _context.getOptions().OPT_NULLVALUE_OPTIMIZATION()); + _context.getOptions().OPT_NULLVALUE_OPTIMIZATION(), + updateInputExceptions); // start code generation // first, add input @@ -388,10 +402,13 @@ namespace tuplex { if (inputNode->type() == LogicalOperatorType::PARALLELIZE) { auto pop = dynamic_cast(inputNode); assert(inputNode); stage->setInputPartitions(pop->getPartitions()); + stage->setInputExceptions(pop->getPythonObjects()); + stage->setPartitionToExceptionsMap(pop->getInputPartitionToPythonObjectsMap()); } else if(inputNode->type() == LogicalOperatorType::CACHE) { auto cop = dynamic_cast(inputNode); assert(inputNode); stage->setInputPartitions(cop->cachedPartitions()); stage->setInputExceptions(cop->cachedExceptions()); + stage->setPartitionToExceptionsMap(cop->partitionToExceptionsMap()); } else if(inputNode->type() == LogicalOperatorType::FILEINPUT) { auto csvop = dynamic_cast(inputNode); stage->setInputFiles(csvop->getURIs(), csvop->getURISizes()); diff --git a/tuplex/core/src/physical/PythonPipelineBuilder.cc b/tuplex/core/src/physical/PythonPipelineBuilder.cc index 532174abf..7b945effd 100644 --- a/tuplex/core/src/physical/PythonPipelineBuilder.cc +++ b/tuplex/core/src/physical/PythonPipelineBuilder.cc @@ -108,13 +108,8 @@ namespace tuplex { " else:\n" " return f(row)\n" " else:\n" - " try:\n" - " # call with default mode using tuple as base element essentially\n" - " return f(row)\n" - " except Exception as te:\n" - " # single op error?\n" - " # try unwrapped...\n" - " return f(row.data[0])"; + " # unwrap single element tuples.\n" + " return f(row.data[0])\n"; _header += applyCode; // some standard packages to import so stuff works... diff --git a/tuplex/core/src/physical/ResolveTask.cc b/tuplex/core/src/physical/ResolveTask.cc index 3932b2e88..6ae6723f0 100644 --- a/tuplex/core/src/physical/ResolveTask.cc +++ b/tuplex/core/src/physical/ResolveTask.cc @@ -228,7 +228,8 @@ namespace tuplex { // needs to be put into separate list of python objects... // save index as well to merge back in order. - _py_nonconfirming.push_back(std::make_tuple(_rowNumber, out_row)); + assert(_rowNumber >= _numUnresolved); + _py_nonconfirming.push_back(std::make_tuple(_rowNumber - _numUnresolved, out_row)); } int64_t ResolveTask::mergeNormalRow(const uint8_t *buf, int64_t bufSize) { @@ -364,6 +365,7 @@ namespace tuplex { switch(ec) { case ExceptionCode::BADPARSE_STRING_INPUT: case ExceptionCode::NORMALCASEVIOLATION: + case ExceptionCode::PYTHON_PARALLELIZE: return true; default: return false; @@ -410,13 +412,14 @@ namespace tuplex { // and true exception, i.e. no resolvers available. // => need a list of for which opIds/codes resolvers are available... ///.... + _numUnresolved++; exceptionCallback(ecCode, operatorID, _rowNumber, ebuf, eSize); return; } // fallback 1: slow, compiled code path int resCode = -1; - if(_functor) { + if(_functor && ecCode != ecToI32(ExceptionCode::PYTHON_PARALLELIZE)) { resCode = _functor(this, _rowNumber, ecCode, ebuf, eSize); // uncomment to print out details on demand // if(resCode != 0) { @@ -431,6 +434,9 @@ namespace tuplex { #endif } resCode = -1; + // exception occured that is not a schema violation so row will not be present in output + } else if (resCode != 0) { + _numUnresolved++; } } @@ -460,6 +466,10 @@ namespace tuplex { tuple = python::rowToPython(row, true); parse_cells = false; // called below... + } else if (ecCode == ecToI64(ExceptionCode::PYTHON_PARALLELIZE)) { + auto pyObj = python::deserializePickledObject(python::getMainModule(), (char *) ebuf, eSize); + tuple = pyObj; + parse_cells = false; } else { // normal case, i.e. an exception occurred somewhere. // --> this means if pipeline is using string as input, we should convert @@ -563,7 +573,14 @@ namespace tuplex { // normal, check type and either merge to normal set back OR onto python set together with row number? auto resultRows = PyDict_GetItemString(pcr.res, "outputRows"); assert(PyList_Check(resultRows)); - for(int i = 0; i < PyList_Size(resultRows); ++i) { + + auto listSize = PyList_Size(resultRows); + // No rows were created, meaning the row was filtered out + if (0 == listSize) { + _numUnresolved++; + } + + for(int i = 0; i < listSize; ++i) { // type check w. output schema // cf. https://pythonextensionpatterns.readthedocs.io/en/latest/refcount.html auto rowObj = PyList_GetItem(resultRows, i); @@ -658,6 +675,7 @@ namespace tuplex { // fallback 3: still exception? save... if(resCode == -1) { + _numUnresolved++; exceptionCallback(ecCode, operatorID, _rowNumber, ebuf, eSize); } } @@ -693,7 +711,7 @@ namespace tuplex { } // abort if no exceptions! - if(_exceptions.empty()) + if(_runtimeExceptions.empty() && _numInputExceptions == 0) return; // special case: no functor & no python pipeline functor given @@ -706,12 +724,12 @@ namespace tuplex { #endif // copy _generalCasePartitions over to base class - IExceptionableTask::setExceptions(_exceptions); + IExceptionableTask::setExceptions(_runtimeExceptions); // clear exceptions, because they have been resolved (or put to new exceptions!) // if task produced exceptions, they are stored in the IExceptionableTask class! // => no need to overwrite them, getter for iexceptionabletask has all info! - _exceptions.clear(); + _runtimeExceptions.clear(); _wallTime = timer.time(); return; @@ -724,7 +742,7 @@ namespace tuplex { // merge exceptions with normal rows after calling slow code over them... // basic idea is go over all exception partitions, execute row wise the resolution function // and merge the result back to the partitions - for(auto partition : _exceptions) { + for(auto partition : _runtimeExceptions) { const uint8_t *ptr = partition->lockRaw(); int64_t numRows = *((int64_t *) ptr); ptr += sizeof(int64_t); @@ -766,6 +784,38 @@ namespace tuplex { partition->invalidate(); } + // now process all of the input exceptions + if (_numInputExceptions > 0) { + // Initialize input exception to starting index + auto partition = _inputExceptions[_inputExceptionIndex]; + auto rowsLeftInPartition = partition->getNumRows() - _inputExceptionRowOffset; + const uint8_t *ptr = partition->lock() + _inputExceptionByteOffset; + + // Iterate over all input exceptions, may be accross multiple partitions + for (int i = 0; i < _numInputExceptions; ++i) { + // Change partition once exhausted + if (rowsLeftInPartition == 0) { + partition->unlock(); + _inputExceptionIndex++; + partition = _inputExceptions[_inputExceptionIndex]; + rowsLeftInPartition = partition->getNumRows(); + ptr = partition->lock(); + } + + const uint8_t *ebuf = nullptr; + int64_t ecCode = -1, operatorID = -1; + size_t eSize = 0; + auto delta = deserializeExceptionFromMemory(ptr, &ecCode, &operatorID, &_currentRowNumber, &ebuf, + &eSize); + processExceptionRow(ecCode, operatorID, ebuf, eSize); + ptr += delta; + _rowNumber++; + rowsLeftInPartition--; + } + // Unlock but wait to invalidate until all resolve tasks have finished + partition->unlock(); + } + // merging is done, unlock the last partition & copy the others over. unlockAll(); @@ -782,7 +832,8 @@ namespace tuplex { // clear exceptions, because they have been resolved (or put to new exceptions!) // if task produced exceptions, they are stored in the IExceptionableTask class! // => no need to overwrite them, getter for iexceptionabletask has all info! - _exceptions.clear(); + _runtimeExceptions.clear(); + _inputExceptions.clear(); } else { executeInOrder(); } @@ -806,8 +857,7 @@ namespace tuplex { void ResolveTask::executeInOrder() { auto& logger = Logger::instance().logger("resolve task"); - // two options: either only exceptions OR - // both exceptions and normal rows + // Determine if normal partitions exist if(!_partitions.empty()) { // merge normal partitions and resolved ones (incl. lookup) @@ -817,113 +867,185 @@ namespace tuplex { // ready normal partition for merge _currentNormalPartitionIdx = 0; _normalPtr = _partitions[_currentNormalPartitionIdx]->lockRaw(); - _normalNumRows = *((int64_t*)_normalPtr); _normalPtr += sizeof(int64_t); + _normalNumRows = *((int64_t *) _normalPtr); + _normalPtr += sizeof(int64_t); _normalPtrBytesRemaining = _partitions[_currentNormalPartitionIdx]->bytesWritten(); _normalRowNumber = 0; _rowNumber = 0; + } else { + _currentNormalPartitionIdx = 0; + _normalPtr = nullptr; + _normalPtrBytesRemaining = 0; + _normalNumRows = 0; + _normalRowNumber = 0; + _rowNumber = 0; + } - // merge exceptions with normal rows after calling slow code over them... - // basic idea is go over all exception partitions, execute row wise the resolution function - // and merge the result back to the partitions - for(auto partition : _exceptions) { - const uint8_t* ptr = partition->lockRaw(); - int64_t numRows = *((int64_t*)ptr); ptr += sizeof(int64_t); - - for(int i = 0; i < numRows; ++i) { + // Initialize runtime exception variables + size_t curRuntimePartitionInd = 0; // current index into vector of runtime exception partitions + int64_t numRuntimeRowsLeftInPartition = 0; // number of rows remaining in partition + const uint8_t *runPtr = nullptr; + if (_runtimeExceptions.size() > 0) { + curRuntimePartitionInd = 0; + numRuntimeRowsLeftInPartition = _runtimeExceptions[curRuntimePartitionInd]->getNumRows(); + runPtr = _runtimeExceptions[curRuntimePartitionInd]->lock(); + } - // old - // _currentRowNumber = *((int64_t*)ptr); - // ptr += sizeof(int64_t); - // int64_t ecCode = *((int64_t*)ptr); - // ptr += sizeof(int64_t); - // int64_t operatorID = *((int64_t*)ptr); - // ptr += sizeof(int64_t); - // int64_t eSize = *((int64_t*)ptr); - // ptr += sizeof(int64_t); + // Initialize input exception variables + size_t curInputPartitionInd = 0; // current index into vector of input exception partitions + int64_t numInputRowsLeftInPartition = 0; // number of rows remaining in partition + const uint8_t *inputPtr = nullptr; + if (_numInputExceptions > 0) { + curInputPartitionInd = _inputExceptionIndex; + numInputRowsLeftInPartition = _inputExceptions[curInputPartitionInd]->getNumRows() - _inputExceptionRowOffset; + inputPtr = _inputExceptions[curInputPartitionInd]->lock() + _inputExceptionByteOffset; + } - const uint8_t *ebuf = nullptr; - int64_t ecCode = -1, operatorID = -1; - size_t eSize = 0; - auto delta = deserializeExceptionFromMemory(ptr, &ecCode, &operatorID, &_currentRowNumber, &ebuf, - &eSize); + // Merge input and runtime exceptions in order. To do so, we can compare the row indices of the + // current runtime and input exception and process the one that occurs first. The saved row indices of + // runtime exceptions do not account for the existence of input exceptions, so we need to add the previous + // input exceptions to compare the true row number + size_t inputRowsProcessed = 0; + const uint8_t *ptr = nullptr; + while (runPtr && inputPtr) { + auto runRowInd = *((int64_t *) runPtr); // get current runtime row index + auto inputRowInd = *((int64_t *) inputPtr); // get current input row index + bool isRuntimeException = false; + // compare indices with accounting for previous input exceptions + if (runRowInd + inputRowsProcessed < inputRowInd) { + ptr = runPtr; + numRuntimeRowsLeftInPartition--; + isRuntimeException = true; + } else { + ptr = inputPtr; + numInputRowsLeftInPartition--; + inputRowsProcessed++; + } - processExceptionRow(ecCode, operatorID, ebuf, eSize); - ptr += delta; + const uint8_t *ebuf = nullptr; + int64_t ecCode = -1, operatorID = -1; + size_t eSize = 0; + auto delta = deserializeExceptionFromMemory(ptr, &ecCode, &operatorID, &_currentRowNumber, &ebuf, + &eSize); - // logger.debug("processing potential output row #" + std::to_string(_currentRowNumber)); + if (isRuntimeException) { + _currentRowNumber += inputRowsProcessed; + runPtr += delta; + } else { + inputPtr += delta; + } - // only inc row number if exception occurred or row was written... - _rowNumber++; + processExceptionRow(ecCode, operatorID, ebuf, eSize); + _rowNumber++; + + // Exhausted current runtime exceptions, need to switch partitions + if (numRuntimeRowsLeftInPartition == 0) { + _runtimeExceptions[curRuntimePartitionInd]->unlock(); + _runtimeExceptions[curRuntimePartitionInd]->invalidate(); + curRuntimePartitionInd++; + // Still have more exceptions to go through + if (curRuntimePartitionInd < _runtimeExceptions.size()) { + numRuntimeRowsLeftInPartition = _runtimeExceptions[curRuntimePartitionInd]->getNumRows(); + runPtr = _runtimeExceptions[curRuntimePartitionInd]->lock(); + } else { + // processed all exceptions + runPtr = nullptr; } - partition->unlock(); } - // add remaining normal rows & partitions to merged partitions - while(_normalRowNumber < _normalNumRows) { - // trick: to get row size, you know number of normal elements + variable length! - // ==> can be used for quick merging! - size_t size = readOutputRowSize(_normalPtr, _normalPtrBytesRemaining); - - writeRow(_normalPtr, size); - _normalPtr += size; - _normalPtrBytesRemaining -= size; - _normalRowNumber++; + // Exhausted current input exceptions, need to switch partitions + if (numInputRowsLeftInPartition == 0 || inputRowsProcessed == _numInputExceptions) { + _inputExceptions[curInputPartitionInd]->unlock(); + curInputPartitionInd++; + // Still have more exceptions to go through + if (curInputPartitionInd < _inputExceptions.size() && inputRowsProcessed < _numInputExceptions) { + numInputRowsLeftInPartition = _inputExceptions[curInputPartitionInd]->getNumRows(); + inputPtr = _inputExceptions[curInputPartitionInd]->lock(); + } else { + // processed all exceptions + inputPtr = nullptr; + } } + } - _partitions[_currentNormalPartitionIdx]->unlock(); - - // merging is done, unlock the last partition & copy the others over. - unlockAll(); - - for(int i = _currentNormalPartitionIdx + 1; i < _partitions.size(); ++i) { - _mergedRowsSink.unlock(); - _mergedRowsSink.partitions.push_back(_partitions[i]); - //_mergedPartitions.emplace_back(_normalCasePartitions[i]); + // Process remaining runtime exceptions if any exist + while (runPtr) { + const uint8_t *ebuf = nullptr; + int64_t ecCode = -1, operatorID = -1; + size_t eSize = 0; + auto delta = deserializeExceptionFromMemory(runPtr, &ecCode, &operatorID, &_currentRowNumber, &ebuf, + &eSize); + _currentRowNumber += inputRowsProcessed; + processExceptionRow(ecCode, operatorID, ebuf, eSize); + runPtr += delta; + _rowNumber++; + + numRuntimeRowsLeftInPartition--; + // Exhausted current runtime exceptions in partitions need to switch partitions or could be done + if (numRuntimeRowsLeftInPartition == 0) { + _runtimeExceptions[curRuntimePartitionInd]->unlock(); + _runtimeExceptions[curRuntimePartitionInd]->invalidate(); + curRuntimePartitionInd++; + // More exceptions to process + if (curRuntimePartitionInd < _runtimeExceptions.size()) { + numRuntimeRowsLeftInPartition = _runtimeExceptions[curRuntimePartitionInd]->getNumRows(); + runPtr = _runtimeExceptions[curRuntimePartitionInd]->lock(); + } else { + // processed all exceptions + runPtr = nullptr; + } } - } else { - // only exceptions, trivial to resolve. => i.e. convert them all to normal partitions... + } - _currentNormalPartitionIdx = 0; - _normalPtr = nullptr; - _normalPtrBytesRemaining = 0; - _normalNumRows = 0; - _normalRowNumber = 0; - _rowNumber = 0; + // Process remaining input exceptions if any exist + while (inputPtr) { + const uint8_t *ebuf = nullptr; + int64_t ecCode = -1, operatorID = -1; + size_t eSize = 0; + auto delta = deserializeExceptionFromMemory(inputPtr, &ecCode, &operatorID, &_currentRowNumber, &ebuf, + &eSize); + processExceptionRow(ecCode, operatorID, ebuf, eSize); + inputPtr += delta; + _rowNumber++; + + numInputRowsLeftInPartition--; + inputRowsProcessed++; + // Exhausted current input exceptions, need to switch partitions + if (numInputRowsLeftInPartition == 0 || inputRowsProcessed == _numInputExceptions) { + _inputExceptions[curInputPartitionInd]->unlock(); + curInputPartitionInd++; + // Still have more exceptions + if (curInputPartitionInd < _inputExceptions.size() && inputRowsProcessed < _numInputExceptions) { + numInputRowsLeftInPartition = _inputExceptions[curInputPartitionInd]->getNumRows(); + inputPtr = _inputExceptions[curInputPartitionInd]->lock(); + } else { + // processed all exceptions + inputPtr = nullptr; + } + } + } - // merge exceptions with normal rows after calling slow code over them... - // basic idea is go over all exception partitions, execute row wise the resolution function - // and merge the result back to the partitions - for(auto partition : _exceptions) { - const uint8_t* ptr = partition->lockRaw(); - int64_t numRows = *((int64_t*)ptr); ptr += sizeof(int64_t); + // add remaining normal rows & partitions to merged partitions + while(_normalRowNumber < _normalNumRows) { + // trick: to get row size, you know number of normal elements + variable length! + // ==> can be used for quick merging! + size_t size = readOutputRowSize(_normalPtr, _normalPtrBytesRemaining); - for(int i = 0; i < numRows; ++i) { + writeRow(_normalPtr, size); + _normalPtr += size; + _normalPtrBytesRemaining -= size; + _normalRowNumber++; + } - // old - // _currentRowNumber = *((int64_t*)ptr); - // ptr += sizeof(int64_t); - // int64_t ecCode = *((int64_t*)ptr); - // ptr += sizeof(int64_t); - // int64_t operatorID = *((int64_t*)ptr); - // ptr += sizeof(int64_t); - // int64_t eSize = *((int64_t*)ptr); - // ptr += sizeof(int64_t); - const uint8_t *ebuf = nullptr; - int64_t ecCode = -1, operatorID = -1; - size_t eSize = 0; - auto delta = deserializeExceptionFromMemory(ptr, &ecCode, &operatorID, &_currentRowNumber, &ebuf, - &eSize); - processExceptionRow(ecCode, operatorID, ebuf, eSize); - ptr += delta; - // old - // ptr += eSize; - } - partition->unlock(); - } + if (!_partitions.empty()) + _partitions[_currentNormalPartitionIdx]->unlock(); + // merging is done, unlock the last partition & copy the others over. + unlockAll(); - // done, unlock all partitions - unlockAll(); + for(int i = _currentNormalPartitionIdx + 1; i < _partitions.size(); ++i) { + _mergedRowsSink.unlock(); + _mergedRowsSink.partitions.push_back(_partitions[i]); } // overwrite merged partitions (& in future also exceptions!!!) @@ -932,7 +1054,7 @@ namespace tuplex { // clear exceptions, because they have been resolved (or put to new exceptions!) // if task produced exceptions, they are stored in the IExceptionableTask class! // => no need to overwrite them, getter for iexceptionabletask has all info! - _exceptions.clear(); + _runtimeExceptions.clear(); } void ResolveTask::sendStatusToHistoryServer() { diff --git a/tuplex/core/src/physical/ResultSet.cc b/tuplex/core/src/physical/ResultSet.cc index c893af81c..0f7bf7319 100644 --- a/tuplex/core/src/physical/ResultSet.cc +++ b/tuplex/core/src/physical/ResultSet.cc @@ -15,6 +15,7 @@ namespace tuplex { ResultSet::ResultSet(const Schema& schema, const std::vector& partitions, const std::vector& exceptions, + const std::unordered_map& partitionToExceptionsMap, const std::vector> pyobjects, int64_t maxRows) : ResultSet::ResultSet() { for(Partition *p : partitions) @@ -22,6 +23,7 @@ namespace tuplex { _pyobjects = std::deque>(pyobjects.begin(), pyobjects.end()); _exceptions = exceptions; + _partitionToExceptionsMap = partitionToExceptionsMap; _curRowCounter = 0; _totalRowCounter = 0; _byteCounter = 0; diff --git a/tuplex/core/src/physical/StageBuilder.cc b/tuplex/core/src/physical/StageBuilder.cc index 77473d565..72f01e2b8 100644 --- a/tuplex/core/src/physical/StageBuilder.cc +++ b/tuplex/core/src/physical/StageBuilder.cc @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -44,10 +45,11 @@ namespace tuplex { bool generateParser, double normalCaseThreshold, bool sharedObjectPropagation, - bool nullValueOptimization) + bool nullValueOptimization, + bool updateInputExceptions) : _stageNumber(stage_number), _isRootStage(rootStage), _allowUndefinedBehavior(allowUndefinedBehavior), _generateParser(generateParser), _normalCaseThreshold(normalCaseThreshold), _sharedObjectPropagation(sharedObjectPropagation), - _nullValueOptimization(nullValueOptimization), + _nullValueOptimization(nullValueOptimization), _updateInputExceptions(updateInputExceptions), _inputNode(nullptr), _outputLimit(std::numeric_limits::max()) { } @@ -1018,7 +1020,10 @@ namespace tuplex { } } else { // tuplex (in-memory) reader - tb = make_shared(env, inSchema, funcStageName); + if (_updateInputExceptions) + tb = make_shared(env, inSchema, funcStageName); + else + tb = make_shared(env, inSchema, funcStageName); } // set pipeline and @@ -1438,6 +1443,7 @@ namespace tuplex { stage->_irBitCode = _irBitCode; stage->_pyCode = _pyCode; stage->_pyPipelineName = _pyPipelineName; + stage->_updateInputExceptions = _updateInputExceptions; // if last op is CacheOperator, check whether normal/exceptional case should get cached separately // or an upcasting step should be performed. diff --git a/tuplex/core/src/physical/TransformStage.cc b/tuplex/core/src/physical/TransformStage.cc index 80d17fce1..b61f9cbe2 100644 --- a/tuplex/core/src/physical/TransformStage.cc +++ b/tuplex/core/src/physical/TransformStage.cc @@ -120,11 +120,11 @@ namespace tuplex { void TransformStage::setMemoryResult(const std::vector &partitions, const std::vector& generalCase, + const std::unordered_map& partitionToExceptionsMap, const std::vector>& interpreterRows, const std::vector& remainingExceptions, const std::unordered_map, size_t> &ecounts) { setExceptionCounts(ecounts); - _unresolved_exceptions = generalCase; if (partitions.empty() && interpreterRows.empty() && generalCase.empty()) _rs = emptyResultSet(); @@ -159,7 +159,7 @@ namespace tuplex { // put ALL partitions to result set _rs = std::make_shared(schema, limitedPartitions, - generalCase, interpreterRows, + generalCase, partitionToExceptionsMap, interpreterRows, outputLimit()); } } @@ -674,7 +674,6 @@ namespace tuplex { // execute stage via backend backend()->execute(this); - // free hashmaps of dependents (b.c. it's a tree this is ok) if(numArgs > 0) { for(int i = 0; i < numPreds; ++i) { @@ -824,8 +823,10 @@ namespace tuplex { logger.info("first compile done"); // fetch symbols (this actually triggers the compilation first with register alloc etc.) - if(!_syms->functor) + if(!_syms->functor && !_updateInputExceptions) _syms->functor = reinterpret_cast(jit.getAddrOfSymbol(funcName())); + if(!_syms->functorWithExp && _updateInputExceptions) + _syms->functorWithExp = reinterpret_cast(jit.getAddrOfSymbol(funcName())); logger.info("functor " + funcName() + " retrieved from llvm"); if(_outputMode == EndPointMode::FILE && !_syms->writeFunctor) _syms->writeFunctor = reinterpret_cast(jit.getAddrOfSymbol(writerFuncName())); @@ -850,7 +851,12 @@ namespace tuplex { } // check symbols are valid... - if(!(_syms->functor && _syms->initStageFunctor && _syms->releaseStageFunctor)) { + bool hasValidFunctor = true; + if (_updateInputExceptions && !_syms->functorWithExp) + hasValidFunctor = false; + if (!_updateInputExceptions && !_syms->functor) + hasValidFunctor = false; + if(!hasValidFunctor && _syms->initStageFunctor && _syms->releaseStageFunctor) { logger.error("invalid pointer address for JIT code returned"); throw std::runtime_error("invalid pointer address for JIT code returned"); } diff --git a/tuplex/core/src/physical/TransformTask.cc b/tuplex/core/src/physical/TransformTask.cc index 494cb5964..c560c4af4 100644 --- a/tuplex/core/src/physical/TransformTask.cc +++ b/tuplex/core/src/physical/TransformTask.cc @@ -405,7 +405,10 @@ namespace tuplex { } else if(hasMemorySource()) { if(_inputPartitions.empty()) throw std::runtime_error("no input partition assigned!"); - processMemorySource(); + if (_updateInputExceptions) + processMemorySourceWithExp(); + else + processMemorySource(); } else { throw std::runtime_error("no source (file/memory) specified, error!"); } @@ -529,6 +532,82 @@ namespace tuplex { _outputRowCounter = 0; } + void TransformTask::processMemorySourceWithExp() { + assert(!_inputPartitions.empty()); + assert(_functor); + + _numInputRowsRead = 0; + _numOutputRowsWritten = 0; + + int64_t num_normal_rows = 0, num_bad_rows = 0; + + auto functor = reinterpret_cast(_functor); + + auto numInputExceptions = _inputExceptionInfo.numExceptions; + auto inputExceptionIndex = _inputExceptionInfo.exceptionIndex; + auto inputExceptionRowOffset = _inputExceptionInfo.exceptionRowOffset; + auto inputExceptionByteOffset = _inputExceptionInfo.exceptionByteOffset; + + // First, prepare the input exception partitions to pass into the code-gen + // This is done to simplify the LLVM code. We will end up passing it an + // array of expPtrs which point to the first exception in their partition + // and expPtrSizes which tell how many exceptions are in that partition. + auto arrSize = _inputExceptions.size() - inputExceptionIndex; + auto expPtrs = new uint8_t*[arrSize]; + auto expPtrSizes = new int64_t[arrSize]; + int expInd = 0; + // Iterate through all exception partitions beginning at the one specified by the starting index + for (int i = inputExceptionIndex; i < _inputExceptions.size(); ++i) { + auto numRows = _inputExceptions[i]->getNumRows(); + auto ptr = _inputExceptions[i]->lock(); + // If its the first partition, we need to account for the offset + if (i == inputExceptionIndex) { + numRows -= inputExceptionRowOffset; + ptr += inputExceptionByteOffset; + } + expPtrSizes[expInd] = numRows; + expPtrs[expInd] = (uint8_t *) ptr; + expInd++; + } + + // go over all input partitions. + for(auto inputPartition : _inputPartitions) { + // lock ptr, extract number of rows ==> store them + // lock raw & call functor! + int64_t inSize = inputPartition->size(); + const uint8_t *inPtr = inputPartition->lockRaw(); + _numInputRowsRead += static_cast(*((int64_t*)inPtr)); + + // call functor + auto bytesParsed = functor(this, inPtr, inSize, expPtrs, expPtrSizes, numInputExceptions, &num_normal_rows, &num_bad_rows, false); + + // save number of normal rows to output rows written if not writeTofile + if(hasMemorySink()) + _numOutputRowsWritten += num_normal_rows; + + // unlock memory sinks if necessary + unlockAllMemorySinks(); + + inputPartition->unlock(); + + // delete partition if desired... + if(_invalidateSourceAfterUse) + inputPartition->invalidate(); + } + + delete[] expPtrs; + delete[] expPtrSizes; + + for (int i = inputExceptionIndex; i < _inputExceptions.size(); ++i) { + _inputExceptions[i]->unlock(); + } + +#ifndef NDEBUG + owner()->info("Trafo task memory source exhausted (" + pluralize(_inputPartitions.size(), "partition") + ", " + + pluralize(num_normal_rows, "normal row") + ", " + pluralize(num_bad_rows, "exceptional row") + ")"); +#endif + } + void TransformTask::processMemorySource() { assert(!_inputPartitions.empty()); assert(_functor); @@ -541,7 +620,7 @@ namespace tuplex { auto functor = reinterpret_cast(_functor); // go over all input partitions. - for(auto inputPartition : _inputPartitions) { + for(const auto &inputPartition : _inputPartitions) { // lock ptr, extract number of rows ==> store them // lock raw & call functor! int64_t inSize = inputPartition->size(); diff --git a/tuplex/python/CMakeLists.txt b/tuplex/python/CMakeLists.txt index 451ae7399..c5285c29d 100644 --- a/tuplex/python/CMakeLists.txt +++ b/tuplex/python/CMakeLists.txt @@ -138,6 +138,7 @@ FILE(COPY ${CMAKE_CURRENT_SOURCE_DIR}/tests/test_tuples.py ${CMAKE_CURRENT_SOURCE_DIR}/tests/test_math.py ${CMAKE_CURRENT_SOURCE_DIR}/tests/test_aggregates.py ${CMAKE_CURRENT_SOURCE_DIR}/tests/test_webui.py + ${CMAKE_CURRENT_SOURCE_DIR}/tests/test_exceptions.py ${CMAKE_CURRENT_SOURCE_DIR}/tests/helper.py DESTINATION ${PYTHON_DIST_DIR}/tests) FILE(COPY ${CMAKE_CURRENT_SOURCE_DIR}/tuplex/libexec/__init__.py DESTINATION ${PYTHON_DIST_DIR}/tuplex/libexec) diff --git a/tuplex/python/include/PythonContext.h b/tuplex/python/include/PythonContext.h index 972d731ed..57746bf7b 100644 --- a/tuplex/python/include/PythonContext.h +++ b/tuplex/python/include/PythonContext.h @@ -112,9 +112,6 @@ namespace tuplex { DataSet & strDictParallelize(PyObject *listObj, const python::Type &rowType, const std::vector &columns); - // bad parallelize objects, i.e those who don't fit the inferred type - std::vector> _badParallelizeObjects; - inline PythonDataSet makeError(const std::string& message) { PythonDataSet pds; pds.wrap(&_context->makeError(message)); @@ -156,7 +153,7 @@ namespace tuplex { * @return PythonDataSet wrapper around internal DataSet class */ PythonDataSet parallelize(boost::python::list L, boost::python::object cols = boost::python::object(), - boost::python::object schema = boost::python::object()); + boost::python::object schema = boost::python::object(), bool autoUnpack = true); /*! * reads one (or multiple) csv files into memory diff --git a/tuplex/python/src/PythonContext.cc b/tuplex/python/src/PythonContext.cc index 09b26db67..310bb1a6d 100644 --- a/tuplex/python/src/PythonContext.cc +++ b/tuplex/python/src/PythonContext.cc @@ -39,9 +39,12 @@ namespace tuplex { Schema schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::F64})); + std::vector> badParallelizeObjects; + std::vector numExceptionsInPartition; + // check if empty? if(0 == numElements) - return _context->fromPartitions(schema, std::vector(), columns); + return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); // create new partition on driver auto driver = _context->getDriver(); @@ -52,12 +55,20 @@ namespace tuplex { *rawPtr = 0; double* ptr = (double*)(rawPtr + 1); size_t numBytesSerialized = 0; + size_t prevNumExceptions = 0; + size_t prevNumRows = 0; for(unsigned i = 0; i < numElements; ++i) { auto obj = PyList_GET_ITEM(listObj, i); Py_XINCREF(obj); // check capacity and realloc if necessary get a new partition if(partition->capacity() < numBytesSerialized + sizeof(double)) { + assert(badParallelizeObjects.size() >= prevNumExceptions); + auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; + numExceptionsInPartition.push_back(numNewExceptions); + prevNumExceptions = badParallelizeObjects.size(); + prevNumRows += numNewExceptions + *rawPtr; + partition->unlockWrite(); partitions.push_back(partition); partition = driver->allocWritablePartition(std::max(sizeof(double), allocMinSize), schema, -1, _context->id()); @@ -78,13 +89,15 @@ namespace tuplex { val = (double)PyLong_AsLongLong(obj); if(PyErr_Occurred()) { // too large integer? PyErr_Clear(); - _badParallelizeObjects.emplace_back(std::make_tuple(i, obj)); + assert(i >= prevNumRows); + badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); continue; } } } else { - _badParallelizeObjects.emplace_back(std::make_tuple(i, obj)); + assert(i >= prevNumRows); + badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); continue; } } @@ -95,11 +108,15 @@ namespace tuplex { numBytesSerialized += sizeof(double); } + assert(badParallelizeObjects.size() >= prevNumExceptions); + auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; + numExceptionsInPartition.push_back(numNewExceptions); + partition->unlockWrite(); partitions.push_back(partition); // create dataset from partitions. - return _context->fromPartitions(schema, partitions, columns); + return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); } DataSet& PythonContext::fastI64Parallelize(PyObject* listObj, const std::vector& columns, bool upcast) { @@ -110,9 +127,12 @@ namespace tuplex { Schema schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::I64})); + std::vector> badParallelizeObjects; + std::vector numExceptionsInPartition; + // check if empty? if(0 == numElements) - return _context->fromPartitions(schema, std::vector(), columns); + return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); // create new partition on driver auto driver = _context->getDriver(); @@ -123,12 +143,20 @@ namespace tuplex { *rawPtr = 0; int64_t* ptr = rawPtr + 1; size_t numBytesSerialized = 0; + size_t prevNumExceptions = 0; + size_t prevNumRows = 0; for(unsigned i = 0; i < numElements; ++i) { auto obj = PyList_GET_ITEM(listObj, i); Py_XINCREF(obj); // check capacity and realloc if necessary get a new partition if(partition->capacity() < numBytesSerialized + sizeof(int64_t)) { + assert(badParallelizeObjects.size() >= prevNumExceptions); + auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; + numExceptionsInPartition.push_back(numNewExceptions); + prevNumExceptions = badParallelizeObjects.size(); + prevNumRows += numNewExceptions + *rawPtr; + partition->unlockWrite(); partitions.push_back(partition); partition = driver->allocWritablePartition(std::max(sizeof(int64_t), allocMinSize), schema, -1, _context->id()); @@ -143,7 +171,8 @@ namespace tuplex { val = PyLong_AsLongLong(obj); if(PyErr_Occurred()) { // too large integer? PyErr_Clear(); - _badParallelizeObjects.emplace_back(std::make_tuple(i, obj)); + assert(i >= prevNumRows); + badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); continue; } } else { @@ -151,7 +180,8 @@ namespace tuplex { if(upcast && (obj == Py_True || obj == Py_False)) val = obj == Py_True; else { - _badParallelizeObjects.emplace_back(std::make_tuple(i, obj)); + assert(i >= prevNumRows); + badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); continue; } } @@ -161,12 +191,15 @@ namespace tuplex { *rawPtr = *rawPtr + 1; numBytesSerialized += sizeof(int64_t); } + assert(badParallelizeObjects.size() >= prevNumExceptions); + auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; + numExceptionsInPartition.push_back(numNewExceptions); partition->unlockWrite(); partitions.push_back(partition); // create dataset from partitions. - return _context->fromPartitions(schema, partitions, columns); + return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); } DataSet& PythonContext::fastMixedSimpleTypeTupleTransfer(PyObject *listObj, const python::Type &majType, @@ -182,9 +215,13 @@ namespace tuplex { // now create partitions super fast Schema schema(Schema::MemoryLayout::ROW, majType); + std::vector> badParallelizeObjects; + std::vector numExceptionsInPartition; + // check if empty? if(0 == numElements) - return _context->fromPartitions(schema, std::vector(), columns); + return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); + // encode type of tuple quickly into string char *typeStr = new char[numTupleElements]; @@ -201,6 +238,8 @@ namespace tuplex { *rawPtr = 0; uint8_t* ptr = (uint8_t*)(rawPtr + 1); size_t numBytesSerialized = 0; + size_t prevNumExceptions = 0; + size_t prevNumRows = 0; for(unsigned i = 0; i < numElements; ++i) { auto obj = PyList_GET_ITEM(listObj, i); Py_XINCREF(obj); @@ -227,13 +266,20 @@ namespace tuplex { } } if (nonConforming) { - _badParallelizeObjects.emplace_back(i, obj); + assert(i >= prevNumRows); + badParallelizeObjects.emplace_back(i - prevNumRows, obj); continue; } } // get new partition if capacity exhausted if(partition->capacity() < numBytesSerialized + requiredBytes) { + assert(badParallelizeObjects.size() >= prevNumExceptions); + auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; + numExceptionsInPartition.push_back(numNewExceptions); + prevNumExceptions = badParallelizeObjects.size(); + prevNumRows += numNewExceptions + *rawPtr; + partition->unlockWrite(); partitions.push_back(partition); partition = driver->allocWritablePartition(std::max(allocMinSize, requiredBytes), schema, -1, _context->id()); @@ -312,9 +358,11 @@ namespace tuplex { // special part when bad row encountered bad_element: ptr = rowStartPtr; - _badParallelizeObjects.emplace_back(std::make_tuple(i, obj)); + assert(i >= prevNumRows); + badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); } else { - _badParallelizeObjects.emplace_back(std::make_tuple(i, obj)); + assert(i >= prevNumRows); + badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); } // serialization code here is a little bit more complicated @@ -323,6 +371,9 @@ namespace tuplex { // (2) is the field containing total varlength // (3) is the actual string content (incl. '\0' delimiter) } + assert(badParallelizeObjects.size() >= prevNumExceptions); + auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; + numExceptionsInPartition.push_back(numNewExceptions); partition->unlockWrite(); partitions.push_back(partition); @@ -330,7 +381,7 @@ namespace tuplex { delete [] typeStr; // create dataset from partitions. - return _context->fromPartitions(schema, partitions, columns); + return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); } DataSet& PythonContext::fastBoolParallelize(PyObject *listObj, const std::vector& columns) { @@ -341,9 +392,13 @@ namespace tuplex { Schema schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::BOOLEAN})); + std::vector> badParallelizeObjects; + std::vector numExceptionsInPartition; + // check if empty? if(0 == numElements) - return _context->fromPartitions(schema, std::vector(), columns); + return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); + // create new partition on driver auto driver = _context->getDriver(); @@ -354,12 +409,20 @@ namespace tuplex { *rawPtr = 0; int64_t* ptr = rawPtr + 1; size_t numBytesSerialized = 0; + size_t prevNumExceptions = 0; + size_t prevNumRows = 0; for(unsigned i = 0; i < numElements; ++i) { auto obj = PyList_GET_ITEM(listObj, i); Py_XINCREF(obj); // check capacity and realloc if necessary get a new partition if(partition->capacity() < numBytesSerialized + sizeof(int64_t)) { + assert(badParallelizeObjects.size() >= prevNumExceptions); + auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; + numExceptionsInPartition.push_back(numNewExceptions); + prevNumExceptions = badParallelizeObjects.size(); + prevNumRows += numNewExceptions + *rawPtr; + partition->unlockWrite(); partitions.push_back(partition); partition = driver->allocWritablePartition(std::max(sizeof(int64_t), allocMinSize), schema, -1, _context->id()); @@ -375,15 +438,20 @@ namespace tuplex { *rawPtr = *rawPtr + 1; numBytesSerialized += sizeof(int64_t); } else { - _badParallelizeObjects.emplace_back(std::make_tuple(i, obj)); + assert(i >= prevNumRows); + badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); } } + assert(badParallelizeObjects.size() >= prevNumExceptions); + auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; + numExceptionsInPartition.push_back(numNewExceptions); + partition->unlockWrite(); partitions.push_back(partition); // create dataset from partitions. - return _context->fromPartitions(schema, partitions, columns); + return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); } DataSet& PythonContext::fastStrParallelize(PyObject* listObj, const std::vector& columns) { @@ -394,9 +462,13 @@ namespace tuplex { Schema schema(Schema::MemoryLayout::ROW, python::Type::makeTupleType({python::Type::STRING})); + std::vector> badParallelizeObjects; + std::vector numExceptionsInPartition; + // check if empty? if(0 == numElements) - return _context->fromPartitions(schema, std::vector(), columns); + return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); + // create new partition on driver auto driver = _context->getDriver(); @@ -407,6 +479,8 @@ namespace tuplex { *rawPtr = 0; uint8_t* ptr = (uint8_t*)(rawPtr + 1); size_t numBytesSerialized = 0; + size_t prevNumExceptions = 0; + size_t prevNumRows = 0; for(unsigned i = 0; i < numElements; ++i) { auto obj = PyList_GET_ITEM(listObj, i); Py_XINCREF(obj); @@ -426,6 +500,12 @@ namespace tuplex { // check capacity and realloc if necessary get a new partition if(partition->capacity() < numBytesSerialized + requiredBytes) { + assert(badParallelizeObjects.size() >= prevNumExceptions); + auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; + numExceptionsInPartition.push_back(numNewExceptions); + prevNumExceptions = badParallelizeObjects.size(); + prevNumRows += numNewExceptions + *rawPtr; + partition->unlockWrite(); partitions.push_back(partition); partition = driver->allocWritablePartition(std::max(allocMinSize, requiredBytes), schema, -1, _context->id()); @@ -450,15 +530,19 @@ namespace tuplex { *rawPtr = *rawPtr + 1; numBytesSerialized += requiredBytes; } else { - _badParallelizeObjects.emplace_back(std::make_tuple(i, obj)); + assert(i >= prevNumRows); + badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, obj)); } } + assert(badParallelizeObjects.size() >= prevNumExceptions); + auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; + numExceptionsInPartition.push_back(numNewExceptions); partition->unlockWrite(); partitions.push_back(partition); // create dataset from partitions. - return _context->fromPartitions(schema, partitions, columns); + return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); } // Returns true if t1 can be considered a subtype of t2, specifically in the context of Option types @@ -494,8 +578,28 @@ namespace tuplex { auto numElements = PyList_Size(listObj); logger.debug("transferring " + std::to_string(numElements) + " elements. "); - // decode tuple - std::vector v; + std::vector> badParallelizeObjects; + std::vector numExceptionsInPartition; + + // check if empty? + if(0 == numElements) + return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); + + auto firstRow = PyList_GET_ITEM(listObj, 0); + Py_XINCREF(firstRow); + schema = Schema(Schema::MemoryLayout::ROW, python::pythonToRow(firstRow, majType).getRowType()); + + // create new partition on driver + auto driver = _context->getDriver(); + + std::vector partitions; + Partition* partition = driver->allocWritablePartition(allocMinSize, schema, -1, _context->id()); + int64_t* rawPtr = (int64_t*)partition->lockWriteRaw(); + *rawPtr = 0; + uint8_t* ptr = (uint8_t*)(rawPtr + 1); + size_t numBytesSerialized = 0; + size_t prevNumExceptions = 0; + size_t prevNumRows = 0; for (unsigned i = 0; i < numElements; ++i) { // because this a slow transfer loop, check explicitly for signals and free anything if there's something... @@ -507,11 +611,10 @@ namespace tuplex { logger.warn("slow transfer to backend interrupted."); // free items (decref) - for(auto t : _badParallelizeObjects) { + for(auto t : badParallelizeObjects) { Py_XDECREF(std::get<1>(t)); } - _badParallelizeObjects.clear(); - v.clear(); + badParallelizeObjects.clear(); return _context->makeError("interrupted transfer"); } @@ -525,14 +628,42 @@ namespace tuplex { if(isSubOptionType(t, majType)) { // In this case, t is a subtype of the majority type; this accounts for the case where the majority type // is an option (e.g. majType=Option[int] should encompass both t=I64 and t=NULLVALUE). - v.push_back(python::pythonToRow(item, majType)); + auto row = python::pythonToRow(item, majType); + auto requiredBytes = row.serializedLength(); + + if(partition->capacity() < numBytesSerialized + requiredBytes) { + assert(badParallelizeObjects.size() >= prevNumExceptions); + auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; + numExceptionsInPartition.push_back(numNewExceptions); + prevNumExceptions = badParallelizeObjects.size(); + prevNumRows += numNewExceptions + *rawPtr; + + partition->unlockWrite(); + partitions.push_back(partition); + partition = driver->allocWritablePartition(std::max(allocMinSize, requiredBytes), schema, -1, _context->id()); + rawPtr = (int64_t*)partition->lockWriteRaw(); + *rawPtr = 0; + ptr = (uint8_t*)(rawPtr + 1); + numBytesSerialized = 0; + } + + row.serializeToMemory(ptr, partition->capacity() - numBytesSerialized); + ptr += requiredBytes; + *rawPtr = *rawPtr + 1; + numBytesSerialized += requiredBytes; } else - _badParallelizeObjects.emplace_back(std::make_tuple(i, item)); + badParallelizeObjects.emplace_back(std::make_tuple(i - prevNumRows, item)); } + assert(badParallelizeObjects.size() >= prevNumExceptions); + auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; + numExceptionsInPartition.push_back(numNewExceptions); + + partition->unlockWrite(); + partitions.push_back(partition); // serialize in main memory - return _context->parallelize(v, columns); + return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); } DataSet& PythonContext::strDictParallelize(PyObject *listObj, const python::Type &rowType, @@ -548,9 +679,12 @@ namespace tuplex { assert(rowType.parameters().size() == columns.size()); // also very important!!! Schema schema(Schema::MemoryLayout::ROW, rowType); + std::vector> badParallelizeObjects; + std::vector numExceptionsInPartition; + // check if empty? if(0 == numElements) - return _context->fromPartitions(schema, std::vector(), columns); + return _context->fromPartitions(schema, std::vector(), columns, badParallelizeObjects, numExceptionsInPartition); // create new partition on driver auto driver = _context->getDriver(); @@ -561,93 +695,82 @@ namespace tuplex { *rawPtr = 0; uint8_t* ptr = (uint8_t*)(rawPtr + 1); size_t numBytesSerialized = 0; + size_t prevNumExceptions = 0; + size_t prevNumRows = 0; for(unsigned i = 0; i < numElements; ++i) { auto obj = PyList_GET_ITEM(listObj, i); Py_XINCREF(obj); // check that it is a dict! - if(PyDict_Check(obj)) { - auto numDictElements = PyDict_Size(obj); - - // first check, do sizes match? - if(numDictElements != rowType.parameters().size()) - _badParallelizeObjects.emplace_back(std::make_tuple(i, obj)); - else { - // same number of elements. - // ==> need to get columns etc. out - bool good = true; - PyObject *tupleObj = PyTuple_New(rowType.parameters().size()); - int j = 0; - for(const auto& c : columns) { - auto item = PyDict_GetItemString(obj, c.c_str()); - - // item is borrowed, reference. So incref! - // https://docs.python.org/3/c-api/dict.html#c.PyDict_GetItemString - Py_XINCREF(item); - - if(!item) { - _badParallelizeObjects.emplace_back(std::make_tuple(i, obj)); - good = false; - // set dummy - Py_XINCREF(Py_None); - PyTuple_SET_ITEM(tupleObj, j, Py_None); - } else - PyTuple_SET_ITEM(tupleObj, j, item); - ++j; + if (PyDict_Check(obj)) { + PyObject * tupleObj = PyTuple_New(rowType.parameters().size()); + int j = 0; + for (const auto &c: columns) { + auto item = PyDict_GetItemString(obj, c.c_str()); + Py_XINCREF(item); + + if (item) { + PyTuple_SET_ITEM(tupleObj, j, item); + } else { + Py_XINCREF(Py_None); + PyTuple_SET_ITEM(tupleObj, j, Py_None); } - // check if all good still or there was an issue with a column... - if(!good) - continue; - - // all the item are extracted into a tuple. - // ==> convert to row object & check type - Row row = python::pythonToRow(tupleObj); - - // Py_XDECREF(tupleObj); // remove temporary tupleObject - - if(row.getRowType() != rowType) - _badParallelizeObjects.emplace_back(std::make_tuple(i, obj)); - else { - // write to partition - - size_t requiredBytes = row.serializedLength(); - // check capacity and realloc if necessary get a new partition - if(partition->capacity() < numBytesSerialized + allocMinSize) { - partition->unlockWrite(); - partitions.push_back(partition); - partition = driver->allocWritablePartition(allocMinSize, schema, -1, _context->id()); - rawPtr = (int64_t*)partition->lockWriteRaw(); - *rawPtr = 0; - ptr = (uint8_t*)(rawPtr + 1); - numBytesSerialized = 0; - } + ++j; + } - row.serializeToMemory(ptr, partition->capacity()); - ptr += requiredBytes; - *rawPtr = *rawPtr + 1; - numBytesSerialized += requiredBytes; + try { + Row row = python::pythonToRow(tupleObj, rowType); + size_t requiredBytes = row.serializedLength(); + // check capacity and realloc if necessary get a new partition + if (partition->capacity() < numBytesSerialized + allocMinSize) { + assert(badParallelizeObjects.size() >= prevNumExceptions); + auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; + numExceptionsInPartition.push_back(numNewExceptions); + prevNumExceptions = badParallelizeObjects.size(); + prevNumRows += numNewExceptions + *rawPtr; + + partition->unlockWrite(); + partitions.push_back(partition); + partition = driver->allocWritablePartition(allocMinSize, schema, -1, _context->id()); + rawPtr = (int64_t *) partition->lockWriteRaw(); + *rawPtr = 0; + ptr = (uint8_t *) (rawPtr + 1); + numBytesSerialized = 0; } + + row.serializeToMemory(ptr, partition->capacity()); + ptr += requiredBytes; + *rawPtr = *rawPtr + 1; + numBytesSerialized += requiredBytes; + } catch (const std::exception& e) { + assert(i >= prevNumRows); + badParallelizeObjects.emplace_back(i - prevNumRows, obj); } + + } else { + assert(i >= prevNumRows); + badParallelizeObjects.emplace_back(i - prevNumRows, obj); } } + assert(badParallelizeObjects.size() >= prevNumExceptions); + auto numNewExceptions = badParallelizeObjects.size() - prevNumExceptions; + numExceptionsInPartition.push_back(numNewExceptions); partition->unlockWrite(); partitions.push_back(partition); // create dataset from partitions. - return _context->fromPartitions(schema, partitions, columns); + return _context->fromPartitions(schema, partitions, columns, badParallelizeObjects, numExceptionsInPartition); } PythonDataSet PythonContext::parallelize(boost::python::list L, boost::python::object cols, - boost::python::object schema) { + boost::python::object schema, + bool autoUnpack) { assert(_context); - // clear bad parallelize cache - _badParallelizeObjects.clear(); // <-- this holds all objects who don't comply with majority type - auto& logger = Logger::instance().logger("python"); auto columns = extractFromListOfStrings(cols.ptr(), "columns "); PythonDataSet pds; @@ -675,7 +798,7 @@ namespace tuplex { majType = inferType(L); // special case: majType is a dict with strings as key, i.e. perform String Dict unpacking - if((majType.isDictionaryType() && majType != python::Type::EMPTYDICT && majType != python::Type::GENERICDICT) && majType.keyType() == python::Type::STRING) { + if(autoUnpack && (majType.isDictionaryType() && majType != python::Type::EMPTYDICT && majType != python::Type::GENERICDICT) && majType.keyType() == python::Type::STRING) { // automatic unpacking! // ==> first check if columns are defined, if not infer columns from sample! auto dictTypes = inferColumnsFromDictObjects(L, _context->getOptions().NORMALCASE_THRESHOLD()); @@ -758,19 +881,6 @@ namespace tuplex { + std::to_string(timer.time()) + " seconds (materialized: " + sizeToMemString(sizeInMemory) + ")"); } - // warning about bad objects - if(!_badParallelizeObjects.empty()) { - // warn! - logger.warn("Found " + pluralize(_badParallelizeObjects.size(), "row") + " not complying with inferred type " + majType.desc() - + ", ignoring for now."); - - // @TODO: later save these rows as cloudpickled objects to a partition together with the row number. - // they then need to be passed through the pure python pipeline & merged back if possible. - - // remove all.. - _badParallelizeObjects.clear(); - } - // assign dataset to wrapper pds.wrap(ds); @@ -883,7 +993,7 @@ namespace tuplex { return majType; } - + python::Type PythonContext::inferType(const boost::python::list &L) const { // elements must be either simple objects, i.e. str/int/float // or tuples of simple objects @@ -919,66 +1029,49 @@ namespace tuplex { auto& logger = Logger::instance().logger("python"); auto numSample = sampleSize(L); - -#warning "use here and for other sample based inferences a global infer method!" - // count occurences of columns, decide on heuristic which are normal and which should become exceptions - unordered_map counts; - unordered_map> cols; - size_t num_dicts = 0; PyObject* listObj = L.ptr(); assert(listObj); assert(PyList_Check(listObj)); - for(int i = 0; i < numSample; ++i) { - auto item = PyList_GET_ITEM(listObj, i); - Py_INCREF(item); // borrowed reference? + std::unordered_map> columns; + for (int i = 0; i < numSample; ++i) { + auto item = PyList_GET_ITEM(listObj, i); - if(PyDict_Check(item)) { - num_dicts++; + Py_INCREF(item); - // go through keys... + if (PyDict_Check(item)) { PyObject *key = nullptr, *val = nullptr; - Py_ssize_t pos = 0; // must be initialized to 0 to start iteration, however internal iterator variable. Don't use semantically. - while(PyDict_Next(item, &pos, &key, &val)) { - // check if key is string - if(PyUnicode_Check(key)) { + Py_ssize_t pos = 0; + while (PyDict_Next(item, &pos, &key, &val)) { + if (PyUnicode_Check(key)) { auto skey = python::PyString_AsString(key); - auto it = counts.find(skey); - if(it == counts.end()) { - counts[skey] = 0; - cols[skey] = std::vector(); + auto it = columns.find(skey); + if (it == columns.end()) { + columns[skey] = std::vector(); } - counts[skey]++; - Py_XINCREF(val); // val is borrowed according to https://docs.python.org/3/c-api/dict.html#c.PyDict_Next - cols[skey].push_back(val); + Py_XINCREF(val); + columns[skey].push_back(val); } } } } - // normal case decision time! - vector columns; - for(const auto& keyval : counts) { - // met threshold? - if(keyval.second >= ceil(normalThreshold * num_dicts)) - columns.emplace_back(keyval.first); - } - - // infer for each of the columns the most likely type! - unordered_map m; - for(const auto& c : columns) { - PyObject* listColObj = PyList_New(cols[c].size()); - for(int i = 0; i < cols[c].size(); ++i) { - Py_XINCREF(cols[c][i]); - PyList_SET_ITEM(listColObj, i, cols[c][i]); + std::unordered_map m; + for (const auto &c : columns) { + PyObject *listColObj = PyList_New(numSample); + int i = 0; + while (i < columns[c.first].size()) { + Py_XINCREF(columns[c.first][i]); + PyList_SET_ITEM(listColObj, i, columns[c.first][i]); + ++i; + } + while (i < numSample) { + Py_XINCREF(Py_None); + PyList_SET_ITEM(listColObj, i, Py_None); + ++i; } - - // hand-off to infer type function - // ==> note: boost::python::handle transfers ownership! auto type = inferType(boost::python::list(boost::python::handle<>(listColObj))); - - m[c] = type; + m[c.first] = type; } - // special case: no inference was possible ==> take as backup the first row as schema. warn message. if(m.empty()) { logger.warn("could not infer column names from sample according to threshold. Defaulting to schema defined by first row."); diff --git a/tuplex/python/src/PythonDataSet.cc b/tuplex/python/src/PythonDataSet.cc index 1c39324a2..ffe14fdd6 100644 --- a/tuplex/python/src/PythonDataSet.cc +++ b/tuplex/python/src/PythonDataSet.cc @@ -1344,18 +1344,18 @@ namespace tuplex { } PyObject *PythonDataSet::resultSetToCPython(tuplex::ResultSet *rs, size_t maxRowCount) { - auto type = rs->schema().getRowType(); - // if single type, reset by one - assert(type.isTupleType()); - if (type.parameters().size() == 1) - type = type.parameters().front(); - // b.c. merging of arbitrary python objects is not implemented yet, whenever they're present, use general // version // @TODO: this could be optimized! if(rs->pyobject_count() != 0) return anyToCPythonWithPyObjects(rs, maxRowCount); + auto type = rs->schema().getRowType(); + // if single type, reset by one + assert(type.isTupleType()); + if (type.parameters().size() == 1) + type = type.parameters().front(); + if (python::Type::BOOLEAN == type) { return boolToCPython(rs, maxRowCount); } else if (python::Type::I64 == type) { diff --git a/tuplex/python/tests/resources/type_violations.csv b/tuplex/python/tests/resources/type_violations.csv new file mode 100644 index 000000000..76536a490 --- /dev/null +++ b/tuplex/python/tests/resources/type_violations.csv @@ -0,0 +1,7 @@ +1,2 +3,4 +5,6 +7,8 +9,10 +100, +200, \ No newline at end of file diff --git a/tuplex/python/tests/test_arithmetic.py b/tuplex/python/tests/test_arithmetic.py index 3694d34db..15b3a1a8a 100644 --- a/tuplex/python/tests/test_arithmetic.py +++ b/tuplex/python/tests/test_arithmetic.py @@ -18,7 +18,7 @@ class TestArithmetic(unittest.TestCase): def setUp(self): - self.conf = {"webui.enable": False, "driverMemory": "8MB", "partitionSize": "256KB"} + self.conf = {"webui.enable": False, "driverMemory": "8MB", "partitionSize": "256KB", "tuplex.optimizer.mergeExceptionsInOrder": True} def test_add(self): c = Context(self.conf) diff --git a/tuplex/python/tests/test_exceptions.py b/tuplex/python/tests/test_exceptions.py new file mode 100644 index 000000000..6086c2ce9 --- /dev/null +++ b/tuplex/python/tests/test_exceptions.py @@ -0,0 +1,514 @@ +#!/usr/bin/env python3 +#----------------------------------------------------------------------------------------------------------------------# +# # +# Tuplex: Blazing Fast Python Data Science # +# # +# # +# (c) 2017 - 2021, Tuplex team # +# Created by Leonhard Spiegelberg first on 10/16/2021 # +# License: Apache 2.0 # +#----------------------------------------------------------------------------------------------------------------------# + +import unittest +from tuplex import Context +from random import randint, sample, shuffle +from math import floor + +class TestExceptions(unittest.TestCase): + + def setUp(self): + self.conf = {"tuplex.webui.enable": False, "executorCount": 8, "executorMemory": "256MB", "driverMemory": "256MB", "partitionSize": "256KB", "tuplex.optimizer.mergeExceptionsInOrder": False} + self.conf_in_order = {"tuplex.webui.enable": False, "executorCount": 8, "executorMemory": "256MB", "driverMemory": "256MB", "partitionSize": "256KB", "tuplex.optimizer.mergeExceptionsInOrder": True} + + def test_merge_with_filter(self): + c = Context(self.conf_in_order) + + output = c.parallelize([0, "e1", 0]).filter(lambda x: x != 0).collect() + self.compare_in_order(["e1"], output) + output = c.parallelize([0, 0, "e1"]).filter(lambda x: x != 0).collect() + self.compare_in_order(["e1"], output) + output = c.parallelize(["e1", 0, 0]).filter(lambda x: x != 0).collect() + self.compare_in_order(["e1"], output) + + output = c.parallelize([-1.1, 1, 2, -2.2, 4, 5, -6.6]).filter(lambda x: x < 0 or x > 3).collect() + self.compare_in_order([-1.1, -2.2, 4, 5, -6.6], output) + + input = list(range(1, 100001)) + sampled = sample(input, 40000) + for i in sampled: + ind = randint(0, 1) + if ind == 0: + input[i - 1] = str(input[i - 1]) + elif ind == 1: + input[i - 1] = 0 + + output = c.parallelize(input).filter(lambda x: x != 0).collect() + self.compare_in_order(list(filter(lambda x: x != 0, input)), output) + + def process(self, input_size, num_filtered, num_schema, num_resolved, num_unresolved): + inds = list(range(input_size)) + shuffle(inds) + inds = iter(inds) + + input = list(range(1, input_size + 1)) + + for _ in range(floor(num_filtered * input_size)): + ind = next(inds) + input[ind] = -1 + + for _ in range(floor(num_schema * input_size)): + ind = next(inds) + input[ind] = "E" + + for _ in range(floor(num_resolved * input_size)): + ind = next(inds) + input[ind] = -2 + + for _ in range(floor(num_unresolved * input_size)): + ind = next(inds) + input[ind] = -3 + + def filter_udf(x): + return x != -1 + + def map_udf(x): + if x == -2 or x == -3: + return 1 // (x - x) + else: + return x + + def resolve_udf(x): + if x == -3: + return 1 // (x - x) + else: + return x + + c = Context(self.conf_in_order) + output = c.parallelize(input).filter(filter_udf).map(map_udf).resolve(ZeroDivisionError, resolve_udf).collect() + + self.assertEqual(list(filter(lambda x: x != -3 and x != -1, input)), output) + + def test_everything(self): + self.process(100, 0.25, 0.25, 0.25, 0.25) + self.process(1000, 0.25, 0.25, 0.25, 0.25) + self.process(10000, 0.25, 0.25, 0.25, 0.25) + self.process(100000, 0.25, 0.25, 0.25, 0.25) + + def test_merge_with_filter_on_exps(self): + c = Context(self.conf_in_order) + + output = c.parallelize([0, 1.1, 2.2, 1, 3.3, 4, 5]).filter(lambda x: x != 0 and x != 1.1).collect() + self.compare_in_order([2.2, 1, 3.3, 4, 5], output) + + def test_merge_runtime_only(self): + c = Context(self.conf_in_order) + + output = c.parallelize([1, 0, 0, 4]).map(lambda x: 1 // x).resolve(ZeroDivisionError, lambda x: -1).collect() + self.compare_in_order([1, -1, -1, 0], output) + + output = c.parallelize([0 for i in range(100000)]).map(lambda x: 1 // x).resolve(ZeroDivisionError, lambda x: -1).collect() + self.compare_in_order([-1 for i in range(100000)], output) + + input = [] + for i in range(100000): + if i % 100 == 0: + input.append(0) + else: + input.append(i) + + output = c.parallelize(input).map(lambda x: 1 // x).resolve(ZeroDivisionError, lambda x: -1).collect() + + expectedOutput = [] + for i in range(100000): + if i % 100 == 0: + expectedOutput.append(-1) + else: + expectedOutput.append(1 // i) + + self.compare_in_order(expectedOutput, output) + + def test_merge_some_fail(self): + c = Context(self.conf_in_order) + + input = [1, 2, -1, 5, 6, 7, -2, 10, 11, 12, -3, 15] + output = c.parallelize(input) \ + .map(lambda x: 1 // (x - x) if x == -1 or x == -2 or x == -3 else x) \ + .resolve(ZeroDivisionError, lambda x: 1 // (x - x) if x == -2 else x) \ + .collect() + self.compare_in_order([1, 2, -1, 5, 6, 7, 10, 11, 12, -3, 15], output) + + def test_merge_both_but_no_resolve(self): + c = Context(self.conf_in_order) + + input = [1, 2, -1, "a", 5, 6, 7, -2, "b", 10, 11, 12, -3, "c", 15] + output = c.parallelize(input) \ + .map(lambda x: 1 // (x - x) if x == -1 or x == -2 or x == -3 else x) \ + .resolve(ZeroDivisionError, lambda x: 1 // (x - x) if x == -2 else x) \ + .collect() + self.compare_in_order([1, 2, -1, "a", 5, 6, 7, "b", 10, 11, 12, -3, "c", 15], output) + + input = list(range(1, 100001)) + sampled = sample(input, 40000) + for i in sampled: + ind = randint(0, 2) + if ind == 0: + input[i - 1] = str(input[i - 1]) + elif ind == 1: + input[i - 1] = 0 + else: + input[i - 1] = -1 + expectedOutput = list(filter(lambda x: x != 0, input)) + + output = c.parallelize(input).map(lambda x: 1 // (x - x) if x == -1 or x == 0 else x).resolve(ZeroDivisionError, lambda x: 1 // x if x == 0 else x).collect() + self.compare_in_order(expectedOutput, output) + + def test_merge_both(self): + c = Context(self.conf_in_order) + + input = [1, 2, 0, "a", 5, 6, 7, 0, "b", 10, 11, 12, 0, "c", 15] + output = c.parallelize(input).map(lambda x: 1 // x if x == 0 else x).resolve(ZeroDivisionError, lambda x: -1).collect() + self.compare_in_order([1, 2, -1, "a", 5, 6, 7, -1, "b", 10, 11, 12, -1, "c", 15], output) + + input = [1, 2, "a", 0, 5, 6, 7, "b", 0, 10, 11, 12, "c", 0, 15] + output = c.parallelize(input).map(lambda x: 1 // x if x == 0 else x).resolve(ZeroDivisionError, lambda x: -1).collect() + self.compare_in_order([1, 2, "a", -1, 5, 6, 7, "b", -1, 10, 11, 12, "c", -1, 15], output) + + input = list(range(1, 100001)) + sampled = sample(input, 40000) + for i in sampled: + if randint(0, 1) == 0: + input[i - 1] = str(input[i - 1]) + else: + input[i - 1] = 0 + + output = c.parallelize(input).map(lambda x: 1 // x if x == 0 else x).resolve(ZeroDivisionError, lambda x: x).collect() + self.compare_in_order(input, output) + + def test_merge_input_only(self): + c = Context(self.conf_in_order) + + input = [1, 2, "a", 4, 5, "b", 6, 7, 8, 9, 10, "d"] + output = c.parallelize([1, 2, "a", 4, 5, "b", 6, 7, 8, 9, 10, "d"]).map(lambda x: x).collect() + self.compare_in_order(input, output) + + input = [] + for i in range(40000): + if i % 100 == 0: + input.append(str(i)) + else: + input.append(i) + + output = c.parallelize(input).map(lambda x: x).collect() + self.compare_in_order(input, output) + + def test_no_normal_rows_in_result(self): + c = Context(self.conf) + + output = c.parallelize([1, None, "a", 1.2, 3, 4]).filter(lambda x: x != 1 and x != 3 and x != 4).collect() + self.compare([None, "a", 1.2], output) + + def test_empty_result(self): + c = Context(self.conf) + + output = c.parallelize([1, None, "a", 1.2, 3, 4]).filter(lambda x: x == -1).collect() + self.compare([], output) + + def test_no_pipeline(self): + c = Context(self.conf) + + output = c.parallelize([1, 2, 3, 4, "abc"]).collect() + self.compare([1, 2, 3, 4, "abc"], output) + + output = c.parallelize([1, 2, "abc", 4, 5]).collect() + self.compare([1, 2, "abc", 4, 5], output) + + output = c.parallelize(["abc", 2, 3, 4, 5]).collect() + self.compare(["abc", 2, 3, 4, 5], output) + + output = c.parallelize(["abc", 2.4, 4, 5, True]).collect() + self.compare(["abc", 2.4, 4, 5, True], output) + + def test_single_tuples_unwrapped(self): + c = Context(self.conf) + + output = c.parallelize([(1,), (2,), (3,)]).collect() + self.compare([1, 2, 3], output) + + def test_parallelize_exceptions_unwrapped(self): + c = Context(self.conf) + + output = c.parallelize([1, 2, 3, 4, (None,)]).map(lambda x: x).collect() + self.compare([1, 2, 3, 4, None], output) + + def test_no_merge_some_fail(self): + c = Context(self.conf) + + input = [1, 2, -1, 5, 6, 7, -2, 10, 11, 12, -3, 15] + output = c.parallelize(input) \ + .map(lambda x: 1 // (x - x) if x == -1 or x == -2 or x == -3 else x) \ + .resolve(ZeroDivisionError, lambda x: 1 // (x - x) if x == -2 else x) \ + .collect() + self.compare([1, 2, -1, 5, 6, 7, 10, 11, 12, -3, 15], output) + + def test_no_merge_both_but_no_resolve(self): + c = Context(self.conf) + + input = [1, 2, -1, "a", 5, 6, 7, -2, "b", 10, 11, 12, -3, "c", 15] + output = c.parallelize(input) \ + .map(lambda x: 1 // (x - x) if x == -1 or x == -2 or x == -3 else x) \ + .resolve(ZeroDivisionError, lambda x: 1 // (x - x) if x == -2 else x) \ + .collect() + self.compare([1, 2, -1, "a", 5, 6, 7, "b", 10, 11, 12, -3, "c", 15], output) + + input = list(range(1, 100001)) + sampled = sample(input, 40000) + for i in sampled: + ind = randint(0, 2) + if ind == 0: + input[i - 1] = str(input[i - 1]) + elif ind == 1: + input[i - 1] = 0 + else: + input[i - 1] = -1 + expectedOutput = list(filter(lambda x: x != 0, input)) + + output = c.parallelize(input).map(lambda x: 1 // (x - x) if x == -1 or x == 0 else x).resolve(ZeroDivisionError, lambda x: 1 // x if x == 0 else x).collect() + self.compare(expectedOutput, output) + + def test_no_merge_both(self): + c = Context(self.conf) + + input = [1, 2, 0, "a", 5, 6, 7, 0, "b", 10, 11, 12, 0, "c", 15] + output = c.parallelize(input).map(lambda x: 1 // x if x == 0 else x).resolve(ZeroDivisionError, lambda x: -1).collect() + self.compare([1, 2, -1, "a", 5, 6, 7, -1, "b", 10, 11, 12, -1, "c", 15], output) + + input = [1, 2, "a", 0, 5, 6, 7, "b", 0, 10, 11, 12, "c", 0, 15] + output = c.parallelize(input).map(lambda x: 1 // x if x == 0 else x).resolve(ZeroDivisionError, lambda x: -1).collect() + self.compare([1, 2, "a", -1, 5, 6, 7, "b", -1, 10, 11, 12, "c", -1, 15], output) + + input = list(range(1, 100001)) + sampled = sample(input, 40000) + for i in sampled: + if randint(0, 1) == 0: + input[i - 1] = str(input[i - 1]) + else: + input[i - 1] = 0 + + output = c.parallelize(input).map(lambda x: 1 // x if x == 0 else x).resolve(ZeroDivisionError, lambda x: x).collect() + self.compare(input, output) + + def test_no_merge_input_only(self): + c = Context(self.conf) + + input = [1, 2, "a", 4, 5, "b", 6, 7, 8, 9, 10, "d"] + output = c.parallelize([1, 2, "a", 4, 5, "b", 6, 7, 8, 9, 10, "d"]).map(lambda x: x).collect() + self.compare(input, output) + + input = [] + for i in range(40000): + if i % 100 == 0: + input.append(str(i)) + else: + input.append(i) + + output = c.parallelize(input).map(lambda x: x).collect() + self.compare(input, output) + + def test_no_merge_runtime_only(self): + c = Context(self.conf) + + output = c.parallelize([1, 0, 0, 4]).map(lambda x: 1 // x).resolve(ZeroDivisionError, lambda x: -1).collect() + self.compare([1, -1, -1, 0], output) + + input = [] + for i in range(100000): + if i % 100 == 0: + input.append(0) + else: + input.append(i) + + output = c.parallelize(input).map(lambda x: 1 // x).resolve(ZeroDivisionError, lambda x: -1).collect() + + expectedOutput = [] + for i in range(100000): + if i % 100 == 0: + expectedOutput.append(-1) + else: + expectedOutput.append(1 // i) + + self.compare(expectedOutput, output) + + def test_parallelize_exceptions_no_merge(self): + c = Context(self.conf) + + output = c.parallelize([1, 2, 3, 4, None]).map(lambda x: x).collect() + self.compare([1, 2, 3, 4, None], output) + + output = c.parallelize([1, 2, 3, "a", 4]).map(lambda x: x).collect() + self.compare([1, 2, 3, 4, "a"], output) + + output = c.parallelize([1, 0.3, 2, 3, 4]).map(lambda x: x).collect() + self.compare([1, 2, 3, 4, 0.3], output) + + output = c.parallelize([(-1, -1), 1, 2, 3, 4]).map(lambda x: x).collect() + self.compare([1, 2, 3, 4, (-1, -1)], output) + + output = c.parallelize([(True, 1), (True, 2), (True, 3), ("abc", "def")]).map(lambda x: x).collect() + self.compare([(True, 1), (True, 2), (True, 3), ("abc", "def")], output) + + l1 = [] + l2 = [] + input = [] + for i in range(50000): + if i % 100 == 0: + l2.append(str(i)) + input.append(str(i)) + else: + l1.append(i) + input.append(i) + output = c.parallelize(input).map(lambda x: x).collect() + l1.extend(l2) + self.compare(l1, output) + + def compare(self, expectedOutput, output): + self.assertEqual(len(expectedOutput), len(output)) + expectedOutput = set(expectedOutput) + output = set(output) + for elt in expectedOutput: + self.assertTrue(elt in output) + + def compare_in_order(self, expectedOutput, output): + self.assertEqual(len(expectedOutput), len(output)) + for i in range(len(expectedOutput)): + self.assertEqual(expectedOutput[i], output[i]) + + def test_withColumn(self): + c = Context(self.conf_in_order) + + ds = c.parallelize([(1, "a", True), (0, "b", False), (3, "c", True)])\ + .withColumn("new", lambda x, y, z: str(1 // x) + y) + output = ds.collect() + ecounts = ds.exception_counts + + self.assertEqual(2, len(output)) + self.assertEqual((1, "a", True, "1a"), output[0]) + self.assertEqual((3, "c", True, "0c"), output[1]) + + self.assertEqual(1, len(ecounts)) + self.assertEqual(1, ecounts["ZeroDivisionError"]) + + ds = ds.resolve(ZeroDivisionError, lambda x, y, z: "NULL") + output = ds.collect() + ecounts = ds.exception_counts + + self.assertEqual(3, len(output)) + self.assertEqual((1, "a", True, "1a"), output[0]) + self.assertEqual((0, "b", False, "NULL"), output[1]) + self.assertEqual((3, "c", True, "0c"), output[2]) + + self.assertEqual(0, len(ecounts)) + + def test_mapColumn(self): + c = Context(self.conf_in_order) + + ds = c.parallelize([(1, "a"), (0, "b"), (3, "c")], columns=["int", "str"]) \ + .mapColumn("int", lambda x: 1 // x) + output = ds.collect() + ecounts = ds.exception_counts + + self.assertEqual(2, len(output)) + self.assertEqual((1, "a"), output[0]) + self.assertEqual((0, "c"), output[1]) + + self.assertEqual(1, len(ecounts)) + self.assertEqual(1, ecounts["ZeroDivisionError"]) + + ds = ds.resolve(ZeroDivisionError, lambda x: -1) + output = ds.collect() + ecounts = ds.exception_counts + + self.assertEqual(3, len(output)) + self.assertEqual((1, "a"), output[0]) + self.assertEqual((-1, "b"), output[1]) + self.assertEqual((0, "c"), output[2]) + + self.assertEqual(0, len(ecounts)) + + def test_withColumn_replace(self): + c = Context(self.conf_in_order) + + ds = c.parallelize([(1, "a", True), (0, "b", False), (3, "c", True)], columns=["num", "str", "bool"]) \ + .withColumn("str", lambda x, y, z: str(1 // x) + y) + output = ds.collect() + ecounts = ds.exception_counts + + self.assertEqual(2, len(output)) + self.assertEqual((1, "1a", True), output[0]) + self.assertEqual((3, "0c", True), output[1]) + + self.assertEqual(1, len(ecounts)) + self.assertEqual(1, ecounts["ZeroDivisionError"]) + + ds = ds.resolve(ZeroDivisionError, lambda x, y, z: "NULL") + output = ds.collect() + ecounts = ds.exception_counts + + self.assertEqual(3, len(output)) + self.assertEqual((1, "1a", True), output[0]) + self.assertEqual((0, "NULL", False), output[1]) + self.assertEqual((3, "0c", True), output[2]) + + self.assertEqual(0, len(ecounts)) + + def test_map(self): + c = Context(self.conf_in_order) + + ds = c.parallelize([1, 0, 0, 2]).map(lambda x: 1 // x) + output = ds.collect() + ecounts = ds.exception_counts + + self.assertEqual(2, len(output)) + self.assertEqual(1, output[0]) + self.assertEqual(0, output[1]) + + self.assertEqual(1, len(ecounts)) + self.assertEqual(2, ecounts["ZeroDivisionError"]) + + ds = ds.resolve(ZeroDivisionError, lambda x: -1) + output = ds.collect() + ecounts = ds.exception_counts + + self.assertEqual(4, len(output)) + self.assertEqual(1, output[0]) + self.assertEqual(-1, output[1]) + self.assertEqual(-1, output[2]) + self.assertEqual(0, output[3]) + + self.assertEqual(0, len(ecounts)) + + def test_filter(self): + c = Context(self.conf_in_order) + + ds = c.parallelize([1, 0, 0, 2]).filter(lambda x: (1 // x) < 5) + output = ds.collect() + ecounts = ds.exception_counts + + self.assertEqual(2, len(output)) + self.assertEqual(1, output[0]) + self.assertEqual(2, output[1]) + + self.assertEqual(1, len(ecounts)) + self.assertEqual(2, ecounts["ZeroDivisionError"]) + + ds = ds.resolve(ZeroDivisionError, lambda x: True) + output = ds.collect() + ecounts = ds.exception_counts + + self.assertEqual(4, len(output)) + self.assertEqual(1, output[0]) + self.assertEqual(0, output[1]) + self.assertEqual(0, output[2]) + self.assertEqual(2, output[3]) + + self.assertEqual(0, len(ecounts)) + diff --git a/tuplex/python/tests/test_parallelize.py b/tuplex/python/tests/test_parallelize.py index 7f7b5772f..1b9d66030 100644 --- a/tuplex/python/tests/test_parallelize.py +++ b/tuplex/python/tests/test_parallelize.py @@ -90,6 +90,24 @@ def testTupleOptionTypeI(self): assert res == ref + def testAutoUnpack(self): + c = Context(self.conf) + input = [{"a":1,"b":2,"c":3},{"a":4,"b":5,"c":6},{"a":7,"b":8,"c":9}] + output = c.parallelize(input).map(lambda x: (x["a"], x["b"], x["c"])).collect() + self.assertEqual([(1, 2, 3), (4, 5, 6), (7, 8, 9)], output) + + input = [{"a":1,"b":2,"c":3},{"a":4,"b":5,"c":6},{"a":7,"b":8,"c":9}] + output = c.parallelize(input, auto_unpack=False).collect() + self.assertEqual(input, output) + + input = [{"a":1,"b":2,"c":3},{"a":4,"b":5,"c":6},{"a":7,"b":8,"c":9},{"a": 1, "b":2}, {"c":11}] + output = c.parallelize(input).map(lambda x: x["a"]).collect() + self.assertEqual([1, 4, 7, 1], output) + + input = [{"a":1,"b":2,"c":3},{"d":4,"e":5,"f":6}] + output = c.parallelize(input).map(lambda x: (x["a"], x["b"], x["c"], x["d"], x["e"], x["f"])).collect() + self.assertEqual([(1, 2, 3, None, None, None), (None, None, None, 4, 5, 6)], output) + def testTupleOptionTypeII(self): c = Context(self.conf) ref = [(1.0, '2'), (None, '2'), (1.0, '2')] diff --git a/tuplex/python/tuplex/context.py b/tuplex/python/tuplex/context.py index 5b5051cbc..0cd3b9f0c 100644 --- a/tuplex/python/tuplex/context.py +++ b/tuplex/python/tuplex/context.py @@ -191,7 +191,7 @@ def __init__(self, conf=None, name="", **kwargs): self.metrics = Metrics(python_metrics) assert self.metrics - def parallelize(self, value_list, columns=None, schema=None): + def parallelize(self, value_list, columns=None, schema=None, auto_unpack=True): """ passes data to the Tuplex framework. Must be a list of primitive objects (e.g. of type bool, int, float, str) or a list of (nested) tuples of these types. @@ -200,11 +200,13 @@ def parallelize(self, value_list, columns=None, schema=None): columns (list): a list of strings or None to pass to the Tuplex backend in order to name the columns. Allows for dict access in functions then. schema: a schema defined as tuple of typing types. If None, then most likely schema will be inferred. + auto_unpack: whether or not to automatically unpack dictionaries with string keys. Returns: Tuplex.dataset.DataSet: A Tuplex Dataset object that allows further ETL operations """ assert isinstance(value_list, list), "data must be given as a list of objects" + assert isinstance(auto_unpack, bool), "auto_unpack must be given as a boolean" cols = [] if not columns: @@ -221,7 +223,7 @@ def parallelize(self, value_list, columns=None, schema=None): ds = DataSet() - ds._dataSet = self._context.parallelize(value_list, columns, schema) + ds._dataSet = self._context.parallelize(value_list, columns, schema, auto_unpack) return ds def csv(self, pattern, columns=None, header=None, delimiter=None, quotechar='"', null_values=[''], type_hints={}): diff --git a/tuplex/test/core/CacheTest.cc b/tuplex/test/core/CacheTest.cc index d00e83a1a..81454defa 100644 --- a/tuplex/test/core/CacheTest.cc +++ b/tuplex/test/core/CacheTest.cc @@ -16,6 +16,101 @@ class CacheTest : public PyTest { using namespace tuplex; using namespace std; +TEST_F(CacheTest, MergeInOrderWithFilter) { + using namespace std; + using namespace tuplex; + + auto fileURI = URI(testName + ".csv"); + + auto opt = microTestOptions(); + // enable NullValue Optimization + opt.set("tuplex.useLLVMOptimizer", "true"); + opt.set("tuplex.optimizer.generateParser", "true"); + opt.set("tuplex.executorCount", "0"); + opt.set("tuplex.optimizer.mergeExceptionsInOrder", "true"); + opt.set("tuplex.optimizer.nullValueOptimization", "true"); + opt.set("tuplex.normalcaseThreshold", "0.6"); + opt.set("tuplex.resolveWithInterpreterOnly", "false"); + + Context c(opt); + + stringstream ss; + for (int i = 0; i < 500; ++i) { + if (i % 4 == 0) { + ss << ",-1\n"; + } else { + ss << to_string(i) << "," << to_string(i) << "\n"; + } + } + stringToFile(fileURI, ss.str()); + + auto ds_cached = c.csv(fileURI.toPath()).cache(); + + auto res = ds_cached.filter(UDF("lambda x, y: y % 3 != 0")).map(UDF("lambda x, y: y")).collectAsVector(); + + std::vector expectedOutput; + for (int i = 0; i < 500; ++i) { + if (i % 4 == 0) { + expectedOutput.push_back(Row(-1)); + } else if (i % 3 != 0) { + expectedOutput.push_back(Row(i)); + } + } + + ASSERT_EQ(expectedOutput.size(), res.size()); + for (int i = 0; i < expectedOutput.size(); ++i) { + EXPECT_EQ(expectedOutput[i].toPythonString(), res[i].toPythonString()); + } +} + +TEST_F(CacheTest, MergeInOrder) { + using namespace std; + using namespace tuplex; + + auto fileURI = URI(testName + ".csv"); + + auto opt = microTestOptions(); + // enable NullValue Optimization + opt.set("tuplex.useLLVMOptimizer", "true"); + opt.set("tuplex.optimizer.generateParser", "true"); + opt.set("tuplex.executorCount", "0"); + opt.set("tuplex.optimizer.mergeExceptionsInOrder", "true"); + opt.set("tuplex.optimizer.nullValueOptimization", "true"); + opt.set("tuplex.normalcaseThreshold", "0.6"); + opt.set("tuplex.resolveWithInterpreterOnly", "false"); + + Context c(opt); + + auto size = 403; + auto mod1 = 5; + auto mod2 = 6; + + stringstream ss; + for (int i = 0; i < size; ++i) { + if (i % mod1 == 0 || i % mod2 == 0) { + ss << ",,,,-1\n"; + } else { + ss << to_string(i) << "," << to_string(i) << "," << to_string(i) << "," << to_string(i) << "," << to_string(i) << "\n"; + } + } + + stringToFile(fileURI, ss.str()); + + auto ds_cached = c.csv(fileURI.toPath()).cache(); + + auto res = ds_cached.map(UDF("lambda x: x[4]")).collectAsVector(); + printRows(res); + + ASSERT_EQ(res.size(), size); + for (int i = 0; i < res.size(); ++i) { + if (i % mod1 == 0 || i % mod2 == 0) { + EXPECT_EQ(res[i].toPythonString(), Row(-1).toPythonString()); + } else { + EXPECT_EQ(res[i].toPythonString(), Row(i).toPythonString()); + } + } +} + // Note: only this test here fails... TEST_F(CacheTest, SimpleCSVLoad) { using namespace std; diff --git a/tuplex/test/core/ResultSetTest.cc b/tuplex/test/core/ResultSetTest.cc index 49b24e27c..4acd38921 100644 --- a/tuplex/test/core/ResultSetTest.cc +++ b/tuplex/test/core/ResultSetTest.cc @@ -84,6 +84,7 @@ TEST_F(ResultSetTest, NoPyObjects) { int Nlimit = 17; auto rsB = make_shared(Schema(Schema::MemoryLayout::ROW, sample_rows.front().getRowType()), partitions, std::vector{}, + std::unordered_map(), vector>{}, Nlimit); pos = 0; @@ -141,6 +142,7 @@ TEST_F(ResultSetTest, WithPyObjects) { auto rsA = make_shared(Schema(Schema::MemoryLayout::ROW, rows.front().getRowType()), partitions, std::vector{}, + std::unordered_map(), objsA); EXPECT_EQ(rsA->rowCount(), objsA.size() + rows.size()); pos = 0; @@ -154,6 +156,7 @@ TEST_F(ResultSetTest, WithPyObjects) { auto rsB = make_shared(Schema(Schema::MemoryLayout::ROW, rows.front().getRowType()), partitions, std::vector{}, + std::unordered_map(), objsB); EXPECT_EQ(rsB->rowCount(), objsB.size() + rows.size()); pos = 0; @@ -168,6 +171,7 @@ TEST_F(ResultSetTest, WithPyObjects) { auto rsC = make_shared(Schema(Schema::MemoryLayout::ROW, rows.front().getRowType()), partitions, std::vector{}, + std::unordered_map(), objsC); EXPECT_EQ(rsC->rowCount(), objsC.size() + rows.size()); pos = 0; @@ -184,6 +188,7 @@ TEST_F(ResultSetTest, WithPyObjects) { auto rsD = make_shared(Schema(Schema::MemoryLayout::ROW, rows.front().getRowType()), std::vector{}, std::vector{}, + std::unordered_map(), objsD); EXPECT_EQ(rsD->rowCount(), objsD.size()); pos = 0; diff --git a/tuplex/test/wrappers/WrapperTest.cc b/tuplex/test/wrappers/WrapperTest.cc index 9779e90ed..c8a819c16 100644 --- a/tuplex/test/wrappers/WrapperTest.cc +++ b/tuplex/test/wrappers/WrapperTest.cc @@ -73,7 +73,6 @@ class WrapperTest : public ::testing::Test { } }; - #ifdef BUILD_WITH_AWS TEST_F(WrapperTest, LambdaBackend) { using namespace tuplex; @@ -166,8 +165,7 @@ TEST_F(WrapperTest, MixedSimpleTupleTuple) { auto resObj = res.ptr(); ASSERT_TRUE(PyList_Check(resObj)); - // Change to 4 when parallelize changes are merged - ASSERT_EQ(PyList_GET_SIZE(resObj), 3); + ASSERT_EQ(PyList_GET_SIZE(resObj), 4); PyObject_Print(resObj, stdout, 0); } @@ -205,14 +203,14 @@ TEST_F(WrapperTest, DictionaryParallelize) { PythonContext c("c", "", testOptions()); PyObject * dictObj1 = PyDict_New(); - PyDict_SetItem(dictObj1, python::PyString_FromString("Hello"), PyFloat_FromDouble(0.0)); - PyDict_SetItem(dictObj1, python::PyString_FromString("World"), PyFloat_FromDouble(1.345)); - PyDict_SetItem(dictObj1, python::PyString_FromString("!"), PyFloat_FromDouble(-2.234)); + PyDict_SetItem(dictObj1, python::PyString_FromString("a"), PyFloat_FromDouble(0.0)); + PyDict_SetItem(dictObj1, python::PyString_FromString("b"), PyFloat_FromDouble(1.345)); + PyDict_SetItem(dictObj1, python::PyString_FromString("c"), PyFloat_FromDouble(-2.234)); PyObject * dictObj2 = PyDict_New(); - PyDict_SetItem(dictObj2, python::PyString_FromString("a"), PyFloat_FromDouble(1.23)); - PyDict_SetItem(dictObj2, python::PyString_FromString("b"), PyFloat_FromDouble(2.34)); - PyDict_SetItem(dictObj2, python::PyString_FromString("c"), PyFloat_FromDouble(-3.45)); + PyDict_SetItem(dictObj2, python::PyString_FromString("d"), PyFloat_FromDouble(1.23)); + PyDict_SetItem(dictObj2, python::PyString_FromString("e"), PyFloat_FromDouble(2.34)); + PyDict_SetItem(dictObj2, python::PyString_FromString("f"), PyFloat_FromDouble(-3.45)); PyObject * listObj = PyList_New(2); PyList_SET_ITEM(listObj, 0, dictObj1); @@ -227,14 +225,14 @@ TEST_F(WrapperTest, DictionaryParallelize) { auto resObj = res.ptr(); ASSERT_TRUE(PyList_Check(resObj)); - ASSERT_EQ(PyList_Size(resObj), 1); + ASSERT_EQ(PyList_Size(resObj), 2); // check contents // ==> there will be only one result though, because of the type inference. Basically the first row will be taken :) // --> the other row will be stored as bad input row. auto tuple1 = PyList_GetItem(resObj, 0); ASSERT_TRUE(PyTuple_Check(tuple1)); - ASSERT_EQ(PyTuple_Size(tuple1), 3); + ASSERT_EQ(PyTuple_Size(tuple1), 6); } } @@ -2395,6 +2393,63 @@ TEST_F(WrapperTest, NYC311) { } } +TEST_F(WrapperTest, MixedTypesIsWithNone) { + using namespace tuplex; + using namespace std; + + auto opts = testOptions(); + opts = opts.substr(0, opts.length() - 1) + ",\"tuplex.optimizer.mergeExceptionsInOrder\":\"True\"}"; + PythonContext c("python", "", opts); + + PyObject *listObj = PyList_New(8); + PyList_SetItem(listObj, 0, Py_None); + PyList_SetItem(listObj, 1, PyLong_FromLong(255)); + PyList_SetItem(listObj, 2, PyLong_FromLong(400)); + PyList_SetItem(listObj, 3, Py_True); + PyList_SetItem(listObj, 4, PyFloat_FromDouble(2.7)); + PyList_SetItem(listObj, 5, PyTuple_New(0)); // empty tuple + PyList_SetItem(listObj, 6, PyList_New(0)); // empty list + PyList_SetItem(listObj, 7, PyDict_New()); // empty dict + + auto ref = vector{true, false, false, false, false, false, false, false}; + + Py_IncRef(listObj); + + { + auto list = boost::python::list(boost::python::handle<>(listObj)); + auto res = c.parallelize(list).map("lambda x: (x, x is None)", "").collect(); + auto resObj = res.ptr(); + PyObject_Print(resObj, stdout, 0); + std::cout<