这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
2 changes: 1 addition & 1 deletion doc/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
# The short X.Y version
version="0.3"
# The full version, including alpha/beta/rc tags
release="0.3.4dev"
release="0.3.4"


# -- General configuration ---------------------------------------------------
Expand Down
30 changes: 30 additions & 0 deletions scripts/install_debug_python.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/env bash

PYTHON_VERSION="3.9.14"
URL="https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-${PYTHON_VERSION}.tar.xz"

# specify here with what flags to build the version (cf. https://pythonextensionpatterns.readthedocs.io/en/latest/debugging/debug_in_ide.html)
# and https://pythonextensionpatterns.readthedocs.io/en/latest/debugging/debug_python.html
DEBUG_OPTIONS="--with-pydebug --without-pymalloc --with-valgrind"
PREFIX="$HOME/.local/python${PYTHON_VERSION}-dbg"

[ -d .cache ] && rm -rf .cache
mkdir -p .cache

# save current working dir
CWD=$PWD

cd .cache

# download python
echo ">>> downloading python ${PYTHON_VERSION}"
wget $URL
echo ">>> extracting python"
tar xf Python-${PYTHON_VERSION}.tar.xz
cd Python-${PYTHON_VERSION}

mkdir -p debug && cd debug && ../configure --prefix=${PREFIX} ${DEBUG_OPTIONS} && make -j$(nproc) && make test


cd $CWD

2 changes: 1 addition & 1 deletion scripts/set_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def LooseVersion(v):


# to create a testpypi version use X.Y.devN
version = '0.3.4dev'
version = '0.3.4'

