diff --git a/tuplex/codegen/src/LLVMEnvironment.cc b/tuplex/codegen/src/LLVMEnvironment.cc index f9e76ca02..32e6b4e9b 100644 --- a/tuplex/codegen/src/LLVMEnvironment.cc +++ b/tuplex/codegen/src/LLVMEnvironment.cc @@ -146,9 +146,10 @@ namespace tuplex { // this function itself is super-slow, so cache its results static std::unordered_map>> _tupleIndexCache; + static std::mutex tupleIndexCacheMutex; std::tuple getTupleIndices(const python::Type &tupleType, size_t index) { - + std::lock_guard lock(tupleIndexCacheMutex); // find cache auto it = _tupleIndexCache.find(tupleType.hash()); if (it == _tupleIndexCache.end()) { diff --git a/tuplex/core/include/JobMetrics.h b/tuplex/core/include/JobMetrics.h index 54a9cabec..fe07374ef 100644 --- a/tuplex/core/include/JobMetrics.h +++ b/tuplex/core/include/JobMetrics.h @@ -28,6 +28,7 @@ namespace tuplex { double _llvm_compilation_time_s = 0.0; double _total_compilation_time_s = 0.0; double _sampling_time_s = 0.0; + double _generate_llvm_time_s = 0.0; // how long StageBuilder::build takes // numbers per stage, can get combined in case. struct StageMetrics { @@ -116,6 +117,12 @@ namespace tuplex { _total_compilation_time_s = time; } /*! + * @param time a double representing time in s + */ + void setGenerateLLVMTime(double time) { + _generate_llvm_time_s = time; + } + /*! * getter for logical optimization time * @returns a double representing logical optimization time in s */ @@ -143,6 +150,12 @@ namespace tuplex { double getTotalCompilationTime() { return _total_compilation_time_s; } + /*! + * @returns a double representing time in s + */ + double getGenerateLLVMTime() { + return _generate_llvm_time_s; + } /*! * set slow path timing info @@ -219,6 +232,7 @@ namespace tuplex { ss<<"\"llvm_compilation_time_s\":"<<_llvm_compilation_time_s<<","; ss<<"\"total_compilation_time_s\":"<<_total_compilation_time_s<<","; ss<<"\"sampling_time_s\":"<<_sampling_time_s<<","; + ss<<"\"generate_llvm_time_s\":"<<_generate_llvm_time_s<<","; // per stage numbers ss<<"\"stages\":["; diff --git a/tuplex/core/include/physical/StageBuilder.h b/tuplex/core/include/physical/StageBuilder.h index b8c682e43..b6741550a 100644 --- a/tuplex/core/include/physical/StageBuilder.h +++ b/tuplex/core/include/physical/StageBuilder.h @@ -80,34 +80,62 @@ namespace tuplex { bool _nullValueOptimization; std::vector _operators; - // codegen strings - std::string _funcFileWriteCallbackName; - std::string _funcMemoryWriteCallbackName; - std::string _funcExceptionCallback; - int64_t _stageNumber; int64_t _outputDataSetID; - std::string _resolveRowFunctionName; - std::string _resolveRowWriteCallbackName; - std::string _resolveRowExceptionCallbackName; - std::string _resolveHashCallbackName; + struct CodeGenerationContext { + // Common variables + bool allowUndefinedBehavior; + bool sharedObjectPropagation; + bool nullValueOptimization; + + EndPointMode outputMode; + FileFormat outputFileFormat; + int64_t outputNodeID; + + std::unordered_map fileOutputParameters; // parameters specific for a file output format + Schema outputSchema; //! output schema of stage + + std::vector hashColKeys; // the column to use as hash key + python::Type hashKeyType; + bool hashSaveOthers; // whether to save other columns than the key or not. => TODO: UDAs, meanByKey etc. all will require similar things... + bool hashAggregate; // whether the hashtable is an aggregate + + EndPointMode inputMode; + std::unordered_map fileInputParameters; // parameters specific for a file input format + + // Resolve variables + std::vector resolveOperators; + LogicalOperator* inputNode; + + Schema resolveReadSchema; //! schema for reading input + Schema resolveInputSchema; //! schema after applying projection pushdown to input source code + Schema resolveOutputSchema; //! output schema of stage - std::string _initStageFuncName; - std::string _releaseStageFuncName; + Schema normalCaseInputSchema; //! schema after applying normal case optimizations + + python::Type hashBucketType; + + // Fast Path + std::vector fastOperators; + python::Type fastReadSchema; + python::Type fastInSchema; + python::Type fastOutSchema; + + bool isRootStage; + bool generateParser; + + std::vector columnsToRead; + int64_t inputNodeID; + }; - std::string _aggregateInitFuncName; - std::string _aggregateCombineFuncName; - std::string _aggregateAggregateFuncName; std::string _aggregateCallbackName; std::string _pyPipelineName; - std::string _irBitCode; // store code as bitcode (faster for parsing) std::string _pyCode; // python backup code std::string _writerFuncName; // name of the function where to write stuff to. - std::string _funcStageName; std::unordered_map _fileInputParameters; // parameters specific for a file input format std::unordered_map _fileOutputParameters; // parameters specific for a file output format @@ -126,7 +154,6 @@ namespace tuplex { LogicalOperator* _inputNode; std::vector _columnsToRead; - std::string _funcHashWriteCallbackName; // callback for writing to hash table std::vector _hashColKeys; // the column to use as hash key python::Type _hashKeyType; python::Type _hashBucketType; @@ -164,16 +191,16 @@ namespace tuplex { * @param udfop * @return string */ - std::string formatBadUDFNode(UDFOperator* udfop); + std::string formatBadUDFNode(UDFOperator* udfop) const; /*! * generate LLVM IR code * @param fastCodePath whether to generate for fastCodePath or not. When false, always generates mem2mem. * @return */ - bool generateFastCodePath(); // file2mem always + TransformStage::TransformStageCodePath generateFastCodePath(const CodeGenerationContext& fastLocalVariables) const; // file2mem always - size_t resolveOperatorCount() { + size_t resolveOperatorCount() const { return std::count_if(_operators.begin(), _operators.end(), [](const LogicalOperator* op) { return op && op->type() == LogicalOperatorType::RESOLVE; }); @@ -185,14 +212,18 @@ namespace tuplex { /*! * code path for mem2mem exception resolution => sh */ - bool generateResolveCodePath(const std::shared_ptr& env); //! generates mix of LLVM / python code for slow code path including resolvers + TransformStage::TransformStageCodePath generateResolveCodePath(const CodeGenerationContext& resolveLocalVariables) const; //! generates mix of LLVM / python code for slow code path including resolvers void generatePythonCode(); //! generates fallback pipeline in pure python. => i.e. special case here... - python::Type intermediateType() const; - std::vector getOperatorIDsAffectedByResolvers(const std::vector &operators); }; + + /* + * Returns the intermediate schema if the output node of the list of operators is an aggregate. + * @param operators + */ + python::Type intermediateType(const std::vector& operators); } } diff --git a/tuplex/core/include/physical/TransformStage.h b/tuplex/core/include/physical/TransformStage.h index d37fabf6e..4ced66d9e 100644 --- a/tuplex/core/include/physical/TransformStage.h +++ b/tuplex/core/include/physical/TransformStage.h @@ -174,36 +174,52 @@ namespace tuplex { ecounts); } - std::string bitCode() const { - return _irBitCode; + std::string fastPathBitCode() const { + return _fastCodePath._fastPathIRBitCode; } - // to retrieve actual IR, use code - std::string code() const { - if(_irBitCode.empty()) + std::string slowPathBitCode() const { + return _slowCodePath._slowPathIRBitCode; + } + + // Retrieve fast path IR code + std::string fastPathCode() const { + if(fastPathBitCode().empty()) + return ""; + + // parse bit code & convert + llvm::LLVMContext ctx; + auto mod = codegen::bitCodeToModule(ctx, fastPathBitCode()); + if(!mod) + return ""; + return codegen::moduleToString(*mod.get()); + } + + // Retrieve slow path IR code + std::string slowPathCode() const { + if(slowPathBitCode().empty()) return ""; // parse bit code & convert llvm::LLVMContext ctx; - auto mod = codegen::bitCodeToModule(ctx, _irBitCode); + auto mod = codegen::bitCodeToModule(ctx, slowPathBitCode()); if(!mod) return ""; return codegen::moduleToString(*mod.get()); } - std::string funcName() const { return _funcStageName; } - std::string writerFuncName() const { return _writerFuncName; } - std::string writeMemoryCallbackName() const { return _funcMemoryWriteCallbackName; } - std::string writeFileCallbackName() const { return _funcFileWriteCallbackName; } - std::string writeHashCallbackName() const { return _funcHashWriteCallbackName; } - std::string exceptionCallbackName() const { return _funcExceptionCallback; } - std::string aggCombineCallbackName() const { return _aggregateCallbackName; } + std::string funcName() const { return _fastCodePath._funcStageName; } + std::string writerFuncName() const { return _fastCodePath._writerFuncName; } + std::string writeMemoryCallbackName() const { return _fastCodePath._funcMemoryWriteCallbackName; } + std::string writeFileCallbackName() const { return _fastCodePath._funcFileWriteCallbackName; } + std::string exceptionCallbackName() const { return _fastCodePath._funcExceptionCallback; } + std::string aggCombineCallbackName() const { return _fastCodePath._aggregateCallbackName; } // std::string resolveCode() const { return _irResolveCode; } - std::string resolveRowName() const { return _resolveRowFunctionName; } - std::string resolveWriteCallbackName() const { return _resolveRowWriteCallbackName; } - std::string resolveHashCallbackName() const { return _resolveHashCallbackName; } - std::string resolveExceptionCallbackName() const { return _resolveRowExceptionCallbackName; } + std::string resolveRowName() const { return _slowCodePath._resolveRowFunctionName; } + std::string resolveWriteCallbackName() const { return _slowCodePath._resolveRowWriteCallbackName; } + std::string resolveHashCallbackName() const { return _slowCodePath._resolveHashCallbackName; } + std::string resolveExceptionCallbackName() const { return _slowCodePath._resolveRowExceptionCallbackName; } std::string purePythonCode() const { return _pyCode; } std::string pythonPipelineName() const { return _pyPipelineName; } @@ -256,6 +272,16 @@ namespace tuplex { // JITSymbols for this stage struct JITSymbols { + struct CodePath { + codegen::init_stage_f initStageFunctor; + codegen::release_stage_f releaseStageFunctor; + + CodePath() : initStageFunctor(nullptr), releaseStageFunctor(nullptr) {} + }; + + CodePath _fastCodePath; + CodePath _slowCodePath; + codegen::read_block_f functor; // can be memory2memory or file2memory codegen::read_block_f writeFunctor; // memory2file codegen::resolve_f resolveFunctor; // always memory2memory @@ -268,8 +294,8 @@ namespace tuplex { JITSymbols() : functor(nullptr), writeFunctor(nullptr), resolveFunctor(nullptr), - initStageFunctor(nullptr), - releaseStageFunctor(nullptr), + _fastCodePath(), + _slowCodePath(), aggInitFunctor(nullptr), aggCombineFunctor(nullptr), aggAggregateFunctor(nullptr) {} @@ -280,10 +306,12 @@ namespace tuplex { * @param jit JIT instance * @param optimizer optional instance of optimizer to run over code first. No optimization run when set to 0 * @param excludeSlowPath if true, only fast path functions are compiled. This helps to have threads already busy when slow path still need to get compiled. - * @param registerSymbols whether to add task symbols to JIT compiler * @return a struct of all function pointers */ - std::shared_ptr compile(JITCompiler& jit, LLVMOptimizer *optimizer=nullptr, bool excludeSlowPath=false, bool registerSymbols=true); + std::shared_ptr compile(JITCompiler& jit, LLVMOptimizer *optimizer=nullptr, bool excludeSlowPath=false); + + void compileSlowPath(JITCompiler& jit, LLVMOptimizer *optimizer=nullptr); + void compileFastPath(JITCompiler& jit, LLVMOptimizer *optimizer=nullptr); EndPointMode outputMode() const override { return _outputMode; } EndPointMode inputMode() const override { return _inputMode; } @@ -390,18 +418,39 @@ namespace tuplex { int64_t number, bool allowUndefinedBehavior); - std::string _irBitCode; //! llvm IR bitcode - std::string _funcStageName; //! llvm function name of the stage function - std::string _funcMemoryWriteCallbackName; //! llvm function name of the write callback - std::string _funcFileWriteCallbackName; //! llvm function name of the write callback used for file output. - std::string _funcExceptionCallback; //! llvm function of the exception callback function - std::string _funcHashWriteCallbackName; //! the function to call when saving to hash table - std::string _initStageFuncName; //! init function for a stage (sets up globals & Co) - std::string _releaseStageFuncName; //! release function for a stage (releases globals & Co) - std::string _aggregateInitFuncName; //! to initiate aggregate (allocates via C-malloc) - std::string _aggregateCombineFuncName; //! to combine two aggregates (allocates & frees via C-malloc). - std::string _aggregateCallbackName; //! the callback to call with an aggregate - std::string _aggregateAggregateFuncName; //! to combine aggregate and result (allocates & frees via C-malloc). + struct TransformStageCodePath { + // Fast path variables + std::string _fastPathIRBitCode; //! llvm IR bitcode for fast path + std::string _fastPathInitStageFuncName; //! init function for a stage (sets up globals & Co) + std::string _fastPathReleaseStageFuncName; //! release function for a stage (releases globals & Co) + + std::string _funcStageName; //! llvm function name of the stage function + std::string _funcMemoryWriteCallbackName; //! llvm function name of the write callback + std::string _funcFileWriteCallbackName; //! llvm function name of the write callback used for file output. + std::string _funcExceptionCallback; //! llvm function of the exception callback function + std::string _funcHashWriteCallbackName; //! the function to call when saving to hash table + std::string _aggregateInitFuncName; //! to initiate aggregate (allocates via C-malloc) + std::string _aggregateCombineFuncName; //! to combine two aggregates (allocates & frees via C-malloc). + + std::string _writerFuncName; + std::string _aggregateCallbackName; //! the callback to call with an aggregate + + // Slow path variables + std::string _slowPathIRBitCode; //! llvm IR bitcode for slow path + std::string _slowPathInitStageFuncName; //! init function for a stage (sets up globals & Co) + std::string _slowPathReleaseStageFuncName; //! release function for a stage (releases globals & Co) + + std::string _resolveRowFunctionName; + std::string _resolveRowWriteCallbackName; + std::string _resolveRowExceptionCallbackName; + std::string _resolveHashCallbackName; + + // Common variables + std::string _aggregateAggregateFuncName; //! to combine aggregate and result (allocates & frees via C-malloc). + }; + + TransformStageCodePath _fastCodePath; + TransformStageCodePath _slowCodePath; EndPointMode _inputMode; //! indicate whether stage takes hash table, serialized mem or files as input EndPointMode _outputMode; //! analog @@ -435,17 +484,9 @@ namespace tuplex { URI _outputURI; //! the output uri for file mode of this stage - // resolve/exception handling code - // std::string _irResolveCode; - std::string _resolveRowFunctionName; - std::string _resolveRowWriteCallbackName; - std::string _resolveRowExceptionCallbackName; - std::string _resolveHashCallbackName; - // pure python pipeline code & names std::string _pyCode; std::string _pyPipelineName; - std::string _writerFuncName; std::shared_ptr emptyResultSet() const; diff --git a/tuplex/core/src/ee/aws/AWSLambdaBackend.cc b/tuplex/core/src/ee/aws/AWSLambdaBackend.cc index c38c0454d..905772ed7 100644 --- a/tuplex/core/src/ee/aws/AWSLambdaBackend.cc +++ b/tuplex/core/src/ee/aws/AWSLambdaBackend.cc @@ -366,7 +366,7 @@ namespace tuplex { Timer timer; llvm::LLVMContext ctx; LLVMOptimizer opt; - auto mod = codegen::bitCodeToModule(ctx, tstage->bitCode()); + auto mod = codegen::bitCodeToModule(ctx, tstage->fastPathBitCode()); opt.optimizeModule(*mod); optimizedBitcode = codegen::moduleToBitCodeString(*mod); logger().info("client-side LLVM IR optimization took " + std::to_string(timer.time()) + "s"); @@ -393,7 +393,7 @@ namespace tuplex { if(_options.USE_LLVM_OPTIMIZER() && !optimizedBitcode.empty()) pb_stage->set_bitcode(optimizedBitcode); else - pb_stage->set_bitcode(tstage->bitCode()); + pb_stage->set_bitcode(tstage->fastPathBitCode()); req.set_allocated_stage(pb_stage.release()); @@ -944,4 +944,4 @@ namespace tuplex { // other reset? @TODO. } } -#endif \ No newline at end of file +#endif diff --git a/tuplex/core/src/ee/local/LocalBackend.cc b/tuplex/core/src/ee/local/LocalBackend.cc index 63aadc9b1..d6489502c 100644 --- a/tuplex/core/src/ee/local/LocalBackend.cc +++ b/tuplex/core/src/ee/local/LocalBackend.cc @@ -760,7 +760,7 @@ namespace tuplex { } // special case: skip stage, i.e. empty code and mem2mem - if(tstage->code().empty() && !tstage->fileInputMode() && !tstage->fileOutputMode()) { + if(tstage->fastPathCode().empty() && !tstage->fileInputMode() && !tstage->fileOutputMode()) { tstage->setMemoryResult(tstage->inputPartitions()); // skip stage Logger::instance().defaultLogger().info("[Transform Stage] skipped stage " + std::to_string(tstage->number()) + " because there is nothing todo here."); @@ -825,10 +825,10 @@ namespace tuplex { // ==> init using optionally hashmaps from dependents int64_t init_rc = 0; - if((init_rc = syms->initStageFunctor(tstage->initData().numArgs, + if((init_rc = syms->_fastCodePath.initStageFunctor(tstage->initData().numArgs, reinterpret_cast(tstage->initData().hash_maps), reinterpret_cast(tstage->initData().null_buckets))) != 0) - throw std::runtime_error("initStage() failed for stage " + std::to_string(tstage->number()) + " with code " + std::to_string(init_rc)); + throw std::runtime_error("fastPathInitStage() failed for stage " + std::to_string(tstage->number()) + " with code " + std::to_string(init_rc)); // init aggregate by key @@ -887,6 +887,7 @@ namespace tuplex { // => there are fallback mechanisms... bool executeSlowPath = true; + bool slowPathActuallyExecuted = false; //TODO: implement pure python resolution here... // exceptions found or slowpath data given? @@ -968,6 +969,15 @@ namespace tuplex { // => this can be achieved by setting functor to nullptr! auto resolveFunctor = _options.RESOLVE_WITH_INTERPRETER_ONLY() ? nullptr : syms->resolveFunctor; + if (resolveFunctor != nullptr) { + slowPathActuallyExecuted = true; + int64_t init_rc = 0; + if((init_rc = syms->_slowCodePath.initStageFunctor(tstage->initData().numArgs, + reinterpret_cast(tstage->initData().hash_maps), + reinterpret_cast(tstage->initData().null_buckets))) != 0) + throw std::runtime_error("slowPathInitStage() failed for stage " + std::to_string(tstage->number()) + " with code " + std::to_string(init_rc)); + } + // cout<<"*** num tasks before resolution: "<releaseStageFunctor() != 0) - throw std::runtime_error("releaseStage() failed for stage " + std::to_string(tstage->number())); + if(syms->_fastCodePath.releaseStageFunctor() != 0) + throw std::runtime_error("fastPathReleaseStage() failed for stage " + std::to_string(tstage->number())); + + if(slowPathActuallyExecuted && syms->_slowCodePath.releaseStageFunctor() != 0) + throw std::runtime_error("slowPathReleaseStage() failed for stage " + std::to_string(tstage->number())); // add exception counts from previous stages to current one // @TODO: need to add test for this. I.e. the whole exceptions + joins needs to revised... @@ -1446,6 +1459,8 @@ namespace tuplex { // add all resolved tasks to the result // cout<<"*** need to compute "<PhysicalStage::plan()->getContext().metrics(); + Timer interpretedPathTimer; auto resolvedTasks = performTasks(resolveTasks); // cout<<"*** git "< #include #include +#include #include #include #include @@ -20,6 +21,7 @@ #include #include #include +#include #include #include @@ -234,7 +236,7 @@ namespace tuplex { _inputNode = csvop; } - std::string StageBuilder::formatBadUDFNode(tuplex::UDFOperator *udfop) { + std::string StageBuilder::formatBadUDFNode(tuplex::UDFOperator *udfop) const { assert(udfop); assert(hasUDF(udfop)); @@ -528,78 +530,27 @@ namespace tuplex { return opt_ops; } - bool StageBuilder::generateFastCodePath() { + TransformStage::TransformStageCodePath StageBuilder::generateFastCodePath(const CodeGenerationContext& fastLocalVariables) const { using namespace std; - - // special case: no operators and mem2mem - // ==> no code needed, i.e. empty stage - bool mem2mem = _inputMode == EndPointMode::MEMORY && _outputMode == EndPointMode::MEMORY; - if (_operators.empty() && mem2mem) { - // _irCode = ""; - // _irResolveCode = ""; - _irBitCode = ""; - _pyCode = ""; - return true; - } - - // null-value optimization? - // => specialize operators of THIS stage then. - // => joins take the general type - auto operators = specializePipeline(_nullValueOptimization, _inputNode, _operators); + TransformStage::TransformStageCodePath ret; string env_name = "tuplex_fastCodePath"; string func_prefix = ""; // name for function processing a row (include stage number) string funcStageName = func_prefix + "Stage_" + to_string(number()); string funcProcessRowName = func_prefix + "processRow_Stage_" + to_string(number()); - _funcFileWriteCallbackName = func_prefix + "writeOut_Stage_" + to_string(number()); - _funcMemoryWriteCallbackName = func_prefix + "memOut_Stage_" + to_string(number()); - _funcHashWriteCallbackName = func_prefix + "hashOut_Stage_" + to_string(number()); - _funcExceptionCallback = func_prefix + "except_Stage_" + to_string(number()); + ret._funcFileWriteCallbackName = func_prefix + "writeOut_Stage_" + to_string(number()); + ret._funcMemoryWriteCallbackName = func_prefix + "memOut_Stage_" + to_string(number()); + ret._funcHashWriteCallbackName = func_prefix + "hashOut_Stage_" + to_string(number()); + ret._funcExceptionCallback = func_prefix + "except_Stage_" + to_string(number()); + ret._writerFuncName = _writerFuncName; auto &logger = Logger::instance().logger("codegen"); - auto readSchema = _readSchema.getRowType(); // what to read from files (before projection pushdown) - auto inSchema = _inputSchema.getRowType(); // with what to start the pipeline (after projection pushdown) - auto outSchema = _outputSchema.getRowType(); // what to output from pipeline auto env = make_shared(env_name); - // per default, set normalcase to output types! - _normalCaseInputSchema = _inputSchema; - _normalCaseOutputSchema = _outputSchema; - Row intermediateInitialValue; // filled by aggregate operator, if needed. - // null-value optimization? => use input schema from operators! - if(_nullValueOptimization) { - if(_inputMode == EndPointMode::FILE) { - readSchema = dynamic_cast(_inputNode)->getOptimizedInputSchema().getRowType(); - _normalCaseInputSchema = dynamic_cast(_inputNode)->getOptimizedOutputSchema(); - inSchema = _normalCaseInputSchema.getRowType(); - _normalCaseOutputSchema = _normalCaseInputSchema; - } else if(_inputMode == EndPointMode::MEMORY && _inputNode && _inputNode->type() == LogicalOperatorType::CACHE) { - _normalCaseInputSchema = dynamic_cast(_inputNode)->getOptimizedOutputSchema(); - inSchema = _normalCaseInputSchema.getRowType(); - } else { - _normalCaseOutputSchema = _outputSchema; - } - - // output schema stays the same unless it's the most outer stage... - // i.e. might need type upgrade in the middle for inner stages as last operator - if(_isRootStage) { - if(!operators.empty()) { - _normalCaseOutputSchema = operators.back()->getOutputSchema(); - - // special case: CacheOperator => use optimized schema! - auto lastOp = operators.back(); - if(lastOp->type() == LogicalOperatorType::CACHE) - _normalCaseOutputSchema = ((CacheOperator*)lastOp)->getOptimizedOutputSchema(); - } - - outSchema = _normalCaseOutputSchema.getRowType(); - } - } - #ifdef VERBOSE_BUILD { stringstream ss; @@ -615,29 +566,28 @@ namespace tuplex { #endif #ifndef NDEBUG - if(!operators.empty()) { + if(!fastLocalVariables.fastOperators.empty()) { stringstream ss; - ss<<"output type of specialized pipeline is: "<getOutputSchema().getRowType().desc()<getOutputSchema().getRowType().desc()< " - + outSchema.desc() + " (" + pluralize(_operators.size(), "operator") + " pipelined)"); + logger.info("generating pipeline for " + fastLocalVariables.fastInSchema.desc() + " -> " + + fastLocalVariables.fastOutSchema.desc() + " (" + pluralize(_operators.size(), "operator") + " pipelined)"); // first node determines the data source @@ -646,77 +596,77 @@ namespace tuplex { // create initstage/release stage functions (LLVM) using namespace llvm; - _initStageFuncName = func_prefix + "initStage" + to_string(number()); - _releaseStageFuncName = func_prefix + "releaseStage" + to_string(number()); - auto initStageFuncType = FunctionType::get(env->i64Type(), + ret._fastPathInitStageFuncName = func_prefix + "fastPathInitStage" + to_string(number()); + ret._fastPathReleaseStageFuncName = func_prefix + "fastPathReleaseStage" + to_string(number()); + auto fastPathInitStageFuncType = FunctionType::get(env->i64Type(), {env->i64Type(), env->i8ptrType()->getPointerTo(), env->i8ptrType()->getPointerTo()}, false); - auto releaseStageFuncType = FunctionType::get(env->i64Type(), false); + auto fastPathReleaseStageFuncType = FunctionType::get(env->i64Type(), false); // create functions + builders - auto initStageFunc = cast( - env->getModule()->getOrInsertFunction(_initStageFuncName, initStageFuncType).getCallee()); - auto releaseStageFunc = cast( - env->getModule()->getOrInsertFunction(_releaseStageFuncName, releaseStageFuncType).getCallee()); + auto fastPathInitStageFunc = cast( + env->getModule()->getOrInsertFunction(ret._fastPathInitStageFuncName, fastPathInitStageFuncType).getCallee()); + auto fastPathReleaseStageFunc = cast( + env->getModule()->getOrInsertFunction(ret._fastPathReleaseStageFuncName, fastPathReleaseStageFuncType).getCallee()); - BasicBlock *bbISBody = BasicBlock::Create(env->getContext(), "", initStageFunc); - BasicBlock *bbRSBody = BasicBlock::Create(env->getContext(), "", releaseStageFunc); + BasicBlock *bbISBody = BasicBlock::Create(env->getContext(), "", fastPathInitStageFunc); + BasicBlock *bbRSBody = BasicBlock::Create(env->getContext(), "", fastPathReleaseStageFunc); IRBuilder<> isBuilder(bbISBody); IRBuilder<> rsBuilder(bbRSBody); - auto isArgs = codegen::mapLLVMFunctionArgs(initStageFunc, {"num_args", "hashmaps", "null_buckets"}); + auto isArgs = codegen::mapLLVMFunctionArgs(fastPathInitStageFunc, {"num_args", "hashmaps", "null_buckets"}); // step 1. build pipeline, i.e. how to process data - auto pip = std::make_shared(env, inSchema, intermediateType(), funcProcessRowName); + auto pip = std::make_shared(env, fastLocalVariables.fastInSchema, intermediateType(fastLocalVariables.fastOperators), funcProcessRowName); // Note: the pipeline function will return whether an exception occured. // if that happens, then call to handler in transform task builder // pip->addExceptionHandler(_funcExceptionCallback); // don't add a exception handler here. // sanity check: output of last op should match schema! - if(!operators.empty() && outSchema != operators.back()->getOutputSchema().getRowType()) { + if(!fastLocalVariables.fastOperators.empty() && fastLocalVariables.fastOutSchema != fastLocalVariables.fastOperators.back()->getOutputSchema().getRowType()) { cout<<"outSchema is different than last operator's schema:"<getOutputSchema().getRowType().desc()<getOutputSchema().getRowType().desc()<(node); switch (node->type()) { case LogicalOperatorType::MAP: { - if (!pip->mapOperation(node->getID(), udfop->getUDF(), _normalCaseThreshold, _allowUndefinedBehavior, - _sharedObjectPropagation)) { + if (!pip->mapOperation(node->getID(), udfop->getUDF(), _normalCaseThreshold, fastLocalVariables.allowUndefinedBehavior, + fastLocalVariables.sharedObjectPropagation)) { logger.error(formatBadUDFNode(udfop)); - return false; + return ret; } break; } case LogicalOperatorType::FILTER: { - if (!pip->filterOperation(node->getID(), udfop->getUDF(), _normalCaseThreshold, _allowUndefinedBehavior, - _sharedObjectPropagation)) { + if (!pip->filterOperation(node->getID(), udfop->getUDF(), _normalCaseThreshold, fastLocalVariables.allowUndefinedBehavior, + fastLocalVariables.sharedObjectPropagation)) { logger.error(formatBadUDFNode(udfop)); - return false; + return ret; } break; } case LogicalOperatorType::MAPCOLUMN: { auto mop = dynamic_cast(node); if (!pip->mapColumnOperation(node->getID(), mop->getColumnIndex(), udfop->getUDF(), - _normalCaseThreshold, _allowUndefinedBehavior, _sharedObjectPropagation)) { + _normalCaseThreshold, fastLocalVariables.allowUndefinedBehavior, fastLocalVariables.sharedObjectPropagation)) { logger.error(formatBadUDFNode(udfop)); - return false; + return ret; } break; } case LogicalOperatorType::WITHCOLUMN: { auto wop = dynamic_cast(node); if (!pip->withColumnOperation(node->getID(), wop->getColumnIndex(), udfop->getUDF(), - _normalCaseThreshold, _allowUndefinedBehavior, _sharedObjectPropagation)) { + _normalCaseThreshold, fastLocalVariables.allowUndefinedBehavior, fastLocalVariables.sharedObjectPropagation)) { logger.error(formatBadUDFNode(udfop)); - return false; + return ret; } break; } @@ -756,9 +706,6 @@ namespace tuplex { auto null_bucket_global = env->createNullInitializedGlobal(null_bucket_global_name, env->i8ptrType()); - // add to lookup map for slow case - _hashmap_vars[jop->getID()] = make_tuple(hash_map_global, null_bucket_global); - isBuilder.CreateStore(isBuilder.CreateLoad( isBuilder.CreateGEP(isArgs["hashmaps"], env->i32Const(global_var_cnt))), hash_map_global); @@ -774,7 +721,7 @@ namespace tuplex { auto rightRowType = jop->right()->getOutputSchema().getRowType(); // if null-value optimization is used, might need to adjust the type for the normal path! - if(_nullValueOptimization) { + if(fastLocalVariables.nullValueOptimization) { // build right or left? if(jop->buildRight()) { // i.e. @@ -792,7 +739,7 @@ namespace tuplex { hash_map_global, null_bucket_global)) { logger.error(formatBadUDFNode(udfop)); - return false; + return ret; } break; } @@ -807,17 +754,17 @@ namespace tuplex { // NOTE: these functions need to be generated only once for the general case type! auto aggType = aop->aggregateOutputType(); - _aggregateInitFuncName = "init_aggregate_stage" + std::to_string(number()); - _aggregateCombineFuncName = "combine_aggregate_stage" + std::to_string(number()); + ret._aggregateInitFuncName = "init_aggregate_stage" + std::to_string(number()); + ret._aggregateCombineFuncName = "combine_aggregate_stage" + std::to_string(number()); if(aop->aggType() == AggregateType::AGG_BYKEY) - _aggregateAggregateFuncName = "aggregate_aggregate_stage" + std::to_string(number()); - _aggregateCallbackName = "aggregate_callback_stage" + std::to_string(number()); + ret._aggregateAggregateFuncName = "aggregate_aggregate_stage" + std::to_string(number()); + ret._aggregateCallbackName = "aggregate_callback_stage" + std::to_string(number()); auto aggregateInitFunc = codegen::createAggregateInitFunction(env.get(), - _aggregateInitFuncName, + ret._aggregateInitFuncName, aop->initialValue(), aggType); // use c-malloc! auto combFunc = codegen::createAggregateCombineFunction(env.get(), - _aggregateCombineFuncName, + ret._aggregateCombineFuncName, aop->combinerUDF(), aggType, malloc); @@ -826,28 +773,28 @@ namespace tuplex { if(!combFunc) throw std::runtime_error("error compiling combiner function for aggregate"); // update func names, to avoid duplicates - _aggregateInitFuncName = aggregateInitFunc->getName().str(); - _aggregateCombineFuncName = combFunc->getName().str(); + ret._aggregateInitFuncName = aggregateInitFunc->getName().str(); + ret._aggregateCombineFuncName = combFunc->getName().str(); if(aop->aggType() == AggregateType::AGG_BYKEY) { // need to make the aggregate functor auto aggregateFunc = codegen::createAggregateFunction(env.get(), - _aggregateAggregateFuncName, + ret._aggregateAggregateFuncName, aop->aggregatorUDF(), aggType, aop->parent()->getOutputSchema().getRowType(), malloc); if(!aggregateFunc) throw std::runtime_error("error compiling aggregate function"); - _aggregateAggregateFuncName = aggregateFunc->getName().str(); + ret._aggregateAggregateFuncName = aggregateFunc->getName().str(); } else { // init intermediate within Stage process function. intermediateInitialValue = aop->initialValue(); if (!pip->addAggregate(aop->getID(), aop->aggregatorUDF(), aop->getOutputSchema().getRowType(), _normalCaseThreshold, - _allowUndefinedBehavior, - _sharedObjectPropagation)) { + fastLocalVariables.allowUndefinedBehavior, + fastLocalVariables.sharedObjectPropagation)) { logger.error(formatBadAggNode(aop)); - return false; + return ret; } } } else if(aop->aggType() == AggregateType::AGG_UNIQUE) { @@ -886,23 +833,23 @@ namespace tuplex { // only fast - switch(_outputMode) { + switch(fastLocalVariables.outputMode) { case EndPointMode::FILE: { // for file mode, can directly merge output rows //pip->buildWithTuplexWriter(_funcMemoryWriteCallbackName, _outputNodeID); //output node id - switch (_outputFileFormat) { + switch (fastLocalVariables.outputFileFormat) { case FileFormat::OUTFMT_CSV: { // i.e. write to memory writer! - pip->buildWithCSVRowWriter(_funcMemoryWriteCallbackName, - _outputNodeID, - hasOutputLimit(), - _fileOutputParameters["null_value"], + pip->buildWithCSVRowWriter(ret._funcMemoryWriteCallbackName, + fastLocalVariables.outputNodeID, + hasOutputLimit(), + fastLocalVariables.fileOutputParameters.at("null_value"), true, csvOutputDelimiter(), csvOutputQuotechar()); break; } case FileFormat::OUTFMT_ORC: { - pip->buildWithTuplexWriter(_funcMemoryWriteCallbackName, _outputNodeID, hasOutputLimit()); + pip->buildWithTuplexWriter(ret._funcMemoryWriteCallbackName, fastLocalVariables.outputNodeID, hasOutputLimit()); break; } default: @@ -924,23 +871,23 @@ namespace tuplex { // special case: join is executed on top of a .cache() // => - if(_nullValueOptimization) { + if(fastLocalVariables.nullValueOptimization) { if(!leaveNormalCase) { - if (!pip->addTypeUpgrade(_outputSchema.getRowType())) + if (!pip->addTypeUpgrade(fastLocalVariables.outputSchema.getRowType())) throw std::runtime_error( - "type upgrade from " + outSchema.desc() + " to " + - _outputSchema.getRowType().desc() + "failed."); + "type upgrade from " + fastLocalVariables.fastOutSchema.desc() + " to " + + fastLocalVariables.outputSchema.getRowType().desc() + "failed."); // set normal case output type to general case _normalCaseOutputSchema = _outputSchema; } } - pip->buildWithHashmapWriter(_funcHashWriteCallbackName, _hashColKeys, hashtableKeyWidth(_hashKeyType), _hashSaveOthers, _hashAggregate); + pip->buildWithHashmapWriter(ret._funcHashWriteCallbackName, fastLocalVariables.hashColKeys, hashtableKeyWidth(fastLocalVariables.hashKeyType), fastLocalVariables.hashSaveOthers, fastLocalVariables.hashAggregate); break; } case EndPointMode::MEMORY: { // special case: writing intermediate output - if(intermediateType() == python::Type::UNKNOWN) { + if(intermediateType(fastLocalVariables.fastOperators) == python::Type::UNKNOWN) { // NOTE: forcing output to be general case is not necessary for cache operator! // => i.e. may manually convert... @@ -948,21 +895,21 @@ namespace tuplex { // => always pass cache node! bool leaveNormalCase = false; - if(!operators.empty()) - leaveNormalCase = operators.back()->type() == LogicalOperatorType::CACHE; + if(!fastLocalVariables.fastOperators.empty()) + leaveNormalCase = fastLocalVariables.fastOperators.back()->type() == LogicalOperatorType::CACHE; // force output type to be always general case (=> so merging works easily!) - if(_nullValueOptimization) { + if(fastLocalVariables.nullValueOptimization) { if(!leaveNormalCase) { - if (!pip->addTypeUpgrade(_outputSchema.getRowType())) + if (!pip->addTypeUpgrade(fastLocalVariables.outputSchema.getRowType())) throw std::runtime_error( - "type upgrade from " + outSchema.desc() + " to " + - _outputSchema.getRowType().desc() + "failed."); + "type upgrade from " + fastLocalVariables.fastOutSchema.desc() + " to " + + fastLocalVariables.outputSchema.getRowType().desc() + "failed."); // set normal case output type to general case - _normalCaseOutputSchema = _outputSchema; + // _normalCaseOutputSchema = _outputSchema; } } - pip->buildWithTuplexWriter(_funcMemoryWriteCallbackName, _outputNodeID, hasOutputLimit()); + pip->buildWithTuplexWriter(ret._funcMemoryWriteCallbackName, fastLocalVariables.outputNodeID, hasOutputLimit()); } else { // build w/o writer pip->build(); @@ -978,39 +925,39 @@ namespace tuplex { // step 2. build connector to data source, i.e. generated parser or simply iterating over stuff std::shared_ptr tb; - if (_inputMode == EndPointMode::FILE) { + if (fastLocalVariables.inputMode == EndPointMode::FILE) { // only CSV supported yet // throw std::runtime_error("found unknown data-source operator " + node->name() + " for which a pipeline could not be generated"); // input schema holds for CSV node the original, unoptimized number of columns. // if pushdown is performed, outputschema holds whatever is left. - char delimiter = _fileInputParameters.at("delimiter")[0]; - char quotechar = _fileInputParameters.at("quotechar")[0]; + char delimiter = fastLocalVariables.fileInputParameters.at("delimiter")[0]; + char quotechar = fastLocalVariables.fileInputParameters.at("quotechar")[0]; // note: null_values may be empty! - auto null_values = jsonToStringArray(_fileInputParameters.at("null_values")); + auto null_values = jsonToStringArray(fastLocalVariables.fileInputParameters.at("null_values")); switch (_inputFileFormat) { case FileFormat::OUTFMT_CSV: case FileFormat::OUTFMT_TEXT: { - if (_generateParser) { + if (fastLocalVariables.generateParser) { tb = make_shared(env, - readSchema, - _columnsToRead, + fastLocalVariables.fastReadSchema, + fastLocalVariables.columnsToRead, funcStageName, - _inputNodeID, + fastLocalVariables.inputNodeID, null_values, delimiter, quotechar); } else { - tb = make_shared(env, readSchema, _columnsToRead, + tb = make_shared(env, fastLocalVariables.fastReadSchema, fastLocalVariables.columnsToRead, funcStageName, - _inputNodeID, null_values); + fastLocalVariables.inputNodeID, null_values); } break; } case FileFormat::OUTFMT_ORC: { - tb = make_shared(env, inSchema, funcStageName); + tb = make_shared(env, fastLocalVariables.fastInSchema, funcStageName); break; } default: @@ -1018,19 +965,19 @@ namespace tuplex { } } else { // tuplex (in-memory) reader - tb = make_shared(env, inSchema, funcStageName); + tb = make_shared(env, fastLocalVariables.fastInSchema, funcStageName); } // set pipeline and // add ignore codes & exception handler - tb->setExceptionHandler(_funcExceptionCallback); + tb->setExceptionHandler(ret._funcExceptionCallback); tb->setIgnoreCodes(ignoreCodes); tb->setPipeline(pip); // special case: intermediate - if(intermediateType() != python::Type::UNKNOWN) { - tb->setIntermediateInitialValueByRow(intermediateType(), intermediateInitialValue); - tb->setIntermediateWriteCallback(_aggregateCallbackName); + if(intermediateType(fastLocalVariables.fastOperators) != python::Type::UNKNOWN) { + tb->setIntermediateInitialValueByRow(intermediateType(fastLocalVariables.fastOperators), intermediateInitialValue); + tb->setIntermediateWriteCallback(ret._aggregateCallbackName); } // create code for "wrap-around" function @@ -1038,22 +985,6 @@ namespace tuplex { if (!func) throw std::runtime_error("could not build codegen csv parser"); - // compile on top of this pipeline resolve code path if several conditons are met - // 1.) compile if resolve function is present - // 2.) compile if null-value optimization is present (could be done lazily) - auto numResolveOperators = resolveOperatorCount(); - // if resolvers are present, compile a slowPath. - bool requireSlowPath = _nullValueOptimization; // per default, slow path is always required when null-value opt is enabled. - - // special case: input source is cached and no exceptions happened => no resolve path necessary if there are no resolvers! - if(_inputNode->type() == LogicalOperatorType::CACHE && dynamic_cast(_inputNode)->cachedExceptions().empty()) - requireSlowPath = false; - - if (numResolveOperators > 0 || requireSlowPath) { - // generate in same env b.c. need to have access to globals... - generateResolveCodePath(env); - } - assert(func); // close initStage/releaseStage functions @@ -1069,25 +1000,36 @@ namespace tuplex { // save into variables (allows to serialize stage etc.) // IR is generated. Save into stage. - _funcStageName = func->getName(); - _irBitCode = codegen::moduleToBitCodeString(*env->getModule()); // trafo stage takes ownership of module + ret._funcStageName = func->getName(); + ret._fastPathIRBitCode = codegen::moduleToBitCodeString(*env->getModule()); // trafo stage takes ownership of module // @TODO: lazy & fast codegen of the different paths + lowering of them // generate interpreter fallback path (always) --> probably do that lazily or parallelize it... - generatePythonCode(); - - return true; + return ret; } - bool StageBuilder::generateResolveCodePath(const std::shared_ptr &env) { + TransformStage::TransformStageCodePath StageBuilder::generateResolveCodePath(const CodeGenerationContext& resolveLocalVariables) const { using namespace std; using namespace llvm; + TransformStage::TransformStageCodePath ret; + + // Compile if resolve function is present or if null-value optimization is present + auto numResolveOperators = resolveOperatorCount(); + bool requireSlowPath = resolveLocalVariables.nullValueOptimization; // per default, slow path is always required when null-value opt is enabled. + + // special case: input source is cached and no exceptions happened => no resolve path necessary if there are no resolvers! + if(resolveLocalVariables.inputNode->type() == LogicalOperatorType::CACHE && dynamic_cast(resolveLocalVariables.inputNode)->cachedExceptions().empty()) + requireSlowPath = false; + + if (numResolveOperators == 0 && !requireSlowPath) { + return ret; + } // when there are no operators present, there is no way to generate a resolve path // => skip - if(_operators.empty() && !_nullValueOptimization) // when null value optimization is done, need to always generate resolve path. - return true; + if(resolveLocalVariables.resolveOperators.empty() && !resolveLocalVariables.nullValueOptimization) // when null value optimization is done, need to always generate resolve path. + return ret; // @TODO: one needs to add here somewhere an option where bad input rows/data get resolved when they do not fit the initial schema! // r/n it's all squashed together in a pipeline. @@ -1097,6 +1039,36 @@ namespace tuplex { auto &logger = Logger::instance().logger("codegen"); + // Create environment + string env_name = "tuplex_slowCodePath"; + string func_prefix = ""; + + auto readSchema = resolveLocalVariables.resolveReadSchema.getRowType(); // what to read from files (before projection pushdown) + auto inSchema = resolveLocalVariables.resolveInputSchema.getRowType(); // with what to start the pipeline (after projection pushdown) + auto resolveInSchema = inSchema; + auto outSchema = resolveLocalVariables.resolveOutputSchema.getRowType(); // what to output from pipeline + + auto env = make_shared(env_name); + + ret._slowPathInitStageFuncName = func_prefix + "slowPathInitStage" + to_string(number()); + ret._slowPathReleaseStageFuncName = func_prefix + "slowPathReleaseStage" + to_string(number()); + auto slowPathInitStageFuncType = FunctionType::get(env->i64Type(), + {env->i64Type(), env->i8ptrType()->getPointerTo(), + env->i8ptrType()->getPointerTo()}, false); + auto slowPathReleaseStageFuncType = FunctionType::get(env->i64Type(), false); + + // create functions + builders + auto slowPathInitStageFunc = cast( + env->getModule()->getOrInsertFunction(ret._slowPathInitStageFuncName, slowPathInitStageFuncType).getCallee()); + auto slowPathReleaseStageFunc = cast( + env->getModule()->getOrInsertFunction(ret._slowPathReleaseStageFuncName, slowPathReleaseStageFuncType).getCallee()); + + BasicBlock *bbISBody = BasicBlock::Create(env->getContext(), "", slowPathInitStageFunc); + BasicBlock *bbRSBody = BasicBlock::Create(env->getContext(), "", slowPathReleaseStageFunc); + IRBuilder<> isBuilder(bbISBody); + IRBuilder<> rsBuilder(bbRSBody); + auto isArgs = codegen::mapLLVMFunctionArgs(slowPathInitStageFunc, {"num_args", "hashmaps", "null_buckets"}); + // Note: this here is quite confusing, because for map operator when tuples are needed, this will return not the row schema but the UDF input schema =? fix that // @TODO: fix getInputSchema for MapOperator!!! auto inSchema = _inputSchema.getRowType(); // old: _operators.front()->getInputSchema().getRowType(); @@ -1106,32 +1078,37 @@ namespace tuplex { string slowPathHashWriteCallback = "hashOutViaSlowPath_Stage_" + to_string(number()); string slowPathExceptionCallback = "exceptionOutViaSlowPath_Stage_" + to_string(number()); - auto slowPip = std::make_shared(env, inSchema, intermediateType(), funcSlowPathName); - for (auto it = _operators.begin(); it != _operators.end(); ++it) { - auto node = *it; + logger.debug("input schema for general case is: " + resolveInSchema.desc()); + logger.debug("intermediate type for general case is: " + intermediateType(resolveLocalVariables.resolveOperators).desc()); + + auto slowPip = std::make_shared(env, resolveInSchema, intermediateType(resolveLocalVariables.resolveOperators), funcSlowPathName); + int global_var_cnt = 0; + auto num_operators = resolveLocalVariables.resolveOperators.size(); + for (int i = 0; i < num_operators; ++i) { + auto node = resolveLocalVariables.resolveOperators[i]; assert(node); UDFOperator *udfop = dynamic_cast(node); switch (node->type()) { case LogicalOperatorType::MAP: { - slowPip->mapOperation(node->getID(), udfop->getUDF(), _normalCaseThreshold, _allowUndefinedBehavior, - _sharedObjectPropagation); + slowPip->mapOperation(node->getID(), udfop->getUDF(), _normalCaseThreshold, resolveLocalVariables.allowUndefinedBehavior, + resolveLocalVariables.sharedObjectPropagation); break; } case LogicalOperatorType::FILTER: { - slowPip->filterOperation(node->getID(), udfop->getUDF(), _normalCaseThreshold, _allowUndefinedBehavior, - _sharedObjectPropagation); + slowPip->filterOperation(node->getID(), udfop->getUDF(), _normalCaseThreshold, resolveLocalVariables.allowUndefinedBehavior, + resolveLocalVariables.sharedObjectPropagation); break; } case LogicalOperatorType::MAPCOLUMN: { auto mop = dynamic_cast(node); slowPip->mapColumnOperation(node->getID(), mop->getColumnIndex(), udfop->getUDF(), - _normalCaseThreshold, _allowUndefinedBehavior, _sharedObjectPropagation); + _normalCaseThreshold, resolveLocalVariables.allowUndefinedBehavior, resolveLocalVariables.sharedObjectPropagation); break; } case LogicalOperatorType::WITHCOLUMN: { auto wop = dynamic_cast(node); slowPip->withColumnOperation(node->getID(), wop->getColumnIndex(), udfop->getUDF(), - _normalCaseThreshold, _allowUndefinedBehavior, _sharedObjectPropagation); + _normalCaseThreshold, resolveLocalVariables.allowUndefinedBehavior, resolveLocalVariables.sharedObjectPropagation); break; } case LogicalOperatorType::CACHE: @@ -1143,8 +1120,8 @@ namespace tuplex { case LogicalOperatorType::RESOLVE: { // ==> this means slow code path needs to be generated as well! auto rop = dynamic_cast(node); - slowPip->addResolver(rop->ecCode(), rop->getID(), rop->getUDF(), _normalCaseThreshold, _allowUndefinedBehavior, - _sharedObjectPropagation); + slowPip->addResolver(rop->ecCode(), rop->getID(), rop->getUDF(), _normalCaseThreshold, resolveLocalVariables.allowUndefinedBehavior, + resolveLocalVariables.sharedObjectPropagation); break; } case LogicalOperatorType::IGNORE: { @@ -1169,12 +1146,30 @@ namespace tuplex { auto jop = dynamic_cast(node); assert(jop); - auto entry = _hashmap_vars.at(jop->getID()); + string hashmap_global_name = + func_prefix + "hash_map_" + to_string(global_var_cnt) + "_stage" + to_string(number()); + string null_bucket_global_name = + func_prefix + "null_bucket_" + to_string(global_var_cnt) + "_stage" + to_string(number()); // add two new globals + init code to init/release func - auto hash_map_global = std::get<0>(entry); - auto null_bucket_global = std::get<1>(entry); + auto hash_map_global = env->createNullInitializedGlobal(hashmap_global_name, env->i8ptrType()); + auto null_bucket_global = env->createNullInitializedGlobal(null_bucket_global_name, + env->i8ptrType()); + isBuilder.CreateStore(isBuilder.CreateLoad( + isBuilder.CreateGEP(isArgs["hashmaps"], env->i32Const(global_var_cnt))), + hash_map_global); + isBuilder.CreateStore(isBuilder.CreateLoad( + isBuilder.CreateGEP(isArgs["null_buckets"], env->i32Const(global_var_cnt))), + null_bucket_global); + + rsBuilder.CreateStore(env->i8nullptr(), hash_map_global); + rsBuilder.CreateStore(env->i8nullptr(), null_bucket_global); + + auto leftRowType = jop->left()->getOutputSchema().getRowType(); + auto rightRowType = jop->right()->getOutputSchema().getRowType(); + + global_var_cnt++; if (!slowPip->addHashJoinProbe(jop->leftKeyIndex(), jop->left()->getOutputSchema().getRowType(), jop->rightKeyIndex(), @@ -1184,38 +1179,73 @@ namespace tuplex { hash_map_global, null_bucket_global)) { logger.error(formatBadUDFNode(udfop)); - return false; + return ret; } break; } case LogicalOperatorType::AGGREGATE: { -// auto aop = dynamic_cast(node); assert(aop); -// auto entry = _hashmap_vars.at(aop->getID()); -// -// if(aop->aggType() == AggregateType::AGG_UNIQUE) { -// auto hash_map_global = std::get<0>(entry); -// if (!slowPip->addHashUnique(aop->parent()->getOutputSchema().getRowType(), hash_map_global)) { -// logger.error(formatBadUDFNode(udfop)); -// return false; -// } -// } else { -// throw runtime_error("Unsupported aggregate type!"); -// } - - // for byKey separate logic is required to key properly - auto aop = dynamic_cast(node); assert(aop); - switch(aop->aggType()) { - case AggregateType::AGG_UNIQUE: { - // this is supported => do not need to specify further! - break; + // @TODO: needs more support for full aggregate fallback code + auto aop = dynamic_cast(node); assert(aop); + if(aop->aggType() == AggregateType::AGG_GENERAL || aop->aggType() == AggregateType::AGG_BYKEY) { + + // right now aggregation is done using a global variable. + // this is because of the overall compilation design + // in the future we should prob. rewrite this to compile better plans... + // writing to a pointer seems like a bad idea... + + // NOTE: these functions need to be generated only once for the general case type! + auto aggType = aop->aggregateOutputType(); + ret._aggregateInitFuncName = "init_aggregate_stage" + std::to_string(number()); + ret._aggregateCombineFuncName = "combine_aggregate_stage" + std::to_string(number()); + if(aop->aggType() == AggregateType::AGG_BYKEY) + ret._aggregateAggregateFuncName = "aggregate_aggregate_stage" + std::to_string(number()); + ret._aggregateCallbackName = "aggregate_callback_stage" + std::to_string(number()); + auto aggregateInitFunc = codegen::createAggregateInitFunction(env.get(), + ret._aggregateInitFuncName, + aop->initialValue(), + aggType); // use c-malloc! + auto combFunc = codegen::createAggregateCombineFunction(env.get(), + ret._aggregateCombineFuncName, + aop->combinerUDF(), + aggType, + malloc, + _allowUndefinedBehavior, + _sharedObjectPropagation); + if(!aggregateInitFunc) + throw std::runtime_error("error compiling aggregate initialize function"); + if(!combFunc) + throw std::runtime_error("error compiling combiner function for aggregate"); + // update func names, to avoid duplicates + ret._aggregateInitFuncName = aggregateInitFunc->getName().str(); + ret._aggregateCombineFuncName = combFunc->getName().str(); + + if(aop->aggType() == AggregateType::AGG_BYKEY) { // need to make the aggregate functor + auto aggregateFunc = codegen::createAggregateFunction(env.get(), + ret._aggregateAggregateFuncName, + aop->aggregatorUDF(), aggType, + aop->parent()->getOutputSchema().getRowType(), + malloc, _allowUndefinedBehavior, + _sharedObjectPropagation); + if(!aggregateFunc) + throw std::runtime_error("error compiling aggregate function"); + ret._aggregateAggregateFuncName = aggregateFunc->getName().str(); + } else { + // init intermediate within Stage process function. +// intermediateInitialValue = aop->initialValue(); + if (!slowPip->addAggregate(aop->getID(), aop->aggregatorUDF(), + aop->getOutputSchema().getRowType(), + _allowUndefinedBehavior, _sharedObjectPropagation)) { + logger.error(formatBadAggNode(aop)); + return ret; + } } - default: - // --> need to codegen aggregate for resolve?? - // or should this be handled directly in ResolveTask incl. function to extract potentially the key - // from the output buffer? - throw std::runtime_error("aggregateResolve not yet supported, have to think about this..."); + } else if(aop->aggType() == AggregateType::AGG_UNIQUE) { + // nothing to do... + } else { + throw std::runtime_error("unsupported aggregate type"); } + break; // aggregate isn't codegen'd } @@ -1234,40 +1264,44 @@ namespace tuplex { // => else, error! - bool useRawOutput = _outputMode == EndPointMode::FILE && _outputFileFormat == FileFormat::OUTFMT_CSV; + bool useRawOutput = resolveLocalVariables.outputMode == EndPointMode::FILE && resolveLocalVariables.outputFileFormat == FileFormat::OUTFMT_CSV; // build slow path with mem writer or to CSV llvm::Function* slowPathFunc = nullptr; if(useRawOutput) { - slowPathFunc = slowPip->buildWithCSVRowWriter(slowPathMemoryWriteCallback, _outputNodeID, + slowPathFunc = slowPip->buildWithCSVRowWriter(slowPathMemoryWriteCallback, resolveLocalVariables.outputNodeID, hasOutputLimit(), - _fileOutputParameters["null_value"], true, - csvOutputDelimiter(), csvOutputQuotechar()); + resolveLocalVariables.fileOutputParameters.at("null_value"), true, + resolveLocalVariables.fileOutputParameters.at("delimiter")[0], resolveLocalVariables.fileOutputParameters.at("quotechar")[0]); } else { // @TODO: hashwriter if hash output desired - if(_outputMode == EndPointMode::HASHTABLE) { - slowPathFunc = slowPip->buildWithHashmapWriter(slowPathHashWriteCallback, - _hashColKeys, - hashtableKeyWidth(_hashKeyType), - _hashSaveOthers, _hashAggregate); + if(resolveLocalVariables.outputMode == EndPointMode::HASHTABLE) { + slowPathFunc = slowPip->buildWithHashmapWriter(slowPathHashWriteCallback, resolveLocalVariables.hashColKeys, hashtableKeyWidth(resolveLocalVariables.hashKeyType), resolveLocalVariables.hashSaveOthers, resolveLocalVariables.hashAggregate); } else { - slowPathFunc = slowPip->buildWithTuplexWriter(slowPathMemoryWriteCallback, _outputNodeID, hasOutputLimit()); + slowPathFunc = slowPip->buildWithTuplexWriter(slowPathMemoryWriteCallback, resolveLocalVariables.outputNodeID, hasOutputLimit()); } } // create wrapper which decodes automatically normal-case rows with optimized types... - auto normalCaseType = _normalCaseInputSchema.getRowType(); + auto normalCaseType = resolveLocalVariables.normalCaseInputSchema.getRowType(); auto null_values = - _inputMode == EndPointMode::FILE ? jsonToStringArray(_fileInputParameters.at("null_values")) + resolveLocalVariables.inputMode == EndPointMode::FILE ? jsonToStringArray(resolveLocalVariables.fileInputParameters.at("null_values")) : std::vector{"None"}; auto rowProcessFunc = codegen::createProcessExceptionRowWrapper(*slowPip, funcResolveRowName, normalCaseType, null_values); - _resolveRowFunctionName = rowProcessFunc->getName(); - _resolveRowWriteCallbackName = slowPathMemoryWriteCallback; - _resolveRowExceptionCallbackName = slowPathExceptionCallback; - _resolveHashCallbackName = slowPathHashWriteCallback; + ret._resolveRowFunctionName = rowProcessFunc->getName(); + ret._resolveRowWriteCallbackName = slowPathMemoryWriteCallback; + ret._resolveRowExceptionCallbackName = slowPathExceptionCallback; + ret._resolveHashCallbackName = slowPathHashWriteCallback; + + // close initStage/releaseStage functions + // => call global init function of llvm env + isBuilder.CreateRet(env->callGlobalsInit(isBuilder)); + rsBuilder.CreateRet(env->callGlobalsRelease(rsBuilder)); + + ret._slowPathIRBitCode = codegen::moduleToBitCodeString(*env->getModule()); // transform stage takes ownership of module - return true; + return ret; } void StageBuilder::addFileOutput(FileOutputOperator *fop) { @@ -1385,12 +1419,98 @@ namespace tuplex { } TransformStage *StageBuilder::build(PhysicalPlan *plan, IBackend *backend) { + TransformStage *stage = new TransformStage(plan, backend, _stageNumber, _allowUndefinedBehavior); - // generate fast code path - // --> slow code path is generated within this function (i.e. resolve & Co) - generateFastCodePath(); + bool mem2mem = _inputMode == EndPointMode::MEMORY && _outputMode == EndPointMode::MEMORY; - TransformStage *stage = new TransformStage(plan, backend, _stageNumber, _allowUndefinedBehavior); + JobMetrics& metrics = stage->PhysicalStage::plan()->getContext().metrics(); + Timer timer; + if (_operators.empty() && mem2mem) { + _pyCode = ""; + + TransformStage::TransformStageCodePath slow; + TransformStage::TransformStageCodePath fast; + stage->_slowCodePath = slow; + stage->_fastCodePath = fast; + } else { + + auto operators = specializePipeline(_nullValueOptimization, _inputNode, _operators); + + auto readSchema = _readSchema.getRowType(); // what to read from files (before projection pushdown) + auto inSchema = _inputSchema.getRowType(); // with what to start the pipeline (after projection pushdown) + auto outSchema = _outputSchema.getRowType(); // what to output from pipeline + + // per default, set normalcase to output types! + _normalCaseInputSchema = _inputSchema; + _normalCaseOutputSchema = _outputSchema; + + // null-value optimization? => use input schema from operators! + if(_nullValueOptimization) { + if(_inputMode == EndPointMode::FILE) { + readSchema = dynamic_cast(_inputNode)->getOptimizedInputSchema().getRowType(); + _normalCaseInputSchema = dynamic_cast(_inputNode)->getOptimizedOutputSchema(); + inSchema = _normalCaseInputSchema.getRowType(); + _normalCaseOutputSchema = _normalCaseInputSchema; + } else if(_inputMode == EndPointMode::MEMORY && _inputNode && _inputNode->type() == LogicalOperatorType::CACHE) { + _normalCaseInputSchema = dynamic_cast(_inputNode)->getOptimizedOutputSchema(); + inSchema = _normalCaseInputSchema.getRowType(); + } else { + _normalCaseOutputSchema = _outputSchema; + } + + // output schema stays the same unless it's the most outer stage... + // i.e. might need type upgrade in the middle for inner stages as last operator + if(_isRootStage) { + if(!operators.empty()) { + _normalCaseOutputSchema = operators.back()->getOutputSchema(); + + // special case: CacheOperator => use optimized schema! + auto lastOp = operators.back(); + if(lastOp->type() == LogicalOperatorType::CACHE) + _normalCaseOutputSchema = ((CacheOperator*)lastOp)->getOptimizedOutputSchema(); + } + + outSchema = _normalCaseOutputSchema.getRowType(); + } + + if (_outputMode == EndPointMode::HASHTABLE) { + _normalCaseOutputSchema = _outputSchema; + } else if (_outputMode == EndPointMode::MEMORY && intermediateType(operators) == python::Type::UNKNOWN) { + bool leaveNormalCase = false; + + // If outputNode is not a cache operator then leave normal case as is + if (!operators.empty()) + leaveNormalCase = operators.back()->type() == LogicalOperatorType::CACHE; + + if (!leaveNormalCase) + _normalCaseOutputSchema = _outputSchema; + } + } + + auto& logger = Logger::instance().defaultLogger(); + + CodeGenerationContext codeGenerationContext { + _allowUndefinedBehavior, _sharedObjectPropagation, + _nullValueOptimization, _outputMode, _outputFileFormat, _outputNodeID, + _fileOutputParameters, _outputSchema, _hashColKeys, _hashKeyType, + _hashSaveOthers, _hashAggregate, _inputMode, _fileInputParameters, _operators, + _inputNode, _readSchema, _inputSchema, _outputSchema, _normalCaseInputSchema, + _hashBucketType, operators, readSchema, inSchema, outSchema, _isRootStage, + _generateParser, _columnsToRead, _inputNodeID + }; + + + std::shared_future slowCodePath_f = std::async(std::launch::async, [this, &codeGenerationContext]() { + return generateResolveCodePath(codeGenerationContext); + }); + + generatePythonCode(); + + stage->_fastCodePath = generateFastCodePath(codeGenerationContext); + + stage->_slowCodePath = slowCodePath_f.get(); + + } stage->_inputColumns = _inputColumns; stage->_outputColumns = _outputColumns; @@ -1435,7 +1555,6 @@ namespace tuplex { // => https://llvm.org/doxygen/BitcodeWriter_8cpp_source.html#l04457 // stage->_irCode = _irCode; // stage->_irResolveCode = _irResolveCode; - stage->_irBitCode = _irBitCode; stage->_pyCode = _pyCode; stage->_pyPipelineName = _pyPipelineName; @@ -1447,40 +1566,22 @@ namespace tuplex { // DEBUG, write out generated trafo code... #ifndef NDEBUG - stringToFile(URI("transform_stage_" + std::to_string(_stageNumber) + ".txt"), stage->code()); + stringToFile(URI("fastpath_transform_stage_" + std::to_string(_stageNumber) + ".txt"), stage->fastPathCode()); + stringToFile(URI("slowpath_transform_stage_" + std::to_string(_stageNumber) + ".txt"), stage->slowPathCode()); #endif - // code names/symbols - stage->_funcStageName = _funcStageName; - stage->_funcMemoryWriteCallbackName = _funcMemoryWriteCallbackName; - stage->_funcExceptionCallback = _funcExceptionCallback; - stage->_funcFileWriteCallbackName = _funcFileWriteCallbackName; - stage->_funcHashWriteCallbackName = _funcHashWriteCallbackName; - stage->_writerFuncName = _writerFuncName; - - stage->_initStageFuncName = _initStageFuncName; - stage->_releaseStageFuncName = _releaseStageFuncName; - stage->_resolveRowFunctionName = _resolveRowFunctionName; - stage->_resolveRowWriteCallbackName = _resolveRowWriteCallbackName; - stage->_resolveRowExceptionCallbackName = _resolveRowExceptionCallbackName; - stage->_resolveHashCallbackName = _resolveHashCallbackName; - - stage->_aggregateInitFuncName = _aggregateInitFuncName; - stage->_aggregateCombineFuncName = _aggregateCombineFuncName; - stage->_aggregateCallbackName = _aggregateCallbackName; - stage->_aggregateAggregateFuncName = _aggregateAggregateFuncName; - stage->_operatorIDsWithResolvers = getOperatorIDsAffectedByResolvers(_operators); stage->setInitData(); + metrics.setGenerateLLVMTime(timer.time()); return stage; } - python::Type StageBuilder::intermediateType() const { + python::Type intermediateType(const std::vector& operators) { // output node aggregate? --> save intermediate schema! - if(!_operators.empty() && _operators.back()) { - auto output_node = _operators.back(); + if(!operators.empty() && operators.back()) { + auto output_node = operators.back(); if(output_node->type() == LogicalOperatorType::AGGREGATE) { auto aop = dynamic_cast(output_node); if(aop->aggType() == AggregateType::AGG_GENERAL) { @@ -1488,7 +1589,6 @@ namespace tuplex { } } } - return python::Type::UNKNOWN; } diff --git a/tuplex/core/src/physical/TransformStage.cc b/tuplex/core/src/physical/TransformStage.cc index 80d17fce1..bd33af83b 100644 --- a/tuplex/core/src/physical/TransformStage.cc +++ b/tuplex/core/src/physical/TransformStage.cc @@ -723,149 +723,152 @@ namespace tuplex { return fields; } - std::shared_ptr TransformStage::compile(JITCompiler &jit, LLVMOptimizer *optimizer, bool excludeSlowPath, bool registerSymbols) { + void TransformStage::compileSlowPath(JITCompiler &jit, LLVMOptimizer *optimizer) { + Timer timer; + JobMetrics& metrics = PhysicalStage::plan()->getContext().metrics(); auto& logger = Logger::instance().defaultLogger(); - // lazy compile - if(!_syms) { - logger.debug("lazy init symbols"); - _syms = std::make_shared(); - } - - Timer timer; +// _slowCodePath = _slowCodePath_f.get(); llvm::LLVMContext ctx; - auto bit_code = bitCode(); - if(bit_code.empty()) - return _syms; + auto slow_path_bit_code = slowPathBitCode(); + auto slow_path_mod = slow_path_bit_code.empty() ? nullptr : codegen::bitCodeToModule(ctx, slow_path_bit_code); + + if(optimizer) { + if (slow_path_mod) { + Timer pathTimer; + pathTimer.reset(); + optimizer->optimizeModule(*slow_path_mod.get()); + } + } + + // compile & link with resolve tasks + if(!resolveWriteCallbackName().empty()) + jit.registerSymbol(resolveWriteCallbackName(), ResolveTask::mergeRowCallback()); + if(!resolveExceptionCallbackName().empty()) + jit.registerSymbol(resolveExceptionCallbackName(), ResolveTask::exceptionCallback()); + if(outputMode() == EndPointMode::HASHTABLE && !resolveExceptionCallbackName().empty()) { + if(hashtableKeyByteWidth() == 8) { + if(_slowCodePath._aggregateAggregateFuncName.empty()) + jit.registerSymbol(resolveHashCallbackName(), ResolveTask::writeInt64HashTableCallback()); + else jit.registerSymbol(resolveHashCallbackName(), ResolveTask::writeInt64HashTableAggregateCallback()); + } + else { + if(_slowCodePath._aggregateAggregateFuncName.empty()) + jit.registerSymbol(resolveHashCallbackName(), ResolveTask::writeStringHashTableCallback()); + else jit.registerSymbol(resolveHashCallbackName(), ResolveTask::writeStringHashTableAggregateCallback()); + } + } - auto mod = codegen::bitCodeToModule(ctx, bit_code); - if(!mod) - throw std::runtime_error("invalid bitcode"); + // compile slow code path if desired + if(slow_path_mod && !jit.compile(std::move(slow_path_mod))) + throw std::runtime_error("could not compile slow code for stage " + std::to_string(number())); + Timer llvmLowerTimer; + if(!_syms->resolveFunctor) + _syms->resolveFunctor = !resolveWriteCallbackName().empty() ? reinterpret_cast(jit.getAddrOfSymbol(resolveRowName())) : nullptr; + + if(!_syms->_slowCodePath.initStageFunctor) + _syms->_slowCodePath.initStageFunctor = reinterpret_cast(jit.getAddrOfSymbol(_slowCodePath._slowPathInitStageFuncName)); + if(!_syms->_slowCodePath.releaseStageFunctor) + _syms->_slowCodePath.releaseStageFunctor = reinterpret_cast(jit.getAddrOfSymbol(_slowCodePath._slowPathReleaseStageFuncName)); + } - logger.debug("parse module in " + std::to_string(timer.time())); + void TransformStage::compileFastPath(JITCompiler &jit, LLVMOptimizer *optimizer) { + Timer timer; + JobMetrics& metrics = PhysicalStage::plan()->getContext().metrics(); + auto& logger = Logger::instance().defaultLogger(); - // because in Lambda there's no context yet, use some dummy object... - JobMetrics dummy_metrics; - JobMetrics& metrics = PhysicalStage::plan() ? PhysicalStage::plan()->getContext().metrics() : dummy_metrics; + llvm::LLVMContext ctx; + auto fast_path_bit_code = fastPathBitCode(); + if(fast_path_bit_code.empty()) + return; - std::string unoptimizedIR; - std::string optimizedIR = "Not currently optimized."; - if (_historyServer) { - unoptimizedIR = code(); - } + auto fast_path_mod = codegen::bitCodeToModule(ctx, fast_path_bit_code); + if(!fast_path_mod) + throw std::runtime_error("invalid fast path bitcode"); // step 1: run optimizer if desired if(optimizer) { - optimizer->optimizeModule(*mod.get()); - if (_historyServer) { - optimizedIR = code(); - } + Timer pathTimer; + optimizer->optimizeModule(*fast_path_mod.get()); + double llvm_optimization_time = timer.time(); metrics.setLLVMOptimizationTime(llvm_optimization_time); - logger.info("Optimization via LLVM passes took " + std::to_string(llvm_optimization_time) + " ms"); + logger.info("TransformStage - Optimization via LLVM passes took " + std::to_string(llvm_optimization_time) + " ms"); + timer.reset(); } - logger.debug("registering symbols..."); // step 2: register callback functions with compiler - if(registerSymbols && !writeMemoryCallbackName().empty()) - jit.registerSymbol(writeMemoryCallbackName(), TransformTask::writeRowCallback(hasOutputLimit(), false)); - if(registerSymbols && !exceptionCallbackName().empty()) + if(!writeMemoryCallbackName().empty()) + jit.registerSymbol(writeMemoryCallbackName(), TransformTask::writeRowCallback(false)); + if(!exceptionCallbackName().empty()) jit.registerSymbol(exceptionCallbackName(), TransformTask::exceptionCallback(false)); - if(registerSymbols && !writeFileCallbackName().empty()) - jit.registerSymbol(writeFileCallbackName(), TransformTask::writeRowCallback(hasOutputLimit(), true)); - - if(outputMode() == EndPointMode::HASHTABLE && !_funcHashWriteCallbackName.empty()) { + if(!writeFileCallbackName().empty()) + jit.registerSymbol(writeFileCallbackName(), TransformTask::writeRowCallback(true)); + if(outputMode() == EndPointMode::HASHTABLE && !_fastCodePath._funcHashWriteCallbackName.empty()) { if (hashtableKeyByteWidth() == 8) { - if(_aggregateAggregateFuncName.empty()) - jit.registerSymbol(_funcHashWriteCallbackName, TransformTask::writeInt64HashTableCallback()); - else jit.registerSymbol(_funcHashWriteCallbackName, TransformTask::writeInt64HashTableAggregateCallback()); + if(_fastCodePath._aggregateAggregateFuncName.empty()) + jit.registerSymbol(_fastCodePath._funcHashWriteCallbackName, TransformTask::writeInt64HashTableCallback()); + else jit.registerSymbol(_fastCodePath._funcHashWriteCallbackName, TransformTask::writeInt64HashTableAggregateCallback()); } else { - if(_aggregateAggregateFuncName.empty()) - jit.registerSymbol(_funcHashWriteCallbackName, TransformTask::writeStringHashTableCallback()); - else jit.registerSymbol(_funcHashWriteCallbackName, TransformTask::writeStringHashTableAggregateCallback()); + if(_fastCodePath._aggregateAggregateFuncName.empty()) + jit.registerSymbol(_fastCodePath._funcHashWriteCallbackName, TransformTask::writeStringHashTableCallback()); + else jit.registerSymbol(_fastCodePath._funcHashWriteCallbackName, TransformTask::writeStringHashTableAggregateCallback()); } } - assert(!_initStageFuncName.empty() && !_releaseStageFuncName.empty()); - if(registerSymbols && !_aggregateCombineFuncName.empty()) + assert(!_fastCodePath._fastPathInitStageFuncName.empty() && !_fastCodePath._fastPathReleaseStageFuncName.empty()); + if(!_fastCodePath._aggregateCombineFuncName.empty()) jit.registerSymbol(aggCombineCallbackName(), TransformTask::aggCombineCallback()); - // compile & link with resolve tasks - if(registerSymbols && !resolveWriteCallbackName().empty()) - jit.registerSymbol(resolveWriteCallbackName(), ResolveTask::mergeRowCallback()); - if(registerSymbols && !resolveExceptionCallbackName().empty()) - jit.registerSymbol(resolveExceptionCallbackName(), ResolveTask::exceptionCallback()); - - if(outputMode() == EndPointMode::HASHTABLE && !resolveExceptionCallbackName().empty()) { - if(hashtableKeyByteWidth() == 8) { - if(_aggregateAggregateFuncName.empty()) - jit.registerSymbol(resolveHashCallbackName(), ResolveTask::writeInt64HashTableCallback()); - else jit.registerSymbol(resolveHashCallbackName(), ResolveTask::writeInt64HashTableAggregateCallback()); - } - else { - if(_aggregateAggregateFuncName.empty()) - jit.registerSymbol(resolveHashCallbackName(), ResolveTask::writeStringHashTableCallback()); - else jit.registerSymbol(resolveHashCallbackName(), ResolveTask::writeStringHashTableAggregateCallback()); - } - } - - logger.info("starting code compilation"); - // 3. compile code // @TODO: use bitcode or llvm Module for more efficiency... - if(!jit.compile(std::move(mod))) { - logger.error("could not compile code for stage " + std::to_string(number())); - throw std::runtime_error("could not compile code for stage " + std::to_string(number())); - } + if(!jit.compile(std::move(fast_path_mod))) + throw std::runtime_error("could not compile fast code for stage " + std::to_string(number())); std::stringstream ss; - logger.info("first compile done"); - // fetch symbols (this actually triggers the compilation first with register alloc etc.) + Timer llvmLowerTimer; if(!_syms->functor) _syms->functor = reinterpret_cast(jit.getAddrOfSymbol(funcName())); - logger.info("functor " + funcName() + " retrieved from llvm"); if(_outputMode == EndPointMode::FILE && !_syms->writeFunctor) - _syms->writeFunctor = reinterpret_cast(jit.getAddrOfSymbol(writerFuncName())); - logger.info("retrieving init/release stage functors"); - if(!_syms->initStageFunctor) - _syms->initStageFunctor = reinterpret_cast(jit.getAddrOfSymbol(_initStageFuncName)); - if(!_syms->releaseStageFunctor) - _syms->releaseStageFunctor = reinterpret_cast(jit.getAddrOfSymbol(_releaseStageFuncName)); + _syms->writeFunctor = reinterpret_cast(jit.getAddrOfSymbol(writerFuncName())); + if(!_syms->_fastCodePath.initStageFunctor) + _syms->_fastCodePath.initStageFunctor = reinterpret_cast(jit.getAddrOfSymbol(_fastCodePath._fastPathInitStageFuncName)); + if(!_syms->_fastCodePath.releaseStageFunctor) + _syms->_fastCodePath.releaseStageFunctor = reinterpret_cast(jit.getAddrOfSymbol(_fastCodePath._fastPathReleaseStageFuncName)); // get aggregate functors - if(!_aggregateInitFuncName.empty()) - _syms->aggInitFunctor = reinterpret_cast(jit.getAddrOfSymbol(_aggregateInitFuncName)); - if(!_aggregateCombineFuncName.empty()) - _syms->aggCombineFunctor = reinterpret_cast(jit.getAddrOfSymbol(_aggregateCombineFuncName)); - if(!_aggregateAggregateFuncName.empty()) - _syms->aggAggregateFunctor = reinterpret_cast(jit.getAddrOfSymbol(_aggregateAggregateFuncName)); + if(!_fastCodePath._aggregateInitFuncName.empty()) + _syms->aggInitFunctor = reinterpret_cast(jit.getAddrOfSymbol(_fastCodePath._aggregateInitFuncName)); + if(!_fastCodePath._aggregateCombineFuncName.empty()) + _syms->aggCombineFunctor = reinterpret_cast(jit.getAddrOfSymbol(_fastCodePath._aggregateCombineFuncName)); + if(!_fastCodePath._aggregateAggregateFuncName.empty()) + _syms->aggAggregateFunctor = reinterpret_cast(jit.getAddrOfSymbol(_fastCodePath._aggregateAggregateFuncName)); + } - // compile slow code path if desired - if(!excludeSlowPath) { - if(!_syms->resolveFunctor) - _syms->resolveFunctor = !resolveWriteCallbackName().empty() ? reinterpret_cast(jit.getAddrOfSymbol(resolveRowName())) : nullptr; - } + std::shared_ptr TransformStage::compile(JITCompiler &jit, LLVMOptimizer *optimizer, bool excludeSlowPath) { + // lazy compile + if(!_syms) + _syms = std::make_shared(); - // check symbols are valid... - if(!(_syms->functor && _syms->initStageFunctor && _syms->releaseStageFunctor)) { - logger.error("invalid pointer address for JIT code returned"); - throw std::runtime_error("invalid pointer address for JIT code returned"); + Timer timer; + JobMetrics& metrics = PhysicalStage::plan()->getContext().metrics(); + if (!excludeSlowPath) { + compileSlowPath(jit, optimizer); } + compileFastPath(jit, optimizer); double compilation_time_via_llvm_this_number = timer.time(); double compilation_time_via_llvm_thus_far = compilation_time_via_llvm_this_number + - metrics.getLLVMCompilationTime(); + metrics.getLLVMCompilationTime(); metrics.setLLVMCompilationTime(compilation_time_via_llvm_thus_far); + std::stringstream ss; ss<<"Compiled code paths for stage "<sendStagePlan("Stage" + std::to_string(number()), unoptimizedIR, optimizedIR, ""); - } return _syms; } diff --git a/tuplex/python/include/PythonMetrics.h b/tuplex/python/include/PythonMetrics.h index 8cabc56f7..cf65a767b 100644 --- a/tuplex/python/include/PythonMetrics.h +++ b/tuplex/python/include/PythonMetrics.h @@ -70,7 +70,15 @@ namespace tuplex { double getTotalCompilationTime() { return _metrics->getTotalCompilationTime(); } - + /*! + * getter for total time it takes to generate LLVM time (i.e. + * for StageBuilder::build to run) + * @returns a double representing time in seconds + */ + double getGenerateLLVMTime() { + return _metrics->getGenerateLLVMTime(); + } + /*! * returns metrics as json string * @return string object with all metrics encoded diff --git a/tuplex/python/src/PythonBindings.cc b/tuplex/python/src/PythonBindings.cc index 3eebbe109..1e6ca07cc 100644 --- a/tuplex/python/src/PythonBindings.cc +++ b/tuplex/python/src/PythonBindings.cc @@ -80,6 +80,7 @@ PYMODULE { .def("getLLVMOptimizationTime", &tuplex::PythonMetrics::getLLVMOptimizationTime) .def("getLLVMCompilationTime", &tuplex::PythonMetrics::getLLVMCompilationTime) .def("getTotalCompilationTime", &tuplex::PythonMetrics::getTotalCompilationTime) + .def("getGenerateLLVMTime", &tuplex::PythonMetrics::getGenerateLLVMTime) .def("getTotalExceptionCount", &tuplex::PythonMetrics::getTotalExceptionCount) .def("getJSONString", &tuplex::PythonMetrics::getJSONString); diff --git a/tuplex/python/tuplex/metrics.py b/tuplex/python/tuplex/metrics.py index eb2286c95..6a143a188 100644 --- a/tuplex/python/tuplex/metrics.py +++ b/tuplex/python/tuplex/metrics.py @@ -82,6 +82,15 @@ def totalCompilationTime(self) -> float: assert self._metrics return self._metrics.getTotalCompilationTime() + @property + def generateLLVMTime(self) -> float: + """ + Returns: + float: the time in seconds + """ + assert self._metrics + return self._metrics.getGenerateLLVMTime() + def as_json(self) -> str: """ all measurements as json encoded string diff --git a/tuplex/utils/include/TypeSystem.h b/tuplex/utils/include/TypeSystem.h index dc0321d3f..5fed5d2e5 100644 --- a/tuplex/utils/include/TypeSystem.h +++ b/tuplex/utils/include/TypeSystem.h @@ -16,6 +16,7 @@ #include #include #include +#include #include namespace python { @@ -309,6 +310,7 @@ namespace python { // need threadsafe hashmap here... // either tbb's or the one from folly... std::map _typeMap; + mutable std::mutex _typeMapMutex; TypeFactory() : _hash_generator(0) {} std::string getDesc(const int _hash) const; @@ -584,4 +586,4 @@ namespace std { } -#endif //TUPLEX_TYPES_H \ No newline at end of file +#endif //TUPLEX_TYPES_H diff --git a/tuplex/utils/src/TypeSystem.cc b/tuplex/utils/src/TypeSystem.cc index 867573a75..c09e4ce41 100644 --- a/tuplex/utils/src/TypeSystem.cc +++ b/tuplex/utils/src/TypeSystem.cc @@ -74,6 +74,7 @@ namespace python { const python::Type& retval, const std::vector& baseClasses, bool isVarLen) { + const std::lock_guard lock(_typeMapMutex); auto it = std::find_if(_typeMap.begin(), _typeMap.end(), [name](const std::pair& p) { @@ -344,6 +345,7 @@ namespace python { std::vector TypeFactory::parameters(const Type& t) const { + const std::lock_guard lock(_typeMapMutex); auto it = _typeMap.find(t._hash); assert(it != _typeMap.end()); // exclude dictionary here, but internal reuse.