这是indexloc提供的服务,不要输入任何密码
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tuplex/codegen/src/LLVMEnvironment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,10 @@ namespace tuplex {

// this function itself is super-slow, so cache its results
static std::unordered_map<int, std::vector<std::tuple<unsigned, unsigned, unsigned>>> _tupleIndexCache;
static std::mutex tupleIndexCacheMutex;

std::tuple<size_t, size_t, size_t> getTupleIndices(const python::Type &tupleType, size_t index) {

std::lock_guard<std::mutex> lock(tupleIndexCacheMutex);
// find cache
auto it = _tupleIndexCache.find(tupleType.hash());
if (it == _tupleIndexCache.end()) {
Expand Down
14 changes: 14 additions & 0 deletions tuplex/core/include/JobMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace tuplex {
double _llvm_compilation_time_s = 0.0;
double _total_compilation_time_s = 0.0;
double _sampling_time_s = 0.0;
double _generate_llvm_time_s = 0.0; // how long StageBuilder::build takes

// numbers per stage, can get combined in case.
struct StageMetrics {
Expand Down Expand Up @@ -116,6 +117,12 @@ namespace tuplex {
_total_compilation_time_s = time;
}
/*!
* @param time a double representing time in s
*/
void setGenerateLLVMTime(double time) {
_generate_llvm_time_s = time;
}
/*!
* getter for logical optimization time
* @returns a double representing logical optimization time in s
*/
Expand Down Expand Up @@ -143,6 +150,12 @@ namespace tuplex {
double getTotalCompilationTime() {
return _total_compilation_time_s;
}
/*!
* @returns a double representing time in s
*/
double getGenerateLLVMTime() {
return _generate_llvm_time_s;
}

/*!
* set slow path timing info
Expand Down Expand Up @@ -219,6 +232,7 @@ namespace tuplex {
ss<<"\"llvm_compilation_time_s\":"<<_llvm_compilation_time_s<<",";
ss<<"\"total_compilation_time_s\":"<<_total_compilation_time_s<<",";
ss<<"\"sampling_time_s\":"<<_sampling_time_s<<",";
ss<<"\"generate_llvm_time_s\":"<<_generate_llvm_time_s<<",";

// per stage numbers
ss<<"\"stages\":[";
Expand Down
77 changes: 54 additions & 23 deletions tuplex/core/include/physical/StageBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,34 +80,62 @@ namespace tuplex {
bool _nullValueOptimization;
std::vector<LogicalOperator*> _operators;

// codegen strings
std::string _funcFileWriteCallbackName;
std::string _funcMemoryWriteCallbackName;
std::string _funcExceptionCallback;

int64_t _stageNumber;
int64_t _outputDataSetID;

std::string _resolveRowFunctionName;
std::string _resolveRowWriteCallbackName;
std::string _resolveRowExceptionCallbackName;
std::string _resolveHashCallbackName;
struct CodeGenerationContext {
// Common variables
bool allowUndefinedBehavior;
bool sharedObjectPropagation;
bool nullValueOptimization;

EndPointMode outputMode;
FileFormat outputFileFormat;
int64_t outputNodeID;

std::unordered_map<std::string, std::string> fileOutputParameters; // parameters specific for a file output format
Schema outputSchema; //! output schema of stage

std::vector<size_t> hashColKeys; // the column to use as hash key
python::Type hashKeyType;
bool hashSaveOthers; // whether to save other columns than the key or not. => TODO: UDAs, meanByKey etc. all will require similar things...
bool hashAggregate; // whether the hashtable is an aggregate

EndPointMode inputMode;
std::unordered_map<std::string, std::string> fileInputParameters; // parameters specific for a file input format

// Resolve variables
std::vector<LogicalOperator*> resolveOperators;
LogicalOperator* inputNode;

Schema resolveReadSchema; //! schema for reading input
Schema resolveInputSchema; //! schema after applying projection pushdown to input source code
Schema resolveOutputSchema; //! output schema of stage

std::string _initStageFuncName;
std::string _releaseStageFuncName;
Schema normalCaseInputSchema; //! schema after applying normal case optimizations

python::Type hashBucketType;

// Fast Path
std::vector<LogicalOperator*> fastOperators;
python::Type fastReadSchema;
python::Type fastInSchema;
python::Type fastOutSchema;

bool isRootStage;
bool generateParser;

std::vector<bool> columnsToRead;
int64_t inputNodeID;
};

std::string _aggregateInitFuncName;
std::string _aggregateCombineFuncName;
std::string _aggregateAggregateFuncName;
std::string _aggregateCallbackName;

std::string _pyPipelineName;
std::string _irBitCode; // store code as bitcode (faster for parsing)
std::string _pyCode; // python backup code

std::string _writerFuncName; // name of the function where to write stuff to.

std::string _funcStageName;
std::unordered_map<std::string, std::string> _fileInputParameters; // parameters specific for a file input format
std::unordered_map<std::string, std::string> _fileOutputParameters; // parameters specific for a file output format

Expand All @@ -126,7 +154,6 @@ namespace tuplex {
LogicalOperator* _inputNode;
std::vector<bool> _columnsToRead;

std::string _funcHashWriteCallbackName; // callback for writing to hash table
std::vector<size_t> _hashColKeys; // the column to use as hash key
python::Type _hashKeyType;
python::Type _hashBucketType;
Expand Down Expand Up @@ -164,16 +191,16 @@ namespace tuplex {
* @param udfop
* @return string
*/
std::string formatBadUDFNode(UDFOperator* udfop);
std::string formatBadUDFNode(UDFOperator* udfop) const;

/*!
* generate LLVM IR code
* @param fastCodePath whether to generate for fastCodePath or not. When false, always generates mem2mem.
* @return
*/
bool generateFastCodePath(); // file2mem always
TransformStage::TransformStageCodePath generateFastCodePath(const CodeGenerationContext& fastLocalVariables) const; // file2mem always

size_t resolveOperatorCount() {
size_t resolveOperatorCount() const {
return std::count_if(_operators.begin(), _operators.end(), [](const LogicalOperator* op) {
return op && op->type() == LogicalOperatorType::RESOLVE;
});
Expand All @@ -185,14 +212,18 @@ namespace tuplex {
/*!
* code path for mem2mem exception resolution => sh
*/
bool generateResolveCodePath(const std::shared_ptr<codegen::LLVMEnvironment>& env); //! generates mix of LLVM / python code for slow code path including resolvers
TransformStage::TransformStageCodePath generateResolveCodePath(const CodeGenerationContext& resolveLocalVariables) const; //! generates mix of LLVM / python code for slow code path including resolvers

void generatePythonCode(); //! generates fallback pipeline in pure python. => i.e. special case here...

python::Type intermediateType() const;

std::vector<int64_t> getOperatorIDsAffectedByResolvers(const std::vector<LogicalOperator *> &operators);
};

/*
* Returns the intermediate schema if the output node of the list of operators is an aggregate.
* @param operators
*/
python::Type intermediateType(const std::vector<LogicalOperator*>& operators);
}
}

Expand Down
123 changes: 82 additions & 41 deletions tuplex/core/include/physical/TransformStage.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,36 +174,52 @@ namespace tuplex {
ecounts);
}

std::string bitCode() const {
return _irBitCode;
std::string fastPathBitCode() const {
return _fastCodePath._fastPathIRBitCode;
}

// to retrieve actual IR, use code
std::string code() const {
if(_irBitCode.empty())
std::string slowPathBitCode() const {
return _slowCodePath._slowPathIRBitCode;
}

// Retrieve fast path IR code
std::string fastPathCode() const {
if(fastPathBitCode().empty())
return "";

// parse bit code & convert
llvm::LLVMContext ctx;
auto mod = codegen::bitCodeToModule(ctx, fastPathBitCode());
if(!mod)
return "";
return codegen::moduleToString(*mod.get());
}

// Retrieve slow path IR code
std::string slowPathCode() const {
if(slowPathBitCode().empty())
return "";

// parse bit code & convert
llvm::LLVMContext ctx;
auto mod = codegen::bitCodeToModule(ctx, _irBitCode);
auto mod = codegen::bitCodeToModule(ctx, slowPathBitCode());
if(!mod)
return "";
return codegen::moduleToString(*mod.get());
}

std::string funcName() const { return _funcStageName; }
std::string writerFuncName() const { return _writerFuncName; }
std::string writeMemoryCallbackName() const { return _funcMemoryWriteCallbackName; }
std::string writeFileCallbackName() const { return _funcFileWriteCallbackName; }
std::string writeHashCallbackName() const { return _funcHashWriteCallbackName; }
std::string exceptionCallbackName() const { return _funcExceptionCallback; }
std::string aggCombineCallbackName() const { return _aggregateCallbackName; }
std::string funcName() const { return _fastCodePath._funcStageName; }
std::string writerFuncName() const { return _fastCodePath._writerFuncName; }
std::string writeMemoryCallbackName() const { return _fastCodePath._funcMemoryWriteCallbackName; }
std::string writeFileCallbackName() const { return _fastCodePath._funcFileWriteCallbackName; }
std::string exceptionCallbackName() const { return _fastCodePath._funcExceptionCallback; }
std::string aggCombineCallbackName() const { return _fastCodePath._aggregateCallbackName; }

// std::string resolveCode() const { return _irResolveCode; }
std::string resolveRowName() const { return _resolveRowFunctionName; }
std::string resolveWriteCallbackName() const { return _resolveRowWriteCallbackName; }
std::string resolveHashCallbackName() const { return _resolveHashCallbackName; }
std::string resolveExceptionCallbackName() const { return _resolveRowExceptionCallbackName; }
std::string resolveRowName() const { return _slowCodePath._resolveRowFunctionName; }
std::string resolveWriteCallbackName() const { return _slowCodePath._resolveRowWriteCallbackName; }
std::string resolveHashCallbackName() const { return _slowCodePath._resolveHashCallbackName; }
std::string resolveExceptionCallbackName() const { return _slowCodePath._resolveRowExceptionCallbackName; }

std::string purePythonCode() const { return _pyCode; }
std::string pythonPipelineName() const { return _pyPipelineName; }
Expand Down Expand Up @@ -256,6 +272,16 @@ namespace tuplex {

// JITSymbols for this stage
struct JITSymbols {
struct CodePath {
codegen::init_stage_f initStageFunctor;
codegen::release_stage_f releaseStageFunctor;

CodePath() : initStageFunctor(nullptr), releaseStageFunctor(nullptr) {}
};

CodePath _fastCodePath;
CodePath _slowCodePath;

codegen::read_block_f functor; // can be memory2memory or file2memory
codegen::read_block_f writeFunctor; // memory2file
codegen::resolve_f resolveFunctor; // always memory2memory
Expand All @@ -268,8 +294,8 @@ namespace tuplex {

JITSymbols() : functor(nullptr), writeFunctor(nullptr),
resolveFunctor(nullptr),
initStageFunctor(nullptr),
releaseStageFunctor(nullptr),
_fastCodePath(),
_slowCodePath(),
aggInitFunctor(nullptr),
aggCombineFunctor(nullptr),
aggAggregateFunctor(nullptr) {}
Expand All @@ -280,10 +306,12 @@ namespace tuplex {
* @param jit JIT instance
* @param optimizer optional instance of optimizer to run over code first. No optimization run when set to 0
* @param excludeSlowPath if true, only fast path functions are compiled. This helps to have threads already busy when slow path still need to get compiled.
* @param registerSymbols whether to add task symbols to JIT compiler
* @return a struct of all function pointers
*/
std::shared_ptr<JITSymbols> compile(JITCompiler& jit, LLVMOptimizer *optimizer=nullptr, bool excludeSlowPath=false, bool registerSymbols=true);
std::shared_ptr<JITSymbols> compile(JITCompiler& jit, LLVMOptimizer *optimizer=nullptr, bool excludeSlowPath=false);

void compileSlowPath(JITCompiler& jit, LLVMOptimizer *optimizer=nullptr);
void compileFastPath(JITCompiler& jit, LLVMOptimizer *optimizer=nullptr);

EndPointMode outputMode() const override { return _outputMode; }
EndPointMode inputMode() const override { return _inputMode; }
Expand Down Expand Up @@ -390,18 +418,39 @@ namespace tuplex {
int64_t number,
bool allowUndefinedBehavior);

std::string _irBitCode; //! llvm IR bitcode
std::string _funcStageName; //! llvm function name of the stage function
std::string _funcMemoryWriteCallbackName; //! llvm function name of the write callback
std::string _funcFileWriteCallbackName; //! llvm function name of the write callback used for file output.
std::string _funcExceptionCallback; //! llvm function of the exception callback function
std::string _funcHashWriteCallbackName; //! the function to call when saving to hash table
std::string _initStageFuncName; //! init function for a stage (sets up globals & Co)
std::string _releaseStageFuncName; //! release function for a stage (releases globals & Co)
std::string _aggregateInitFuncName; //! to initiate aggregate (allocates via C-malloc)
std::string _aggregateCombineFuncName; //! to combine two aggregates (allocates & frees via C-malloc).
std::string _aggregateCallbackName; //! the callback to call with an aggregate
std::string _aggregateAggregateFuncName; //! to combine aggregate and result (allocates & frees via C-malloc).
struct TransformStageCodePath {
// Fast path variables
std::string _fastPathIRBitCode; //! llvm IR bitcode for fast path
std::string _fastPathInitStageFuncName; //! init function for a stage (sets up globals & Co)
std::string _fastPathReleaseStageFuncName; //! release function for a stage (releases globals & Co)

std::string _funcStageName; //! llvm function name of the stage function
std::string _funcMemoryWriteCallbackName; //! llvm function name of the write callback
std::string _funcFileWriteCallbackName; //! llvm function name of the write callback used for file output.
std::string _funcExceptionCallback; //! llvm function of the exception callback function
std::string _funcHashWriteCallbackName; //! the function to call when saving to hash table
std::string _aggregateInitFuncName; //! to initiate aggregate (allocates via C-malloc)
std::string _aggregateCombineFuncName; //! to combine two aggregates (allocates & frees via C-malloc).

std::string _writerFuncName;
std::string _aggregateCallbackName; //! the callback to call with an aggregate

// Slow path variables
std::string _slowPathIRBitCode; //! llvm IR bitcode for slow path
std::string _slowPathInitStageFuncName; //! init function for a stage (sets up globals & Co)
std::string _slowPathReleaseStageFuncName; //! release function for a stage (releases globals & Co)

std::string _resolveRowFunctionName;
std::string _resolveRowWriteCallbackName;
std::string _resolveRowExceptionCallbackName;
std::string _resolveHashCallbackName;

// Common variables
std::string _aggregateAggregateFuncName; //! to combine aggregate and result (allocates & frees via C-malloc).
};

TransformStageCodePath _fastCodePath;
TransformStageCodePath _slowCodePath;
EndPointMode _inputMode; //! indicate whether stage takes hash table, serialized mem or files as input
EndPointMode _outputMode; //! analog

Expand Down Expand Up @@ -435,17 +484,9 @@ namespace tuplex {

URI _outputURI; //! the output uri for file mode of this stage

// resolve/exception handling code
// std::string _irResolveCode;
std::string _resolveRowFunctionName;
std::string _resolveRowWriteCallbackName;
std::string _resolveRowExceptionCallbackName;
std::string _resolveHashCallbackName;

// pure python pipeline code & names
std::string _pyCode;
std::string _pyPipelineName;
std::string _writerFuncName;

std::shared_ptr<ResultSet> emptyResultSet() const;

Expand Down
6 changes: 3 additions & 3 deletions tuplex/core/src/ee/aws/AWSLambdaBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ namespace tuplex {
Timer timer;
llvm::LLVMContext ctx;
LLVMOptimizer opt;
auto mod = codegen::bitCodeToModule(ctx, tstage->bitCode());
auto mod = codegen::bitCodeToModule(ctx, tstage->fastPathBitCode());
opt.optimizeModule(*mod);
optimizedBitcode = codegen::moduleToBitCodeString(*mod);
logger().info("client-side LLVM IR optimization took " + std::to_string(timer.time()) + "s");
Expand All @@ -393,7 +393,7 @@ namespace tuplex {
if(_options.USE_LLVM_OPTIMIZER() && !optimizedBitcode.empty())
pb_stage->set_bitcode(optimizedBitcode);
else
pb_stage->set_bitcode(tstage->bitCode());
pb_stage->set_bitcode(tstage->fastPathBitCode());

req.set_allocated_stage(pb_stage.release());

Expand Down Expand Up @@ -944,4 +944,4 @@ namespace tuplex {
// other reset? @TODO.
}
}
#endif
#endif
Loading