# https://pypi.org/simple/tuplex/
# or https://test.pypi.org/simple/tuplex/
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ def tplx_package_data():
# logic and declaration, and simpler if you include description/version in a file.
setup(name="tuplex",
python_requires='>=3.7.0',
version="0.3.4dev",
version="0.3.4",
author="Leonhard Spiegelberg",
author_email="tuplex@cs.brown.edu",
description="Tuplex is a novel big data analytics framework incorporating a Python UDF compiler based on LLVM "
Expand Down
13 changes: 7 additions & 6 deletions tuplex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -567,12 +567,13 @@ function(FindPython3Exe NAMES VERSION EXECUTABLE)
# check version (must match VERSION)
execute_process(COMMAND "${TEMP_EXE}" -c "import platform;print(platform.python_version())" RESULT_VARIABLE _result OUTPUT_VARIABLE TEMP_VERSION OUTPUT_STRIP_TRAILING_WHITESPACE)
# check if version matches

compare_version_strings(${VERSION} ${TEMP_VERSION} _result)
if(result EQUAL 0)
message(STATUS "Found ${TEMP_EXE} with version ${TEMP_VERSION} matching desired version ${VERSION}")
set(${EXECUTABLE} ${TEMP_EXE} PARENT_SCOPE) # write out
endif()
if(VERSION AND TEMP_VERSION)
compare_version_strings(${VERSION} ${TEMP_VERSION} _result)
if(result EQUAL 0)
message(STATUS "Found ${TEMP_EXE} with version ${TEMP_VERSION} matching desired version ${VERSION}")
set(${EXECUTABLE} ${TEMP_EXE} PARENT_SCOPE) # write out
endif()
endif()
endif()

endfunction()
Expand Down
9 changes: 5 additions & 4 deletions tuplex/adapters/cpython/include/PythonHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,16 +354,17 @@ namespace python {
*/
extern void unlockGIL();

/*!
* needs to be called if using as C-extension to setup gil etc. MUST BE CALLED FROM MAIN THREAD
*/
extern void registerWithInterpreter();

/*!
* check whether this thread holds the GIL or not
* @return
*/
extern bool holdsGIL();

/*!
* required in python extension module.
*/
extern void acquireGIL();

/*!
* runs python code (throws runtime error if err occurred) and retrieves object with name
Expand Down
125 changes: 91 additions & 34 deletions tuplex/adapters/cpython/src/PythonGIL.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include <Base.h>
#include <mutex>

#include <pythread.h>

namespace python {

// GIL details:
Expand All @@ -27,86 +29,135 @@ namespace python {
ss.flush();
auto thread_id = ss.str();
int64_t id = -1;
#ifndef LINUX
sscanf(thread_id.c_str(), "%lld", &id);
#else
sscanf(thread_id.c_str(), "%ld", &id);
#endif
return id;
}

// GIL management here
static std::atomic_bool gil(false); // true if a thread holds the gil, false else
static std::mutex gilMutex; // access to all the properties below
PyGILState_STATE gstate; // for non-main thread lock

// cf. https://pythonextensionpatterns.readthedocs.io/en/latest/thread_safety.html#f1
static PyThread_type_lock gil_lock(nullptr);

static void acquire_lock() {
// lazy init lock -> called on first entry.
if(!gil_lock) {
gil_lock = PyThread_allocate_lock();
if(!gil_lock) {
std::cerr<<"failed to initialize lock"<<std::endl;
}
}

if (! PyThread_acquire_lock(gil_lock, NOWAIT_LOCK)) {
{
PyThreadState *_save;
_save = PyEval_SaveThread();
PyThread_acquire_lock(gil_lock, WAIT_LOCK);
PyEval_RestoreThread(_save);
}
}
}

static void release_lock() {
PyThread_release_lock(gil_lock);
}

// Note: thread::id can't be atomic yet, this is an ongoing proposal
// ==> convert to uint64_t and use this for thread safe access
static std::atomic_int64_t gilID(-1); // id of thread who holds gil
static std::atomic_int64_t interpreterID(-1); // thread which holds the interpreter
static std::atomic_bool interpreterInitialized(false); // checks whether interpreter is initialized or not
std::thread::id gil_main_thread_id; // id of the main thread.
std::thread::id gil_id; // id of the thread holding the gil right now.

// vars for python management
static std::atomic<PyThreadState*> gilState(nullptr);

void registerWithInterpreter() {
if(!interpreterInitialized) {
interpreterInitialized = true;
gil_main_thread_id = std::this_thread::get_id();
gil_id = gil_main_thread_id;
gilState = PyGILState_GetThisThreadState();
}
}

void lockGIL() {
gilMutex.lock();
assert(gilState);
PyEval_RestoreThread(gilState); // acquires GIL!
gilMutex.lock(); // <-- acquire the managing lock. No other thread can lock the gil! => what if another thread tries to unlock? -> security concern...

// what is the current thread id? is it the main thread? => then lock the gil via restore thread etc.
// if not, need to use GILState_Ensure
if(std::this_thread::get_id() == gil_main_thread_id) {
if(!gilState)
gilState = PyGILState_GetThisThreadState();
assert(gilState);
PyEval_RestoreThread(gilState); // acquires GIL!
} else {
assert(interpreterInitialized);
gstate = PyGILState_Ensure();
}
assert(PyGILState_Check());
gil_id = std::this_thread::get_id();
gil = true;
gilID = thisThreadID();
gilState = nullptr;
}

void unlockGIL() {
gilMutex.unlock();
// is it the main thread? and does it hold the manipulation lock?
if(std::this_thread::get_id() == gil_main_thread_id) {
gilState = PyEval_SaveThread();
} else {
assert(interpreterInitialized);
PyGILState_Release(gstate);
gstate = PyGILState_UNLOCKED;
}
gil_id = std::thread::id();
gil = false;
gilState = PyEval_SaveThread();
gilID = thisThreadID();
gilMutex.unlock();
}

bool holdsGIL() {
return gil;
}

void acquireGIL() {
gilMutex.lock();
PyEval_AcquireLock();
PyEval_AcquireThread(gilState); // acquires GIL!
gil = true;
gilID = thisThreadID();
// thread holds gil if it is hold in general and thread ids match.
return gil && std::this_thread::get_id() == gil_id;
}

void initInterpreter() {
gil_main_thread_id = std::this_thread::get_id();

if(interpreterInitialized)
throw std::runtime_error("interpreter was already initialized, abort");

// check if this function is called within a python interpreter or not
if(!Py_IsInitialized()) {

Py_InitializeEx(0); // 0 to skip initialization of signal handlers, 1 would register them.

#if (PY_MAJOR_VERSION >= 3 && PY_MINOR_VERSION < 7)
// init threads (not necessary from Python 3.7 onwards)
PyEval_InitThreads();
assert(PyEval_ThreadsInitialized());
#endif
// assume we are calling from python process/shared object
gilMutex.lock();
gil = true;
gilID = interpreterID = thisThreadID();
} else {

// make sure this thread rn holds the GIL!
if(!PyGILState_Check())
throw std::runtime_error("when initializing the thread, initInterpreter MUST hold the GIL");

// assume we are calling from python process/shared object
gilMutex.lock();
gil = true;
gilID = interpreterID = thisThreadID();
}

gil_lock = nullptr; // this is the start, we're in the interpreter...
// acquire and release to initialize, works b.c. single-threaded interpreter...
acquire_lock();
release_lock();
gil = true;

gil_id = std::this_thread::get_id();
gilMutex.lock();
interpreterInitialized = true;
}

void closeInterpreter() {

if(!PyGILState_Check() || !holdsGIL())
throw std::runtime_error("to shutdown interpreter, GIL must be hold the calling thread...");

Expand All @@ -120,11 +171,17 @@ namespace python {
PyErr_Clear();
}
Py_FinalizeEx();

// now set to uninitialized.
interpreterInitialized = false;

// unlock
if(gil)
gilMutex.unlock();
if (gil_lock) {
PyThread_free_lock(gil_lock);
gil_lock = NULL;
}
gilMutex.unlock();

// reset vars (except main thread id!)
gil = false;
gil_lock = nullptr;
}
}
2 changes: 2 additions & 0 deletions tuplex/adapters/cpython/src/PythonHelpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1363,6 +1363,8 @@ namespace python {

// mapping type to internal types, unknown as default
python::Type mapPythonClassToTuplexType(PyObject *o, bool autoUpcast) {
assert(o);

if(Py_None == o)
return python::Type::NULLVALUE;

Expand Down
3 changes: 2 additions & 1 deletion tuplex/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,5 @@ target_link_libraries(libcore
Boost::thread
Boost::system
Boost::filesystem
)
util
)
6 changes: 4 additions & 2 deletions tuplex/core/src/TraceVisitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ namespace tuplex {
auto sym = PyDict_GetItemString(mainDict, node->_name.c_str());

if(sym) {
Py_XINCREF(sym);
addTraceResult(node, TraceItem(sym, node->_name));
} else {

Expand All @@ -128,9 +129,10 @@ namespace tuplex {
auto builtinDict = PyModule_GetDict(builtins); assert(builtinDict);

sym = PyDict_GetItemString(builtinDict, node->_name.c_str());
if(sym)
if(sym) {
Py_XINCREF(sym);
addTraceResult(node, TraceItem(sym, node->_name));
else {
} else {
PyErr_SetString(PyExc_NameError, ("could not find identifier " + node->_name).c_str());

// i.e., could early exit function...
Expand Down
8 changes: 5 additions & 3 deletions tuplex/core/src/ee/local/LocalBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <physical/LLVMOptimizer.h>
#include <HybridHashTable.h>
#include <int_hashmap.h>
#include <PythonHelpers.h>

namespace tuplex {

Expand Down Expand Up @@ -1284,14 +1285,16 @@ namespace tuplex {
auto input_intermediates = tstage->initData();

// lazy init hybrids
if(!input_intermediates.hybrids) {
if(!input_intermediates.hybrids && input_intermediates.numArgs > 0) {
auto num_predecessors = tstage->predecessors().size();
input_intermediates.hybrids = new PyObject*[num_predecessors]; // @TODO: free these intermediates. Where is this done?
for(int i = 0; i < num_predecessors; ++i)
input_intermediates.hybrids[i] = nullptr;
} else {
assert(input_intermediates.hybrids == nullptr);
input_intermediates.hybrids = nullptr;
}


python::lockGIL();

// construct intermediates from predecessors
Expand Down Expand Up @@ -1321,7 +1324,6 @@ namespace tuplex {
python::unlockGIL();

// check whether hybrids exist. If not, create them quickly!
assert(input_intermediates.hybrids);
logger().info("creating hybrid intermediates took " + std::to_string(timer.time()) + "s");
timer.reset();

Expand Down
Loading