-
Notifications
You must be signed in to change notification settings - Fork 47
[FEATURE] Parallelize support for normal case violations #57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
104 commits
Select commit
Hold shift + click to select a range
a717d22
test for buggy behavior
LeonhardFS 83cc4f8
Test files
bgivertz fd9720d
Merge branch 'tuplex:master' into exceptions
bgivertz 9258803
proper test
LeonhardFS a52e70c
Merge pull request #1 from LeonhardFS/nonefix
bgivertz 67bb6d0
Merge branch 'tuplex:master' into exceptions
bgivertz 66ce12a
Implement input exceptions
bgivertz 3cea525
Merge branch 'tuplex:master' into exceptions
bgivertz aead15a
Merge branch 'tuplex:master' into exceptions
bgivertz 8948850
Input exceptions without merge, auto unpack dicts
bgivertz 8b8131d
Input exceptions without merge, auto unpack dicts
bgivertz 1e2057c
Input partition to python objects map
bgivertz e408620
Merge not in order
bgivertz 8942662
Merge not in order
bgivertz 1130e85
Merge not in order
bgivertz 9fa2a6c
Merge python objects in order
bgivertz 2bba5d7
Dict test
bgivertz 243542d
Dict test
bgivertz 9c72c3e
Merge branch 'tuplex:master' into exceptions
bgivertz 7eb8774
Update exceptions code gen
bgivertz b9c0580
Detect when exceptions should be updated
bgivertz 55c69e5
Add filter tests
bgivertz 58531aa
Account for unresolved exceptions
bgivertz 9ee19c7
Define functor for updating exceptions
bgivertz 1f55d68
Update exceptions when filters are present
bgivertz 6fc03a7
Fix tests
bgivertz 3d390df
Fix tests
bgivertz bbb1531
Force parallelize exceptions to use interpreter
bgivertz 2f4b52e
Save exceptions from generalized path
bgivertz 837c244
Normalize types in tests
bgivertz 4e71406
Normalize types in tests
bgivertz e03b785
Multiple partition exceptions test
bgivertz 7b26395
Multiple partition exceptions test
bgivertz fc12d63
Bug fix
bgivertz ddcbb5e
Bug fix
bgivertz 5837b44
Bug fix
bgivertz fd48b5c
Bug fix
bgivertz 5acb93f
Test fix
bgivertz d126e81
Test fix
bgivertz c37d9ce
Small PR changes
bgivertz fe9b422
Code cleanup
bgivertz 4978456
Merge branch 'tuplex:master' into exceptions
bgivertz c13ed53
Support for cached exceptions
bgivertz 671a2ea
Support for cached exceptions
bgivertz a154bcf
Support for cached exceptions
bgivertz d9037fb
Support for cached exceptions
bgivertz df200ac
Support for cached exceptions
bgivertz 5cbefeb
Support for cached exceptions
bgivertz 23b1213
Support for cached exceptions
bgivertz dcc091d
Support for cached exceptions
bgivertz 2fc93c9
Support for cached exceptions
bgivertz f8e54f9
Support for cached exceptions
bgivertz fc971a0
merged in master
LeonhardFS 756a4d5
build fixes
LeonhardFS 71c2570
Merge master bug fix
bgivertz d5eec30
Code cleanup
bgivertz 26f05a6
Code cleanup
bgivertz 34ed9cf
PR Comments
bgivertz 320e88b
PR Comments
bgivertz f463e4a
PR Comments
bgivertz 6331c84
PR Comments
bgivertz 6ada257
PR Comments
bgivertz 4bca0ec
PR Comments
bgivertz 5402613
PR Comments
bgivertz 86b9365
PR Comments
bgivertz c79d471
PR Comments
bgivertz 6c12312
PR Comments
bgivertz 7777886
PR Comments
bgivertz e8ee439
Change pointer to struct
bgivertz cbd69eb
Change pointer to struct
bgivertz dc167f3
Change pointer to struct
bgivertz 9b2ca38
Change pointer to struct
bgivertz 1ebe1ac
Revert "Change pointer to struct"
bgivertz 38f1522
Revert "Change pointer to struct"
bgivertz ac21393
Revert "PR Comments"
bgivertz 02a0dde
Revert "PR Comments"
bgivertz 3f8615e
Revert "PR Comments"
bgivertz 7452dda
Revert "PR Comments"
bgivertz ea391a6
Revert "PR Comments"
bgivertz bea8006
Revert "PR Comments"
bgivertz e6bae6e
Revert "PR Comments"
bgivertz 2cd5ac1
Revert "PR Comments"
bgivertz adb86ed
Revert "PR Comments"
bgivertz f60bfdb
Revert "PR Comments"
bgivertz e09af5c
Revert "PR Comments"
bgivertz b274e08
Revert "Code cleanup"
bgivertz db57a09
Revert "Code cleanup"
bgivertz 40c20bc
Documentation updates
bgivertz 2d6cd37
PR comment fixes
bgivertz d781d32
Change tuple to struct
bgivertz 9201eca
Lambda fix
bgivertz c965f86
Documentation for exception processing and mapping
bgivertz 2934f0a
Documentation and simplify resolve logic
bgivertz feb17da
Transform task documentation and logic simplification
bgivertz da0dc3b
Simplify Python Context
bgivertz 2317d84
Simplify Python Context
bgivertz 15506cf
Simplify Python Context
bgivertz 7fff790
PR Changes
bgivertz 60d0ec6
PR Changes
bgivertz c5df6c3
PR Changes
bgivertz 52e59ae
Refactor python object from parallelize
bgivertz 1fc75f5
Refactor python object from parallelize
bgivertz f6da529
Refactor python object from parallelize
bgivertz 79b44cb
Refactor python object from parallelize
bgivertz File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| //--------------------------------------------------------------------------------------------------------------------// | ||
| // // | ||
| // Tuplex: Blazing Fast Python Data Science // | ||
| // // | ||
| // // | ||
| // (c) 2017 - 2021, Tuplex team // | ||
| // Created by Benjamin Givertz first on 1/1/2022 // | ||
| // License: Apache 2.0 // | ||
| //--------------------------------------------------------------------------------------------------------------------// | ||
|
|
||
| #ifndef TUPLEX_EXCEPTIONINFO_H | ||
| #define TUPLEX_EXCEPTIONINFO_H | ||
|
|
||
| namespace tuplex { | ||
| /*! | ||
| * Struct to hold information that maps input partitions to input exceptions that occur within them. | ||
| * | ||
| * Explanation: | ||
| * Each input partition is passed the same vector of all input exceptions that occured during data parallelization | ||
| * or caching. Thus, each input partition must know how many input exceptions occur in its partition, the index | ||
| * of the input exception partition where its first exception occurs, and the offset into that partition where the | ||
| * first exception occurs. These values are held in this struct and each input partition is mapped to an ExceptionInfo. | ||
| */ | ||
| struct ExceptionInfo { | ||
| size_t numExceptions; //! number of exception rows that occur within a single input partition | ||
| size_t exceptionIndex; //! index into a vector of input exception partitions that holds the first input exception | ||
| size_t exceptionRowOffset; //! offset in rows into the first input exception partition where the first exception occurs. | ||
| size_t exceptionByteOffset; //! offset in bytes into the first input exception partition where the first exception occurs | ||
|
|
||
| ExceptionInfo() : | ||
| numExceptions(0), | ||
| exceptionIndex(0), | ||
| exceptionRowOffset(0), | ||
| exceptionByteOffset(0) {} | ||
|
|
||
| ExceptionInfo(size_t numExps, | ||
| size_t expIndex, | ||
| size_t expRowOffset, | ||
| size_t expByteOffset) : | ||
| numExceptions(numExps), | ||
| exceptionIndex(expIndex), | ||
| exceptionRowOffset(expRowOffset), | ||
| exceptionByteOffset(expByteOffset) {} | ||
| }; | ||
| } | ||
|
|
||
| #endif //TUPLEX_EXCEPTIONINFO_H | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -81,6 +81,8 @@ namespace tuplex { | |
| bool isCached() const { return _cached; } | ||
| std::vector<Partition*> cachedPartitions() const { return _normalCasePartitions; } | ||
| std::vector<Partition*> cachedExceptions() const { return _generalCasePartitions; } | ||
| std::unordered_map<std::string, ExceptionInfo> partitionToExceptionsMap() const { return _partitionToExceptionsMap; } | ||
|
|
||
| size_t getTotalCachedRows() const; | ||
|
|
||
| int64_t cost() const override; | ||
|
|
@@ -109,7 +111,8 @@ namespace tuplex { | |
| std::vector<Partition*> _generalCasePartitions; //! holds all data which is considered to be a normal-case violation, | ||
| //! i.e. which does not adhere to the normal case schema, but did not produce | ||
| //! an exception while being processed through the pipeline before | ||
| std::vector<PyObject*> _py_objects; //! all python objects who do not adhere to the general case schema ( | ||
| std::unordered_map<std::string, ExceptionInfo> _partitionToExceptionsMap; //! maps normal case partitions to corresponding general case ones | ||
| std::vector<PyObject*> _py_objects; //! all python objects who do not adhere to the general case schema | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. they do not need row numbers, right? |
||
| std::vector<std::string> _columns; | ||
|
|
||
| // internal sample of normal case rows, used for tracing & Co. | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| //--------------------------------------------------------------------------------------------------------------------// | ||
| // // | ||
| // Tuplex: Blazing Fast Python Data Science // | ||
| // // | ||
| // // | ||
| // (c) 2017 - 2021, Tuplex team // | ||
| // Created by Benjamin Givertz first on 1/1/2021 // | ||
| // License: Apache 2.0 // | ||
| //--------------------------------------------------------------------------------------------------------------------// | ||
|
|
||
| #ifndef TUPLEX_EXCEPTIONSOURCETASKBUILDER_H | ||
| #define TUPLEX_EXCEPTIONSOURCETASKBUILDER_H | ||
|
|
||
| #include "BlockBasedTaskBuilder.h" | ||
|
|
||
| namespace tuplex { | ||
| namespace codegen { | ||
| /*! | ||
| * Class used to process tuplex partitions through the pipeline when both input exceptions and filters are | ||
| * present. This is necessary to update the row indices of any input exceptions if rows are filtered out. | ||
| */ | ||
| class ExceptionSourceTaskBuilder : public BlockBasedTaskBuilder { | ||
bgivertz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| private: | ||
| void createMainLoop(llvm::Function* read_block_func, bool terminateEarlyOnLimitCode); | ||
|
|
||
| /*! | ||
| * generates code to process a row depending on parse result... | ||
| * @param builder | ||
| * @param userData a value for userData (i.e. the class ptr of the task typically) to be parsed to callback functions | ||
| * @param tuple (flattened) tuple representation of current tuple (LLVM) | ||
| * @param normalRowCountVar where to store normal row counts | ||
| * @param badRowCountVar where to store bad row counts | ||
| * @param processRowFunc (optional) function to be called before output is written. | ||
| * Most likely this is not a nullptr, because users want to transform data. | ||
| */ | ||
| void processRow(llvm::IRBuilder<>& builder, | ||
| llvm::Value* userData, | ||
| const FlattenedTuple& tuple, | ||
| llvm::Value *normalRowCountVar, | ||
| llvm::Value *badRowCountVar, | ||
| llvm::Value *outputRowNumberVar, | ||
| llvm::Value *inputRowPtr, | ||
| llvm::Value *inputRowSize, | ||
| bool terminateEarlyOnLimitCode, | ||
| llvm::Function* processRowFunc=nullptr); | ||
|
|
||
| void callProcessFuncWithHandler(llvm::IRBuilder<> &builder, llvm::Value *userData, | ||
| const FlattenedTuple &tuple, | ||
| llvm::Value *normalRowCountVar, llvm::Value *badRowCountVar, llvm::Value *rowNumberVar, | ||
| llvm::Value *inputRowPtr, llvm::Value *inputRowSize, | ||
| bool terminateEarlyOnLimitCode, | ||
| llvm::Function *processRowFunc); | ||
| public: | ||
| ExceptionSourceTaskBuilder() = delete; | ||
|
|
||
| explicit ExceptionSourceTaskBuilder(const std::shared_ptr<LLVMEnvironment>& env, const python::Type& rowType, const std::string& name) : BlockBasedTaskBuilder::BlockBasedTaskBuilder(env, rowType, name) {} | ||
|
|
||
| llvm::Function* build(bool terminateEarlyOnFailureCode) override; | ||
| }; | ||
| } | ||
| } | ||
|
|
||
| #endif //TUPLEX_EXCEPTIONSOURCETASKBUILDER_H | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.