这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
104 commits
Select commit Hold shift + click to select a range
a717d22
test for buggy behavior
LeonhardFS Nov 3, 2021
83cc4f8
Test files
bgivertz Nov 3, 2021
fd9720d
Merge branch 'tuplex:master' into exceptions
bgivertz Nov 3, 2021
9258803
proper test
LeonhardFS Nov 3, 2021
a52e70c
Merge pull request #1 from LeonhardFS/nonefix
bgivertz Nov 3, 2021
67bb6d0
Merge branch 'tuplex:master' into exceptions
bgivertz Nov 12, 2021
66ce12a
Implement input exceptions
bgivertz Nov 16, 2021
3cea525
Merge branch 'tuplex:master' into exceptions
bgivertz Nov 18, 2021
aead15a
Merge branch 'tuplex:master' into exceptions
bgivertz Dec 9, 2021
8948850
Input exceptions without merge, auto unpack dicts
bgivertz Dec 9, 2021
8b8131d
Input exceptions without merge, auto unpack dicts
bgivertz Dec 10, 2021
1e2057c
Input partition to python objects map
bgivertz Dec 12, 2021
e408620
Merge not in order
bgivertz Dec 12, 2021
8942662
Merge not in order
bgivertz Dec 12, 2021
1130e85
Merge not in order
bgivertz Dec 12, 2021
9fa2a6c
Merge python objects in order
bgivertz Dec 12, 2021
2bba5d7
Dict test
bgivertz Dec 13, 2021
243542d
Dict test
bgivertz Dec 13, 2021
9c72c3e
Merge branch 'tuplex:master' into exceptions
bgivertz Jan 3, 2022
7eb8774
Update exceptions code gen
bgivertz Jan 7, 2022
b9c0580
Detect when exceptions should be updated
bgivertz Jan 7, 2022
55c69e5
Add filter tests
bgivertz Jan 7, 2022
58531aa
Account for unresolved exceptions
bgivertz Jan 7, 2022
9ee19c7
Define functor for updating exceptions
bgivertz Jan 7, 2022
1f55d68
Update exceptions when filters are present
bgivertz Jan 7, 2022
6fc03a7
Fix tests
bgivertz Jan 7, 2022
3d390df
Fix tests
bgivertz Jan 7, 2022
bbb1531
Force parallelize exceptions to use interpreter
bgivertz Jan 7, 2022
2f4b52e
Save exceptions from generalized path
bgivertz Jan 7, 2022
837c244
Normalize types in tests
bgivertz Jan 7, 2022
4e71406
Normalize types in tests
bgivertz Jan 7, 2022
e03b785
Multiple partition exceptions test
bgivertz Jan 7, 2022
7b26395
Multiple partition exceptions test
bgivertz Jan 7, 2022
fc12d63
Bug fix
bgivertz Jan 8, 2022
ddcbb5e
Bug fix
bgivertz Jan 9, 2022
5837b44
Bug fix
bgivertz Jan 9, 2022
fd48b5c
Bug fix
bgivertz Jan 9, 2022
5acb93f
Test fix
bgivertz Jan 9, 2022
d126e81
Test fix
bgivertz Jan 9, 2022
c37d9ce
Small PR changes
bgivertz Jan 9, 2022
fe9b422
Code cleanup
bgivertz Jan 9, 2022
4978456
Merge branch 'tuplex:master' into exceptions
bgivertz Jan 13, 2022
c13ed53
Support for cached exceptions
bgivertz Jan 14, 2022
671a2ea
Support for cached exceptions
bgivertz Jan 14, 2022
a154bcf
Support for cached exceptions
bgivertz Jan 14, 2022
d9037fb
Support for cached exceptions
bgivertz Jan 14, 2022
df200ac
Support for cached exceptions
bgivertz Jan 14, 2022
5cbefeb
Support for cached exceptions
bgivertz Jan 14, 2022
23b1213
Support for cached exceptions
bgivertz Jan 14, 2022
dcc091d
Support for cached exceptions
bgivertz Jan 14, 2022
2fc93c9
Support for cached exceptions
bgivertz Jan 14, 2022
f8e54f9
Support for cached exceptions
bgivertz Jan 14, 2022
fc971a0
merged in master
LeonhardFS Jan 16, 2022
756a4d5
build fixes
LeonhardFS Jan 16, 2022
71c2570
Merge master bug fix
bgivertz Jan 19, 2022
d5eec30
Code cleanup
bgivertz Jan 19, 2022
26f05a6
Code cleanup
bgivertz Jan 19, 2022
34ed9cf
PR Comments
bgivertz Jan 19, 2022
320e88b
PR Comments
bgivertz Jan 19, 2022
f463e4a
PR Comments
bgivertz Jan 19, 2022
6331c84
PR Comments
bgivertz Jan 19, 2022
6ada257
PR Comments
bgivertz Jan 19, 2022
4bca0ec
PR Comments
bgivertz Jan 19, 2022
5402613
PR Comments
bgivertz Jan 19, 2022
86b9365
PR Comments
bgivertz Jan 19, 2022
c79d471
PR Comments
bgivertz Jan 19, 2022
6c12312
PR Comments
bgivertz Jan 19, 2022
7777886
PR Comments
bgivertz Jan 19, 2022
e8ee439
Change pointer to struct
bgivertz Jan 19, 2022
cbd69eb
Change pointer to struct
bgivertz Jan 19, 2022
dc167f3
Change pointer to struct
bgivertz Jan 19, 2022
9b2ca38
Change pointer to struct
bgivertz Jan 19, 2022
1ebe1ac
Revert "Change pointer to struct"
bgivertz Jan 19, 2022
38f1522
Revert "Change pointer to struct"
bgivertz Jan 19, 2022
ac21393
Revert "PR Comments"
bgivertz Jan 19, 2022
02a0dde
Revert "PR Comments"
bgivertz Jan 19, 2022
3f8615e
Revert "PR Comments"
bgivertz Jan 19, 2022
7452dda
Revert "PR Comments"
bgivertz Jan 19, 2022
ea391a6
Revert "PR Comments"
bgivertz Jan 19, 2022
bea8006
Revert "PR Comments"
bgivertz Jan 19, 2022
e6bae6e
Revert "PR Comments"
bgivertz Jan 19, 2022
2cd5ac1
Revert "PR Comments"
bgivertz Jan 19, 2022
adb86ed
Revert "PR Comments"
bgivertz Jan 19, 2022
f60bfdb
Revert "PR Comments"
bgivertz Jan 19, 2022
e09af5c
Revert "PR Comments"
bgivertz Jan 19, 2022
b274e08
Revert "Code cleanup"
bgivertz Jan 19, 2022
db57a09
Revert "Code cleanup"
bgivertz Jan 19, 2022
40c20bc
Documentation updates
bgivertz Jan 20, 2022
2d6cd37
PR comment fixes
bgivertz Jan 20, 2022
d781d32
Change tuple to struct
bgivertz Jan 20, 2022
9201eca
Lambda fix
bgivertz Jan 20, 2022
c965f86
Documentation for exception processing and mapping
bgivertz Jan 20, 2022
2934f0a
Documentation and simplify resolve logic
bgivertz Jan 20, 2022
feb17da
Transform task documentation and logic simplification
bgivertz Jan 20, 2022
da0dc3b
Simplify Python Context
bgivertz Jan 20, 2022
2317d84
Simplify Python Context
bgivertz Jan 20, 2022
15506cf
Simplify Python Context
bgivertz Jan 20, 2022
7fff790
PR Changes
bgivertz Jan 20, 2022
60d0ec6
PR Changes
bgivertz Jan 20, 2022
c5df6c3
PR Changes
bgivertz Jan 20, 2022
52e59ae
Refactor python object from parallelize
bgivertz Jan 22, 2022
1fc75f5
Refactor python object from parallelize
bgivertz Jan 22, 2022
f6da529
Refactor python object from parallelize
bgivertz Jan 22, 2022
79b44cb
Refactor python object from parallelize
bgivertz Jan 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tuplex/awslambda/src/lambda_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ tuplex::messages::InvocationResponse lambda_main(aws::lambda_runtime::invocation
// parse protobuf request
tuplex::messages::InvocationRequest req;
auto rc = google::protobuf::util::JsonStringToMessage(lambda_req.payload, &req);
if(rc != google::protobuf::util::Status::OK)
if(!rc.ok())
throw std::runtime_error("could not parse json into protobuf message, bad parse for request");

timer.mark_time("decode_request");
Expand Down
28 changes: 25 additions & 3 deletions tuplex/core/include/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ namespace tuplex {
class Executor;
class Partition;
class IBackend;
class ExceptionInfo;

class Context {
private:
Expand All @@ -58,7 +59,25 @@ namespace tuplex {

// needed because of C++ template issues
void addPartition(DataSet* ds, Partition *partition);
void addParallelizeNode(DataSet *ds); //! adds a paralellize node to the computation graph
void addParallelizeNode(DataSet *ds, const std::vector<std::tuple<size_t, PyObject*>> &badParallelizeObjects=std::vector<std::tuple<size_t, PyObject*>>(), const std::vector<size_t> &numExceptionsInPartition=std::vector<size_t>()); //! adds a paralellize node to the computation graph

/*!
* serialize python objects as pickled objects into exception partitions. Set the python objects map to
* map all normalPartitions to the exceptions that occured within them.
* @param pythonObjects normal case schema violations and their initial row numbers
* @param numExceptionsInPartition number of exceptions in each normal partition
* @param normalPartitions normal partitions created
* @param opID parallelize operator ID
* @param serializedPythonObjects output vector for partitions
* @param pythonObjectsMap output for mapping
*/
void serializePythonObjects(const std::vector<std::tuple<size_t, PyObject*>>& pythonObjects,
const std::vector<size_t> &numExceptionsInPartition,
const std::vector<Partition*> &normalPartitions,
const int64_t opID,
std::vector<Partition*> &serializedPythonObjects,
std::unordered_map<std::string, ExceptionInfo> &pythonObjectsMap);

Partition* requestNewPartition(const Schema& schema, const int dataSetID, size_t minBytesRequired);
uint8_t* partitionLockRaw(Partition *partition);
void partitionUnlock(Partition *partition);
Expand Down Expand Up @@ -111,7 +130,8 @@ namespace tuplex {
columnNames);
}

DataSet& parallelize(const std::vector<Row>& L, const std::vector<std::string>& columnNames=std::vector<std::string>());
DataSet& parallelize(const std::vector<Row>& L,
const std::vector<std::string>& columnNames=std::vector<std::string>());

DataSet& parallelize(std::initializer_list<double> L, const std::vector<std::string>& columnNames=std::vector<std::string>()) {
if(!columnNames.empty())
Expand Down Expand Up @@ -252,9 +272,11 @@ namespace tuplex {
* empty dataset will be created.
*
* @param columns optional column names
* @param badParallelizeObjects schema violations found during parallelization of partitions
* @param numExceptionsInPartition number of schema violations that occured in each of the partitions
* @return reference to newly created dataset.
*/
DataSet& fromPartitions(const Schema& schema, const std::vector<Partition*>& partitions, const std::vector<std::string>& columns);
DataSet& fromPartitions(const Schema& schema, const std::vector<Partition*>& partitions, const std::vector<std::string>& columns, const std::vector<std::tuple<size_t, PyObject*>> &badParallelizeObjects, const std::vector<size_t> &numExceptionsInPartition);
};
// needed for template mechanism to work
#include <DataSet.h>
Expand Down
47 changes: 47 additions & 0 deletions tuplex/core/include/ExceptionInfo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//--------------------------------------------------------------------------------------------------------------------//
// //
// Tuplex: Blazing Fast Python Data Science //
// //
// //
// (c) 2017 - 2021, Tuplex team //
// Created by Benjamin Givertz first on 1/1/2022 //
// License: Apache 2.0 //
//--------------------------------------------------------------------------------------------------------------------//

#ifndef TUPLEX_EXCEPTIONINFO_H
#define TUPLEX_EXCEPTIONINFO_H

namespace tuplex {
/*!
* Struct to hold information that maps input partitions to input exceptions that occur within them.
*
* Explanation:
* Each input partition is passed the same vector of all input exceptions that occured during data parallelization
* or caching. Thus, each input partition must know how many input exceptions occur in its partition, the index
* of the input exception partition where its first exception occurs, and the offset into that partition where the
* first exception occurs. These values are held in this struct and each input partition is mapped to an ExceptionInfo.
*/
struct ExceptionInfo {
size_t numExceptions; //! number of exception rows that occur within a single input partition
size_t exceptionIndex; //! index into a vector of input exception partitions that holds the first input exception
size_t exceptionRowOffset; //! offset in rows into the first input exception partition where the first exception occurs.
size_t exceptionByteOffset; //! offset in bytes into the first input exception partition where the first exception occurs

ExceptionInfo() :
numExceptions(0),
exceptionIndex(0),
exceptionRowOffset(0),
exceptionByteOffset(0) {}

ExceptionInfo(size_t numExps,
size_t expIndex,
size_t expRowOffset,
size_t expByteOffset) :
numExceptions(numExps),
exceptionIndex(expIndex),
exceptionRowOffset(expRowOffset),
exceptionByteOffset(expByteOffset) {}
};
}

#endif //TUPLEX_EXCEPTIONINFO_H
2 changes: 1 addition & 1 deletion tuplex/core/include/ee/local/LocalBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ namespace tuplex {
void freeExecutors();


std::vector<IExecutorTask*> createLoadAndTransformToMemoryTasks(TransformStage* tstage, const ContextOptions& options, codegen::read_block_f functor);
std::vector<IExecutorTask*> createLoadAndTransformToMemoryTasks(TransformStage* tstage, const ContextOptions& options, const std::shared_ptr<TransformStage::JITSymbols>& syms);
void executeTransformStage(TransformStage* tstage);


Expand Down
5 changes: 4 additions & 1 deletion tuplex/core/include/logical/CacheOperator.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ namespace tuplex {
bool isCached() const { return _cached; }
std::vector<Partition*> cachedPartitions() const { return _normalCasePartitions; }
std::vector<Partition*> cachedExceptions() const { return _generalCasePartitions; }
std::unordered_map<std::string, ExceptionInfo> partitionToExceptionsMap() const { return _partitionToExceptionsMap; }

size_t getTotalCachedRows() const;

int64_t cost() const override;
Expand Down Expand Up @@ -109,7 +111,8 @@ namespace tuplex {
std::vector<Partition*> _generalCasePartitions; //! holds all data which is considered to be a normal-case violation,
//! i.e. which does not adhere to the normal case schema, but did not produce
//! an exception while being processed through the pipeline before
std::vector<PyObject*> _py_objects; //! all python objects who do not adhere to the general case schema (
std::unordered_map<std::string, ExceptionInfo> _partitionToExceptionsMap; //! maps normal case partitions to corresponding general case ones
std::vector<PyObject*> _py_objects; //! all python objects who do not adhere to the general case schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they do not need row numbers, right?

std::vector<std::string> _columns;

// internal sample of normal case rows, used for tracing & Co.
Expand Down
14 changes: 12 additions & 2 deletions tuplex/core/include/logical/ParallelizeOperator.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ namespace tuplex {
class ParallelizeOperator : public LogicalOperator {

std::vector<Partition*> _partitions; // data, conforming to majority type
//@TODO: missing: python objects & general case data
std::vector<Partition*> _pythonObjects; // schema violations stored for interpreter processing as python objects
// maps partitions to their corresponding python objects
std::unordered_map<std::string, ExceptionInfo> _inputPartitionToPythonObjectsMap;
std::vector<std::string> _columnNames;

std::vector<Row> _sample; // sample, not necessary conforming to one type
Expand All @@ -28,7 +30,9 @@ namespace tuplex {
LogicalOperator *clone() override;

// this a root node
ParallelizeOperator(const Schema& schema, std::vector<Partition*> partitions, const std::vector<std::string>& columns);
ParallelizeOperator(const Schema& schema,
const std::vector<Partition*>& partitions,
const std::vector<std::string>& columns);

std::string name() override { return "parallelize"; }
LogicalOperatorType type() const override { return LogicalOperatorType::PARALLELIZE; }
Expand All @@ -45,6 +49,12 @@ namespace tuplex {
*/
std::vector<tuplex::Partition*> getPartitions();

void setPythonObjects(const std::vector<Partition*> &pythonObjects) { _pythonObjects = pythonObjects; }
std::vector<Partition *> getPythonObjects() { return _pythonObjects; }

void setInputPartitionToPythonObjectsMap(const std::unordered_map<std::string, ExceptionInfo>& pythonObjectsMap) { _inputPartitionToPythonObjectsMap = pythonObjectsMap; }
std::unordered_map<std::string, ExceptionInfo> getInputPartitionToPythonObjectsMap() { return _inputPartitionToPythonObjectsMap; }

Schema getInputSchema() const override { return getOutputSchema(); }

bool good() const override;
Expand Down
8 changes: 7 additions & 1 deletion tuplex/core/include/physical/BlockBasedTaskBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ namespace tuplex {

llvm::Function *createFunction();

/*!
* create function to process blocks of data along with any input exceptions. Used when filters are present
* to update the indices of the exceptions.
* @return llvm function
*/
llvm::Function *createFunctionWithExceptions();

python::Type _inputRowType; //@TODO: make this private??

std::string _intermediateCallbackName;
Expand All @@ -48,7 +55,6 @@ namespace tuplex {
* @return
*/
inline llvm::Value *arg(const std::string &name) {
assert(_args.size() == 6);
auto it = _args.find(name);
if (it == _args.end())
throw std::runtime_error("unknown arg " + name + " requested");
Expand Down
4 changes: 4 additions & 0 deletions tuplex/core/include/physical/CodeDefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ namespace tuplex {
// parameters are userData, array of globals (e.g. hashmaps), block, blocksize, normalrowsout, badrowsout, lastRow
typedef int64_t(*read_block_f)(void*, const uint8_t*, int64_t, int64_t*, int64_t*, int8_t);

// protoype of the function generated by the below builder
// parameters are userData, block, blocksize, expPtrs, expPtrSizes, numExceptions, normalrowsout, badrowsout, lastRow
typedef int64_t(*read_block_exp_f)(void*, const uint8_t*, int64_t, uint8_t **, int64_t *, int64_t, int64_t*, int64_t*, bool);

/*!
* prototype for processing a single row (with callbacks etc.). Returns how many bytes were processed
*/
Expand Down
63 changes: 63 additions & 0 deletions tuplex/core/include/physical/ExceptionSourceTaskBuilder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//--------------------------------------------------------------------------------------------------------------------//
// //
// Tuplex: Blazing Fast Python Data Science //
// //
// //
// (c) 2017 - 2021, Tuplex team //
// Created by Benjamin Givertz first on 1/1/2021 //
// License: Apache 2.0 //
//--------------------------------------------------------------------------------------------------------------------//

#ifndef TUPLEX_EXCEPTIONSOURCETASKBUILDER_H
#define TUPLEX_EXCEPTIONSOURCETASKBUILDER_H

#include "BlockBasedTaskBuilder.h"

namespace tuplex {
namespace codegen {
/*!
* Class used to process tuplex partitions through the pipeline when both input exceptions and filters are
* present. This is necessary to update the row indices of any input exceptions if rows are filtered out.
*/
class ExceptionSourceTaskBuilder : public BlockBasedTaskBuilder {
private:
void createMainLoop(llvm::Function* read_block_func, bool terminateEarlyOnLimitCode);

/*!
* generates code to process a row depending on parse result...
* @param builder
* @param userData a value for userData (i.e. the class ptr of the task typically) to be parsed to callback functions
* @param tuple (flattened) tuple representation of current tuple (LLVM)
* @param normalRowCountVar where to store normal row counts
* @param badRowCountVar where to store bad row counts
* @param processRowFunc (optional) function to be called before output is written.
* Most likely this is not a nullptr, because users want to transform data.
*/
void processRow(llvm::IRBuilder<>& builder,
llvm::Value* userData,
const FlattenedTuple& tuple,
llvm::Value *normalRowCountVar,
llvm::Value *badRowCountVar,
llvm::Value *outputRowNumberVar,
llvm::Value *inputRowPtr,
llvm::Value *inputRowSize,
bool terminateEarlyOnLimitCode,
llvm::Function* processRowFunc=nullptr);

void callProcessFuncWithHandler(llvm::IRBuilder<> &builder, llvm::Value *userData,
const FlattenedTuple &tuple,
llvm::Value *normalRowCountVar, llvm::Value *badRowCountVar, llvm::Value *rowNumberVar,
llvm::Value *inputRowPtr, llvm::Value *inputRowSize,
bool terminateEarlyOnLimitCode,
llvm::Function *processRowFunc);
public:
ExceptionSourceTaskBuilder() = delete;

explicit ExceptionSourceTaskBuilder(const std::shared_ptr<LLVMEnvironment>& env, const python::Type& rowType, const std::string& name) : BlockBasedTaskBuilder::BlockBasedTaskBuilder(env, rowType, name) {}

llvm::Function* build(bool terminateEarlyOnFailureCode) override;
};
}
}

#endif //TUPLEX_EXCEPTIONSOURCETASKBUILDER_H
Loading