这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 10 additions & 20 deletions tuplex/core/include/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <logical/LogicalOperator.h>
#include <DataSet.h>
#include "Partition.h"
#include <PartitionGroup.h>
#include "Row.h"
#include "HistoryServerClasses.h"
#include <initializer_list>
Expand All @@ -37,7 +38,7 @@ namespace tuplex {
class Executor;
class Partition;
class IBackend;
class ExceptionInfo;
class PartitionGroup;

class Context {
private:
Expand All @@ -59,24 +60,14 @@ namespace tuplex {

// needed because of C++ template issues
void addPartition(DataSet* ds, Partition *partition);
void addParallelizeNode(DataSet *ds, const std::vector<std::tuple<size_t, PyObject*>> &badParallelizeObjects=std::vector<std::tuple<size_t, PyObject*>>(), const std::vector<size_t> &numExceptionsInPartition=std::vector<size_t>()); //! adds a paralellize node to the computation graph

/*!
* serialize python objects as pickled objects into exception partitions. Set the python objects map to
* map all normalPartitions to the exceptions that occured within them.
* @param pythonObjects normal case schema violations and their initial row numbers
* @param numExceptionsInPartition number of exceptions in each normal partition
* @param normalPartitions normal partitions created
* @param opID parallelize operator ID
* @param serializedPythonObjects output vector for partitions
* @param pythonObjectsMap output for mapping
* Add parallelize logical operator to dataset
* @param ds dataset
* @param fallbackPartitions fallback partitions from python parallelize
* @param partitionGroups partition mapping information
*/
void serializePythonObjects(const std::vector<std::tuple<size_t, PyObject*>>& pythonObjects,
const std::vector<size_t> &numExceptionsInPartition,
const std::vector<Partition*> &normalPartitions,
const int64_t opID,
std::vector<Partition*> &serializedPythonObjects,
std::unordered_map<std::string, ExceptionInfo> &pythonObjectsMap);
void addParallelizeNode(DataSet *ds, const std::vector<Partition*>& fallbackPartitions=std::vector<Partition*>{}, const std::vector<PartitionGroup>& partitionGroups=std::vector<PartitionGroup>{}); //! adds a paralellize node to the computation graph

Partition* requestNewPartition(const Schema& schema, const int dataSetID, size_t minBytesRequired);
uint8_t* partitionLockRaw(Partition *partition);
Expand Down Expand Up @@ -270,13 +261,12 @@ namespace tuplex {
* @param partitions partitions to assign to dataset. These should NOT be reused later.
* Also, partitions need to hold data in supplied schema. If empty vector is given,
* empty dataset will be created.
*
* @param fallbackPartitions fallback partitions to assign to dataset
* @param partitionGroups mapping of partitions to fallback partitions
* @param columns optional column names
* @param badParallelizeObjects schema violations found during parallelization of partitions
* @param numExceptionsInPartition number of schema violations that occured in each of the partitions
* @return reference to newly created dataset.
*/
DataSet& fromPartitions(const Schema& schema, const std::vector<Partition*>& partitions, const std::vector<std::string>& columns, const std::vector<std::tuple<size_t, PyObject*>> &badParallelizeObjects, const std::vector<size_t> &numExceptionsInPartition);
DataSet& fromPartitions(const Schema& schema, const std::vector<Partition*>& partitions, const std::vector<Partition*>& fallbackPartitions, const std::vector<PartitionGroup>& partitionGroups, const std::vector<std::string>& columns);
};
// needed for template mechanism to work
#include <DataSet.h>
Expand Down
47 changes: 0 additions & 47 deletions tuplex/core/include/ExceptionInfo.h

This file was deleted.

53 changes: 53 additions & 0 deletions tuplex/core/include/PartitionGroup.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//--------------------------------------------------------------------------------------------------------------------//
// //
// Tuplex: Blazing Fast Python Data Science //
// //
// //
// (c) 2017 - 2021, Tuplex team //
// Created by Benjamin Givertz first on 1/1/2021 //
// License: Apache 2.0 //
//--------------------------------------------------------------------------------------------------------------------//

#ifndef TUPLEX_PARTITIONGROUP_H
#define TUPLEX_PARTITIONGROUP_H

/*!
* Maintains a grouping of normal, fallback, and general partitions that arise from the
* same executor in order to be reprocessed after a cache occurs.
*/
namespace tuplex {
struct PartitionGroup {
/*!
* Groups normal, general, and fallback partitions into groups, with all partitions that originated from a single
* task being grouped together.
* @param numNormalPartitons number of normal partitions in group
* @param normalPartitionStartIndex starting index in list of all normal partitions
* @param numGeneralPartitions number of general partitions in group
* @param generalPartitionStartIndex starting index in list of all general partitions
* @param numFallbackPartitions number of fallback partitions in group
* @param fallbackPartitionStartIndex starting index in list of all fallback partitions
*/
PartitionGroup(size_t numNormalPartitions, size_t normalPartitionStartIndex,
size_t numGeneralPartitions, size_t generalPartitionStartIndex,
size_t numFallbackPartitions, size_t fallbackPartitionStartIndex):
numNormalPartitions(numNormalPartitions), normalPartitionStartIndex(normalPartitionStartIndex),
numGeneralPartitions(numGeneralPartitions), generalPartitionStartIndex(generalPartitionStartIndex),
numFallbackPartitions(numFallbackPartitions), fallbackPartitionStartIndex(fallbackPartitionStartIndex) {}

/*!
* Initialize empty struct with all values set to zero.
*/
PartitionGroup() :
numNormalPartitions(0), numGeneralPartitions(0), numFallbackPartitions(0),
normalPartitionStartIndex(0), generalPartitionStartIndex(0), fallbackPartitionStartIndex(0) {}

size_t numNormalPartitions;
size_t normalPartitionStartIndex;
size_t numGeneralPartitions;
size_t generalPartitionStartIndex;
size_t numFallbackPartitions;
size_t fallbackPartitionStartIndex;
};
}

#endif //TUPLEX_PARTITIONGROUP_H
18 changes: 9 additions & 9 deletions tuplex/core/include/ee/local/LocalBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ namespace tuplex {
return std::accumulate(counts.begin(), counts.end(), 0, [](size_t acc, std::pair<std::tuple<int64_t, ExceptionCode>, size_t> val) { return acc + val.second; });
}

inline std::vector<Partition*> getOutputPartitions(IExecutorTask* task) {
inline std::vector<Partition*> getNormalPartitions(IExecutorTask* task) const {
if(!task)
return std::vector<Partition*>();

Expand All @@ -113,7 +113,7 @@ namespace tuplex {
return std::vector<Partition*>();
}

inline std::vector<Partition*> getRemainingExceptions(IExecutorTask* task) {
inline std::vector<Partition*> getExceptionPartitions(IExecutorTask* task) const {
if(!task)
return std::vector<Partition*>();

Expand All @@ -127,7 +127,7 @@ namespace tuplex {
return std::vector<Partition*>();
}

inline std::vector<Partition*> generalCasePartitions(IExecutorTask* task) {
inline std::vector<Partition*> getGeneralPartitions(IExecutorTask* task) const {
if(!task)
return std::vector<Partition*>();

Expand All @@ -141,7 +141,7 @@ namespace tuplex {
return std::vector<Partition*>();
}

inline std::unordered_map<std::tuple<int64_t, ExceptionCode>, size_t> getExceptionCounts(IExecutorTask* task) {
inline std::unordered_map<std::tuple<int64_t, ExceptionCode>, size_t> getExceptionCounts(IExecutorTask* task) const {
if(!task)
return std::unordered_map<std::tuple<int64_t, ExceptionCode>, size_t>();

Expand All @@ -155,18 +155,18 @@ namespace tuplex {
return std::unordered_map<std::tuple<int64_t, ExceptionCode>, size_t>();
}

inline std::vector<std::tuple<size_t, PyObject*>> getNonConformingRows(IExecutorTask* task) {
inline std::vector<Partition*> getFallbackPartitions(IExecutorTask* task) const {
if(!task)
return std::vector<std::tuple<size_t, PyObject*>>();
return std::vector<Partition*>();

if(task->type() == TaskType::UDFTRAFOTASK)
return std::vector<std::tuple<size_t, PyObject*>>(); // none here, can be only result from ResolveTask.
return std::vector<Partition*>(); // none here, can be only result from ResolveTask.

if(task->type() == TaskType::RESOLVE)
return dynamic_cast<ResolveTask*>(task)->getNonConformingRows();
return dynamic_cast<ResolveTask*>(task)->getOutputFallbackPartitions();

throw std::runtime_error("unknown task type seen in " + std::string(__FILE_NAME__) + ":" + std::to_string(__LINE__));
return std::vector<std::tuple<size_t, PyObject*>>();
return std::vector<Partition*>();
}

std::vector<IExecutorTask*> resolveViaSlowPath(std::vector<IExecutorTask*>& tasks,
Expand Down
32 changes: 12 additions & 20 deletions tuplex/core/include/logical/CacheOperator.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace tuplex {

CacheOperator(LogicalOperator* parent, bool storeSpecialized,
const Schema::MemoryLayout& memoryLayout=Schema::MemoryLayout::ROW) : LogicalOperator(parent), _storeSpecialized(storeSpecialized), _memoryLayout(memoryLayout), _cached(false),
_columns(parent->columns()) {
_columns(parent->columns()), _normalRowCount(0), _fallbackRowCount(0), _generalRowCount(0) {
setSchema(this->parent()->getOutputSchema()); // inherit schema from parent
_optimizedSchema = getOutputSchema();
if(memoryLayout != Schema::MemoryLayout::ROW)
Expand Down Expand Up @@ -79,9 +79,10 @@ namespace tuplex {
* @return
*/
bool isCached() const { return _cached; }
std::vector<Partition*> cachedPartitions() const { return _normalCasePartitions; }
std::vector<Partition*> cachedExceptions() const { return _generalCasePartitions; }
std::unordered_map<std::string, ExceptionInfo> partitionToExceptionsMap() const { return _partitionToExceptionsMap; }
std::vector<Partition*> cachedNormalPartitions() const { return _normalPartitions; }
std::vector<Partition*> cachedGeneralPartitions() const { return _generalPartitions; }
std::vector<Partition*> cachedFallbackPartitions() const { return _fallbackPartitions; }
std::vector<PartitionGroup> partitionGroups() const { return _partitionGroups; }

size_t getTotalCachedRows() const;

Expand All @@ -107,28 +108,19 @@ namespace tuplex {
// or merge them.
bool _cached;
bool _storeSpecialized;
std::vector<Partition*> _normalCasePartitions; //! holds all data conforming to the normal case schema
std::vector<Partition*> _generalCasePartitions; //! holds all data which is considered to be a normal-case violation,
//! i.e. which does not adhere to the normal case schema, but did not produce
//! an exception while being processed through the pipeline before
std::unordered_map<std::string, ExceptionInfo> _partitionToExceptionsMap; //! maps normal case partitions to corresponding general case ones
std::vector<PyObject*> _py_objects; //! all python objects who do not adhere to the general case schema
std::vector<Partition*> _normalPartitions; //! holds all data conforming to the normal case schema
std::vector<Partition*> _generalPartitions; //! holds all data which is considered to be a normal-case violation,
std::vector<Partition*> _fallbackPartitions; //! holds all data which is output as a python object from interpreter processing
std::vector<PartitionGroup> _partitionGroups; //! groups together partitions for correct row ordering
std::vector<std::string> _columns;

// internal sample of normal case rows, used for tracing & Co.
std::vector<Row> _sample;

// number of rows need to be stored for cost estimates
size_t _normalCaseRowCount;
size_t _generalCaseRowCount;

// @TODO: there should be 3 things stored
// 1.) common case => i.e.
// 2.) general case => i.e. what in general can be done (null-values & Co, wide integers, ...)
// 3.) python case => i.e. things that don't fit into either case (interpreter objects serialized via pickle)

// Note: the pickling could be parallelized by simply matching python types & Co...
// ==> store python data as tuple of elements!
size_t _normalRowCount;
size_t _generalRowCount;
size_t _fallbackRowCount;
};
}

Expand Down
19 changes: 9 additions & 10 deletions tuplex/core/include/logical/ParallelizeOperator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
namespace tuplex {
class ParallelizeOperator : public LogicalOperator {

std::vector<Partition*> _partitions; // data, conforming to majority type
std::vector<Partition*> _pythonObjects; // schema violations stored for interpreter processing as python objects
// maps partitions to their corresponding python objects
std::unordered_map<std::string, ExceptionInfo> _inputPartitionToPythonObjectsMap;
std::vector<Partition*> _normalPartitions; // data, conforming to majority type
std::vector<Partition*> _fallbackPartitions; // schema violations stored for interpreter processing as python objects
std::vector<PartitionGroup> _partitionGroups; // maps normal partitions to their corresponding fallback partitions
std::vector<std::string> _columnNames;

std::vector<Row> _sample; // sample, not necessary conforming to one type
Expand All @@ -31,7 +30,7 @@ namespace tuplex {

// this a root node
ParallelizeOperator(const Schema& schema,
const std::vector<Partition*>& partitions,
const std::vector<Partition*>& normalPartitions,
const std::vector<std::string>& columns);

std::string name() override { return "parallelize"; }
Expand All @@ -47,13 +46,13 @@ namespace tuplex {
* get the partitions where the parallelized data is stored.
* @return vector of partitions.
*/
std::vector<tuplex::Partition*> getPartitions();
std::vector<tuplex::Partition*> getNormalPartitions();

void setPythonObjects(const std::vector<Partition*> &pythonObjects) { _pythonObjects = pythonObjects; }
std::vector<Partition *> getPythonObjects() { return _pythonObjects; }
void setFallbackPartitions(const std::vector<Partition*> &fallbackPartitions) { _fallbackPartitions = fallbackPartitions; }
std::vector<Partition *> getFallbackPartitions() { return _fallbackPartitions; }

void setInputPartitionToPythonObjectsMap(const std::unordered_map<std::string, ExceptionInfo>& pythonObjectsMap) { _inputPartitionToPythonObjectsMap = pythonObjectsMap; }
std::unordered_map<std::string, ExceptionInfo> getInputPartitionToPythonObjectsMap() { return _inputPartitionToPythonObjectsMap; }
void setPartitionGroups(const std::vector<PartitionGroup>& partitionGroups) { _partitionGroups = partitionGroups; }
std::vector<PartitionGroup> getPartitionGroups() { return _partitionGroups; }

Schema getInputSchema() const override { return getOutputSchema(); }

Expand Down
Loading