diff --git a/tuplex/adapters/cpython/include/PythonHelpers.h b/tuplex/adapters/cpython/include/PythonHelpers.h index bf61c8e9b..e332f5f83 100644 --- a/tuplex/adapters/cpython/include/PythonHelpers.h +++ b/tuplex/adapters/cpython/include/PythonHelpers.h @@ -257,17 +257,19 @@ namespace python { /*! * converts a python object to tuplex object, obj must be not null. * @param obj if it can not be mapped to a tuplex type, stored as PYOBJECT (cloudpickled). + * @param autoUpcast whether to upcast numeric types to a unified type when type conflicts, false by default * @return C++ Tuplex Field object */ - extern tuplex::Field pythonToField(PyObject* obj); + extern tuplex::Field pythonToField(PyObject *obj, bool autoUpcast=false); /*! * converts python object to Row using row type supplied in type. * @param obj * @param type specify what type of objects should be serialized, may contain options. + * @param autoUpcast whether to upcast numeric types to a unified type when type conflicts, false by default * @return Tuplex C++ row object. */ - extern tuplex::Row pythonToRow(PyObject *obj, const python::Type &type); + extern tuplex::Row pythonToRow(PyObject *obj, const python::Type &type, bool autoUpcast=false); /*! * converts a Tuplex C++ object to a python object @@ -318,10 +320,11 @@ namespace python { /*! * get corresponding tuplex type for python object - * @param o - * @return + * @param o python object to map to Tuplex type + * @param autoUpcast whether to upcast numeric types to a unified type when type conflicts, false by default + * @return internal Tuplex type corresponding to given python object. */ - extern python::Type mapPythonClassToTuplexType(PyObject *o); + extern python::Type mapPythonClassToTuplexType(PyObject *o, bool autoUpcast=false); /*! * Tuplex's python API provides a paramter to (optionally) specify a schema, this functions decodes that PyObject @@ -391,4 +394,4 @@ namespace python { } -#endif //TUPLEX_PYTHONHELPERS_H \ No newline at end of file +#endif //TUPLEX_PYTHONHELPERS_H diff --git a/tuplex/adapters/cpython/include/PythonSerializer.h b/tuplex/adapters/cpython/include/PythonSerializer.h index 61f82c793..8d89c1b79 100644 --- a/tuplex/adapters/cpython/include/PythonSerializer.h +++ b/tuplex/adapters/cpython/include/PythonSerializer.h @@ -26,10 +26,13 @@ namespace tuplex { * @param nextptr ptr position after deserialization * @return bool for if deserialization was successful or not */ - extern bool fromSerializedMemory(const uint8_t *ptr, int64_t capacity, const Schema &schema, PyObject **obj, + extern bool fromSerializedMemory(const uint8_t *ptr, + size_t capacity, + const Schema &schema, + PyObject **obj, const uint8_t **nextptr = nullptr); } } -#endif //TUPLEX_PYTHONSERIALIZER_H \ No newline at end of file +#endif //TUPLEX_PYTHONSERIALIZER_H diff --git a/tuplex/adapters/cpython/include/PythonSerializer_private.h b/tuplex/adapters/cpython/include/PythonSerializer_private.h index 2dde0b9d2..e7bba4cfb 100644 --- a/tuplex/adapters/cpython/include/PythonSerializer_private.h +++ b/tuplex/adapters/cpython/include/PythonSerializer_private.h @@ -21,28 +21,38 @@ namespace tuplex { * Creates Python object from raw memory (deserialize) * @param ptr memory location to where serialized data is * @param row_type holding information on types of input memory + * @param capacity size of buffer * @param bitmap pointer to bitmap (i.e. multiple 64bit blocks) * @param index of the element within the bitmap * @return Python object holding deserialized elements */ - PyObject *createPyObjectFromMemory(const uint8_t *ptr, const python::Type &row_type, const uint8_t *bitmap = nullptr, int index = 0); + PyObject* createPyObjectFromMemory(const uint8_t *ptr, const python::Type &row_type, size_t capacity, + const uint8_t *bitmap = nullptr, unsigned index = 0); /*! * Creates Python tuple object from raw memory (deserialize) * @param ptr memory location to where serialized data is * @param row_type holding information on types of input memory + * @param capacity size of buffer * @return Python object holding deserialized elements */ - PyObject *createPyTupleFromMemory(const uint8_t *ptr, const python::Type &row_type); + PyObject *createPyTupleFromMemory(const uint8_t *ptr, const python::Type &row_type, size_t capacity); PyObject *createPyDictFromMemory(const uint8_t *ptr); - PyObject *createPyListFromMemory(const uint8_t *ptr, const python::Type &row_type); + /*! + * Creates Python list object from raw memory (deserialize) + * @param ptr memory location to where serialized data is + * @param row_type holding information on types of input memory + * @param capacity size of buffer + * @return Python object holding deserialized elements + */ + PyObject *createPyListFromMemory(const uint8_t *ptr, const python::Type &row_type, size_t capacity); /*! * Checks if capacity for buffer with schema is valid (if it is possible for buffer to hold such data given schema) * @param ptr memory location to where serialized data is - * @param capacity size of buffer + * @param capacity size of buffer, negative values are invalid. * @param row_type holding information on types of input memory * @return bool for if capacity is valid or not */ @@ -55,10 +65,19 @@ namespace tuplex { * @param row_type holding information on types of input memory * @return -1 if invalid, size of serialized data if valid */ - int64_t checkTupleCapacity(const uint8_t *ptr, int64_t capacity, const python::Type &row_type); + int64_t checkTupleCapacity(const uint8_t *ptr, size_t capacity, const python::Type &row_type); + + /*! + * map bitmap of the object at ptr to a vector with numElements true/false values + * @param objectType current object type that contains optional value + * @param ptr memory location to where the start of bitmap + * @param numElements number of elements in objectType for which bitmap is needed + * @return vector of booleans representing a bitmap indicating whether element is null (true) or not (false). + */ + std::vector getBitmapFromType(const python::Type &objectType, const uint8_t *&ptr, size_t numElements); } } -#endif //TUPLEX_PYTHONSERIALIZER_PRIVATE_H \ No newline at end of file +#endif //TUPLEX_PYTHONSERIALIZER_PRIVATE_H diff --git a/tuplex/adapters/cpython/src/PythonHelpers.cc b/tuplex/adapters/cpython/src/PythonHelpers.cc index 9c05a65ca..388d0ff7a 100644 --- a/tuplex/adapters/cpython/src/PythonHelpers.cc +++ b/tuplex/adapters/cpython/src/PythonHelpers.cc @@ -473,7 +473,7 @@ namespace python { return f; } - tuplex::Field pythonToField(PyObject* obj) { + tuplex::Field pythonToField(PyObject *obj, bool autoUpcast) { using namespace tuplex; using namespace std; @@ -498,7 +498,7 @@ namespace python { vector v; v.reserve(numElements); for(unsigned i = 0; i < numElements; ++i) { - v.push_back(pythonToField(PyTuple_GetItem(obj, i))); + v.push_back(pythonToField(PyTuple_GetItem(obj, i), autoUpcast)); } return Field(Tuple::from_vector(v)); } else if(PyBool_Check(obj)) { // important to call this before isinstance long since isinstance also return long for bool @@ -529,7 +529,7 @@ namespace python { if(PyDict_Size(obj) == 0) return Field::empty_dict(); - auto dictType = mapPythonClassToTuplexType(obj); + auto dictType = mapPythonClassToTuplexType(obj, autoUpcast); std::string dictStr; PyObject *key = nullptr, *val = nullptr; Py_ssize_t pos = 0; // must be initialized to 0 to start iteration, however internal iterator variable. Don't use semantically. @@ -537,7 +537,7 @@ namespace python { while(PyDict_Next(obj, &pos, &key, &val)) { // create key auto keyStr = PyString_AsString(PyObject_Str(key)); - auto keyType = mapPythonClassToTuplexType(key); + auto keyType = mapPythonClassToTuplexType(key, autoUpcast); python::Type valType; // create value, mimicking cJSON printing standards @@ -625,7 +625,7 @@ namespace python { vector v; v.reserve(numElements); for(unsigned i = 0; i < numElements; ++i) { - v.push_back(pythonToField(PyList_GET_ITEM(obj, i))); + v.push_back(pythonToField(PyList_GET_ITEM(obj, i), autoUpcast)); } return Field(List::from_vector(v)); } else if(obj == Py_None) { @@ -660,9 +660,10 @@ namespace python { * converts object to field of specified type. * @param obj * @param type + * @param autoUpcast whether to upcast numeric types to a unified type when type conflicts, false by default * @return Field object */ - tuplex::Field pythonToField(PyObject *obj, const python::Type &type) { + tuplex::Field pythonToField(PyObject *obj, const python::Type &type, bool autoUpcast=false) { assert(obj); // TODO: check assumptions about whether nonempty tuple can be an option @@ -671,8 +672,15 @@ namespace python { return tuplex::Field::null(type); } else { tuplex::Field f; - f = pythonToField(obj); - f = fieldCastTo(f, type.getReturnType()); + auto rtType = type.getReturnType(); + if(rtType.isListType() || rtType.isTupleType()) { + // type still needed to correctly construct field + f = pythonToField(obj, rtType, autoUpcast); + } else { + // simple types + f = pythonToField(obj, autoUpcast); + f = autoUpcast? fieldCastTo(f, type.getReturnType()) : f; + } f.makeOptional(); return f; } @@ -682,19 +690,30 @@ namespace python { std::vector v; for(unsigned i = 0; i < numElements; ++i) { - v.push_back(pythonToField(PyTuple_GetItem(obj, i), type.parameters()[i])); + v.push_back(pythonToField(PyTuple_GetItem(obj, i), type.parameters()[i], autoUpcast)); } return tuplex::Field(tuplex::Tuple::from_vector(v)); + } else if(type.isListType() && type != python::Type::EMPTYLIST) { + auto numElements = PyList_Size(obj); + auto elementType = type.elementType(); + std::vector v; + v.reserve(numElements); + for(unsigned i = 0; i < numElements; ++i) { + auto currListItem = PyList_GetItem(obj, i); + v.push_back(pythonToField(currListItem, elementType, autoUpcast)); + Py_IncRef(currListItem); + } + return tuplex::Field(tuplex::List::from_vector(v)); } else { - auto f = pythonToField(obj); - return fieldCastTo(f, type); + auto f = pythonToField(obj, autoUpcast); + return autoUpcast? fieldCastTo(f, type) : f; } } - tuplex::Row pythonToRow(PyObject *obj, const python::Type &type) { + tuplex::Row pythonToRow(PyObject *obj, const python::Type &type, bool autoUpcast) { assert(obj); - tuplex::Field f = pythonToField(obj, type); + tuplex::Field f = pythonToField(obj, type, autoUpcast); // unpack the tuples one level if(f.getType().isTupleType() && f.getType() != python::Type::EMPTYTUPLE) { @@ -1365,7 +1384,7 @@ namespace python { } // mapping type to internal types, unknown as default - python::Type mapPythonClassToTuplexType(PyObject *o) { + python::Type mapPythonClassToTuplexType(PyObject *o, bool autoUpcast) { if(Py_None == o) return python::Type::NULLVALUE; @@ -1393,7 +1412,7 @@ namespace python { for(int j = 0; j < numElements; j++) { auto item = PyTuple_GET_ITEM(o, j); // borrowed reference assert(item->ob_refcnt > 0); // important!!! - elementTypes.push_back(mapPythonClassToTuplexType(item)); + elementTypes.push_back(mapPythonClassToTuplexType(item, autoUpcast)); } return python::TypeFactory::instance().createOrGetTupleType(elementTypes); } @@ -1410,8 +1429,8 @@ namespace python { Py_ssize_t pos = 0; // must be initialized to 0 to start iteration, however internal iterator variable. Don't use semantically. bool types_set = false; // need extra var here b/c vals could be unknown. while(PyDict_Next(o, &pos, &key, &val)) { - auto curKeyType = mapPythonClassToTuplexType(key); - auto curValType = mapPythonClassToTuplexType(val); + auto curKeyType = mapPythonClassToTuplexType(key, autoUpcast); + auto curValType = mapPythonClassToTuplexType(val, autoUpcast); if(!types_set) { types_set = true; keyType = curKeyType; @@ -1431,13 +1450,18 @@ namespace python { if(numElements == 0) return python::Type::EMPTYLIST; - python::Type elementType = mapPythonClassToTuplexType(PyList_GetItem(o, 0)); + python::Type elementType = mapPythonClassToTuplexType(PyList_GetItem(o, 0), autoUpcast); // verify that all elements have the same type for(int j = 0; j < numElements; j++) { - if(elementType != mapPythonClassToTuplexType(PyList_GetItem(o, j))) { - Logger::instance().defaultLogger().error("lists with variable type elements are not supported."); - return python::Type::UNKNOWN; - // TODO: the general case should return python::Type::PyObject in the future + python::Type currElementType = mapPythonClassToTuplexType(PyList_GetItem(o, j), autoUpcast); + if(elementType != currElementType) { + // possible to use nullable type as element type? + auto newElementType = unifyTypes(elementType, currElementType, autoUpcast); + if (newElementType == python::Type::UNKNOWN) { + Logger::instance().defaultLogger().error("list with variable element type " + elementType.desc() + " and " + currElementType.desc() + " not supported."); + return python::Type::PYOBJECT; + } + elementType = newElementType; } } return python::Type::makeListType(elementType); @@ -1500,6 +1524,23 @@ namespace python { // typing.Optional[str] which is equal to typing.Union[str, NoneType] auto typing_optional = PyDict_GetItemString(typing_dict, "Optional"); assert(typing_optional); + if (t.getReturnType().isTupleType()) { + // https://docs.python.org/3/library/typing.html#typing.Tuple +#if (PY_MAJOR_VERSION >= 3 && PY_MINOR_VERSION >= 9) + // use builtin.tuple[...] + auto builtin_mod = PyImport_AddModule("builtins"); + assert(builtin_mod); + auto builtin_dict = PyModule_GetDict(builtin_mod); + assert(builtin_dict); + auto tuple = PyDict_GetItemString(builtin_dict, "tuple"); + tobj = PyObject_GetItem(tuple, tobj); +#else + // use Tuple[...] + auto typing_tuple = PyDict_GetItemString(typing_dict, "Tuple"); + assert(typing_tuple); + tobj = PyObject_GetItem(typing_tuple, tobj); +#endif + } auto opt_type = PyObject_GetItem(typing_optional, tobj); return opt_type; } diff --git a/tuplex/adapters/cpython/src/PythonSerializer.cc b/tuplex/adapters/cpython/src/PythonSerializer.cc index 621a97db3..868e6cf59 100644 --- a/tuplex/adapters/cpython/src/PythonSerializer.cc +++ b/tuplex/adapters/cpython/src/PythonSerializer.cc @@ -77,7 +77,7 @@ namespace tuplex { return dictObj; } - PyObject *createPyTupleFromMemory(const uint8_t *ptr, const python::Type &row_type) { + PyObject *createPyTupleFromMemory(const uint8_t *ptr, const python::Type &row_type, size_t capacity) { int64_t current_buffer_index = 0; auto tree = tuplex::TupleTree(row_type); @@ -88,7 +88,7 @@ namespace tuplex { std::vector curr; std::vector prev; - int bitmap_index = 0; + unsigned bitmap_index = 0; const uint8_t *bitmap = ptr; auto num_bitmap_fields = core::ceilToMultiple(python::numOptionalFields(row_type), 64ul)/64; ptr += sizeof(int64_t) * num_bitmap_fields; @@ -129,7 +129,19 @@ namespace tuplex { int curr_index = curr.back(); python::Type current_type = tree.fieldType(curr); - PyObject *elem_to_insert = createPyObjectFromMemory(ptr + current_buffer_index, current_type, bitmap, bitmap_index); + PyObject *elem_to_insert = nullptr; + if (current_type.isOptionType() && current_type.getReturnType().isTupleType()) { + // createPyTupleFromMemory requires a ptr to start of the actual tuple data, so need to decode and add offset here + uint64_t offset = *((uint64_t *)(ptr + current_buffer_index)); + assert((uintptr_t)(ptr + current_buffer_index + offset) < capacity); + elem_to_insert = createPyObjectFromMemory(ptr + current_buffer_index + offset, current_type, + capacity, bitmap, bitmap_index); + } else { + // otherwise, simply pass ptr to the current field + elem_to_insert = createPyObjectFromMemory(ptr + current_buffer_index, current_type, capacity, + bitmap, bitmap_index); + } + if(current_type.isOptionType()) bitmap_index++; if (elem_to_insert == nullptr) { return nullptr; @@ -166,7 +178,7 @@ namespace tuplex { return test; } - PyObject *createPyListFromMemory(const uint8_t *ptr, const python::Type &row_type) { + PyObject *createPyListFromMemory(const uint8_t *ptr, const python::Type &row_type, size_t capacity) { assert(row_type.isListType() && row_type != python::Type::EMPTYLIST); auto elementType = row_type.elementType(); if(elementType.isSingleValued()) { @@ -193,9 +205,9 @@ namespace tuplex { // access the field element auto elem = *(uint64_t *) ptr; auto offset = (uint32_t) elem; - auto length = (uint32_t) (elem >> 32ul); // move to varlen field + assert((uintptr_t)(ptr + offset) < capacity); ptr = &ptr[offset]; // get number of elements @@ -203,42 +215,107 @@ namespace tuplex { ptr += sizeof(int64_t); auto ret = PyList_New(numElements); + // get bitmap + std::vector bitmapV = getBitmapFromType(row_type, ptr, numElements); + for(size_t i=0; i(ptr)); + ptr += sizeof(int64_t); } else if(elementType == python::Type::F64) { element = PyFloat_FromDouble(*reinterpret_cast(ptr)); + ptr += sizeof(int64_t); } else if(elementType == python::Type::BOOLEAN) { element = PyBool_FromLong(*reinterpret_cast(ptr)); + ptr += sizeof(int64_t); } else if (elementType == python::Type::STRING) { char *string_errors = nullptr; - auto curStrOffset = *reinterpret_cast(ptr); - int64_t curStrLen; - // string lists are serialized (in the varlen section) as | num elements | offset 1 | ... | offset n | string 1 | ... | string n - // we need to use the offsets to calculate the lengths of the strings - if(i == numElements-1) { - // for the final string, we need to calculate the length by subtracting the offset from the total length of the varlen section (minus the spaces used for the first n-1 offsets and the number of elements) - curStrLen = (length - (numElements*sizeof(int64_t))) - curStrOffset; - } else { - // for any string other than the final string, we calculate the length by taking the difference between consecutive offsets (and accounting for the 8 byte shift in where the offsets start from) - auto nextStrOffset = *reinterpret_cast(ptr+sizeof(int64_t)); - curStrLen = nextStrOffset - (curStrOffset-sizeof(int64_t)); - } - element = PyUnicode_DecodeUTF8(reinterpret_cast(&ptr[curStrOffset]), curStrLen-1, string_errors); + // get offset for string + auto currOffset = *reinterpret_cast(ptr); + assert((uintptr_t)(ptr + currOffset) < capacity); + auto currStr = reinterpret_cast(&ptr[currOffset]); + element = PyUnicode_DecodeUTF8(currStr, (long)(strlen(currStr)), string_errors); + ptr += sizeof(int64_t); } else if(elementType.isTupleType()) { - element = createPyTupleFromMemory(ptr, elementType); + auto currOffset = *(uint64_t *)ptr; + assert((uintptr_t)(ptr + currOffset) < capacity); + element = createPyTupleFromMemory(ptr + currOffset, elementType, capacity); + ptr += sizeof(int64_t); + } else if(elementType.isListType()) { + element = createPyListFromMemory(ptr, elementType, capacity); + ptr += sizeof(int64_t); } else if(elementType.isDictionaryType()) { element = createPyDictFromMemory(ptr); + ptr += sizeof(int64_t); + } else if(elementType.isOptionType()) { + auto underlyingType = elementType.getReturnType(); + if(underlyingType == python::Type::BOOLEAN) { + if(bitmapV[i]) { + Py_XINCREF(Py_None); + element = Py_None; + } else { + element = PyBool_FromLong(*reinterpret_cast(ptr)); + ptr += sizeof(int64_t); + } + } else if(underlyingType == python::Type::I64) { + if(bitmapV[i]) { + Py_XINCREF(Py_None); + element = Py_None; + } else { + element = PyLong_FromLong(*reinterpret_cast(ptr)); + ptr += sizeof(int64_t); + } + } else if(underlyingType == python::Type::F64) { + if(bitmapV[i]) { + Py_XINCREF(Py_None); + element = Py_None; + } else { + element = PyFloat_FromDouble(*reinterpret_cast(ptr)); + ptr += sizeof(int64_t); + } + } else if(underlyingType == python::Type::STRING) { + if(bitmapV[i]) { + Py_XINCREF(Py_None); + element = Py_None; + } else { + char *string_errors = nullptr; + // get offset for string + auto currOffset = *reinterpret_cast(ptr); + assert((uintptr_t)(ptr + currOffset) < capacity); + auto currStr = reinterpret_cast(&ptr[currOffset]); + element = PyUnicode_DecodeUTF8(currStr, (long)(strlen(currStr)), string_errors); + ptr += sizeof(int64_t); + } + } else if(underlyingType.isListType()) { + if(bitmapV[i]) { + Py_XINCREF(Py_None); + element = Py_None; + } else { + element = createPyListFromMemory(ptr, underlyingType, capacity); + ptr += sizeof(int64_t); + } + } else if(underlyingType.isTupleType()) { + if(bitmapV[i]) { + Py_XINCREF(Py_None); + element = Py_None; + } else { + uint64_t currOffset = *((uint64_t *)(ptr)); + assert((uintptr_t)(ptr + currOffset) < capacity); + element = createPyTupleFromMemory(ptr + currOffset, underlyingType, capacity); + ptr += sizeof(int64_t); + } + } else throw std::runtime_error("Invalid list type: " + row_type.desc()); } else throw std::runtime_error("Invalid list type: " + row_type.desc()); PyList_SET_ITEM(ret, i, element); - ptr += sizeof(int64_t); } return ret; } } - PyObject *createPyObjectFromMemory(const uint8_t *ptr, const python::Type &row_type, const uint8_t *bitmap, int index) { + PyObject * + createPyObjectFromMemory(const uint8_t *ptr, const python::Type &row_type, size_t capacity, + const uint8_t *bitmap, unsigned index) { if (row_type == python::Type::BOOLEAN) { return PyBool_FromLong(ptr[0]); } else if (row_type == python::Type::I64) { @@ -249,13 +326,14 @@ namespace tuplex { auto elem = *(uint64_t *) (ptr); auto offset = (uint32_t) elem; auto length = (uint32_t) (elem >> 32ul); + assert((uintptr_t)(ptr + offset + length) < capacity); auto str = reinterpret_cast(&ptr[offset]); char *string_errors = nullptr; return PyUnicode_DecodeUTF8(str, length - 1, string_errors); } else if (row_type == python::Type::EMPTYTUPLE) { return PyTuple_New(0); } else if (row_type.isTupleType()) { - return createPyTupleFromMemory(ptr, row_type); + return createPyTupleFromMemory(ptr, row_type, capacity); } else if (row_type == python::Type::EMPTYDICT) { return PyDict_New(); } else if (row_type.isDictionaryType() || row_type == python::Type::GENERICDICT) { @@ -263,10 +341,12 @@ namespace tuplex { } else if(row_type == python::Type::EMPTYLIST) { return PyList_New(0); } else if(row_type.isListType()) { - return createPyListFromMemory(ptr, row_type); + return createPyListFromMemory(ptr, row_type, capacity); } else if(row_type.isOptionType()) { // TODO: should this be [isOptional()]? + bool singleValue = false; if(!bitmap) { // If bitmap was null, this means that it was a single value, not part of a tuple + singleValue = true; bitmap = ptr; index = 0; ptr += (sizeof(uint64_t)/sizeof(uint8_t)); @@ -278,12 +358,20 @@ namespace tuplex { } auto t = row_type.getReturnType(); - return createPyObjectFromMemory(ptr, t); + if (t.isTupleType() && singleValue) { + // offset exists + uint64_t sizeOffset = *((uint64_t *)ptr); + uint64_t offset = sizeOffset & 0xFFFFFFFF; + assert((uintptr_t)(ptr + offset) < capacity); + ptr += offset; + } + return createPyObjectFromMemory(ptr, t, capacity); } else if(row_type == python::Type::PYOBJECT) { // cloudpickle, deserialize auto elem = *(uint64_t *) (ptr); auto offset = (uint32_t) elem; auto buf_size = (uint32_t) (elem >> 32ul); + assert((uintptr_t)(ptr + offset + buf_size) < capacity); auto buf = reinterpret_cast(&ptr[offset]); return python::deserializePickledObject(python::getMainModule(), buf, buf_size); } else { @@ -295,7 +383,7 @@ namespace tuplex { return Py_None; } - int64_t checkTupleCapacity(const uint8_t *ptr, int64_t capacity, const python::Type &row_type) { + int64_t checkTupleCapacity(const uint8_t *ptr, size_t capacity, const python::Type &row_type) { auto tree = tuplex::TupleTree(row_type); auto indices = tree.getMultiIndices(); auto num_bytes = static_cast(indices.size() * sizeof(int64_t)); @@ -316,7 +404,7 @@ namespace tuplex { return num_bytes; } - int64_t serializationSize(const uint8_t *ptr, int64_t capacity, const python::Type &row_type) { + int64_t serializationSize(const uint8_t *ptr, size_t capacity, const python::Type &row_type) { // should be identical to Deserializer.inferlength... @@ -377,7 +465,7 @@ namespace tuplex { // TODO: check for errors when creating PyObjects - bool fromSerializedMemory(const uint8_t *ptr, int64_t capacity, const tuplex::Schema &schema, PyObject **obj, + bool fromSerializedMemory(const uint8_t *ptr, size_t capacity, const tuplex::Schema &schema, PyObject **obj, const uint8_t **nextptr) { python::Type row_type = schema.getRowType(); @@ -390,12 +478,30 @@ namespace tuplex { // return false; // } - *obj = createPyObjectFromMemory(ptr, row_type); + *obj = createPyObjectFromMemory(ptr, row_type, capacity); if (nextptr) { *nextptr = ptr + serializationSize(ptr, capacity, row_type); } return *obj != nullptr; } + + std::vector getBitmapFromType(const python::Type &objectType, const uint8_t *&ptr, size_t numElements) { + std::vector bitmapV; + bitmapV.reserve(numElements); + if (objectType.isListType()) { + if (objectType.elementType().isOptionType()) { + auto numBitmapFields = core::ceilToMultiple((unsigned long)numElements, 64ul) / 64; + auto bitmapSize = numBitmapFields * sizeof(uint64_t); + auto *bitmapAddr = (uint64_t *)ptr; + ptr += bitmapSize; + for (size_t i = 0; i < numElements; i++) { + bool currBit = (bitmapAddr[i / 64] >> (i % 64)) & 0x1; + bitmapV.push_back(currBit); + } + } + } + return bitmapV; + } } -} \ No newline at end of file +} diff --git a/tuplex/codegen/include/TypeAnnotatorVisitor.h b/tuplex/codegen/include/TypeAnnotatorVisitor.h index cbf564f29..922c5f1e2 100644 --- a/tuplex/codegen/include/TypeAnnotatorVisitor.h +++ b/tuplex/codegen/include/TypeAnnotatorVisitor.h @@ -19,55 +19,6 @@ #include namespace tuplex { - inline python::Type unifyTypes(const python::Type& a, const python::Type& b, bool allowNumbers) { - using namespace std; - if(a == b) - return a; - - if(!allowNumbers) { - // only NULL to any or element and option type allowed - if (a == python::Type::NULLVALUE) - return python::Type::makeOptionType(b); - if (b == python::Type::NULLVALUE) - return python::Type::makeOptionType(a); - - // one is option type, the other not but the elementtype of the option type! - if (a.isOptionType() && !b.isOptionType() && a.elementType() == b) - return a; - if (b.isOptionType() && !a.isOptionType() && b.elementType() == a) - return b; - } else { - auto t = python::Type::superType(a, b); - if(t != python::Type::UNKNOWN) - return t; - } - - // tuples, lists, dicts... - if(a.isTupleType() && b.isTupleType() && a.parameters().size() == b.parameters().size()) { - vector v; - for(unsigned i = 0; i < a.parameters().size(); ++i) { - v.push_back(unifyTypes(a.parameters()[i], b.parameters()[i], allowNumbers)); - if(v.back() == python::Type::UNKNOWN) - return python::Type::UNKNOWN; - } - return python::Type::makeTupleType(v); - } - - if(a.isListType() && b.isListType()) { - auto el = unifyTypes(a.elementType(), b.elementType(), allowNumbers); - if(el == python::Type::UNKNOWN) - return python::Type::UNKNOWN; - return python::Type::makeListType(el); - } - - if(a.isDictionaryType() && b.isDictionaryType()) { - auto key_t = unifyTypes(a.keyType(), b.keyType(), allowNumbers); - auto val_t = unifyTypes(a.valueType(), b.valueType(), allowNumbers); - if(key_t != python::Type::UNKNOWN && val_t != python::Type::UNKNOWN) - return python::Type::makeDictionaryType(key_t, val_t); - } - return python::Type::UNKNOWN; - } class TypeAnnotatorVisitor : public ApatheticVisitor, public IFailable { private: diff --git a/tuplex/codegen/src/BlockGeneratorVisitor.cc b/tuplex/codegen/src/BlockGeneratorVisitor.cc index 50ffbb379..ef447cc72 100644 --- a/tuplex/codegen/src/BlockGeneratorVisitor.cc +++ b/tuplex/codegen/src/BlockGeneratorVisitor.cc @@ -5048,7 +5048,7 @@ namespace tuplex { // if not, error. Type annotation failed then! auto& slot = it->second; - auto uni_type = unifyTypes(slot.type, var.second.type, allowNumericUpcasting); + auto uni_type = python::unifyTypes(slot.type, var.second.type, allowNumericUpcasting); if(uni_type == python::Type::UNKNOWN) { error("variable " + name + " declared in " + branch_name + " with type " + var.second.type.desc() + " conflicts with slot type" + diff --git a/tuplex/codegen/src/TypeAnnotatorVisitor.cc b/tuplex/codegen/src/TypeAnnotatorVisitor.cc index 73be0346f..dd19474e7 100644 --- a/tuplex/codegen/src/TypeAnnotatorVisitor.cc +++ b/tuplex/codegen/src/TypeAnnotatorVisitor.cc @@ -93,7 +93,8 @@ namespace tuplex { // go through all func types, and check whether they can be unified. auto combined_ret_type = _funcReturnTypes.front(); for(int i = 1; i < _funcReturnTypes.size(); ++i) - combined_ret_type = unifyTypes(combined_ret_type, _funcReturnTypes[i], _policy.allowNumericTypeUnification); + combined_ret_type = python::unifyTypes(combined_ret_type, _funcReturnTypes[i], + _policy.allowNumericTypeUnification); if(combined_ret_type == python::Type::UNKNOWN) { @@ -122,7 +123,8 @@ namespace tuplex { auto best_so_far = std::get<0>(v.front()); for(int i = 1; i < v.size(); ++i) { - auto u_type = unifyTypes(best_so_far, std::get<0>(v[i]), _policy.allowNumericTypeUnification); + auto u_type = python::unifyTypes(best_so_far, std::get<0>(v[i]), + _policy.allowNumericTypeUnification); if(u_type != python::Type::UNKNOWN) best_so_far = u_type; } @@ -158,7 +160,8 @@ namespace tuplex { if(n.getInferredType() == python::Type::UNKNOWN) // i.e. code that is never visited return; - auto uni_type = unifyTypes(n.getInferredType(), combined_ret_type, autoUpcast); + auto uni_type = python::unifyTypes(n.getInferredType(), combined_ret_type, + autoUpcast); if(uni_type != python::Type::UNKNOWN) n.setInferredType(combined_ret_type); }); @@ -1290,7 +1293,7 @@ namespace tuplex { if(_nameTable.find(name) != _nameTable.end()) { if(_nameTable[name] != type) { // can we unify types? - auto uni_type = unifyTypes(type, _nameTable[name], _policy.allowNumericTypeUnification); + auto uni_type = python::unifyTypes(type, _nameTable[name], _policy.allowNumericTypeUnification); if(uni_type != python::Type::UNKNOWN) _nameTable[name] = uni_type; else { @@ -1327,7 +1330,7 @@ namespace tuplex { if(if_type != else_type) { // check if they can be unified - auto uni_type = unifyTypes(if_type, else_type, _policy.allowNumericTypeUnification); + auto uni_type = python::unifyTypes(if_type, else_type, _policy.allowNumericTypeUnification); if(uni_type != python::Type::UNKNOWN) { if_table[name] = uni_type; else_table[name] = else_type; @@ -1474,7 +1477,7 @@ namespace tuplex { if(ifelse->isExpression()) { // check: if (iftype != elsetype) { - auto combined_type = unifyTypes(iftype, elsetype, _policy.allowNumericTypeUnification); + auto combined_type = python::unifyTypes(iftype, elsetype, _policy.allowNumericTypeUnification); if(combined_type == python::Type::UNKNOWN) error("could not combine type " + iftype.desc() + " of if-branch with type " + elsetype.desc() + " of else-branch in if-else expression"); diff --git a/tuplex/core/src/HybridHashTable.cc b/tuplex/core/src/HybridHashTable.cc index 42212951e..e7af2b4a5 100644 --- a/tuplex/core/src/HybridHashTable.cc +++ b/tuplex/core/src/HybridHashTable.cc @@ -35,7 +35,7 @@ namespace tuplex { // get type of key => is elementType? => fetch from internal hashmap. // else, use python dict - auto key_type = python::mapPythonClassToTuplexType(key); + auto key_type = python::mapPythonClassToTuplexType(key, false); PyObject* ret_list = nullptr; @@ -160,7 +160,7 @@ namespace tuplex { return -1; } - auto key_type = python::mapPythonClassToTuplexType(key); + auto key_type = python::mapPythonClassToTuplexType(key, false); // could be direct key_type == hmElementType comparison, // yet, let's be lazy so objects can be properly extracted... @@ -256,8 +256,8 @@ namespace tuplex { } // decoce types of both key and val - auto key_type = python::mapPythonClassToTuplexType(key); - auto val_type = python::mapPythonClassToTuplexType(value); + auto key_type = python::mapPythonClassToTuplexType(key, false); + auto val_type = python::mapPythonClassToTuplexType(value, false); // @TODO: upcasting b.c. of NVO! @@ -273,7 +273,7 @@ namespace tuplex { // serialize content of value auto bucket_type = this->hmBucketType; - auto bucket_row = python::pythonToRow(value, bucket_type); + auto bucket_row = python::pythonToRow(value, bucket_type, false); // serialize to buffer auto buf_length = bucket_row.serializedLength(); auto buf = new uint8_t [buf_length + 32]; // some security bytes diff --git a/tuplex/core/src/TraceVisitor.cc b/tuplex/core/src/TraceVisitor.cc index 57b20e5e4..11670d649 100644 --- a/tuplex/core/src/TraceVisitor.cc +++ b/tuplex/core/src/TraceVisitor.cc @@ -325,7 +325,7 @@ namespace tuplex { // record input types for schema inference! std::vector types; for(auto a : extractedArgs) { - types.emplace_back(python::mapPythonClassToTuplexType(a)); + types.emplace_back(python::mapPythonClassToTuplexType(a, false)); } _colTypes.emplace_back(types); } else throw std::runtime_error("no nested functions supported in tracer yet!"); @@ -372,7 +372,7 @@ namespace tuplex { // @TODO: add annotation object (ptr) to astnodes! // record type - auto retType = python::mapPythonClassToTuplexType(_retValue.value); + auto retType = python::mapPythonClassToTuplexType(_retValue.value, false); if(retType.isTupleType() && !retType.parameters().empty()) { _retColTypes.emplace_back(retType.parameters()); } else { @@ -553,7 +553,7 @@ namespace tuplex { _evalStack.pop_back(); // record type - auto retType = python::mapPythonClassToTuplexType(_retValue.value); + auto retType = python::mapPythonClassToTuplexType(_retValue.value, false); if(retType.isTupleType() && !retType.parameters().empty()) { _retColTypes.emplace_back(retType.parameters()); } else { @@ -907,7 +907,7 @@ namespace tuplex { node->annotation().numTimesVisited++; // translate type - node->annotation().types.push_back(python::mapPythonClassToTuplexType(item.value)); + node->annotation().types.push_back(python::mapPythonClassToTuplexType(item.value, false)); } // add to instruction stack. diff --git a/tuplex/core/src/physical/ResolveTask.cc b/tuplex/core/src/physical/ResolveTask.cc index 6ae6723f0..23ba34d26 100644 --- a/tuplex/core/src/physical/ResolveTask.cc +++ b/tuplex/core/src/physical/ResolveTask.cc @@ -594,7 +594,7 @@ namespace tuplex { continue; } - auto rowType = python::mapPythonClassToTuplexType(rowObj); + auto rowType = python::mapPythonClassToTuplexType(rowObj, false); // special case output schema is str (fileoutput!) if(rowType == python::Type::STRING) { @@ -1120,7 +1120,7 @@ namespace tuplex { switch(_hash_agg_type) { case AggregateType::AGG_UNIQUE: { - auto rowType = python::mapPythonClassToTuplexType(rowObject); + auto rowType = python::mapPythonClassToTuplexType(rowObject, false); // special case: Is it single element column? => this is the only supported right now for unique... if(rowType.parameters().size() == 1) { diff --git a/tuplex/python/include/PythonContext.h b/tuplex/python/include/PythonContext.h index 66e87523b..ef4b1d9c2 100644 --- a/tuplex/python/include/PythonContext.h +++ b/tuplex/python/include/PythonContext.h @@ -87,9 +87,9 @@ namespace tuplex { DataSet ¶llelizeAnyType(const py::list &L, const python::Type &majType, - const std::vector &columns); + const std::vector &columns, bool autoUpcast); - python::Type inferType(const py::list &L) const; + python::Type inferType(const py::list &L, bool autoUpcast) const; /*! * infer what are the columns in a probabilistic fashion @@ -98,7 +98,7 @@ namespace tuplex { * @return map of column name and most likely type for each */ std::unordered_map - inferColumnsFromDictObjects(const py::list &L, double normalThreshold); + inferColumnsFromDictObjects(const py::list &L, double normalThreshold, bool autoUpcast); inline size_t sampleSize(const py::list &L) const { // sample size to determine how many entries should be scanned to get python types diff --git a/tuplex/python/src/PythonContext.cc b/tuplex/python/src/PythonContext.cc index b70be013a..0e81b667e 100644 --- a/tuplex/python/src/PythonContext.cc +++ b/tuplex/python/src/PythonContext.cc @@ -562,7 +562,8 @@ namespace tuplex { return false; } - DataSet & PythonContext::parallelizeAnyType(const py::list &L, const python::Type &majType, const std::vector& columns) { + DataSet & PythonContext::parallelizeAnyType(const py::list &L, const python::Type &majType, + const std::vector &columns, bool autoUpcast) { auto& logger = Logger::instance().logger("python"); logger.info("using slow transfer to backend"); @@ -587,7 +588,7 @@ namespace tuplex { auto firstRow = PyList_GET_ITEM(listObj, 0); Py_XINCREF(firstRow); - schema = Schema(Schema::MemoryLayout::ROW, python::pythonToRow(firstRow, majType).getRowType()); + schema = Schema(Schema::MemoryLayout::ROW, python::pythonToRow(firstRow, majType, autoUpcast).getRowType()); // create new partition on driver auto driver = _context->getDriver(); @@ -624,11 +625,11 @@ namespace tuplex { // cf. http://www.cse.psu.edu/~gxt29/papers/refcount.pdf Py_XINCREF(item); - python::Type t = python::mapPythonClassToTuplexType(item); - if(isSubOptionType(t, majType)) { + python::Type t = python::mapPythonClassToTuplexType(item, autoUpcast); + if(unifyTypes(majType, t, autoUpcast) == majType) { // In this case, t is a subtype of the majority type; this accounts for the case where the majority type // is an option (e.g. majType=Option[int] should encompass both t=I64 and t=NULLVALUE). - auto row = python::pythonToRow(item, majType); + auto row = python::pythonToRow(item, majType, autoUpcast); auto requiredBytes = row.serializedLength(); if(partition->capacity() < numBytesSerialized + requiredBytes) { @@ -720,7 +721,7 @@ namespace tuplex { } try { - Row row = python::pythonToRow(tupleObj, rowType); + Row row = python::pythonToRow(tupleObj, rowType, false); size_t requiredBytes = row.serializedLength(); // check capacity and realloc if necessary get a new partition if (partition->capacity() < numBytesSerialized + allocMinSize) { @@ -795,13 +796,13 @@ namespace tuplex { if(hasExplicitSchema) { majType = python::decodePythonSchema(schemaObj); } else - majType = inferType(L); + majType = inferType(L, autoUpcast); // special case: majType is a dict with strings as key, i.e. perform String Dict unpacking if(autoUnpack && (majType.isDictionaryType() && majType != python::Type::EMPTYDICT && majType != python::Type::GENERICDICT) && majType.keyType() == python::Type::STRING) { // automatic unpacking! // ==> first check if columns are defined, if not infer columns from sample! - auto dictTypes = inferColumnsFromDictObjects(L, _context->getOptions().NORMALCASE_THRESHOLD()); + auto dictTypes = inferColumnsFromDictObjects(L, _context->getOptions().NORMALCASE_THRESHOLD(), autoUpcast); // are columns empty? ==> keys are new columns, create type out of that! if(columns.empty()) { @@ -843,19 +844,19 @@ namespace tuplex { ds = &fastMixedSimpleTypeTupleTransfer(L.ptr(), majType, columns); } else { // general slow transfer... - ds = ¶llelizeAnyType(L, majType, columns);} + ds = ¶llelizeAnyType(L, majType, columns, autoUpcast);} } else if(majType.isDictionaryType() || majType == python::Type::GENERICDICT) { - ds = ¶llelizeAnyType(L, majType, columns); + ds = ¶llelizeAnyType(L, majType, columns, autoUpcast); } else if(majType.isOptionType()) { // TODO: special case to fast conversion for the option types with fast underlying types - ds = ¶llelizeAnyType(L, majType, columns); + ds = ¶llelizeAnyType(L, majType, columns, autoUpcast); } else if(majType == python::Type::NULLVALUE) { // TODO: special case to fast conversion for the option types with fast underlying types - ds = ¶llelizeAnyType(L, majType, columns); + ds = ¶llelizeAnyType(L, majType, columns, autoUpcast); } else if(majType.isListType()) { - ds = ¶llelizeAnyType(L, majType, columns); + ds = ¶llelizeAnyType(L, majType, columns, autoUpcast); } else if(majType == python::Type::PYOBJECT) { - ds = ¶llelizeAnyType(L, majType, columns); + ds = ¶llelizeAnyType(L, majType, columns, autoUpcast); } else { std::string msg = "unsupported type '" + majType.desc() + "' found, could not transfer data to backend"; Logger::instance().logger("python").error(msg); @@ -896,45 +897,16 @@ namespace tuplex { // If it returns true, it places the "super option" type into the parameter [super]. // For example, t1=int, t2=None -> super = Option[int] // Similarly, t1=(int, none), t2=(none, int) -> super = (Option[int], Option[int]) - bool hasSuperOptionType(python::Type t1, python::Type t2, python::Type &super) { - // same type - if(t1 == t2) { - super = t1; + bool hasSuperOptionType(python::Type t1, python::Type t2, python::Type &super, bool autoUpcast) { + auto newType = unifyTypes(t1, t2, autoUpcast); + if (newType != python::Type::UNKNOWN) { + super = newType; return true; } - if(t1.isOptionType() && (t1.getReturnType() == t2 || python::Type::NULLVALUE == t2)) { - super = t1; - return true; - } - if(t2.isOptionType() && (t2.getReturnType() == t1 || python::Type::NULLVALUE == t1)) { - super = t2; - return true; - } - - // one of them is null - if(t1 == python::Type::NULLVALUE) { - super = python::Type::makeOptionType(t2); - return true; - } - if(t2 == python::Type::NULLVALUE) { - super = python::Type::makeOptionType(t1); - return true; - } - - // both tuples, recurse - if (t1.isTupleType() && t2.isTupleType() && t1.parameters().size() == t2.parameters().size()) { - std::vector types(t1.parameters().size()); - for(int i=0; i &colTypes, int numSamples, double threshold) { + python::Type buildRowTypeFromSamples(const std::map &colTypes, int numSamples, double threshold, bool autoUpcast) { Logger::instance().logger("python").info("inferring type!"); std::map tupleLengthCounter; // count for each length of tuples how often it was seen in the sample @@ -975,7 +947,7 @@ namespace tuplex { int num = 0; // the number of elements that will go under the new type for (const auto &it : colTypes) { // recurse on each of the fields - if(hasSuperOptionType(it.first, superTuple, superTuple)) { + if(hasSuperOptionType(it.first, superTuple, superTuple, autoUpcast)) { num += it.second; } } @@ -990,11 +962,10 @@ namespace tuplex { majType = python::Type::makeOptionType(majType); } } - return majType; } - python::Type PythonContext::inferType(const py::list &L) const { + python::Type PythonContext::inferType(const py::list &L, bool autoUpcast) const { // elements must be either simple objects, i.e. str/int/float // or tuples of simple objects // ==> no support for lists yet!!! @@ -1008,22 +979,39 @@ namespace tuplex { py::object o = L[i]; // describe using internal types - python::Type t = python::mapPythonClassToTuplexType(o.ptr()); - - if(mTypes.find(t) == mTypes.end()) - mTypes[t] = 1; - else + python::Type t = python::mapPythonClassToTuplexType(o.ptr(), autoUpcast); + + if(mTypes.find(t) == mTypes.end()) { + // is there a compatible type + bool foundCompatible = false; + for (auto it = mTypes.begin(); it != mTypes.end(); it++) { + auto newType = unifyTypes(it->first, t, autoUpcast); + if(newType != python::Type::UNKNOWN) { + // update map KEY to compatible type + auto mTypeNew = std::make_pair(newType, it->second + 1); + mTypes.erase(it); + mTypes.insert(mTypeNew); + foundCompatible = true; + break; + } + } + if (!foundCompatible) { + mTypes[t] = 1; + } + } else { mTypes[t] += 1; + } } // be sure to also collapse types to supertypes if possible... if(mTypes.size() > 1) Logger::instance().logger("python").warn("more than one type in column found"); - return buildRowTypeFromSamples(mTypes, numSample, _context->getOptions().OPTIONAL_THRESHOLD()); + return buildRowTypeFromSamples(mTypes, numSample, _context->getOptions().OPTIONAL_THRESHOLD(), autoUpcast); } - std::unordered_map PythonContext::inferColumnsFromDictObjects(const py::list &L, double normalThreshold) { + std::unordered_map + PythonContext::inferColumnsFromDictObjects(const py::list &L, double normalThreshold, bool autoUpcast) { using namespace std; auto& logger = Logger::instance().logger("python"); @@ -1069,7 +1057,7 @@ namespace tuplex { ++i; } - auto type = inferType(py::reinterpret_borrow(listColObj)); + auto type = inferType(py::reinterpret_borrow(listColObj), autoUpcast); m[c.first] = type; } @@ -1113,7 +1101,7 @@ namespace tuplex { auto key = PyTuple_GET_ITEM(keyval, 0); auto val = PyTuple_GET_ITEM(keyval, 1); assert(PyUnicode_Check(key)); - m[python::PyString_AsString(key)] = python::mapPythonClassToTuplexType(val); + m[python::PyString_AsString(key)] = python::mapPythonClassToTuplexType(val, false); } } diff --git a/tuplex/python/src/PythonWrappers.cc b/tuplex/python/src/PythonWrappers.cc index 8e35d5d4c..e680765ba 100644 --- a/tuplex/python/src/PythonWrappers.cc +++ b/tuplex/python/src/PythonWrappers.cc @@ -318,7 +318,7 @@ namespace tuplex { } else { ClosureEnvironment::Constant c; c.identifier = identifier; - c.type = python::mapPythonClassToTuplexType(val); + c.type = python::mapPythonClassToTuplexType(val, false); c.value = python::pythonToField(val); // // call json.dumps diff --git a/tuplex/python/tests/test_parallelize.py b/tuplex/python/tests/test_parallelize.py index 1b9d66030..78803117a 100644 --- a/tuplex/python/tests/test_parallelize.py +++ b/tuplex/python/tests/test_parallelize.py @@ -102,7 +102,7 @@ def testAutoUnpack(self): input = [{"a":1,"b":2,"c":3},{"a":4,"b":5,"c":6},{"a":7,"b":8,"c":9},{"a": 1, "b":2}, {"c":11}] output = c.parallelize(input).map(lambda x: x["a"]).collect() - self.assertEqual([1, 4, 7, 1], output) + self.assertEqual([1, 4, 7, 1, None], output) input = [{"a":1,"b":2,"c":3},{"d":4,"e":5,"f":6}] output = c.parallelize(input).map(lambda x: (x["a"], x["b"], x["c"], x["d"], x["e"], x["f"])).collect() @@ -120,4 +120,46 @@ def testNoneType(self): ref = [None, None] res = c.parallelize(ref).collect() + assert res == ref + + +class TestParallelizeAnyType(unittest.TestCase): + + def __init__(self, *args, **kwargs): + self.conf = {"webui.enable" : False, "driverMemory" : "8MB", "partitionSize" : "256KB"} + super(TestParallelizeAnyType, self).__init__(*args, **kwargs) + + def testListTupleI(self): + c = Context(self.conf) + ref = [([(1, 2), (3, 4)], [(-1, -2), (-3, -4)])] + res = c.parallelize(ref).collect() + + assert res == ref + + def testListTupleII(self): + c = Context(self.conf) + ref = [("a", [("b", [1, 2]), ("c", [1, 2, 3, 4])]), ("....", [("d", [100, 200, -10000000]), ("e", [1000, 2000, 3000, 4000, 5000])])] + res = c.parallelize(ref).collect() + + assert res == ref + + def testOptionTypeIII(self): + c = Context(self.conf) + ref = [(1, 2), None, (3, 4)] + res = c.parallelize(ref).collect() + + assert res == ref + + def testOptionTypeIV(self): + c = Context(self.conf) + ref = [None, ["a", "b"], None] + res = c.parallelize(ref).collect() + + assert res == ref + + def testOptionTypeV(self): + c = Context(self.conf) + ref = [[(1, None), None, (3, (4, None))]] + res = c.parallelize(ref).collect() + assert res == ref \ No newline at end of file diff --git a/tuplex/test/adapters/cpython/PythonHelperTest.cc b/tuplex/test/adapters/cpython/PythonHelperTest.cc index 7df569eeb..d5874f1b7 100644 --- a/tuplex/test/adapters/cpython/PythonHelperTest.cc +++ b/tuplex/test/adapters/cpython/PythonHelperTest.cc @@ -125,23 +125,24 @@ TEST_F(PythonHelperTest, pythonToTuplexNestedTupleII) { TEST_F(PythonHelperTest, typeMap) { // primitive types - EXPECT_EQ(python::Type::BOOLEAN, python::mapPythonClassToTuplexType(Py_True)); - EXPECT_EQ(python::Type::BOOLEAN, python::mapPythonClassToTuplexType(Py_False)); + EXPECT_EQ(python::Type::BOOLEAN, python::mapPythonClassToTuplexType(Py_True, false)); + EXPECT_EQ(python::Type::BOOLEAN, python::mapPythonClassToTuplexType(Py_False, false)); - EXPECT_EQ(python::Type::I64, python::mapPythonClassToTuplexType(PyLong_FromLong(0))); - EXPECT_EQ(python::Type::I64, python::mapPythonClassToTuplexType(PyLong_FromLong(-42))); - EXPECT_EQ(python::Type::I64, python::mapPythonClassToTuplexType(PyLong_FromLong(1234560))); + EXPECT_EQ(python::Type::I64, python::mapPythonClassToTuplexType(PyLong_FromLong(0), false)); + EXPECT_EQ(python::Type::I64, python::mapPythonClassToTuplexType(PyLong_FromLong(-42), false)); + EXPECT_EQ(python::Type::I64, python::mapPythonClassToTuplexType(PyLong_FromLong(1234560), false)); - EXPECT_EQ(python::Type::F64, python::mapPythonClassToTuplexType(PyFloat_FromDouble(0.0))); - EXPECT_EQ(python::Type::F64, python::mapPythonClassToTuplexType(PyFloat_FromDouble(-1.0))); - EXPECT_EQ(python::Type::F64, python::mapPythonClassToTuplexType(PyFloat_FromDouble(3.123456789))); + EXPECT_EQ(python::Type::F64, python::mapPythonClassToTuplexType(PyFloat_FromDouble(0.0), false)); + EXPECT_EQ(python::Type::F64, python::mapPythonClassToTuplexType(PyFloat_FromDouble(-1.0), false)); + EXPECT_EQ(python::Type::F64, python::mapPythonClassToTuplexType(PyFloat_FromDouble(3.123456789), false)); - EXPECT_EQ(python::Type::STRING, python::mapPythonClassToTuplexType(python::PyString_FromString(""))); - EXPECT_EQ(python::Type::STRING, python::mapPythonClassToTuplexType(python::PyString_FromString("hello world"))); + EXPECT_EQ(python::Type::STRING, python::mapPythonClassToTuplexType(python::PyString_FromString(""), false)); + EXPECT_EQ(python::Type::STRING, python::mapPythonClassToTuplexType(python::PyString_FromString("hello world"), + false)); // compound types PyObject* c1 = PyTuple_New(0); - EXPECT_EQ(python::Type::EMPTYTUPLE, python::mapPythonClassToTuplexType(c1)); + EXPECT_EQ(python::Type::EMPTYTUPLE, python::mapPythonClassToTuplexType(c1, false)); PyObject* c2 = PyTuple_New(5); PyTuple_SetItem(c2, 0, Py_True); @@ -150,14 +151,14 @@ TEST_F(PythonHelperTest, typeMap) { PyTuple_SetItem(c2, 3, c1); PyTuple_SetItem(c2, 4, python::PyString_FromString("abc")); auto t2 = python::Type::makeTupleType({python::Type::BOOLEAN, python::Type::I64, python::Type::F64, python::Type::EMPTYTUPLE, python::Type::STRING}); - EXPECT_EQ(t2, python::mapPythonClassToTuplexType(c2)); + EXPECT_EQ(t2, python::mapPythonClassToTuplexType(c2, false)); // dict (keytype, valuetype) PyObject* c3 = PyDict_New(); PyDict_SetItemString(c3, "x", Py_True); PyDict_SetItemString(c3, "y", Py_False); auto t3 = python::Type::makeDictionaryType(python::Type::STRING, python::Type::BOOLEAN); - EXPECT_EQ(t3, python::mapPythonClassToTuplexType(c3)); + EXPECT_EQ(t3, python::mapPythonClassToTuplexType(c3, false)); // @TODO: to represent dicts as struct type, there should be also specific type -> generic type PyObject* c4 = PyDict_New(); @@ -171,7 +172,7 @@ TEST_F(PythonHelperTest, typeMap) { PyDict_SetItem(c5, Py_True, Py_False); PyDict_SetItem(c5, PyLong_FromLong(42), python::PyString_FromString("hello world")); PyDict_SetItemString(c5, "test", PyLong_FromLong(42)); - EXPECT_EQ(python::Type::GENERICDICT, python::mapPythonClassToTuplexType(c5)); + EXPECT_EQ(python::Type::GENERICDICT, python::mapPythonClassToTuplexType(c5, false)); } TEST_F(PythonHelperTest, PythonConversion) { @@ -402,8 +403,8 @@ TEST_F(PythonHelperTest, FunctionGlobals) { PyObject *key = nullptr, *val = nullptr; Py_ssize_t pos = 0; // must be initialized to 0 to start iteration, however internal iterator variable. Don't use semantically. while(PyDict_Next(pyGlobals, &pos, &key, &val)) { - auto curKeyType = mapPythonClassToTuplexType(key); - auto curValType = mapPythonClassToTuplexType(val); + auto curKeyType = mapPythonClassToTuplexType(key, false); + auto curValType = mapPythonClassToTuplexType(val, false); assert(curKeyType == python::Type::STRING); // check if value can be dealt with diff --git a/tuplex/test/adapters/cpython/PythonSerializerTest.cc b/tuplex/test/adapters/cpython/PythonSerializerTest.cc index 481e99980..be80daf08 100644 --- a/tuplex/test/adapters/cpython/PythonSerializerTest.cc +++ b/tuplex/test/adapters/cpython/PythonSerializerTest.cc @@ -108,7 +108,7 @@ void checkCreatePyObjectFromMemoryBool(uint8_t *buffer, size_t capacity, bool go EXPECT_EQ(1, PyObject_RichCompareBool( PyBool_FromLong(want), - createPyObjectFromMemory(buffer, python::Type::BOOLEAN), + createPyObjectFromMemory(buffer, python::Type::BOOLEAN, capacity), equal ? Py_EQ : Py_NE)); } @@ -131,7 +131,7 @@ void checkCreatePyObjectFromMemoryLong(uint8_t *buffer, size_t capacity, int64_t EXPECT_EQ(1, PyObject_RichCompareBool( PyLong_FromLong(want), - createPyObjectFromMemory(buffer, python::Type::I64), + createPyObjectFromMemory(buffer, python::Type::I64, capacity), equal ? Py_EQ : Py_NE)); } @@ -160,7 +160,7 @@ void checkCreatePyObjectFromMemoryFloat(uint8_t *buffer, size_t capacity, double EXPECT_EQ(1, PyObject_RichCompareBool( PyFloat_FromDouble(want), - createPyObjectFromMemory(buffer, python::Type::F64), + createPyObjectFromMemory(buffer, python::Type::F64, capacity), equal ? Py_EQ : Py_NE)); } @@ -191,7 +191,7 @@ void checkCreatePyObjectFromMemoryString(uint8_t *buffer, size_t capacity, const EXPECT_EQ(1, PyObject_RichCompareBool( PyUnicode_DecodeASCII(want.c_str(), want.length(), string_errors), - createPyObjectFromMemory(buffer, python::Type::STRING), + createPyObjectFromMemory(buffer, python::Type::STRING, capacity), equal ? Py_EQ : Py_NE)); } @@ -216,7 +216,7 @@ void checkCreatePyObjectFromMemoryTuple(uint8_t *buffer, size_t capacity, const EXPECT_EQ(1, PyObject_RichCompareBool( want, - createPyObjectFromMemory(buffer, row.getSchema().getRowType()), + createPyObjectFromMemory(buffer, row.getSchema().getRowType(), capacity), equal ? Py_EQ : Py_NE)); } @@ -311,7 +311,7 @@ TEST(PythonSerializer, TestCreatePyObjectFromMemoryNestedTuple) { PyTuple_SetItem(tuple, 1, inner_tuple_2); PyTuple_SetItem(tuple, 2, PyUnicode_DecodeASCII(sample_str.c_str(), sample_str.length(), string_errors)); - auto reconstructed = createPyObjectFromMemory(buffer, row.getSchema().getRowType()); + auto reconstructed = createPyObjectFromMemory(buffer, row.getSchema().getRowType(), capacity); EXPECT_EQ(1, PyObject_RichCompareBool( tuple, diff --git a/tuplex/test/codegen/TypeSystemTest.cc b/tuplex/test/codegen/TypeSystemTest.cc index 891d7a3ab..ae6bb3e88 100644 --- a/tuplex/test/codegen/TypeSystemTest.cc +++ b/tuplex/test/codegen/TypeSystemTest.cc @@ -161,4 +161,19 @@ TEST(TypeSys, flattenWithPyObject) { auto row_type = python::Type::makeTupleType({python::Type::I64, python::Type::I64, python::Type::PYOBJECT}); auto num_params = tuplex::flattenedType(row_type).parameters().size(); EXPECT_EQ(num_params, 3); +} + +TEST(TypeSys, compatibleType) { + + // [Option[[i64]]] and [[Option[i64]]] ==> [Option[[Option[i64]]]] + auto a1_type = python::Type::makeListType(python::Type::makeOptionType(python::Type::makeListType(python::Type::I64))); + auto b1_type = python::Type::makeListType(python::Type::makeListType(python::Type::makeOptionType(python::Type::I64))); + auto ab1_compatible_type = unifyTypes(a1_type, b1_type, true); + EXPECT_EQ(ab1_compatible_type, python::Type::makeListType(python::Type::makeOptionType(python::Type::makeListType(python::Type::makeOptionType(python::Type::I64))))); + + // Option[[Option[(Option[str], [Option[F64]])]]] and [(str, Option[[F64]])] ==> Option[[Option[(Option[str], Option[[Option[F64]]])]]] + auto a2_type = python::Type::makeOptionType(python::Type::makeListType(python::Type::makeOptionType(python::Type::makeTupleType({python::Type::makeOptionType(python::Type::STRING), python::Type::makeListType(python::Type::makeOptionType(python::Type::F64))})))); + auto b2_type = python::Type::makeListType(python::Type::makeTupleType({python::Type::STRING, python::Type::makeOptionType(python::Type::makeListType(python::Type::F64))})); + auto ab2_compatible_type = unifyTypes(a2_type, b2_type, true); + EXPECT_EQ(ab2_compatible_type, python::Type::makeOptionType(python::Type::makeListType(python::Type::makeOptionType(python::Type::makeTupleType({python::Type::makeOptionType(python::Type::STRING), python::Type::makeOptionType(python::Type::makeListType(python::Type::makeOptionType(python::Type::F64)))}))))); } \ No newline at end of file diff --git a/tuplex/test/core/DataSetShow.cc b/tuplex/test/core/DataSetShow.cc index cf50705b8..4ec70c4e6 100644 --- a/tuplex/test/core/DataSetShow.cc +++ b/tuplex/test/core/DataSetShow.cc @@ -14,7 +14,7 @@ #include #include "TestUtils.h" -class DataSetTest : public TuplexTest {}; +class DataSetTest : public PyTest {}; TEST_F(DataSetTest, DataSetShow) { using namespace tuplex; diff --git a/tuplex/test/core/FallbackMode.cc b/tuplex/test/core/FallbackMode.cc index 34ecc683f..fe1d3fc3b 100644 --- a/tuplex/test/core/FallbackMode.cc +++ b/tuplex/test/core/FallbackMode.cc @@ -81,7 +81,7 @@ TEST_F(FallbackTest, ArbitraryPyObjectSerialization) { cout< PYOBJECT!) - auto f = python::pythonToField(np_obj); + auto f = python::pythonToField(np_obj, false); EXPECT_EQ(f.getType(), python::Type::PYOBJECT); // mapped to arbitrary python object @@ -113,7 +113,7 @@ TEST_F(FallbackTest, NonAccessedPyObjectInPipeline) { cout << endl; // convert to tuplex Field (--> PYOBJECT!) - auto f = python::pythonToField(np_obj); + auto f = python::pythonToField(np_obj, false); python::unlockGIL(); diff --git a/tuplex/test/core/SerializationTypeTest.cc b/tuplex/test/core/SerializationTypeTest.cc new file mode 100644 index 000000000..f1fb1ace2 --- /dev/null +++ b/tuplex/test/core/SerializationTypeTest.cc @@ -0,0 +1,50 @@ +//--------------------------------------------------------------------------------------------------------------------// +// // +// Tuplex: Blazing Fast Python Data Science // +// // +// // +// (c) 2017 - 2021, Tuplex team // +// Created by Yunzhi Shao first on 4/1/2021 // +// License: Apache 2.0 // +//--------------------------------------------------------------------------------------------------------------------// + +#include "gtest/gtest.h" +#include + +using namespace tuplex; + +class SerializationTypeTest : public ::testing::Test { +protected: + void SetUp() override { + // init python interpreter + python::initInterpreter(); + } + + void TearDown() override { + // close python interpreter + python::closeInterpreter(); + } +}; + +namespace python { + void testRowSerializationType(const std::string& PyLiteral, const std::string& expectedType, bool autoUpcast) { + auto PyObj = python::runAndGet("obj = " + PyLiteral, "obj"); + auto rowType = python::mapPythonClassToTuplexType(PyObj, autoUpcast); + auto row = python::pythonToRow(PyObj, rowType, false); + EXPECT_EQ(row.getRowType().desc(), expectedType); + } +} + +TEST_F(SerializationTypeTest, ListSerializationTest) { + python::testRowSerializationType("([1, 2, None], ['ab', None])", "([Option[i64]],[Option[str]])", false); + python::testRowSerializationType("([([[4444, None], [3, 4]], '!!!!!'), ([[-20000, 400], [3, 4]], None)])", "([([[Option[i64]]],Option[str])])", false); + python::testRowSerializationType("([['aa', 'bbbb', None], None, ['test']])", "([Option[[Option[str]]]])", false); + python::testRowSerializationType("([([('--', [1000, 2000, None])], [1.25, 5544.2211])])", "([([(str,[Option[i64]])],[f64])])", false); +} + +TEST_F(SerializationTypeTest, OptionTupleSerializationTest) { + python::testRowSerializationType("([(100, -10000000000), None, (5, 2147483647)])", "([Option[(i64,i64)]])", false); + python::testRowSerializationType("([('string', None, False), (None, (1, [1, 2]), None)])", "([(Option[str],Option[(i64,[i64])],Option[boolean])])", false); + python::testRowSerializationType("[(1, (1, 2)) ,(2, None)])]", "([(Option[str],Option[(i64,[i64])],Option[boolean])])", false); + python::testRowSerializationType("('qwert', [ ((False, 2), True, 'ab'), None, (None, False, None), ((None, None), True, 'efghijk')])", "(str,[Option[(Option[(Option[boolean],Option[i64])],boolean,Option[str])]])", false); +} \ No newline at end of file diff --git a/tuplex/test/core/SerializerTest.cc b/tuplex/test/core/SerializerTest.cc index aedb13883..91aa6156d 100644 --- a/tuplex/test/core/SerializerTest.cc +++ b/tuplex/test/core/SerializerTest.cc @@ -137,4 +137,108 @@ TEST(Serializer,OptI64Format) { EXPECT_EQ(*(int64_t*)buf, 0); EXPECT_EQ(*(((int64_t*)buf) + 1), i); } +} + +TEST(Serializer, ListOfTuples) { + Serializer s; + auto *buffer = (uint8_t*)malloc(2048); + + // ("abcd", [(1234, "hello,", 5.6789, "TUPLEX"), (98, "world!!", 76543.21, "??&&")], True, 10000000) + auto len = s.append("abcd") + .append(List(Tuple(1234, "hello,", 5.6789, "TUPLEX"), Tuple(98, "world!!", 76543.21, "??&&"))) + .append(true) + .append(10000000) + .serialize(buffer, 2048); + + Schema schema = s.getSchema(); + auto et = python::Type::makeTupleType({python::Type::STRING, + python::Type::makeListType(python::Type::makeTupleType({python::Type::I64, python::Type::STRING, python::Type::F64, python::Type::STRING})), + python::Type::BOOLEAN, + python::Type::I64}); + EXPECT_TRUE(schema.getRowType() == et); + + Deserializer d(schema); + d.deserialize(buffer, 2048); + free(buffer); + + auto row = d.getTuple(); + EXPECT_EQ(row.numElements(), 4); + EXPECT_EQ(std::string((char *)row.getField(0).getPtr()), "abcd"); + auto lst = *(List *)row.getField(1).getPtr(); + EXPECT_EQ(lst.numElements(), 2); + auto tup1 = *(Tuple *)(lst.getField(0).getPtr()); + auto tup2 = *(Tuple *)(lst.getField(1).getPtr()); + EXPECT_EQ(tup1.numElements(), 4); + EXPECT_EQ(tup2.numElements(), 4); + EXPECT_EQ(tup1.getField(0).getInt(), 1234); + EXPECT_EQ(std::string((char *)tup1.getField(1).getPtr()), "hello,"); + EXPECT_FLOAT_EQ(tup1.getField(2).getDouble(), 5.6789); + EXPECT_EQ(std::string((char *)tup1.getField(3).getPtr()), "TUPLEX"); + EXPECT_EQ(tup2.getField(0).getInt(), 98); + EXPECT_EQ(std::string((char *)tup2.getField(1).getPtr()), "world!!"); + EXPECT_FLOAT_EQ(tup2.getField(2).getDouble(), 76543.21); + EXPECT_EQ(std::string((char *)tup2.getField(3).getPtr()), "??&&"); + EXPECT_EQ(row.getField(2).getInt(), 1); + EXPECT_EQ(row.getField(3).getInt(), 10000000); +} + +TEST(Serializer, ListOfLists) { + Serializer s; + auto *buffer = (uint8_t*)malloc(2048); + + // ([[1, 2, 3, 4], [5, 6]], [["ab"], ["####", "QWERT"]]) + auto len = s.append(List(List(1, 2, 3, 4), List(5, 6))) + .append(List(List("ab"), List("####", "QWERT"))) + .serialize(buffer, 2048); + + Schema schema = s.getSchema(); + auto et = python::Type::makeTupleType({python::Type::makeListType(python::Type::makeListType(python::Type::I64)), + python::Type::makeListType(python::Type::makeListType(python::Type::STRING))}); + EXPECT_TRUE(schema.getRowType() == et); + + Deserializer d(schema); + d.deserialize(buffer, 2048); + free(buffer); + + auto row = d.getTuple(); + EXPECT_EQ(row.numElements(), 2); + auto lst1 = *(List *)row.getField(0).getPtr(); + EXPECT_EQ(lst1.numElements(), 2); + auto lst11 = *(List *)(lst1.getField(0).getPtr()); + EXPECT_EQ(lst11.desc(), "[1,2,3,4]"); + auto lst12 = *(List *)(lst1.getField(1).getPtr()); + EXPECT_EQ(lst12.desc(), "[5,6]"); + auto lst2 = *(List *)row.getField(1).getPtr(); + EXPECT_EQ(lst2.numElements(), 2); + auto lst21 = *(List *)lst2.getField(0).getPtr(); + EXPECT_EQ(lst21.desc(), "['ab']"); + auto lst22 = *(List *)lst2.getField(1).getPtr(); + EXPECT_EQ(lst22.desc(), "['####','QWERT']"); +} + +TEST(Serializer, OptionalTuple) { + Serializer s; + auto *buffer = (uint8_t*)malloc(2048); + + // (i64, Option[(i64, i64)], str) + auto len = s.append(5000) + .append(option(Tuple(1234, 9876)), python::Type::makeTupleType({python::Type::I64, python::Type::I64})) + .append("$$$$tuple$$$$") + .serialize(buffer, 2048); + + Schema schema = s.getSchema(); + auto et = python::Type::makeTupleType({python::Type::I64, python::Type::makeOptionType(python::Type::makeTupleType({python::Type::I64, python::Type::I64})), python::Type::STRING}); + + EXPECT_TRUE(schema.getRowType() == et); + Deserializer d(schema); + d.deserialize(buffer, 2048); + free(buffer); + + auto row = d.getTuple(); + EXPECT_EQ(row.numElements(), 3); + EXPECT_EQ(row.getField(0).getInt(), 5000); + auto tuple = *(Tuple *)row.getField(1).getPtr(); + EXPECT_EQ(tuple.numElements(), 2); + EXPECT_EQ(tuple.desc(), "(1234,9876)"); + EXPECT_EQ(std::string((char *)(row.getField(2).getPtr())), "$$$$tuple$$$$"); } \ No newline at end of file diff --git a/tuplex/test/wrappers/WrapperTest.cc b/tuplex/test/wrappers/WrapperTest.cc index ede9dd82d..5e855e4d5 100644 --- a/tuplex/test/wrappers/WrapperTest.cc +++ b/tuplex/test/wrappers/WrapperTest.cc @@ -683,6 +683,55 @@ TEST_F(WrapperTest, UpcastParallelizeII) { } } +TEST_F(WrapperTest, OptionListTest) { + using namespace std; + using namespace tuplex; + + // RAII, destruct python context! + auto opts = testOptions(); + opts = opts.substr(0, opts.length() - 1) + ", \"tuplex.autoUpcast\":\"True\"}"; + PythonContext c("python", "", opts); + + // weird block syntax due to RAII problems. + { + PyObject * listObj1 = PyList_New(2); + PyObject * listObj2 = PyList_New(2); + PyObject * listObj3 = PyList_New(2); + PyObject * listObj4 = PyList_New(2); + PyObject * listObj5 = PyList_New(2); + PyObject * tupleObj1 = PyTuple_New(1); + PyObject * tupleObj2 = PyTuple_New(1); + PyObject * listObj6 = PyList_New(2); + + PyList_SET_ITEM(listObj1, 0, PyLong_FromLong(1)); + PyList_SET_ITEM(listObj1, 1, PyLong_FromLong(2)); + PyList_SET_ITEM(listObj2, 0, PyLong_FromLong(3)); + PyList_SET_ITEM(listObj2, 1, PyLong_FromLong(4)); + PyList_SET_ITEM(listObj3, 0, PyLong_FromLong(5)); + PyList_SET_ITEM(listObj3, 1, PyLong_FromLong(6)); + PyList_SET_ITEM(listObj4, 0, listObj1); + PyList_SET_ITEM(listObj4, 1, listObj2); + PyList_SET_ITEM(listObj5, 0, listObj3); + PyList_SET_ITEM(listObj5, 1, Py_None); + Py_XINCREF(Py_None); + PyTuple_SET_ITEM(tupleObj1, 0, listObj4); + PyTuple_SET_ITEM(tupleObj2, 0, listObj5); + PyList_SET_ITEM(listObj6, 0, tupleObj1); + PyList_SET_ITEM(listObj6, 1, tupleObj2); + + auto list = py::reinterpret_borrow(listObj6); + auto res = c.parallelize(list).collect(); + + auto resObj = res.ptr(); + + ASSERT_TRUE(PyList_Check(resObj)); + ASSERT_EQ(PyList_Size(resObj), 2); + auto rowType = python::Type::makeListType(python::Type::makeOptionType(python::Type::makeListType(python::Type::I64))); + EXPECT_EQ(python::pythonToRow(PyList_GetItem(resObj, 0), rowType, true).toPythonString(), "([[1,2],[3,4]],)"); + EXPECT_EQ(python::pythonToRow(PyList_GetItem(resObj, 1), rowType, true).toPythonString(), "([[5,6],None],)"); + } +} + TEST_F(WrapperTest, FilterAll) { // c = Context() // ds = c.parallelize([1, 2, 3, 4, 5]) diff --git a/tuplex/utils/include/List.h b/tuplex/utils/include/List.h index 88f1ca33e..ae27c8755 100644 --- a/tuplex/utils/include/List.h +++ b/tuplex/utils/include/List.h @@ -52,6 +52,8 @@ namespace tuplex { size_t numElements() const { return _numElements; } Field getField(const int i) const; + size_t numNonNullElements() const; + friend bool operator == (const List& rhs, const List& lhs); static List from_vector(const std::vector& elements) { diff --git a/tuplex/utils/include/Serializer.h b/tuplex/utils/include/Serializer.h index bdf22114d..24fdab469 100644 --- a/tuplex/utils/include/Serializer.h +++ b/tuplex/utils/include/Serializer.h @@ -100,7 +100,7 @@ namespace tuplex { Serializer& appendWithoutInference(const option& d); Serializer& appendWithoutInference(const option &str); Serializer& appendWithoutInference(const option &list, const python::Type &listType); - + Serializer& appendWithoutInference(const option &tuple, const python::Type &tupleType); Serializer& appendWithoutInference(const uint8_t* buf, size_t bufSize); Serializer& appendWithoutInference(const Field f); @@ -109,6 +109,16 @@ namespace tuplex { // from _isVarLenField, if any element is set to true return true return std::any_of(_isVarField.begin(), _isVarField.end(), [](bool b) { return b; }); } + + // helper functions to append element to a parent list or non-row tuple + // only writes data to _varLenFields as part of its parent's varLen field + // tuple is serialized as below : + // bitmap for optional fields (8-byte aligned) | data of field_1 (if fixed len field) | offset to field_2 (if varLen field) | ... | field_n | actual data of field_2 | ... + // list is serialized the same as tuple except for an additional "num of elements" field at the beginning + Serializer& appendWithoutInferenceHelper(const Tuple &t); + Serializer& appendWithoutInferenceHelper(const List &l); + Serializer& appendWithoutInferenceHelper(const std::string &str); + public: Serializer(bool autoSchema = true) : _autoSchema(autoSchema), _fixedLenFields(_bufferGrowthConstant), @@ -127,8 +137,8 @@ namespace tuplex { Serializer& append(const option& f) { return append(f.has_value() ? static_cast(f.value()) : option::none); } Serializer& append(const option& i) { return append(i.has_value() ? static_cast(i.value()) : option::none); } Serializer& appendEmptyTupleOption(const Field &f); - Serializer& append(const option& t); - Serializer& append(const option& list, python::Type listType); + Serializer& append(const option& t, const python::Type& tupleType); + Serializer& append(const option& list, const python::Type& listType); // data fields which are not nullable Serializer& append(const bool b); @@ -238,6 +248,23 @@ namespace tuplex { std::string getString(const int col) const; std::string getDictionary(const int col) const; List getList(const int col) const; + Tuple getOptionTuple(const int col) const; + + /*! + * given a ptr to the start of the actual tuple data (not offset), return a tuple + * @param tupleType + * @param ptr ptr to the start of tuple data + * @return Tuple + */ + Tuple getTupleHelper(const python::Type &tupleType, const uint8_t *ptr) const; + + /*! + * given a ptr to the start of the actual list data (not offset), return a list + * @param listType + * @param ptr ptr to the start of list data + * @return List + */ + List getListHelper(const python::Type &listType, const uint8_t *ptr) const; const uint8_t* getPtr(const int col) const; size_t getSize(const int col) const; diff --git a/tuplex/utils/include/TupleTree.h b/tuplex/utils/include/TupleTree.h index debc1d075..b643bcd3c 100644 --- a/tuplex/utils/include/TupleTree.h +++ b/tuplex/utils/include/TupleTree.h @@ -63,12 +63,7 @@ namespace tuplex { return root; } else { if(rtype.isTupleType()) { - // recursively call children & append as children! - int num_params = rtype.parameters().size(); - for(int i = 0; i < num_params; ++i) { - TupleTreeNode *child = new TupleTreeNode(); - root->children.push_back(createTupleTreeR(child, python::Type::makeOptionType(type.parameters()[i]))); - } + // treat as primitive root->type = type; return root; } else { @@ -306,7 +301,8 @@ namespace tuplex { assert(ret->type.getReturnType().isPrimitiveType() || ret->type.getReturnType() == python::Type::EMPTYTUPLE || ret->type.getReturnType().isDictionaryType() || - ret->type.getReturnType().isListType()); + ret->type.getReturnType().isListType() || + ret->type.getReturnType().isTupleType()); else assert(ret->type.isPrimitiveType() || ret->type == python::Type::EMPTYTUPLE || ret->type.isDictionaryType() || ret->type.isListType() || ret->type == python::Type::PYOBJECT); diff --git a/tuplex/utils/include/TypeSystem.h b/tuplex/utils/include/TypeSystem.h index dc0321d3f..6861f24de 100644 --- a/tuplex/utils/include/TypeSystem.h +++ b/tuplex/utils/include/TypeSystem.h @@ -399,6 +399,17 @@ namespace python { */ extern bool canUpcastToRowType(const python::Type& minor, const python::Type& major); + /*! + * return unified type for both a and b + * e.g. a == [Option[[I64]]] and b == [[Option[I64]]] should return [Option[[Option[I64]]]] + * return python::Type::UNKNOWN if no compatible type can be found + * @param a (optional) primitive or list or tuple type + * @param b (optional) primitive or list or tuple type + * @param autoUpcast whether to upcast numeric types to a unified type when type conflicts, false by default + * @return (optional) compatible type or UNKNOWN + */ + extern Type unifyTypes(const python::Type &a, const python::Type &b, bool autoUpcast=false); + /*! * two types may be combined into one nullable type. * @param a diff --git a/tuplex/utils/src/List.cc b/tuplex/utils/src/List.cc index a027520cf..dc2a3e671 100644 --- a/tuplex/utils/src/List.cc +++ b/tuplex/utils/src/List.cc @@ -112,4 +112,12 @@ namespace tuplex { } return true; } + + size_t List::numNonNullElements() const { + size_t num = 0; + for(size_t i = 0; i < _numElements; i++) { + num += !getField(i).isNull(); + } + return num; + } } \ No newline at end of file diff --git a/tuplex/utils/src/Row.cc b/tuplex/utils/src/Row.cc index af6756400..2dfcb24e5 100644 --- a/tuplex/utils/src/Row.cc +++ b/tuplex/utils/src/Row.cc @@ -177,13 +177,16 @@ namespace tuplex { serializer.append(el.isNull() ? option::none : option(std::string((const char*)el.getPtr())), el.getType()); else if(rt.isListType()) serializer.append(el.isNull() ? option::none : option(*(List*)el.getPtr()), el.getType()); - else { - if(rt.isTupleType()) { - - // needs specialized function, depending on tuple type... - throw std::runtime_error(std::string(__FILE__) + " + " + std::to_string(__LINE__) + std::string(": tuple serialization not yet implemented")); - //serializer.append(*((Tuple *)el.getPtr())); - } else throw std::runtime_error("option underlying type " + rt.desc() + " not known"); + else if (rt.isTupleType()) { + if (el.isNull()) { + serializer.append(option::none, el.getType()); + } else { + auto ptrToTuple = (Tuple*)el.getPtr(); + assert(ptrToTuple); + serializer.append(option(*ptrToTuple), el.getType()); + } + } else { + throw std::runtime_error("option underlying type " + rt.desc() + " not known"); } } else if(el.getType() == python::Type::NULLVALUE) { serializer.append(NullType()); diff --git a/tuplex/utils/src/Serializer.cc b/tuplex/utils/src/Serializer.cc index 0d83cf855..2ce64a0ab 100644 --- a/tuplex/utils/src/Serializer.cc +++ b/tuplex/utils/src/Serializer.cc @@ -339,6 +339,43 @@ namespace tuplex { return *this; } + Serializer &Serializer::append(const option &t, const python::Type &tupleType) { + if(_autoSchema) { + _types.push_back(python::Type::makeOptionType(tupleType)); + } else { + assert(_schema.getRowType().parameters()[_col++] == python::Type::makeOptionType(tupleType)); + } + + if(tupleType == python::Type::EMPTYTUPLE || (tupleType.isOptionType() && tupleType.getReturnType() == python::Type::EMPTYTUPLE)) { + _isNull.push_back(!t.has_value()); + _requiresBitmap.push_back(true); + _isVarField.push_back(false); + return *this; + } + + return appendWithoutInference(t, tupleType.isOptionType() ? tupleType.getReturnType() : tupleType); + } + + Serializer &Serializer::appendWithoutInference(const option &tuple, const python::Type &tupleType) { + assert(!tupleType.isOptionType() && tupleType != python::Type::EMPTYTUPLE); + _isVarField.push_back(true); + _isNull.push_back(!tuple.has_value()); + _requiresBitmap.push_back(true); + + _fixedLenFields.provideSpace(sizeof(uint64_t)); + *((uint64_t *)_fixedLenFields.ptr()) = 0; + _fixedLenFields.movePtr(sizeof(uint64_t)); + + _varLenFieldOffsets.push_back(_varLenFields.size()); + + if(tuple.has_value()) { + Tuple tupleData = tuple.data(); + appendWithoutInferenceHelper(tupleData); + } + + return *this; + } + Serializer &Serializer::appendWithoutInference(const option &list, const python::Type &listType) { assert(!listType.isOptionType() && listType != python::Type::EMPTYLIST); bool isVar = !(listType.elementType().isSingleValued()); @@ -353,50 +390,12 @@ namespace tuplex { *((int64_t *) _fixedLenFields.ptr()) = (isVar || !list.has_value()) ? 0L : list.data().numElements(); _fixedLenFields.movePtr(sizeof(int64_t)); - if(isVar && list.has_value()) { - // as chars (later UTF8 support here!!!) - _varLenFieldOffsets.push_back(_varLenFields.size()); - - // add number of elements - _varLenFields.provideSpace(sizeof(uint64_t)); - *((uint64_t *) _varLenFields.ptr()) = list.data().numElements(); - _varLenFields.movePtr(sizeof(uint64_t)); + _varLenFieldOffsets.push_back(_varLenFields.size()); - // add actual data - auto elementType = listType.elementType(); - if (elementType == python::Type::STRING) { // strings are serialized differently - // offset numbers - size_t current_offset = sizeof(uint64_t) * list.data().numElements(); - for (size_t i = 0; i < list.data().numElements(); i++) { - _varLenFields.provideSpace(sizeof(uint64_t)); - *((uint64_t *) _varLenFields.ptr()) = current_offset; - _varLenFields.movePtr(sizeof(uint64_t)); - // update for next field: move forward one uint64_t, then add on the string - current_offset -= sizeof(uint64_t); - current_offset += strlen((char *) list.data().getField(i).getPtr()) + 1; - } - // string data - for (size_t i = 0; i < list.data().numElements(); i++) { - size_t slen = strlen((char*)list.data().getField(i).getPtr()); - _varLenFields.provideSpace(slen + 1); - std::memcpy(_varLenFields.ptr(), list.data().getField(i).getPtr(), slen); - *((uint8_t *) _varLenFields.ptr() + slen) = 0; - _varLenFields.movePtr(slen + 1); - } - } else { // ints/floats/bools - // values - for(size_t i = 0; i < list.data().numElements(); i++) { - _varLenFields.provideSpace(sizeof(uint64_t)); - if(elementType == python::Type::I64 || elementType == python::Type::BOOLEAN) { - *((uint64_t*)_varLenFields.ptr()) = list.data().getField(i).getInt(); - } else if(elementType == python::Type::F64) { - *((double*)_varLenFields.ptr()) = list.data().getField(i).getDouble(); - } else { - throw std::runtime_error("serializing invalid list type!: " + listType.desc()); - } - _varLenFields.movePtr(sizeof(uint64_t)); - } - } + if(isVar && list.has_value()) { + // get list data + auto l = list.data(); + appendWithoutInferenceHelper(l); } return *this; } @@ -435,7 +434,7 @@ namespace tuplex { option(std::string((char *) dict.value().c_str()))); } - Serializer &Serializer::append(const option& list, python::Type listType) { + Serializer &Serializer::append(const option& list, const python::Type &listType) { // variable length field if(_autoSchema) { _types.push_back(python::Type::makeOptionType(listType)); @@ -517,11 +516,14 @@ namespace tuplex { _isVarField.push_back(false); _isNull.push_back(f.isNull()); _requiresBitmap.push_back(true); - } else if(t.isListType()) { - throw std::runtime_error("list option not implemented yet!"); + } else if (t.isListType()) { + return appendWithoutInference(f.isNull() ? option::none : option(*(List*)f.getPtr()), t); + } else if (t.isTupleType()) { + return appendWithoutInference(f.isNull() ? option::none : option(*(Tuple*)f.getPtr()), t); } else - throw std::runtime_error("unknown type " + f.getType().desc() + " found, can't serialize."); - + // throw std::runtime_error("unknown type " + f.getType().desc() + " found, can't serialize."); + Logger::instance().logger("serializer").error( + "unknown field type " + f.getType().desc() + " encountered, can't serialize."); } else { Logger::instance().logger("serializer").error( "unknown field type " + f.getType().desc() + " encountered, can't serialize."); @@ -593,51 +595,337 @@ namespace tuplex { // as chars (later UTF8 support here!!!) _varLenFieldOffsets.push_back(_varLenFields.size()); - // add number of elements - _varLenFields.provideSpace(sizeof(uint64_t)); - *((uint64_t *) _varLenFields.ptr()) = l.numElements(); - _varLenFields.movePtr(sizeof(uint64_t)); + appendWithoutInferenceHelper(l); + } + return *this; + } + + Serializer &Serializer::appendWithoutInferenceHelper(const Tuple &t) { + auto tree = tupleToTree(t); + tree.tupleType(); + + // need bitmap field for elements? + std::vector bitmapV; + void *bitmapAddr = nullptr; + size_t bitmapSize = 0; + size_t numOptionalFields = python::numOptionalFields(t.getType()); + if(numOptionalFields) { + auto numBitmapFields = core::ceilToMultiple(numOptionalFields, 64ul)/64; + bitmapSize = numBitmapFields * sizeof(uint64_t); + _varLenFields.provideSpace(bitmapSize); + bitmapAddr = _varLenFields.ptr(); + _varLenFields.movePtr(bitmapSize); + } + + // std::vector: index for a varLen type in the tuple + // void *: address for storing offset to actual data for the varlen type + std::vector, void *>> varLenOffsetAddrStack; + for(const auto &index : tree.getMultiIndices()) { + auto currField = tree.get(index); + auto currFieldType = currField.getType(); + if(currFieldType.isSingleValued()) { + continue; + } + if(currFieldType.isOptionType()) { + // only write to bitmap for optional field in tuple + if(currField.isNull()) { + bitmapV.push_back(true); + } else { + // need actual type + currFieldType = currFieldType.getReturnType(); + bitmapV.push_back(false); + } + } + if (!currField.isNull()) { + // assume 8 bytes for each field + _varLenFields.provideSpace(sizeof(uint64_t)); + // either fill in actual data for fixed length field + // or record current address to varLenOffsetAddr for filling in offset to actual data later + if(currFieldType == python::Type::I64 || currFieldType == python::Type::BOOLEAN) { + *((uint64_t *)_varLenFields.ptr()) = currField.getInt(); + } else if(currFieldType == python::Type::F64) { + // double is 8 bytes + *((double *)_varLenFields.ptr()) = currField.getDouble(); + } else if(currFieldType == python::Type::STRING || currFieldType.isListType() || currFieldType.isTupleType()) { + // skip 8 bytes as placeholder to fill in offset later + varLenOffsetAddrStack.emplace_back(index, _varLenFields.ptr()); + } + _varLenFields.movePtr(sizeof(uint64_t)); + } + } + // append varLen fields + for(const auto &varLenOffsetAddr : varLenOffsetAddrStack) { + auto currVarLenField = tree.get(varLenOffsetAddr.first); + auto currVarLenFieldType = currVarLenField.getType(); + if(currVarLenFieldType.isOptionType()) { + currVarLenFieldType = currVarLenFieldType.getReturnType(); + } + void *currVarLenOffsetAddr = varLenOffsetAddr.second; + uint64_t currOffset = (uintptr_t)_varLenFields.ptr() - (uintptr_t)currVarLenOffsetAddr; + // write offset to placeholder then append varLen field + if(currVarLenFieldType == python::Type::STRING) { + // write size | offset to placeholder + uint64_t strSize = strlen((char *)currVarLenField.getPtr()) + 1; + uint64_t sizeOffset = currOffset | (strSize << 32); + *((uint64_t *)currVarLenOffsetAddr) = sizeOffset; + appendWithoutInferenceHelper(std::string((char *) currVarLenField.getPtr())); + } else if(currVarLenFieldType.isListType()) { + // write offset to placeholder + *((uint64_t *)currVarLenOffsetAddr) = currOffset; + appendWithoutInferenceHelper(*(List *) currVarLenField.getPtr()); + } else if(currVarLenFieldType.isTupleType()) { + // has to be an optional tuple otherwise it would have been flattened away + assert(currVarLenField.getType().isOptionType()); + *((uint64_t *)currVarLenOffsetAddr) = currOffset; + appendWithoutInferenceHelper(*(Tuple *) currVarLenField.getPtr()); + } else { + // throw std::runtime_error("element type not support in tuple: " + currVarLenField.getType().desc()); + Logger::instance().logger("serializer").error( + "invalid element type in tuple: " + currVarLenField.getType().desc() + " encountered, can't serialize."); + } + } + + // write bitmap if exists + if (bitmapSize) { + uint64_t bitmap[bitmapSize / sizeof(int64_t)]; + std::memset(bitmap, 0, bitmapSize); + + int opt_counter = 0; + assert(bitmapV.size() == numOptionalFields); + for (int i = 0; i < numOptionalFields; i++) { + // set bit + if (bitmapV[i]) { + bitmap[i / 64] |= (1UL << (i % 64)); + } + } + // write to ptr + std::memcpy(bitmapAddr, bitmap, bitmapSize); + } + return *this; + } + + Serializer &Serializer::appendWithoutInferenceHelper(const std::string &str) { + // append the string to of _varLenFields + const char *cStr = str.c_str(); + auto strSize = strlen(cStr) + 1; + _varLenFields.provideSpace(strSize); + std::memcpy(_varLenFields.ptr(), cStr ,strSize); + _varLenFields.movePtr(strSize); + return *this; + } + + Serializer &Serializer::appendWithoutInferenceHelper(const List &l) { + + // add number of elements + _varLenFields.provideSpace(sizeof(uint64_t)); + *((uint64_t *)_varLenFields.ptr()) = l.numElements(); + _varLenFields.movePtr(sizeof(uint64_t)); + + auto elementType = l.getType().elementType(); + if(elementType.isSingleValued()) { + // done. List can be retrieved from numElements and listType + return *this; + } + + // need bitmap field for elements? + std::vector bitmapV; + void *bitmapAddr = nullptr; + size_t bitmapSize = 0; + if(elementType.isOptionType()) { + auto numBitmapFields = core::ceilToMultiple(l.numElements(), 64ul)/64; + bitmapSize = numBitmapFields * sizeof(uint64_t); + _varLenFields.provideSpace(bitmapSize); + bitmapAddr = _varLenFields.ptr(); + _varLenFields.movePtr(bitmapSize); + } - // add actual data - auto elementType = l.getType().elementType(); - if (elementType == python::Type::STRING) { // strings are serialized differently + if(elementType == python::Type::STRING) { // strings are serialized differently + // offset numbers + size_t current_offset = sizeof(uint64_t) * l.numElements(); + for (size_t i = 0; i < l.numElements(); i++) { + _varLenFields.provideSpace(sizeof(uint64_t)); + *((uint64_t *) _varLenFields.ptr()) = current_offset; + _varLenFields.movePtr(sizeof(uint64_t)); + // update for next field: move forward one uint64_t, then add on the string + current_offset -= sizeof(uint64_t); + current_offset += strlen((char *) l.getField(i).getPtr()) + 1; + } + // string data + for (size_t i = 0; i < l.numElements(); i++) { + size_t slen = strlen((char*)l.getField(i).getPtr()); + _varLenFields.provideSpace(slen + 1); + std::memcpy(_varLenFields.ptr(), l.getField(i).getPtr(), slen); + *((uint8_t *) _varLenFields.ptr() + slen) = 0; + _varLenFields.movePtr(slen + 1); + } + } else if(elementType.isTupleType()) { + void *varLenOffsetAddr = _varLenFields.ptr(); + // skip #elements * 8 bytes as placeholder for offsets + auto offsetBytes = l.numElements() * sizeof(uint64_t); + _varLenFields.provideSpace(offsetBytes); + _varLenFields.movePtr(offsetBytes); + for (size_t listIndex = 0; listIndex < l.numElements(); ++listIndex) { + // write offset to placeholder + uint64_t currOffset = (uintptr_t)_varLenFields.ptr() - (uintptr_t)varLenOffsetAddr; + *(uint64_t *)varLenOffsetAddr = currOffset; + // increment varLenOffsetAddr by 8 + varLenOffsetAddr = (void *)((uint64_t *)varLenOffsetAddr + 1); + // append tuple + auto currTuple = *(Tuple *)(l.getField(listIndex).getPtr()); + appendWithoutInferenceHelper(currTuple); + } + } else if (elementType.isListType()) { + void *varLenOffsetAddr = _varLenFields.ptr(); + // skip #elements * 8 bytes as placeholder for offsets + auto offsetBytes = l.numElements() * sizeof(uint64_t); + _varLenFields.provideSpace(offsetBytes); + _varLenFields.movePtr(offsetBytes); + for (size_t listIndex = 0; listIndex < l.numElements(); ++listIndex) { + // write offset to placeholder + uint64_t currOffset = (uintptr_t)_varLenFields.ptr() - (uintptr_t)varLenOffsetAddr; + *(uint64_t *)varLenOffsetAddr = currOffset; + // increment varLenOffsetAddr by 8 + varLenOffsetAddr = (void *)((uint64_t *)varLenOffsetAddr + 1); + // append list + auto currList = *(List *)(l.getField(listIndex).getPtr()); + appendWithoutInferenceHelper(currList); + } + } else if(elementType == python::Type::I64 || elementType == python::Type::BOOLEAN) { + for(size_t i = 0; i < l.numElements(); i++) { + _varLenFields.provideSpace(sizeof(uint64_t)); + *((uint64_t*)_varLenFields.ptr()) = l.getField(i).getInt(); + _varLenFields.movePtr(sizeof(uint64_t)); + } + } else if(elementType == python::Type::F64) { + for(size_t i = 0; i < l.numElements(); i++) { + _varLenFields.provideSpace(sizeof(uint64_t)); + *((double*)_varLenFields.ptr()) = l.getField(i).getDouble(); + _varLenFields.movePtr(sizeof(uint64_t)); + } + } else if(elementType.isOptionType()) { + auto underlyingElementType = elementType.getReturnType(); + size_t numNonNullElements = l.numNonNullElements(); + if(underlyingElementType == python::Type::STRING) { // offset numbers - size_t current_offset = sizeof(uint64_t) * l.numElements(); - for (size_t i = 0; i < l.numElements(); i++) { - _varLenFields.provideSpace(sizeof(uint64_t)); - *((uint64_t *) _varLenFields.ptr()) = current_offset; - _varLenFields.movePtr(sizeof(uint64_t)); - // update for next field: move forward one uint64_t, then add on the string - current_offset -= sizeof(uint64_t); - current_offset += strlen((char *) l.getField(i).getPtr()) + 1; + size_t currentOffset = sizeof(uint64_t) * numNonNullElements; + for(size_t i = 0; i < l.numElements(); i++) { + if(l.getField(i).isNull()) { + bitmapV.push_back(true); + } else { + bitmapV.push_back(false); + // write offset + _varLenFields.provideSpace(sizeof(uint64_t)); + *((uint64_t *) _varLenFields.ptr()) = currentOffset; + _varLenFields.movePtr(sizeof(uint64_t)); + // update for next field: move forward one uint64_t, then add on the string + currentOffset -= sizeof(uint64_t); + currentOffset += strlen((char *) l.getField(i).getPtr()) + 1; + } } // string data for (size_t i = 0; i < l.numElements(); i++) { - size_t slen = strlen((char*)l.getField(i).getPtr()); - _varLenFields.provideSpace(slen + 1); - std::memcpy(_varLenFields.ptr(), l.getField(i).getPtr(), slen); - *((uint8_t *) _varLenFields.ptr() + slen) = 0; - _varLenFields.movePtr(slen + 1); + if(!l.getField(i).isNull()) { + size_t slen = strlen((char*)l.getField(i).getPtr()); + _varLenFields.provideSpace(slen + 1); + std::memcpy(_varLenFields.ptr(), l.getField(i).getPtr(), slen); + *((uint8_t *) _varLenFields.ptr() + slen) = 0; + _varLenFields.movePtr(slen + 1); + } } - } else { // ints/floats/bools - // values + } else if(underlyingElementType.isTupleType()) { + void *varLenOffsetAddr = _varLenFields.ptr(); + // skip #elements * 8 bytes as placeholder for offsets + auto offsetBytes = numNonNullElements * sizeof(uint64_t); + _varLenFields.provideSpace(offsetBytes); + _varLenFields.movePtr(offsetBytes); + for (size_t listIndex = 0; listIndex < l.numElements(); ++listIndex) { + if(l.getField(listIndex).isNull()) { + bitmapV.push_back(true); + } else { + bitmapV.push_back(false); + // write offset to placeholder + uint64_t currOffset = (uintptr_t)_varLenFields.ptr() - (uintptr_t)varLenOffsetAddr; + *(uint64_t *)varLenOffsetAddr = currOffset; + // increment varLenOffsetAddr by 8 + varLenOffsetAddr = (void *)((uint64_t *)varLenOffsetAddr + 1); + // append tuple + auto currTuple = *(Tuple *)(l.getField(listIndex).getPtr()); + appendWithoutInferenceHelper(currTuple); + } + } + } else if(underlyingElementType.isListType()) { + void *varLenOffsetAddr = _varLenFields.ptr(); + // skip #elements * 8 bytes as placeholder for offsets + auto offsetBytes = l.numNonNullElements() * sizeof(uint64_t); + _varLenFields.provideSpace(offsetBytes); + _varLenFields.movePtr(offsetBytes); + for (size_t listIndex = 0; listIndex < l.numElements(); ++listIndex) { + if(l.getField(listIndex).isNull()) { + bitmapV.push_back(true); + } else { + bitmapV.push_back(false); + // write offset to placeholder + uint64_t currOffset = (uintptr_t)_varLenFields.ptr() - (uintptr_t)varLenOffsetAddr; + *(uint64_t *)varLenOffsetAddr = currOffset; + // increment varLenOffsetAddr by 8 + varLenOffsetAddr = (void *)((uint64_t *)varLenOffsetAddr + 1); + // append list + auto currList = *(List *)(l.getField(listIndex).getPtr()); + appendWithoutInferenceHelper(currList); + } + } + } else if(underlyingElementType == python::Type::I64 || underlyingElementType == python::Type::BOOLEAN) { for(size_t i = 0; i < l.numElements(); i++) { - _varLenFields.provideSpace(sizeof(uint64_t)); - if(elementType == python::Type::I64 || elementType == python::Type::BOOLEAN) { + if(l.getField(i).isNull()) { + bitmapV.push_back(true); + } else { + bitmapV.push_back(false); + _varLenFields.provideSpace(sizeof(uint64_t)); *((uint64_t*)_varLenFields.ptr()) = l.getField(i).getInt(); - } else if(elementType == python::Type::F64) { - *((double*)_varLenFields.ptr()) = l.getField(i).getDouble(); + _varLenFields.movePtr(sizeof(uint64_t)); + } + } + } else if(underlyingElementType == python::Type::F64) { + for(size_t i = 0; i < l.numElements(); i++) { + if(l.getField(i).isNull()) { + bitmapV.push_back(true); } else { - throw std::runtime_error("serializing invalid list type!: " + l.getType().desc()); + bitmapV.push_back(false); + _varLenFields.provideSpace(sizeof(uint64_t)); + *((double*)_varLenFields.ptr()) = l.getField(i).getDouble(); + _varLenFields.movePtr(sizeof(uint64_t)); } - _varLenFields.movePtr(sizeof(uint64_t)); + } + } else { + // throw std::runtime_error("serializing invalid list type!: " + l.getType().desc()); + Logger::instance().logger("serializer").error( + "invalid list type: " + l.getType().desc() + " encountered, can't serialize."); + } + } else { + // throw std::runtime_error("serializing invalid list type!: " + l.getType().desc()); + Logger::instance().logger("serializer").error( + "invalid list type: " + l.getType().desc() + " encountered, can't serialize."); + } + + // write bitmap if exists + if (bitmapSize) { + uint64_t bitmap[bitmapSize / sizeof(int64_t)]; + std::memset(bitmap, 0, bitmapSize); + + int opt_counter = 0; + for (int i = 0; i < bitmapV.size(); i++) { + // set bit + if (bitmapV[i]) { + bitmap[i / 64] |= (1UL << (i % 64)); } } + // write to ptr + std::memcpy(bitmapAddr, bitmap, bitmapSize); } + return *this; } - size_t Serializer::serialize(void *ptr, const size_t capacityLeft) { // if not done, fix schema @@ -655,6 +943,7 @@ namespace tuplex { size += _varLenFields.size() + sizeof(int64_t); // compute bitmap size + // TODO: auto bitmapSize = calcBitmapSize(_requiresBitmap); size += bitmapSize; @@ -790,7 +1079,8 @@ namespace tuplex { type == python::Type::PYOBJECT || type.isDictionaryType() || type == python::Type::GENERICDICT || - (type.isListType() && !type.elementType().isSingleValued())) { + (type.isListType() && !type.elementType().isSingleValued()) || + (el.isOptionType() && type.isTupleType())) { _isVarLenField.push_back(true); } else { Logger::instance().logger("core").error("non deserializable type '" + el.desc() + "' detected"); @@ -1063,40 +1353,312 @@ namespace tuplex { else if(elType == python::Type::EMPTYLIST) els.push_back(Field::empty_list()); else throw std::runtime_error("Unsupported list element type deserialized: " + elType.desc()); } - } else { - assert(phys_col < (inferLength(_buffer) - sizeof(int64_t)) / sizeof(int64_t)); // sharper bound because of varlen - // get offset: offset is in the lower 32bit, the upper are the size of the var entry - int64_t offset = *((int64_t *) ((uint8_t *) _buffer + sizeof(int64_t) * phys_col + calcBitmapSize(_requiresBitmap))); - int64_t len = ((offset & (0xFFFFFFFFl << 32)) >> 32) - 1; - assert(len > 0); - offset = offset & 0xFFFFFFFF; - - // pointer to varlen field - uint8_t *ptr = (uint8_t *) _buffer + sizeof(int64_t) * phys_col + calcBitmapSize(_requiresBitmap) + offset; - - // get number of elements - uint64_t num_elements = *(uint64_t *) ptr; - ptr += sizeof(uint64_t); - if (elType == python::Type::STRING) { - // read each string - for (int i = 0; i < num_elements; i++) { - auto str_offset = *(int64_t *) ptr; - els.push_back(Field((const char *) (ptr + str_offset))); - ptr += sizeof(uint64_t); + return List::from_vector(els); + } + + assert(phys_col < (inferLength(_buffer) - sizeof(int64_t)) / sizeof(int64_t)); // sharper bound because of varlen + // get offset: offset is in the lower 32bit, the upper are the size of the var entry + int64_t offset = *((int64_t *) ((uint8_t *) _buffer + sizeof(int64_t) * phys_col + calcBitmapSize(_requiresBitmap))); + int64_t len = ((offset & (0xFFFFFFFFl << 32)) >> 32) - 1; + assert(len > 0); + offset = offset & 0xFFFFFFFF; + + // pointer to varlen field + uint8_t *ptr = (uint8_t *) _buffer + sizeof(int64_t) * phys_col + calcBitmapSize(_requiresBitmap) + offset; + + return getListHelper(list_type, ptr); + } + + Tuple Deserializer::getTupleHelper(const python::Type &tupleType, const uint8_t *ptr) const { + auto tree = TupleTree(tupleType); + Field f; + std::vector bitmapV; + size_t bitmapIndex = 0; + size_t numOptionalFields = python::numOptionalFields(tupleType); + if(numOptionalFields) { + auto numBitmapFields = core::ceilToMultiple(numOptionalFields, 64ul)/64; + auto bitmapSize = numBitmapFields * sizeof(uint64_t); + auto *bitmapAddr = (uint64_t *)ptr; + ptr += bitmapSize; + for (size_t i = 0; i < numOptionalFields; i++) { + bool currBit = (bitmapAddr[i/64] >> (i % 64)) & 1; + bitmapV.push_back(currBit); + } + } + + auto flattenedTupleType = flattenedType(tupleType); + for (size_t i = 0; i < tree.numElements(); i++) { + auto currFieldType = flattenedTupleType.parameters()[i]; + if(currFieldType == python::Type::BOOLEAN) { + f = Field((bool)(*(uint64_t *)ptr)); + ptr += sizeof(uint64_t); + } else if(currFieldType == python::Type::I64) { + f = Field(*(int64_t *)ptr); + ptr += sizeof(uint64_t); + } else if(currFieldType == python::Type::F64) { + f = Field(*(double *)ptr); + ptr += sizeof(uint64_t); + } else if(currFieldType == python::Type::STRING) { + uint64_t sizeOffset = *(uint64_t *)ptr; // offset field for string is (size | offset) + uint64_t strOffset = sizeOffset & 0xFFFFFFFF; + f = Field((const char *)(ptr + strOffset)); + ptr += sizeof(uint64_t); + } else if(currFieldType.isListType()) { + auto listOffset = *(int64_t *)ptr; + f = Field(getListHelper(currFieldType, ptr + listOffset)); + ptr += sizeof(uint64_t); + } else if(currFieldType == python::Type::NULLVALUE) { + f = Field::null(); + } else if(currFieldType == python::Type::EMPTYTUPLE) { + f = Field::empty_tuple(); + } else if(currFieldType == python::Type::EMPTYLIST) { + f = Field::empty_list(); + } else if(currFieldType == python::Type::EMPTYDICT) { + f = Field::empty_dict(); + } else if(currFieldType.isOptionType()) { + // need to check bitmapV + auto underlyingType = currFieldType.getReturnType(); + if(underlyingType == python::Type::BOOLEAN) { + if(bitmapV[bitmapIndex]) { + // is None + f = Field(option::none); + } else { + f = Field(option((bool)(*(uint64_t *)ptr))); + ptr += sizeof(uint64_t); + } + } else if(underlyingType == python::Type::I64) { + if(bitmapV[bitmapIndex]) { + // is None + f = Field(option::none); + } else { + f = Field(option(*(int64_t *)ptr)); + ptr += sizeof(uint64_t); + } + } else if(underlyingType == python::Type::F64) { + if(bitmapV[bitmapIndex]) { + // is None + f = Field(option::none); + } else { + f = Field(option(*(double *)ptr)); + ptr += sizeof(uint64_t); + } + } else if(underlyingType == python::Type::STRING) { + if(bitmapV[bitmapIndex]) { + // is None + f = Field(option::none); + } else { + uint64_t sizeOffset = *(uint64_t *)ptr; // offset field for string is (size | offset) + uint64_t strOffset = sizeOffset & 0xFFFFFFFF; + f = Field(option((const char *)(ptr + strOffset))); + ptr += sizeof(uint64_t); + } + } else if(underlyingType.isListType()) { + if(bitmapV[bitmapIndex]) { + // is None + f = Field::null(currFieldType); + } else { + auto listOffset = *(int64_t *)ptr; + f = Field(option(getListHelper(underlyingType, ptr + listOffset))); + ptr += sizeof(uint64_t); + } + } else if(underlyingType.isTupleType()) { + if(bitmapV[bitmapIndex]) { + // is None + f = Field::null(currFieldType); + } else { + auto tupleOffset = *(int64_t *)ptr; + f = Field(option(getTupleHelper(underlyingType, ptr + tupleOffset))); + ptr += sizeof(uint64_t); + } + } else { + Logger::instance().logger("deserializer").error( + "invalid element type in tuple: " + currFieldType.desc() + " encountered, can't deserialize."); } + // increment bitmapIndex for optional fields only + bitmapIndex++; } else { - // read each value - for(int i = 0; i< num_elements; i++) { - if(elType == python::Type::BOOLEAN) { - els.push_back(Field((bool)(*(uint64_t *)ptr))); - } else if(elType == python::Type::I64) { - els.push_back(Field(*(int64_t *)ptr)); + Logger::instance().logger("deserializer").error( + "invalid element type in tuple: " + currFieldType.desc() + " encountered, can't deserialize."); + } + tree.set(i, f); + } + return flattenToTuple(tree); + } + + Tuple Deserializer::getOptionTuple(const int col) const { + assert(_buffer); + + // check type + auto tupleType = _flattenedRowType.parameters()[col].getReturnType(); + assert(tupleType.isTupleType() && tupleType != python::Type::EMPTYTUPLE); + + // get physical column + auto phys_col = logicalToPhysicalIndex(col); + assert(phys_col >= 0); + assert(phys_col < (inferLength(_buffer) - sizeof(uint64_t)) / sizeof(uint64_t)); + + uint64_t offset = *((uint64_t *) ((uint8_t *) _buffer + sizeof(uint64_t) * phys_col + calcBitmapSize(_requiresBitmap))); + uint64_t len = ((offset & (0xFFFFFFFFl << 32)) >> 32) - 1; + assert(len > 0); + offset = offset & 0xFFFFFFFF; + + // get ptr to tuple data + uint8_t *ptr = (uint8_t *) _buffer + sizeof(uint64_t) * phys_col + calcBitmapSize(_requiresBitmap) + offset; + + return getTupleHelper(tupleType, ptr); + } + + List Deserializer::getListHelper(const python::Type &listType, const uint8_t *ptr) const { + auto elType = listType.elementType(); + std::vector els; + // get number of elements + uint64_t numElements = *(uint64_t *)ptr; + ptr += sizeof(uint64_t); + + if(listType.isSingleValued()) { + // does not need memory data + // fill els with numElements of the single value + if(elType == python::Type::NULLVALUE) { + els.insert(els.end(), numElements, Field::null()); + } else if(elType == python::Type::EMPTYTUPLE) { + els.insert(els.end(), numElements, Field::empty_tuple()); + } else if(elType == python::Type::EMPTYLIST) { + els.insert(els.end(), numElements, Field::empty_list()); + } else if(elType == python::Type::EMPTYDICT) { + els.insert(els.end(), numElements, Field::empty_dict()); + } else { + throw std::runtime_error("Unsupported list element type deserialized: " + elType.desc()); + } + return List::from_vector(els); + } + + std::vector bitmapV; + if (elType.isOptionType()) { + auto numBitmapFields = core::ceilToMultiple((unsigned long)numElements, 64ul)/64; + auto bitmapSize = numBitmapFields * sizeof(uint64_t); + auto *bitmapAddr = (uint64_t *)ptr; + ptr += bitmapSize; + for (size_t i = 0; i < numElements; i++) { + bool currBit = (bitmapAddr[i/64] >> (i % 64)) & 1; + bitmapV.push_back(currBit); + } + } + + if(elType == python::Type::I64) { + for (size_t i = 0; i < numElements; i++) { + els.emplace_back(Field(*(int64_t *)ptr)); + ptr += sizeof(uint64_t); + } + } else if(elType == python::Type::F64) { + for (size_t i = 0; i < numElements; i++) { + els.emplace_back(Field(*(double *)ptr)); + ptr += sizeof(uint64_t); + } + } else if(elType == python::Type::BOOLEAN) { + for (size_t i = 0; i < numElements; i++) { + els.emplace_back(Field(bool(*(uint64_t *)ptr))); + ptr += sizeof(uint64_t); + } + } else if(elType == python::Type::STRING) { + // read each string + for (size_t i = 0; i < numElements; i++) { + uint64_t strOffset = *(uint64_t *)ptr; + els.emplace_back(Field((const char *)(ptr + strOffset))); + ptr += sizeof(uint64_t); + } + } else if(elType.isTupleType()) { + // read each tuple + for (size_t i = 0; i < numElements; i++) { + auto tupleOffset = *(uint64_t *) ptr; + els.emplace_back(Field(getTupleHelper(elType, ptr + tupleOffset))); + ptr += sizeof(uint64_t); + } + } else if(elType.isListType()) { + // read each list + for (size_t i = 0; i < numElements; i++) { + auto listOffset = *(uint64_t *) ptr; + els.emplace_back(Field(getListHelper(elType, ptr + listOffset))); + ptr += sizeof(uint64_t); + } + } else if(elType.isOptionType()) { + auto underlyingElType = elType.getReturnType(); + if(underlyingElType == python::Type::STRING) { + // check for none then read each string + for (size_t i = 0; i < numElements; i++) { + if(bitmapV[i]) { + // is None + els.emplace_back(Field(option::none)); + } else { + uint64_t strOffset = *(uint64_t *)ptr; + els.emplace_back(Field(option((const char *)(ptr + strOffset)))); + ptr += sizeof(uint64_t); + } + } + } else if(underlyingElType.isTupleType()) { + // check for none then read each tuple + for (size_t i = 0; i < numElements; i++) { + if(bitmapV[i]) { + // is None + els.emplace_back(Field::null(elType)); + } else { + auto tupleOffset = *(uint64_t *) ptr; + els.emplace_back(Field(option( + getTupleHelper(underlyingElType, ptr + tupleOffset)))); + ptr += sizeof(uint64_t); + } + } + } else if(underlyingElType.isListType()) { + // check for none then read each list + for (size_t i = 0; i < numElements; i++) { + if(bitmapV[i]) { + // is None + els.emplace_back(Field::null(elType)); } else { - els.push_back(Field((*(double*)ptr))); + auto listOffset = *(uint64_t *) ptr; + els.emplace_back(Field(option(getListHelper(underlyingElType, ptr + listOffset)))); + ptr += sizeof(uint64_t); } - ptr += sizeof(uint64_t); } + } else if(underlyingElType == python::Type::BOOLEAN) { + // check for none then read each value + for (size_t i = 0; i < numElements; i++) { + if(bitmapV[i]) { + // is None + els.emplace_back(Field(option::none)); + } else { + els.emplace_back(Field(option((bool)(*(uint64_t *)ptr)))); + ptr += sizeof(uint64_t); + } + } + } else if(underlyingElType == python::Type::I64) { + // check for none then read each value + for (size_t i = 0; i < numElements; i++) { + if(bitmapV[i]) { + // is None + els.emplace_back(Field(option::none)); + } else { + els.emplace_back(Field(option(*(int64_t *)ptr))); + ptr += sizeof(uint64_t); + } + } + } else if(underlyingElType == python::Type::F64) { + // check for none then read each value + for (size_t i = 0; i < numElements; i++) { + if(bitmapV[i]) { + // is None + els.emplace_back(Field(option::none)); + } else { + els.emplace_back(Field(option((double)(*(double *)ptr)))); + ptr += sizeof(uint64_t); + } + } + } else { + Logger::instance().logger("deserializer").error( + "invalid list type: " + listType.desc() + " encountered, can't deserialize."); } + } else { + Logger::instance().logger("deserializer").error( + "invalid list type: " + listType.desc() + " encountered, can't deserialize."); } return List::from_vector(els); } @@ -1165,8 +1727,10 @@ namespace tuplex { else if(rt == python::Type::EMPTYLIST) f = Field(isNull(i) ? option::none : option(List())); else if(rt.isListType()) - f = Field(isNull(i) ? option::none : option(getList(i))); - else { + f = isNull(i) ? Field::null(type) : Field(option(getList(i))); + else if(rt.isTupleType()) { + f = isNull(i) ? Field::null(type) : Field(option(getOptionTuple(i))); + } else { f = Field::null(); // default to NULL Logger::instance().defaultLogger().error( "unknown type '" + type.desc() + "' occurred when trying to attempt deserialization of field"); diff --git a/tuplex/utils/src/TypeSystem.cc b/tuplex/utils/src/TypeSystem.cc index 867573a75..2fd3fe064 100644 --- a/tuplex/utils/src/TypeSystem.cc +++ b/tuplex/utils/src/TypeSystem.cc @@ -960,6 +960,119 @@ namespace python { return true; } + Type unifyTypes(const python::Type &a, const python::Type &b, bool autoUpcast) { + // UNKNOWN type is not compatible + if(a == python::Type::UNKNOWN || b == python::Type::UNKNOWN) { + return python::Type::UNKNOWN; + } + + // same type, return either one + if(a == b) { + return a; + } + + if(a == python::Type::NULLVALUE) { + return python::Type::makeOptionType(b); + } + + if(b == python::Type::NULLVALUE) { + return python::Type::makeOptionType(a); + } + + // check for optional type + bool makeOption = false; + // underlyingType: remove outermost Option if exists + python::Type aUnderlyingType = a; + python::Type bUnderlyingType = b; + if(a.isOptionType()) { + makeOption = true; + aUnderlyingType = a.getReturnType(); + } + + if(b.isOptionType()) { + makeOption = true; + bUnderlyingType = b.getReturnType(); + } + + // same underlying types? make option + if (aUnderlyingType == bUnderlyingType) { + return python::Type::makeOptionType(aUnderlyingType); + } + + // both numeric types? upcast + if(autoUpcast) { + if(aUnderlyingType.isNumericType() && bUnderlyingType.isNumericType()) { + if(aUnderlyingType == python::Type::F64 || bUnderlyingType == python::Type::F64) { + // upcast to F64 if either is F64 + if (makeOption) { + return python::Type::makeOptionType(python::Type::F64); + } else { + return python::Type::F64; + } + } + // at this point underlyingTypes cannot both be bool. Upcast to I64 + if (makeOption) { + return python::Type::makeOptionType(python::Type::I64); + } else { + return python::Type::I64; + } + } + } + + // list type? check if element type compatible + if(aUnderlyingType.isListType() && bUnderlyingType.isListType() && aUnderlyingType != python::Type::EMPTYLIST && bUnderlyingType != python::Type::EMPTYLIST) { + python::Type newElementType = unifyTypes(aUnderlyingType.elementType(), bUnderlyingType.elementType(), + autoUpcast); + if(newElementType == python::Type::UNKNOWN) { + // incompatible list element type + return python::Type::UNKNOWN; + } + if(makeOption) { + return python::Type::makeOptionType(python::Type::makeListType(newElementType)); + } + return python::Type::makeListType(newElementType); + } + + // tuple type? check if every parameter type compatible + if(aUnderlyingType.isTupleType() && bUnderlyingType.isTupleType()) { + if (aUnderlyingType.parameters().size() != bUnderlyingType.parameters().size()) { + // tuple length differs + return python::Type::UNKNOWN; + } + std::vector newTuple; + for (size_t i = 0; i < aUnderlyingType.parameters().size(); i++) { + python::Type newElementType = unifyTypes(aUnderlyingType.parameters()[i], + bUnderlyingType.parameters()[i], autoUpcast); + if(newElementType == python::Type::UNKNOWN) { + // incompatible tuple element type + return python::Type::UNKNOWN; + } + newTuple.emplace_back(newElementType); + } + if(makeOption) { + return python::Type::makeOptionType(python::Type::makeTupleType(newTuple)); + } + return python::Type::makeTupleType(newTuple); + } + + // dictionary type + if(aUnderlyingType.isDictionaryType() && bUnderlyingType.isDictionaryType()) { + auto key_t = unifyTypes(aUnderlyingType.keyType(), bUnderlyingType.keyType(), autoUpcast); + auto val_t = unifyTypes(aUnderlyingType.elementType(), bUnderlyingType.elementType(), autoUpcast); + if(key_t == python::Type::UNKNOWN || val_t == python::Type::UNKNOWN) { + return python::Type::UNKNOWN; + } + if(makeOption) { + return python::Type::makeOptionType(python::Type::makeDictionaryType(key_t, val_t)); + } else { + return python::Type::makeDictionaryType(key_t, val_t); + } + } + + // other non-supported types + return python::Type::UNKNOWN; + } + bool python::Type::isZeroSerializationSize() const { if(*this == python::Type::NULLVALUE) return true;