diff --git a/tuplex/python/src/PythonContext.cc b/tuplex/python/src/PythonContext.cc index cb126dd80..3d9645c1e 100644 --- a/tuplex/python/src/PythonContext.cc +++ b/tuplex/python/src/PythonContext.cc @@ -211,9 +211,22 @@ namespace tuplex { // first get how many bytes are required size_t requiredBytes = baseRequiredBytes; if(varLenField) { - for(int j = 0; j < numTupleElements; ++j) - if(typeStr[j] == 's') - requiredBytes += PyUnicode_GET_SIZE(PyTuple_GetItem(obj, j)) + 1; // +1 for '\0' + bool nonConforming = false; + for(int j = 0; j < numTupleElements; ++j) { + if (typeStr[j] == 's') { + auto tupleItem = PyTuple_GetItem(obj, j); + if (PyUnicode_Check(tupleItem)) { + requiredBytes += PyUnicode_GET_SIZE(tupleItem) + 1; // +1 for '\0' + } else { + nonConforming = true; + break; + } + } + } + if (nonConforming) { + _badParallelizeObjects.emplace_back(i, obj); + continue; + } } // get new partition if capacity exhausted diff --git a/tuplex/test/wrappers/WrapperTest.cc b/tuplex/test/wrappers/WrapperTest.cc index 00603c5eb..9df251b0d 100644 --- a/tuplex/test/wrappers/WrapperTest.cc +++ b/tuplex/test/wrappers/WrapperTest.cc @@ -52,6 +52,90 @@ TEST_F(WrapperTest, LambdaBackend) { // Important detail: RAII of boost python requires call to all boost::python destructors before closing the interpreter. +TEST_F(WrapperTest, StringTuple) { + using namespace tuplex; + + PythonContext c(""); + + PyObject *listObj = PyList_New(4); + PyObject *tupleObj1 = PyTuple_New(2); + PyTuple_SET_ITEM(tupleObj1, 0, python::PyString_FromString("a")); + PyTuple_SET_ITEM(tupleObj1, 1, python::PyString_FromString("a")); + + PyObject *tupleObj2 = PyTuple_New(2); + PyTuple_SET_ITEM(tupleObj2, 0, python::PyString_FromString("b")); + PyTuple_SET_ITEM(tupleObj2, 1, python::PyString_FromString("b")); + + PyObject *tupleObj3 = PyTuple_New(2); + PyTuple_SET_ITEM(tupleObj3, 0, python::PyString_FromString("c")); + PyTuple_SET_ITEM(tupleObj3, 1, python::PyString_FromString("c")); + + PyObject *tupleObj4 = PyTuple_New(2); + PyTuple_SET_ITEM(tupleObj4, 0, python::PyString_FromString("d")); + PyTuple_SET_ITEM(tupleObj4, 1, python::PyString_FromString("d")); + + PyList_SetItem(listObj, 0, tupleObj1); + PyList_SetItem(listObj, 1, tupleObj2); + PyList_SetItem(listObj, 2, tupleObj3); + PyList_SetItem(listObj, 3, tupleObj4); + + { + auto list = boost::python::list(boost::python::handle<>(listObj)); + + auto res = c.parallelize(list).map("lambda x: x", "").collect(); + + auto resObj = res.ptr(); + + ASSERT_TRUE(PyList_Check(resObj)); + // Change to 4 when parallelize changes are merged + ASSERT_EQ(PyList_GET_SIZE(resObj), 4); + + PyObject_Print(resObj, stdout, 0); + } +} + +TEST_F(WrapperTest, MixedSimpleTupleTuple) { + using namespace tuplex; + + PythonContext c(""); + + PyObject *listObj = PyList_New(4); + PyObject *tupleObj1 = PyTuple_New(2); + PyTuple_SET_ITEM(tupleObj1, 0, python::PyString_FromString("a")); + PyTuple_SET_ITEM(tupleObj1, 1, PyLong_FromLong(1)); + + PyObject *tupleObj2 = PyTuple_New(2); + PyTuple_SET_ITEM(tupleObj2, 0, python::PyString_FromString("b")); + PyTuple_SET_ITEM(tupleObj2, 1, PyLong_FromLong(2)); + + PyObject *tupleObj3 = PyTuple_New(2); + PyTuple_SET_ITEM(tupleObj3, 0, python::PyString_FromString("c")); + PyTuple_SET_ITEM(tupleObj3, 1, PyLong_FromLong(3)); + + PyObject *tupleObj4 = PyTuple_New(2); + PyTuple_SET_ITEM(tupleObj4, 0, Py_None); + PyTuple_SET_ITEM(tupleObj4, 1, PyLong_FromLong(4)); + + PyList_SetItem(listObj, 0, tupleObj1); + PyList_SetItem(listObj, 1, tupleObj2); + PyList_SetItem(listObj, 2, tupleObj3); + PyList_SetItem(listObj, 3, tupleObj4); + + { + auto list = boost::python::list(boost::python::handle<>(listObj)); + + auto res = c.parallelize(list).collect(); + + auto resObj = res.ptr(); + + ASSERT_TRUE(PyList_Check(resObj)); + // Change to 4 when parallelize changes are merged + ASSERT_EQ(PyList_GET_SIZE(resObj), 3); + + PyObject_Print(resObj, stdout, 0); + } +} + TEST_F(WrapperTest, StringParallelize) { using namespace tuplex;