diff --git a/doc/source/conf.py b/doc/source/conf.py index 29e9d36a6..52275332d 100755 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -36,7 +36,7 @@ # The short X.Y version version="0.3" # The full version, including alpha/beta/rc tags -release="0.3.4dev" +release="0.3.4" # -- General configuration --------------------------------------------------- diff --git a/scripts/install_debug_python.sh b/scripts/install_debug_python.sh new file mode 100755 index 000000000..209ebeb9f --- /dev/null +++ b/scripts/install_debug_python.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash + +PYTHON_VERSION="3.9.14" +URL="https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-${PYTHON_VERSION}.tar.xz" + +# specify here with what flags to build the version (cf. https://pythonextensionpatterns.readthedocs.io/en/latest/debugging/debug_in_ide.html) +# and https://pythonextensionpatterns.readthedocs.io/en/latest/debugging/debug_python.html +DEBUG_OPTIONS="--with-pydebug --without-pymalloc --with-valgrind" +PREFIX="$HOME/.local/python${PYTHON_VERSION}-dbg" + +[ -d .cache ] && rm -rf .cache +mkdir -p .cache + +# save current working dir +CWD=$PWD + +cd .cache + +# download python +echo ">>> downloading python ${PYTHON_VERSION}" +wget $URL +echo ">>> extracting python" +tar xf Python-${PYTHON_VERSION}.tar.xz +cd Python-${PYTHON_VERSION} + +mkdir -p debug && cd debug && ../configure --prefix=${PREFIX} ${DEBUG_OPTIONS} && make -j$(nproc) && make test + + +cd $CWD + diff --git a/scripts/set_version.py b/scripts/set_version.py index 9ca7eac05..d1b6a08c0 100755 --- a/scripts/set_version.py +++ b/scripts/set_version.py @@ -15,7 +15,7 @@ def LooseVersion(v): # to create a testpypi version use X.Y.devN -version = '0.3.4dev' +version = '0.3.4' # https://pypi.org/simple/tuplex/ # or https://test.pypi.org/simple/tuplex/ diff --git a/setup.py b/setup.py index 82646935e..6f1144b0e 100644 --- a/setup.py +++ b/setup.py @@ -653,7 +653,7 @@ def tplx_package_data(): # logic and declaration, and simpler if you include description/version in a file. setup(name="tuplex", python_requires='>=3.7.0', - version="0.3.4dev", + version="0.3.4", author="Leonhard Spiegelberg", author_email="tuplex@cs.brown.edu", description="Tuplex is a novel big data analytics framework incorporating a Python UDF compiler based on LLVM " diff --git a/tuplex/CMakeLists.txt b/tuplex/CMakeLists.txt index 44249fe8b..9fd0a9ef4 100755 --- a/tuplex/CMakeLists.txt +++ b/tuplex/CMakeLists.txt @@ -567,12 +567,13 @@ function(FindPython3Exe NAMES VERSION EXECUTABLE) # check version (must match VERSION) execute_process(COMMAND "${TEMP_EXE}" -c "import platform;print(platform.python_version())" RESULT_VARIABLE _result OUTPUT_VARIABLE TEMP_VERSION OUTPUT_STRIP_TRAILING_WHITESPACE) # check if version matches - - compare_version_strings(${VERSION} ${TEMP_VERSION} _result) - if(result EQUAL 0) - message(STATUS "Found ${TEMP_EXE} with version ${TEMP_VERSION} matching desired version ${VERSION}") - set(${EXECUTABLE} ${TEMP_EXE} PARENT_SCOPE) # write out - endif() + if(VERSION AND TEMP_VERSION) + compare_version_strings(${VERSION} ${TEMP_VERSION} _result) + if(result EQUAL 0) + message(STATUS "Found ${TEMP_EXE} with version ${TEMP_VERSION} matching desired version ${VERSION}") + set(${EXECUTABLE} ${TEMP_EXE} PARENT_SCOPE) # write out + endif() + endif() endif() endfunction() diff --git a/tuplex/adapters/cpython/include/PythonHelpers.h b/tuplex/adapters/cpython/include/PythonHelpers.h index b3abf40a9..4ed86197e 100644 --- a/tuplex/adapters/cpython/include/PythonHelpers.h +++ b/tuplex/adapters/cpython/include/PythonHelpers.h @@ -354,16 +354,17 @@ namespace python { */ extern void unlockGIL(); + /*! + * needs to be called if using as C-extension to setup gil etc. MUST BE CALLED FROM MAIN THREAD + */ + extern void registerWithInterpreter(); + /*! * check whether this thread holds the GIL or not * @return */ extern bool holdsGIL(); - /*! - * required in python extension module. - */ - extern void acquireGIL(); /*! * runs python code (throws runtime error if err occurred) and retrieves object with name diff --git a/tuplex/adapters/cpython/src/PythonGIL.cc b/tuplex/adapters/cpython/src/PythonGIL.cc index 1a1b705ab..54754a5d0 100644 --- a/tuplex/adapters/cpython/src/PythonGIL.cc +++ b/tuplex/adapters/cpython/src/PythonGIL.cc @@ -14,6 +14,8 @@ #include #include +#include + namespace python { // GIL details: @@ -27,58 +29,110 @@ namespace python { ss.flush(); auto thread_id = ss.str(); int64_t id = -1; +#ifndef LINUX sscanf(thread_id.c_str(), "%lld", &id); +#else + sscanf(thread_id.c_str(), "%ld", &id); +#endif return id; } // GIL management here static std::atomic_bool gil(false); // true if a thread holds the gil, false else static std::mutex gilMutex; // access to all the properties below + PyGILState_STATE gstate; // for non-main thread lock + + // cf. https://pythonextensionpatterns.readthedocs.io/en/latest/thread_safety.html#f1 + static PyThread_type_lock gil_lock(nullptr); + + static void acquire_lock() { + // lazy init lock -> called on first entry. + if(!gil_lock) { + gil_lock = PyThread_allocate_lock(); + if(!gil_lock) { + std::cerr<<"failed to initialize lock"< convert to uint64_t and use this for thread safe access - static std::atomic_int64_t gilID(-1); // id of thread who holds gil - static std::atomic_int64_t interpreterID(-1); // thread which holds the interpreter static std::atomic_bool interpreterInitialized(false); // checks whether interpreter is initialized or not + std::thread::id gil_main_thread_id; // id of the main thread. + std::thread::id gil_id; // id of the thread holding the gil right now. // vars for python management static std::atomic gilState(nullptr); + void registerWithInterpreter() { + if(!interpreterInitialized) { + interpreterInitialized = true; + gil_main_thread_id = std::this_thread::get_id(); + gil_id = gil_main_thread_id; + gilState = PyGILState_GetThisThreadState(); + } + } + void lockGIL() { - gilMutex.lock(); - assert(gilState); - PyEval_RestoreThread(gilState); // acquires GIL! + gilMutex.lock(); // <-- acquire the managing lock. No other thread can lock the gil! => what if another thread tries to unlock? -> security concern... + + // what is the current thread id? is it the main thread? => then lock the gil via restore thread etc. + // if not, need to use GILState_Ensure + if(std::this_thread::get_id() == gil_main_thread_id) { + if(!gilState) + gilState = PyGILState_GetThisThreadState(); + assert(gilState); + PyEval_RestoreThread(gilState); // acquires GIL! + } else { + assert(interpreterInitialized); + gstate = PyGILState_Ensure(); + } + assert(PyGILState_Check()); + gil_id = std::this_thread::get_id(); gil = true; - gilID = thisThreadID(); + gilState = nullptr; } void unlockGIL() { - gilMutex.unlock(); + // is it the main thread? and does it hold the manipulation lock? + if(std::this_thread::get_id() == gil_main_thread_id) { + gilState = PyEval_SaveThread(); + } else { + assert(interpreterInitialized); + PyGILState_Release(gstate); + gstate = PyGILState_UNLOCKED; + } + gil_id = std::thread::id(); gil = false; - gilState = PyEval_SaveThread(); - gilID = thisThreadID(); + gilMutex.unlock(); } bool holdsGIL() { - return gil; - } - - void acquireGIL() { - gilMutex.lock(); - PyEval_AcquireLock(); - PyEval_AcquireThread(gilState); // acquires GIL! - gil = true; - gilID = thisThreadID(); + // thread holds gil if it is hold in general and thread ids match. + return gil && std::this_thread::get_id() == gil_id; } void initInterpreter() { + gil_main_thread_id = std::this_thread::get_id(); + if(interpreterInitialized) throw std::runtime_error("interpreter was already initialized, abort"); // check if this function is called within a python interpreter or not if(!Py_IsInitialized()) { - Py_InitializeEx(0); // 0 to skip initialization of signal handlers, 1 would register them. #if (PY_MAJOR_VERSION >= 3 && PY_MINOR_VERSION < 7) @@ -86,27 +140,24 @@ namespace python { PyEval_InitThreads(); assert(PyEval_ThreadsInitialized()); #endif - // assume we are calling from python process/shared object - gilMutex.lock(); - gil = true; - gilID = interpreterID = thisThreadID(); } else { - // make sure this thread rn holds the GIL! if(!PyGILState_Check()) throw std::runtime_error("when initializing the thread, initInterpreter MUST hold the GIL"); - - // assume we are calling from python process/shared object - gilMutex.lock(); - gil = true; - gilID = interpreterID = thisThreadID(); } + gil_lock = nullptr; // this is the start, we're in the interpreter... + // acquire and release to initialize, works b.c. single-threaded interpreter... + acquire_lock(); + release_lock(); + gil = true; + + gil_id = std::this_thread::get_id(); + gilMutex.lock(); interpreterInitialized = true; } void closeInterpreter() { - if(!PyGILState_Check() || !holdsGIL()) throw std::runtime_error("to shutdown interpreter, GIL must be hold the calling thread..."); @@ -120,11 +171,17 @@ namespace python { PyErr_Clear(); } Py_FinalizeEx(); - + // now set to uninitialized. interpreterInitialized = false; - // unlock - if(gil) - gilMutex.unlock(); + if (gil_lock) { + PyThread_free_lock(gil_lock); + gil_lock = NULL; + } + gilMutex.unlock(); + + // reset vars (except main thread id!) + gil = false; + gil_lock = nullptr; } } \ No newline at end of file diff --git a/tuplex/adapters/cpython/src/PythonHelpers.cc b/tuplex/adapters/cpython/src/PythonHelpers.cc index aa87c22bd..eb5f8ebff 100644 --- a/tuplex/adapters/cpython/src/PythonHelpers.cc +++ b/tuplex/adapters/cpython/src/PythonHelpers.cc @@ -1363,6 +1363,8 @@ namespace python { // mapping type to internal types, unknown as default python::Type mapPythonClassToTuplexType(PyObject *o, bool autoUpcast) { + assert(o); + if(Py_None == o) return python::Type::NULLVALUE; diff --git a/tuplex/core/CMakeLists.txt b/tuplex/core/CMakeLists.txt index d14ad349f..da224299a 100755 --- a/tuplex/core/CMakeLists.txt +++ b/tuplex/core/CMakeLists.txt @@ -83,4 +83,5 @@ target_link_libraries(libcore Boost::thread Boost::system Boost::filesystem - ) \ No newline at end of file + util + ) diff --git a/tuplex/core/src/TraceVisitor.cc b/tuplex/core/src/TraceVisitor.cc index 29d6f0b5d..9e2ad891a 100644 --- a/tuplex/core/src/TraceVisitor.cc +++ b/tuplex/core/src/TraceVisitor.cc @@ -120,6 +120,7 @@ namespace tuplex { auto sym = PyDict_GetItemString(mainDict, node->_name.c_str()); if(sym) { + Py_XINCREF(sym); addTraceResult(node, TraceItem(sym, node->_name)); } else { @@ -128,9 +129,10 @@ namespace tuplex { auto builtinDict = PyModule_GetDict(builtins); assert(builtinDict); sym = PyDict_GetItemString(builtinDict, node->_name.c_str()); - if(sym) + if(sym) { + Py_XINCREF(sym); addTraceResult(node, TraceItem(sym, node->_name)); - else { + } else { PyErr_SetString(PyExc_NameError, ("could not find identifier " + node->_name).c_str()); // i.e., could early exit function... diff --git a/tuplex/core/src/ee/local/LocalBackend.cc b/tuplex/core/src/ee/local/LocalBackend.cc index 3b303a5fe..f014e766a 100644 --- a/tuplex/core/src/ee/local/LocalBackend.cc +++ b/tuplex/core/src/ee/local/LocalBackend.cc @@ -28,6 +28,7 @@ #include #include #include +#include namespace tuplex { @@ -1284,14 +1285,16 @@ namespace tuplex { auto input_intermediates = tstage->initData(); // lazy init hybrids - if(!input_intermediates.hybrids) { + if(!input_intermediates.hybrids && input_intermediates.numArgs > 0) { auto num_predecessors = tstage->predecessors().size(); input_intermediates.hybrids = new PyObject*[num_predecessors]; // @TODO: free these intermediates. Where is this done? for(int i = 0; i < num_predecessors; ++i) input_intermediates.hybrids[i] = nullptr; + } else { + assert(input_intermediates.hybrids == nullptr); + input_intermediates.hybrids = nullptr; } - python::lockGIL(); // construct intermediates from predecessors @@ -1321,7 +1324,6 @@ namespace tuplex { python::unlockGIL(); // check whether hybrids exist. If not, create them quickly! - assert(input_intermediates.hybrids); logger().info("creating hybrid intermediates took " + std::to_string(timer.time()) + "s"); timer.reset(); diff --git a/tuplex/core/src/physical/ResolveTask.cc b/tuplex/core/src/physical/ResolveTask.cc index 0e03a0bcf..fa9ab3312 100644 --- a/tuplex/core/src/physical/ResolveTask.cc +++ b/tuplex/core/src/physical/ResolveTask.cc @@ -457,270 +457,279 @@ namespace tuplex { // fallback 2: interpreter path // --> only go there if a non-true exception was recorded. Else, it will be dealt with above if(resCode == -1 && _interpreterFunctor) { + assert(!python::holdsGIL()); // acquire GIL python::lockGIL(); - PyCallable_Check(_interpreterFunctor); - - // holds the pythonized data - PyObject* tuple = nullptr; - - bool parse_cells = false; - - // there are different data reps for certain error codes. - // => decode the correct object from memory & then feed it into the pipeline... - if(ecCode == ecToI64(ExceptionCode::BADPARSE_STRING_INPUT)) { - // it's a string! - tuple = tupleFromParseException(ebuf, eSize); - parse_cells = true; // need to parse cells in python mode. - } else if(ecCode == ecToI64(ExceptionCode::NORMALCASEVIOLATION)) { - // changed, why are these names so random here? makes no sense... - auto row = Row::fromMemory(exceptionsInputSchema(), ebuf, eSize); - - tuple = python::rowToPython(row, true); - parse_cells = false; - // called below... - } else if (ecCode == ecToI64(ExceptionCode::PYTHON_PARALLELIZE)) { - auto pyObj = python::deserializePickledObject(python::getMainModule(), (char *) ebuf, eSize); - tuple = pyObj; - parse_cells = false; - } else { - // normal case, i.e. an exception occurred somewhere. - // --> this means if pipeline is using string as input, we should convert - auto row = Row::fromMemory(exceptionsInputSchema(), ebuf, eSize); - // cell source automatically takes input, i.e. no need to convert. simply get tuple from row object - tuple = python::rowToPython(row, true); + // catch any exceptions here + try { + PyCallable_Check(_interpreterFunctor); + + // holds the pythonized data + PyObject* tuple = nullptr; + bool parse_cells = false; + + // there are different data reps for certain error codes. + // => decode the correct object from memory & then feed it into the pipeline... + if(ecCode == ecToI64(ExceptionCode::BADPARSE_STRING_INPUT)) { + // it's a string! + tuple = tupleFromParseException(ebuf, eSize); + parse_cells = true; // need to parse cells in python mode. + } else if(ecCode == ecToI64(ExceptionCode::NORMALCASEVIOLATION)) { + // changed, why are these names so random here? makes no sense... + auto row = Row::fromMemory(exceptionsInputSchema(), ebuf, eSize); + + tuple = python::rowToPython(row, true); + parse_cells = false; + // called below... + } else if (ecCode == ecToI64(ExceptionCode::PYTHON_PARALLELIZE)) { + auto pyObj = python::deserializePickledObject(python::getMainModule(), (char *) ebuf, eSize); + tuple = pyObj; + parse_cells = false; + } else { + // normal case, i.e. an exception occurred somewhere. + // --> this means if pipeline is using string as input, we should convert + auto row = Row::fromMemory(exceptionsInputSchema(), ebuf, eSize); + + // cell source automatically takes input, i.e. no need to convert. simply get tuple from row object + tuple = python::rowToPython(row, true); #ifndef NDEBUG - if(PyTuple_Check(tuple)) { - // make sure tuple is valid... - for(unsigned i = 0; i < PyTuple_Size(tuple); ++i) { - auto elemObj = PyTuple_GET_ITEM(tuple, i); - assert(elemObj); + if(PyTuple_Check(tuple)) { + // make sure tuple is valid... + for(unsigned i = 0; i < PyTuple_Size(tuple); ++i) { + auto elemObj = PyTuple_GET_ITEM(tuple, i); + assert(elemObj); + } } - } #endif - parse_cells = false; - } + parse_cells = false; + } - // compute - // @TODO: we need to encode the hashmaps as these hybrid objects! - // ==> for more efficiency we prob should store one per executor! - // the same goes for any hashmap... + // compute + // @TODO: we need to encode the hashmaps as these hybrid objects! + // ==> for more efficiency we prob should store one per executor! + // the same goes for any hashmap... - assert(tuple); + assert(tuple); #ifndef NDEBUG - if(!tuple) { - owner()->error("bad decode, using () as dummy..."); - tuple = PyTuple_New(0); // empty tuple. - } + if(!tuple) { + owner()->error("bad decode, using () as dummy..."); + tuple = PyTuple_New(0); // empty tuple. + } #endif - // note: current python pipeline always expects a tuple arg. hence pack current element. - if(PyTuple_Check(tuple) && PyTuple_Size(tuple) > 1) { - // nothing todo... - } else { - auto tmp_tuple = PyTuple_New(1); - PyTuple_SET_ITEM(tmp_tuple, 0, tuple); - tuple = tmp_tuple; - } + // note: current python pipeline always expects a tuple arg. hence pack current element. + if(PyTuple_Check(tuple) && PyTuple_Size(tuple) > 1) { + // nothing todo... + } else { + auto tmp_tuple = PyTuple_New(1); + PyTuple_SET_ITEM(tmp_tuple, 0, tuple); + tuple = tmp_tuple; + } #ifndef NDEBUG - // // to print python object - // Py_XINCREF(tuple); - // PyObject_Print(tuple, stdout, 0); - // std::cout< note: unify handling this with the other cases... - assert(_htable->hybrid_hm); - Py_XINCREF(_htable->hybrid_hm); - PyTuple_SET_ITEM(args, num_python_args - 1, _htable->hybrid_hm); - } + // special case unique, no arg required (done via output) + if(hasHashTableSink() && _hash_agg_type == AggregateType::AGG_UNIQUE) + num_python_args -= 1; - auto kwargs = PyDict_New(); PyDict_SetItemString(kwargs, "parse_cells", python::boolean(parse_cells)); - auto pcr = python::callFunctionEx(_interpreterFunctor, args, kwargs); + PyObject* args = PyTuple_New(num_python_args); + PyTuple_SET_ITEM(args, 0, tuple); + for(unsigned i = 0; i < _py_intermediates.size(); ++i) { + Py_XINCREF(_py_intermediates[i]); + PyTuple_SET_ITEM(args, i + 1, _py_intermediates[i]); + } + // set hash table sink + if(hasHashTableSink() && _hash_agg_type != AggregateType::AGG_UNIQUE) { // special case: unique -> note: unify handling this with the other cases... + assert(_htable->hybrid_hm); + Py_XINCREF(_htable->hybrid_hm); + PyTuple_SET_ITEM(args, num_python_args - 1, _htable->hybrid_hm); + } - if(pcr.exceptionCode != ExceptionCode::SUCCESS) { - // this should not happen, bad internal error. codegen'ed python should capture everything. - owner()->error("bad internal python error: " + pcr.exceptionMessage); - python::unlockGIL(); - return; - } else { - // all good, row is fine. exception occurred? - assert(pcr.res); + auto kwargs = PyDict_New(); + auto py_parse_cells = python::boolean(parse_cells); + PyDict_SetItemString(kwargs, "parse_cells", py_parse_cells); + auto pcr = python::callFunctionEx(_interpreterFunctor, args, kwargs); - // type check: save to regular rows OR save to python row collection - if(!pcr.res) { - owner()->error("bad internal python error, NULL object returned"); + if(pcr.exceptionCode != ExceptionCode::SUCCESS) { + // this should not happen, bad internal error. codegen'ed python should capture everything. + owner()->error("bad internal python error: " + pcr.exceptionMessage); + python::unlockGIL(); + return; } else { + // all good, row is fine. exception occurred? + assert(pcr.res); -#ifndef NDEBUG - // // uncomment to print res obj - // Py_XINCREF(pcr.res); - // PyObject_Print(pcr.res, stdout, 0); - // std::cout<error("bad internal python error, NULL object returned"); + } else { #ifndef NDEBUG - // // debug printing of exception and what the reason is... - // // print res obj + // // uncomment to print res obj // Py_XINCREF(pcr.res); - // std::cout<<"exception occurred while processing using python: "<(cptr), strlen(cptr), BUF_FORMAT_NORMAL_OUTPUT); // don't write '\0'! - } else { - - // there are three options where to store the result now - - // 1. fits targetOutputSchema (i.e. row becomes normalcase row) - bool outputAsNormalRow = python::Type::UNKNOWN != unifyTypes(rowType, _targetOutputSchema.getRowType(), _allowNumericTypeUnification) - && canUpcastToRowType(rowType, _targetOutputSchema.getRowType()); - // 2. fits generalCaseOutputSchema (i.e. row becomes generalcase row) - bool outputAsGeneralRow = python::Type::UNKNOWN != unifyTypes(rowType, - commonCaseOutputSchema().getRowType(), _allowNumericTypeUnification) - && canUpcastToRowType(rowType, commonCaseOutputSchema().getRowType()); - - // 3. doesn't fit, store as python object. => we should use block storage for this as well. Then data can be shared. - - // can upcast? => note that the && is necessary because of cases where outputSchema is - // i64 but the given row type f64. We can cast up i64 to f64 but not the other way round. - if(outputAsNormalRow) { - Row resRow = python::pythonToRow(rowObj).upcastedRow(_targetOutputSchema.getRowType()); - assert(resRow.getRowType() == _targetOutputSchema.getRowType()); - - // write to buffer & perform callback - auto buf_size = 2 * resRow.serializedLength(); - uint8_t *buf = new uint8_t[buf_size]; - memset(buf, 0, buf_size); - auto serialized_length = resRow.serializeToMemory(buf, buf_size); - // call row func! - // --> merge row distinguishes between those two cases. Distinction has to be done there - // because of compiled functor who calls mergeRow in the write function... - mergeRow(buf, serialized_length, BUF_FORMAT_NORMAL_OUTPUT); - delete [] buf; - } else if(outputAsGeneralRow) { - Row resRow = python::pythonToRow(rowObj).upcastedRow(commonCaseOutputSchema().getRowType()); - assert(resRow.getRowType() == commonCaseOutputSchema().getRowType()); - - // write to buffer & perform callback - auto buf_size = 2 * resRow.serializedLength(); - uint8_t *buf = new uint8_t[buf_size]; - memset(buf, 0, buf_size); - auto serialized_length = resRow.serializeToMemory(buf, buf_size); - // call row func! - // --> merge row distinguishes between those two cases. Distinction has to be done there - // because of compiled functor who calls mergeRow in the write function... - mergeRow(buf, serialized_length, BUF_FORMAT_GENERAL_OUTPUT); - delete [] buf; + auto rowType = python::mapPythonClassToTuplexType(rowObj, false); + + // special case output schema is str (fileoutput!) + if(rowType == python::Type::STRING) { + // write to file, no further type check necessary b.c. + // if it was the object string it would be within a tuple! + auto cptr = PyUnicode_AsUTF8(rowObj); + Py_XDECREF(rowObj); + mergeRow(reinterpret_cast(cptr), strlen(cptr), BUF_FORMAT_NORMAL_OUTPUT); // don't write '\0'! } else { - // Unwrap single element tuples before writing them to the fallback sink - if(PyTuple_Check(rowObj) && PyTuple_Size(rowObj) == 1) { - writePythonObjectToFallbackSink(PyTuple_GetItem(rowObj, 0)); + + // there are three options where to store the result now + + // 1. fits targetOutputSchema (i.e. row becomes normalcase row) + bool outputAsNormalRow = python::Type::UNKNOWN != unifyTypes(rowType, _targetOutputSchema.getRowType(), _allowNumericTypeUnification) + && canUpcastToRowType(rowType, _targetOutputSchema.getRowType()); + // 2. fits generalCaseOutputSchema (i.e. row becomes generalcase row) + bool outputAsGeneralRow = python::Type::UNKNOWN != unifyTypes(rowType, + commonCaseOutputSchema().getRowType(), _allowNumericTypeUnification) + && canUpcastToRowType(rowType, commonCaseOutputSchema().getRowType()); + + // 3. doesn't fit, store as python object. => we should use block storage for this as well. Then data can be shared. + + // can upcast? => note that the && is necessary because of cases where outputSchema is + // i64 but the given row type f64. We can cast up i64 to f64 but not the other way round. + if(outputAsNormalRow) { + Row resRow = python::pythonToRow(rowObj).upcastedRow(_targetOutputSchema.getRowType()); + assert(resRow.getRowType() == _targetOutputSchema.getRowType()); + + // write to buffer & perform callback + auto buf_size = 2 * resRow.serializedLength(); + uint8_t *buf = new uint8_t[buf_size]; + memset(buf, 0, buf_size); + auto serialized_length = resRow.serializeToMemory(buf, buf_size); + // call row func! + // --> merge row distinguishes between those two cases. Distinction has to be done there + // because of compiled functor who calls mergeRow in the write function... + mergeRow(buf, serialized_length, BUF_FORMAT_NORMAL_OUTPUT); + delete [] buf; + } else if(outputAsGeneralRow) { + Row resRow = python::pythonToRow(rowObj).upcastedRow(commonCaseOutputSchema().getRowType()); + assert(resRow.getRowType() == commonCaseOutputSchema().getRowType()); + + // write to buffer & perform callback + auto buf_size = 2 * resRow.serializedLength(); + uint8_t *buf = new uint8_t[buf_size]; + memset(buf, 0, buf_size); + auto serialized_length = resRow.serializeToMemory(buf, buf_size); + // call row func! + // --> merge row distinguishes between those two cases. Distinction has to be done there + // because of compiled functor who calls mergeRow in the write function... + mergeRow(buf, serialized_length, BUF_FORMAT_GENERAL_OUTPUT); + delete [] buf; } else { - writePythonObjectToFallbackSink(rowObj); + // Unwrap single element tuples before writing them to the fallback sink + if(PyTuple_Check(rowObj) && PyTuple_Size(rowObj) == 1) { + writePythonObjectToFallbackSink(PyTuple_GetItem(rowObj, 0)); + } else { + writePythonObjectToFallbackSink(rowObj); + } } + // Py_XDECREF(rowObj); } - // Py_XDECREF(rowObj); } - } #ifndef NDEBUG - if(PyErr_Occurred()) { - // print out the otber objects... - std::cout<<__FILE__<<":"<<__LINE__<<" python error not cleared properly!"< #include #include #include "PythonWrappers.h" +#include namespace tuplex { // wrappers hold the actual objects @@ -41,7 +42,7 @@ namespace tuplex { // convert a flat tuple type fast to list of tuples PyObject* simpleTupleToCPython(ResultSet* rs, const python::Type& type, size_t maxRowCount); public: - PythonDataSet(): _dataset(nullptr) {} + PythonDataSet(): _dataset(nullptr) { python::registerWithInterpreter(); } void wrap(DataSet *dataset) { _dataset = dataset; } diff --git a/tuplex/python/include/PythonException.h b/tuplex/python/include/PythonException.h index f05220aa2..1f82bd236 100644 --- a/tuplex/python/include/PythonException.h +++ b/tuplex/python/include/PythonException.h @@ -32,6 +32,7 @@ namespace tuplex { PythonException(const std::string& message, const std::string& data = "") : _message(message), _data(data) { + python::registerWithInterpreter(); } const char *what() const throw() { diff --git a/tuplex/python/include/PythonMetrics.h b/tuplex/python/include/PythonMetrics.h index 111c73c3e..13844e44f 100644 --- a/tuplex/python/include/PythonMetrics.h +++ b/tuplex/python/include/PythonMetrics.h @@ -25,7 +25,7 @@ namespace tuplex { friend class PythonContext; public: - PythonMetrics(): _metrics(nullptr) {} + PythonMetrics(): _metrics(nullptr) { python::registerWithInterpreter(); } /*! * wraps JobMetrics object in PythonMetrics object * @param metrics pointer to JobMetrics object diff --git a/tuplex/python/setup.py b/tuplex/python/setup.py index 2d5eeac45..512413d6e 100644 --- a/tuplex/python/setup.py +++ b/tuplex/python/setup.py @@ -29,7 +29,7 @@ setup( name="Tuplex", - version="0.3.4dev", + version="0.3.4", packages=find_packages(), package_data={ # include libs in libexec diff --git a/tuplex/python/src/PythonBindings.cc b/tuplex/python/src/PythonBindings.cc index 6b3683853..7909e5e8f 100644 --- a/tuplex/python/src/PythonBindings.cc +++ b/tuplex/python/src/PythonBindings.cc @@ -39,6 +39,8 @@ PYMODULE { m.attr("__version__") = "dev"; #endif + // Note: before constructing any object - call registerWithInterpreter to setup GIL properly! + py::class_(m, "_DataSet") .def("show", &tuplex::PythonDataSet::show) .def("collect", &tuplex::PythonDataSet::collect) @@ -90,4 +92,6 @@ PYMODULE { // global method to register a new logging function m.def("registerLoggingCallback", &tuplex::registerPythonLoggingCallback); + + m.def("registerWithInterpreter", &python::registerWithInterpreter); } \ No newline at end of file diff --git a/tuplex/python/src/PythonCommon.cc b/tuplex/python/src/PythonCommon.cc index 80e8f7e08..affc009cf 100644 --- a/tuplex/python/src/PythonCommon.cc +++ b/tuplex/python/src/PythonCommon.cc @@ -12,6 +12,8 @@ namespace tuplex { py::object registerPythonLoggingCallback(py::object callback_functor) { + python::registerWithInterpreter(); + // get object callback_functor.inc_ref(); auto functor_obj = callback_functor.ptr(); diff --git a/tuplex/python/src/PythonContext.cc b/tuplex/python/src/PythonContext.cc index 65f6ae04f..2d9b11acb 100644 --- a/tuplex/python/src/PythonContext.cc +++ b/tuplex/python/src/PythonContext.cc @@ -1374,6 +1374,9 @@ namespace tuplex { const std::string &runtimeLibraryPath, const std::string& options) : _context(nullptr) { + python::registerWithInterpreter(); + + using namespace std; TUPLEX_TRACE("entering PythonContext"); diff --git a/tuplex/python/tuplex/utils/version.py b/tuplex/python/tuplex/utils/version.py index be3ddfc75..1f80cd420 100644 --- a/tuplex/python/tuplex/utils/version.py +++ b/tuplex/python/tuplex/utils/version.py @@ -1,2 +1,2 @@ # (c) L.Spiegelberg 2017 - 2022 -__version__="0.3.4dev" \ No newline at end of file +__version__="0.3.4" \ No newline at end of file diff --git a/tuplex/test/adapters/cpython/PythonHelperTest.cc b/tuplex/test/adapters/cpython/PythonHelperTest.cc index db070c9bc..5c071ee12 100644 --- a/tuplex/test/adapters/cpython/PythonHelperTest.cc +++ b/tuplex/test/adapters/cpython/PythonHelperTest.cc @@ -391,18 +391,24 @@ TEST_F(PythonHelperTest, FunctionGlobals) { PyObject* pyFunc = python::runAndGet(code, "func"); + // convert globals object to lookup dictionary + unordered_map> globals; + auto pyGlobals = PyFunction_GetGlobals(pyFunc); + + // fetch globals, locals cout<> globals; - auto pyGlobals = PyFunction_GetGlobals(pyFunc); // iterate over dictionary PyObject *key = nullptr, *val = nullptr; Py_ssize_t pos = 0; // must be initialized to 0 to start iteration, however internal iterator variable. Don't use semantically. while(PyDict_Next(pyGlobals, &pos, &key, &val)) { + // b. key/value will be used twice, inc refcount by one + Py_XINCREF(key); + Py_XINCREF(val); auto curKeyType = mapPythonClassToTuplexType(key, false); auto curValType = mapPythonClassToTuplexType(val, false); assert(curKeyType == python::Type::STRING); @@ -416,11 +422,10 @@ TEST_F(PythonHelperTest, FunctionGlobals) { for(auto item : globals) cout<(item.second).desc()<<"]: "<(item.second)< worth it?? prob not really... - - int N = 1001000; - - Timer timer; - auto listObj = PyList_New(N); - for(int i = 0; i < N; ++i) { - auto tupleObj = PyTuple_New(4); - PyTuple_SET_ITEM(tupleObj, 0, python::PyString_FromString("hello world")); - PyTuple_SET_ITEM(tupleObj, 1, python::PyString_FromString("hello whkljdkhjorld")); - PyTuple_SET_ITEM(tupleObj, 2, python::PyString_FromString("dkfjopdjfophjhello world")); - PyTuple_SET_ITEM(tupleObj, 3, PyLong_FromLongLong(12345)); - PyList_SET_ITEM(listObj, i, tupleObj); - } - std::cout<<"w/o opt took: "<ob_item[0] = python::PyString_FromString("hello world"); - tp->ob_item[1] = python::PyString_FromString("hello whkljdkhjorld"); - tp->ob_item[2] = python::PyString_FromString("dkfjopdjfophjhello world"); - tp->ob_item[3] = PyLong_FromLongLong(12345); - - PyList_SET_ITEM(listObj, i, (PyObject*)tp); - } - std::cout<<"tuple construction optimized, took: "<ob_item = (PyObject**)PyMem_Calloc(N, sizeof(PyObject*)); - // TODO: mem check... - Py_SIZE(lo) = N; - lo->allocated = N; - PyObject_GC_Track(lo); - for(int i = 0; i < N; ++i) { - - // directly call python functions without all the crap - auto tp = (PyTupleObject*)PyObject_GC_NewVar(PyTupleObject, &PyTuple_Type, tupleSize); - PyObject_GC_Track(tp); - if(!tp) // out of mem.. - break; - - tp->ob_item[0] = python::PyString_FromString("hello world"); - tp->ob_item[1] = python::PyString_FromString("hello whkljdkhjorld"); - tp->ob_item[2] = python::PyString_FromString("dkfjopdjfophjhello world"); - tp->ob_item[3] = PyLong_FromLongLong(12345); - - lo->ob_item[i] = (PyObject*)tp; - } - std::cout<<"tuple + list construction optimized, took: "< worth it?? prob not really... +// +// int N = 1001000; +// +// Timer timer; +// auto listObj = PyList_New(N); +// for(int i = 0; i < N; ++i) { +// auto tupleObj = PyTuple_New(4); +// PyTuple_SET_ITEM(tupleObj, 0, python::PyString_FromString("hello world")); +// PyTuple_SET_ITEM(tupleObj, 1, python::PyString_FromString("hello whkljdkhjorld")); +// PyTuple_SET_ITEM(tupleObj, 2, python::PyString_FromString("dkfjopdjfophjhello world")); +// PyTuple_SET_ITEM(tupleObj, 3, PyLong_FromLongLong(12345)); +// PyList_SET_ITEM(listObj, i, tupleObj); +// } +// std::cout<<"w/o opt took: "<ob_item[0] = python::PyString_FromString("hello world"); +// tp->ob_item[1] = python::PyString_FromString("hello whkljdkhjorld"); +// tp->ob_item[2] = python::PyString_FromString("dkfjopdjfophjhello world"); +// tp->ob_item[3] = PyLong_FromLongLong(12345); +// +// PyList_SET_ITEM(listObj, i, (PyObject*)tp); +// } +// std::cout<<"tuple construction optimized, took: "<ob_item = (PyObject**)PyMem_Calloc(N, sizeof(PyObject*)); +// // TODO: mem check... +// Py_SIZE(lo) = N; +// lo->allocated = N; +// PyObject_GC_Track(lo); +// for(int i = 0; i < N; ++i) { +// +// // directly call python functions without all the crap +// auto tp = (PyTupleObject*)PyObject_GC_NewVar(PyTupleObject, &PyTuple_Type, tupleSize); +// PyObject_GC_Track(tp); +// if(!tp) // out of mem.. +// break; +// +// tp->ob_item[0] = python::PyString_FromString("hello world"); +// tp->ob_item[1] = python::PyString_FromString("hello whkljdkhjorld"); +// tp->ob_item[2] = python::PyString_FromString("dkfjopdjfophjhello world"); +// tp->ob_item[3] = PyLong_FromLongLong(12345); +// +// lo->ob_item[i] = (PyObject*)tp; +// } +// std::cout<<"tuple + list construction optimized, took: "<