这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion tuplex/core/include/JobMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ namespace tuplex {
// size_t fast_path_output_row_count;
// size_t slow_path_input_row_count;
// size_t slow_path_output_row_count;

// disk spilling metrics
int partitions_swapin_count = 0;
int partitions_swapout_count = 0;
size_t partitions_bytes_swapped_out = 0;
size_t partitions_bytes_swapped_in = 0;
};
std::vector<StageMetrics> _stage_metrics;

Expand Down Expand Up @@ -172,6 +178,26 @@ namespace tuplex {
it->fast_path_per_row_time_ns = fast_path_per_row_time_ns;
}

/*!
* add statistics for disk spilling (per stage)
* @param stageNo stage
* @param partitions_swapin_count how often partitions were swapped in
* @param partitions_bytes_swapped_in how many bytes were swapped in
* @param partitions_swapout_count how often partitions were swapped out
* @param partitions_bytes_swapped_out how many bytes were swapped out
*/
void setDiskSpillStatistics(int stageNo,
int partitions_swapin_count,
size_t partitions_bytes_swapped_in,
int partitions_swapout_count,
size_t partitions_bytes_swapped_out) {
auto it = get_or_create_stage_metrics(stageNo);
it->partitions_swapin_count = partitions_swapin_count;
it->partitions_swapout_count = partitions_swapout_count;
it->partitions_bytes_swapped_in = partitions_bytes_swapped_in;
it->partitions_bytes_swapped_out = partitions_bytes_swapped_out;
}

/*!
* set sampling time in s for specific operator
* @param time
Expand Down Expand Up @@ -204,7 +230,11 @@ namespace tuplex {
ss<<"\"fast_path_per_row_time_ns\":"<<s.fast_path_per_row_time_ns<<",";
ss<<"\"slow_path_wall_time_s\":"<<s.slow_path_wall_time_s<<",";
ss<<"\"slow_path_time_s\":"<<s.slow_path_time_s<<",";
ss<<"\"slow_path_per_row_time_ns\":"<<s.slow_path_per_row_time_ns;
ss<<"\"slow_path_per_row_time_ns\":"<<s.slow_path_per_row_time_ns<<",";
ss<<"\"partitions_swapin_count\":"<<s.partitions_swapin_count<<",";
ss<<"\"partitions_swapout_count\":"<<s.partitions_swapout_count<<",";
ss<<"\"partitions_bytes_swapped_in\":"<<s.partitions_bytes_swapped_in<<",";
ss<<"\"partitions_bytes_swapped_out\":"<<s.partitions_bytes_swapped_out;
ss<<"}";
if(s.stageNo != _stage_metrics.back().stageNo)
ss<<",";
Expand Down
42 changes: 42 additions & 0 deletions tuplex/core/include/Partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ namespace tuplex {
// variables when being swapped out.
std::string _localFilePath;
bool _swappedToFile;

// metrics
// static variables holding information on how many swapIns/swapOuts happened
static std::atomic_int64_t _swapInCount;
static std::atomic_int64_t _swapOutCount;
static std::atomic_int64_t _swapInBytesRead;
static std::atomic_int64_t _swapOutBytesWritten;
public:

Partition(Executor* const owner,
Expand Down Expand Up @@ -254,6 +261,41 @@ namespace tuplex {

const Executor* owner() const { return _owner; }

/*
* reset Partition statistics. SHOULD NOT BE CALLED when computation is active, because though internally
* backed by atomics this function is not threadsafe (no mutex used here)
*/
static void resetStatistics() {
_swapInCount = 0;
_swapOutCount = 0;
_swapInBytesRead = 0;
_swapOutBytesWritten =0;
}

/*!
* how often partitions were swapped in from disk spill
* @return number of times disk spill (in) occurred
*/
static size_t statisticSwapInCount() { return _swapInCount; }

/*!
* how often partitions were swapped to disk
* @return number of times disk spill (out) occurred
*/
static size_t statisticSwapOutCount() { return _swapOutCount; }

/*!
* how many bytes were read back during disk spill
* @return amount of bytes
*/
static size_t statisticSwapInBytesRead() { return _swapInBytesRead; }

/*!
* how many bytes were written in total during disk spill
* @return amount of bytes
*/
static size_t statisticSwapOutBytesWritten() { return _swapOutBytesWritten; }


// for debug asserts.
friend class Executor;
Expand Down
14 changes: 14 additions & 0 deletions tuplex/core/include/physical/ResultSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,23 @@ namespace tuplex {
const std::vector<std::tuple<size_t, PyObject*>> pyobjects=std::vector<std::tuple<size_t, PyObject*>>{},
int64_t maxRows=std::numeric_limits<int64_t>::max());

/*!
* check whether result contains one more row
*/
bool hasNextRow();

/*!
* get next row. If exhausted, return dummy empty row.
*/
Row getNextRow();

/*!
* get up to limit rows
* @param limit how many rows to retrieve at most
* @return vector of rows with up to limit rows
*/
std::vector<Row> getRows(size_t limit);

