这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions tuplex/core/include/DataSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ namespace tuplex {
*/
virtual DataSet &renameColumn(const std::string &oldColumnName, const std::string &newColumnName);

/*!
* rename column based on position in dataframe. throws error if invalid index is supplied.
* @param index position, 0 <= index < #columns
* @param newColumnName new column name
* @return Dataset or Errordataset
*/
virtual DataSet &renameColumn(int index, const std::string& newColumnName);

/*!
* add a new column to dataset, whose result is defined through the given udf
* @param columnName
Expand Down Expand Up @@ -209,6 +217,12 @@ namespace tuplex {
*/
Schema schema() const;

/*!
* How many columns dataset has (at least 1)
* @return number of columns
*/
size_t numColumns() const;

/*!
* join dataset with other dataset, either based on (K, V), (K, W) layout or via column names(equijoin)
* @param other
Expand Down
1 change: 1 addition & 0 deletions tuplex/core/include/EmptyDataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ namespace tuplex {
virtual DataSet& selectColumns(const std::vector<std::string>& columnNames) override { return *this; }
virtual DataSet& selectColumns(const std::vector<size_t>& columnIndices) override { return *this; }
virtual DataSet& renameColumn(const std::string& oldColumnName, const std::string& newColumnName) override { return *this; }
virtual DataSet& renameColumn(int index, const std::string& newColumnName) override { return *this; }
virtual DataSet& withColumn(const std::string& columnName, const UDF& udf) override { return *this; }
virtual void tofile(FileFormat fmt,
const URI& uri,
Expand Down
1 change: 1 addition & 0 deletions tuplex/core/include/ErrorDataSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ namespace tuplex {
DataSet& selectColumns(const std::vector<std::string>& columnNames) override { return *this; }
DataSet& selectColumns(const std::vector<size_t>& columnIndices) override { return *this; }
DataSet& renameColumn(const std::string& oldColumnName, const std::string& newColumnName) override { return *this; }
DataSet& renameColumn(int, const std::string& newColumnName) override { return *this; }
DataSet& withColumn(const std::string& columnName, const UDF& udf) override { return *this; }
void tofile(FileFormat fmt,
const URI& uri,
Expand Down
54 changes: 51 additions & 3 deletions tuplex/core/src/DataSet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -378,17 +378,56 @@ namespace tuplex {
assert(_context);
assert(_operator);

if(_columnNames.empty()) {
return _context->makeError("Dataset has no column names specified, try to use position based renameColumn function");
}

// find old column in current columns
auto it = std::find(_columnNames.begin(), _columnNames.end(), oldColumnName);
if(it == _columnNames.end())
return _context->makeError("renameColumn: could not find column '" + oldColumnName + "' in dataset's columns");
if(it == _columnNames.end()) {
// fuzzy match against existing columns
assert(_columnNames.size() >= 1);
auto closest_index = fuzzyMatch(oldColumnName, _columnNames);
assert(closest_index >= 0 && closest_index < _columnNames.size());
auto closest_name = _columnNames[closest_index];
return _context->makeError("renameColumn: could not find column '" + oldColumnName + "' in dataset's columns. Did you mean \"" + closest_name + "\"?");
}

// position?
auto idx = it - _columnNames.begin();
return renameColumn(idx, newColumnName);
}

size_t DataSet::numColumns() const {
assert(schema().getRowType().isTupleType());
return this->schema().getRowType().parameters().size();
}

DataSet& DataSet::renameColumn(int index, const std::string &newColumnName) {
using namespace std;

if(isError())
return *this;

assert(_context);
assert(_operator);

auto num_columns = numColumns();
if(index < 0)
return _context->makeError("index must be non-negative number");
if(index >= num_columns)
return _context->makeError("Dataset contains only " + std::to_string(num_columns) + ", can't rename the " +
ordinal(index + 1) + " column");

// make copy
vector<string> columnNames(_columnNames.begin(), _columnNames.end());
columnNames[idx] = newColumnName;

// are column names empty? If so, fill in with blanks!
if(columnNames.empty()) {
columnNames = vector<string>(num_columns, "");
}

columnNames[index] = newColumnName;

// create dummy map operator
// now it is a simple map operator
Expand All @@ -409,6 +448,15 @@ namespace tuplex {
return _context->makeError("job aborted (signal received)");
}

// emit warning if non-unique names anymore
// ==> only for non-empty strings
std::vector<std::string> non_empty_names;
std::copy_if(columnNames.begin(), columnNames.end(), std::back_inserter(non_empty_names), [](const std::string& name) { return name != ""; });
std::set<std::string> unique_names(non_empty_names.begin(), non_empty_names.end());
if(unique_names.size() != non_empty_names.size()) {
Logger::instance().defaultLogger().info("Found duplicate column names. Note that this can negatively impact UDFs and subsequent operators.");
}

return ds;
}

Expand Down
2 changes: 2 additions & 0 deletions tuplex/python/include/PythonDataSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ namespace tuplex {

PythonDataSet renameColumn(const std::string& oldName, const std::string& newName);

PythonDataSet renameColumnByPosition(int index, const std::string& newName);

PythonDataSet ignore(const int64_t exceptionCode);

PythonDataSet join(const PythonDataSet& right, const std::string& leftKeyColumn, const std::string& rightKeyColumn,
Expand Down
1 change: 1 addition & 0 deletions tuplex/python/src/PythonBindings.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ PYMODULE {
.def("withColumn", &tuplex::PythonDataSet::withColumn)
.def("selectColumns", &tuplex::PythonDataSet::selectColumns)
.def("renameColumn", &tuplex::PythonDataSet::renameColumn)
.def("renameColumnByPosition", &tuplex::PythonDataSet::renameColumnByPosition)
.def("join", &tuplex::PythonDataSet::join)
.def("leftJoin", &tuplex::PythonDataSet::leftJoin)
.def("columns", &tuplex::PythonDataSet::columns)
Expand Down
37 changes: 37 additions & 0 deletions tuplex/python/src/PythonDataSet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,43 @@ namespace tuplex {
return pds;
}

PythonDataSet PythonDataSet::renameColumnByPosition(int index, const std::string &newName) {
assert(_dataset);
if (_dataset->isError()) {
PythonDataSet pds;
pds.wrap(this->_dataset);
return pds;
}

PythonDataSet pds;
// GIL release & reacquire
assert(PyGILState_Check()); // make sure this thread holds the GIL!
python::unlockGIL();
DataSet *ds = nullptr;
std::string err_message = "";
try {
ds = &_dataset->renameColumn(index, newName);
} catch(const std::exception& e) {
err_message = e.what();
Logger::instance().defaultLogger().error(err_message);
} catch(...) {
err_message = "unknown C++ exception occurred, please change type.";
Logger::instance().defaultLogger().error(err_message);
}

python::lockGIL();

// nullptr? then error dataset!
if(!ds || !err_message.empty()) {
Logger::instance().flushAll();
assert(_dataset->getContext());
ds = &_dataset->getContext()->makeError(err_message);
}
pds.wrap(ds);
Logger::instance().flushAll();
return pds;
}

PythonDataSet PythonDataSet::selectColumns(boost::python::list L) {
// check dataset is valid & perform error check.
assert(this->_dataset);
Expand Down
15 changes: 14 additions & 1 deletion tuplex/python/tests/test_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,17 @@ def test_withColumnUnnamed(self):
.withColumn('newcol', lambda a, b: (a + b)/10) \
.collect()

self.assertEqual(res, [(1, 2, 3/10), (3, 2, 5/10)])
self.assertEqual(res, [(1, 2, 3/10), (3, 2, 5/10)])

def test_renameColumn(self):
ds = self.c.parallelize([(1, 2), (3, 2)])

ds2 = ds.renameColumn(0, 'first')
self.assertEqual(ds2.columns[0], 'first')
ds3 = ds2.renameColumn(1, 'second')
self.assertEqual(ds3.columns[1], 'second')

ds4 = ds3.renameColumn("first", '1')
self.assertEqual(ds4.columns, ['1', 'second'])
ds5 = ds4.renameColumn('second', '2')
self.assertEqual(ds5.columns, ['1', '2'])
13 changes: 9 additions & 4 deletions tuplex/python/tuplex/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,10 @@ def selectColumns(self, columns):
ds._dataSet = self._dataSet.selectColumns(columns)
return ds

def renameColumn(self, oldColumnName, newColumnName):
def renameColumn(self, key, newColumnName):
""" rename a column in dataset
Args:
oldColumnName: str, old column name. Must exist.
key: str|int, old column name or (0-indexed) position.
newColumnName: str, new column name

Returns:
Expand All @@ -273,11 +273,16 @@ def renameColumn(self, oldColumnName, newColumnName):

assert self._dataSet is not None, 'internal API error, datasets must be created via context object'

assert isinstance(oldColumnName, str), 'oldColumnName must be a string'
assert isinstance(key, (str, int)), 'key must be a string or integer'
assert isinstance(newColumnName, str), 'newColumnName must be a string'

ds = DataSet()
ds._dataSet = self._dataSet.renameColumn(oldColumnName, newColumnName)
if isinstance(key, str):
ds._dataSet = self._dataSet.renameColumn(key, newColumnName)
elif isinstance(key, int):
ds._dataSet = self._dataSet.renameColumnByPosition(key, newColumnName)
else:
raise TypeError('key must be int or str')
return ds

def ignore(self, eclass):
Expand Down
25 changes: 25 additions & 0 deletions tuplex/test/core/DataFrameOperations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -382,4 +382,29 @@ TEST_F(DataFrameTest, FolderOutput) {
// load file from disk
auto content = fileToString(URI("output/part0.csv"));
EXPECT_EQ(content, "A,B\n11,20\n11,40\n");
}

TEST_F(DataFrameTest, RenameColumns) {
using namespace tuplex;
Context c(microTestOptions());

// rename test, position based:
auto& ds = c.parallelize({Row(1, 2), Row(3, 4)});
auto cols_before_rename = ds.columns();

EXPECT_EQ(cols_before_rename.size(), 0); // no columns defined

// now rename columns
auto& ds2 = ds.renameColumn(0, "first");
ASSERT_EQ(ds2.columns().size(), 2);
EXPECT_EQ(ds2.columns()[0], "first");
EXPECT_EQ(ds2.columns()[1], "");
auto& ds3 = ds2.renameColumn(1, "second");
ASSERT_EQ(ds3.columns().size(), 2);
EXPECT_EQ(ds3.columns()[0], "first");
EXPECT_EQ(ds3.columns()[1], "second");

// check now fuzzy matching
auto& err_ds = ds3.renameColumn("secund", "+1");
EXPECT_TRUE(err_ds.isError());
}
32 changes: 32 additions & 0 deletions tuplex/utils/include/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ namespace std {
}
#endif

#include "third_party/levenshtein-sse.h"

// helper code to allow tuples in maps.
#include <boost/functional/hash.hpp>
Expand Down Expand Up @@ -299,6 +300,37 @@ namespace tuplex {
return std::to_string(i) + "th";
}

/*!
* match needle against dictionary and find closest match based on Levenshtein distance
* @param needle string
* @param dictionary strings
* @return position in dictionary, or -1 in error case
*/
inline int fuzzyMatch(const std::string& needle, const std::vector<std::string>& dictionary) {
using namespace std;
using namespace levenshteinSSE;

if(dictionary.empty())
return -1;

if(dictionary.size() == 1)
return 0;

int best_index = 0;
int lowest_score = numeric_limits<int>::max();
for(int i = 1; i < dictionary.size(); ++i) {
const auto& word = dictionary[i];
auto score = levenshtein(needle, word);

if(score < lowest_score) {
best_index = i;
lowest_score = score;
}
}
return best_index;
}


template<typename T> std::ostream &operator <<(std::ostream &os, const std::vector<T> &v) {
using namespace std;

Expand Down
Loading