bool hasNextPartition() const;

/*! user needs to invalidate then!
Expand Down
10 changes: 6 additions & 4 deletions tuplex/core/src/DataSet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@ namespace tuplex {
#warning "limiting should make this hack irrelevant..."
if (numElements < 0)
numElements = std::numeric_limits<int64_t>::max();
std::vector<Row> v;
while (rs->hasNextRow() && v.size() < numElements) {
v.push_back(rs->getNextRow());
}

// std::vector<Row> v;
// while (rs->hasNextRow() && v.size() < numElements) {
// v.push_back(rs->getNextRow());
// }
auto v = rs->getRows(numElements); // faster version instead of the loop above.

Logger::instance().defaultLogger().debug("Result set converted to " + pluralize(v.size(), "row"));
Logger::instance().defaultLogger().info(
Expand Down
15 changes: 14 additions & 1 deletion tuplex/core/src/Partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
#include <Utils.h>

namespace tuplex {

// init all atomic metric counters
std::atomic_int64_t Partition::_swapInCount(0);
std::atomic_int64_t Partition::_swapOutCount(0);
std::atomic_int64_t Partition::_swapInBytesRead(0);
std::atomic_int64_t Partition::_swapOutBytesWritten(0);

const uint8_t* Partition::lockRaw() {

assert(_owner);
Expand Down Expand Up @@ -116,6 +123,9 @@ namespace tuplex {

fclose(pFile);

_swapOutCount++;
_swapOutBytesWritten += _size + sizeof(uint64_t);

return true;
}

Expand All @@ -133,7 +143,6 @@ namespace tuplex {
return;
}


// read from file
fread(&_bytesWritten, sizeof(uint64_t), 1, pFile);
fread(_arena, _size, 1, pFile);
Expand All @@ -145,6 +154,10 @@ namespace tuplex {
throw std::runtime_error("failed removing file from path " + path);
}

// update metric counters
_swapInCount++;
_swapInBytesRead += _size + sizeof(uint64_t);

// auto vfs = VirtualFileSystem::fromURI(uri);
// uint64_t file_size = 0;
// vfs.file_size(uri, file_size);
Expand Down
10 changes: 10 additions & 0 deletions tuplex/core/src/ee/local/LocalBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,9 @@ namespace tuplex {
Timer stageTimer;
Timer timer; // for detailed measurements.

// reset Partition stats
Partition::resetStatistics();

// special case: no input, return & set empty result
// Note: file names & sizes are also saved in input partition!
if (tstage->inputMode() != EndPointMode::HASHTABLE
Expand Down Expand Up @@ -1130,6 +1133,13 @@ namespace tuplex {

freeTasks(completedTasks);

// update metrics
metrics.setDiskSpillStatistics(tstage->number(),
Partition::statisticSwapInCount(),
Partition::statisticSwapInBytesRead(),
Partition::statisticSwapOutCount(),
Partition::statisticSwapOutBytesWritten());

// info how long the trafo stage took
std::stringstream ss;
ss<<"[Transform Stage] Stage "<<tstage->number()<<" took "<<stageTimer.time()<<"s";
Expand Down
74 changes: 74 additions & 0 deletions tuplex/core/src/physical/ResultSet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,80 @@ namespace tuplex {
return first;
}

std::vector<Row> ResultSet::getRows(size_t limit) {
using namespace std;

if(0 == limit)
return vector<Row>{};

vector<Row> v;

// reserve up to 32k (do NOT always execute because special case upper limit is used)
if(limit < 32000)
v.reserve(limit);

// do a quick check whether there are ANY pyobjects, if not deserialize quickly!
if(_pyobjects.empty()) {

if(_partitions.empty())
return vector<Row>{};

Deserializer ds(_schema);
for(int i = 0; i < limit;) {

// all exhausted
if(_partitions.empty())
break;

// get number of rows in first partition
Partition *first = _partitions.front();
auto num_rows = first->getNumRows();
// how many left to retrieve?
auto num_to_retrieve_from_partition = std::min(limit - i, num_rows - _curRowCounter);
if(num_to_retrieve_from_partition <= 0)
break;

// make sure partition schema matches stored schema
assert(_schema == first->schema());

// thread safe version (slow)
// get next element of partition
const uint8_t* ptr = first->lock();
for(int j = 0; j < num_to_retrieve_from_partition; ++j) {
auto row = Row::fromMemory(ds, ptr + _byteCounter, first->capacity() - _byteCounter);
_byteCounter += row.serializedLength();
_curRowCounter++;
_rowsRetrieved++;
_totalRowCounter++;
v.push_back(row);
}

// thread safe version (slow)
// deserialize
first->unlock();

i += num_to_retrieve_from_partition;

// get next Partition ready when current one is exhausted
if(_curRowCounter == first->getNumRows())
removeFirstPartition();
}

v.shrink_to_fit();
return v;
} else {
// fallback solution:
// @TODO: write faster version with proper merging!

std::vector<Row> v;
while (hasNextRow() && v.size() < limit) {
v.push_back(getNextRow());
}
v.shrink_to_fit();
return v;
}
}

Row ResultSet::getNextRow() {
// merge rows from objects
if(!_pyobjects.empty()) {
Expand Down
14 changes: 8 additions & 6 deletions tuplex/core/src/physical/StageBuilder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -612,18 +612,20 @@ namespace tuplex {
}
#endif


#ifndef NDEBUG
if(!operators.empty()) {
cout<<"output type of specialized pipeline is: "<<outSchema.desc()<<endl;
cout<<"is this the most outer stage?: "<<_isRootStage<<endl;
stringstream ss;
ss<<"output type of specialized pipeline is: "<<outSchema.desc()<<endl;
ss<<"is this the most outer stage?: "<<_isRootStage<<endl;
if(!_isRootStage)
cout<<"need to upgrade output type to "<<_operators.back()->getOutputSchema().getRowType().desc()<<endl;
ss<<"need to upgrade output type to "<<_operators.back()->getOutputSchema().getRowType().desc()<<endl;

logger.debug(ss.str());
}
#endif

#ifndef NDEBUG
assert(inSchema != python::Type::UNKNOWN);
assert(outSchema != python::Type::UNKNOWN);
#endif

// special case: empty pipeline
if (outSchema.parameters().empty() && inSchema.parameters().empty()) {
Expand Down
1 change: 1 addition & 0 deletions tuplex/test/core/DataSetCollect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ TEST_F(DataSetTest, BuiltinPowerWithComplexResult) {
.resolve(ExceptionCode::ZERODIVISIONERROR, UDF("lambda x: 42"))
.collectAsVector();


ASSERT_EQ(res.size(), 7);
// to python string!
}
Expand Down
2 changes: 1 addition & 1 deletion tuplex/test/core/IsKeywordTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -288,4 +288,4 @@ TEST_F(IsKeywordTest, MultipleIs4) {
assert(m[0].toPythonString() == "(False,)");
assert(m[1].toPythonString() == "(False,)");
assert(m[2].toPythonString() == "(False,)");
}
}
24 changes: 23 additions & 1 deletion tuplex/test/core/LargerThanMemoryDataSet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ TEST_F(DiskSwapping, MicroSwap) {
// validate result
for(int64_t i = 0; i < numIntegers; i++)
EXPECT_EQ(i, res[i].getInt(0));

// print metrics
std::cout<<"METRICS:\n======="<<std::endl;
std::cout<<c.metrics().to_json()<<std::endl;
using namespace std;
cout<<"swaps in: "<<Partition::statisticSwapInCount()<<endl;
cout<<"swaps out: "<<Partition::statisticSwapOutCount()<<endl;
cout<<"swap bytes read: "<<sizeToMemString(Partition::statisticSwapInBytesRead())<<endl;
cout<<"swap bytes written: "<<sizeToMemString(Partition::statisticSwapOutBytesWritten())<<endl;
std::cout<<std::endl;
}

TEST_F(DiskSwapping, MiniSwap) {
Expand All @@ -57,14 +67,16 @@ TEST_F(DiskSwapping, MiniSwap) {
co.set("tuplex.partitionSize", "200KB");
//co.set("tuplex.executorMemory", "1MB");
co.set("tuplex.executorMemory", "400KB");
co.set("tuplex.driverMemory", "400KB");
co.set("tuplex.useLLVMOptimizer", "false");
co.set("tuplex.executorCount", "4"); // fix to 4 threads.

Context c(co);


// one integer needs 8 bytes, given here are 200KB partitions
// i.e. in order to go over the limit (1MB) following number of integers are needed
int64_t numIntegers = static_cast<int64_t>(1.1 * (400 * 1024 / 8)); //1.1 * (1024 * 1024 / 8); // give 10% more
int64_t numIntegers = static_cast<int64_t>(5.7 * 1.1 * (400 * 1024 / 8)); //1.1 * (1024 * 1024 / 8); // give 10% more

std::vector<int64_t> data;
data.reserve(numIntegers);
Expand All @@ -84,6 +96,16 @@ TEST_F(DiskSwapping, MiniSwap) {
// validate result
for(int64_t i = 0; i < numIntegers; i++)
EXPECT_EQ(i, res[i].getInt(0));

// print metrics
std::cout<<"METRICS:\n======="<<std::endl;
std::cout<<c.metrics().to_json()<<std::endl;
using namespace std;
cout<<"swaps in: "<<Partition::statisticSwapInCount()<<endl;
cout<<"swaps out: "<<Partition::statisticSwapOutCount()<<endl;
cout<<"swap bytes read: "<<sizeToMemString(Partition::statisticSwapInBytesRead())<<endl;
cout<<"swap bytes written: "<<sizeToMemString(Partition::statisticSwapOutBytesWritten())<<endl;
std::cout<<std::endl;
}

TEST_F(DiskSwapping, SwapWithLambda) {
Expand Down
Loading