这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
97ae216
[HOTFIX] Fix for key with optional tuplex. prefixing #69
LeonhardFS Jul 3, 2022
4bef8f5
fix hashing for rows with more complex types
LeonhardFS Jul 3, 2022
f56043a
test fix
LeonhardFS Jul 3, 2022
81ecd8f
another fix
LeonhardFS Jul 3, 2022
3b87b98
wip fixing aggregateByKey (except hybrid)
LeonhardFS Jul 5, 2022
c04c63f
memcpy fix
LeonhardFS Jul 5, 2022
d395e09
another test getting fixed
LeonhardFS Jul 5, 2022
c63c4e9
key fix
LeonhardFS Jul 5, 2022
42f4947
adding agg test
LeonhardFS Jul 5, 2022
77f5cff
multi-user test fix
LeonhardFS Jul 5, 2022
32db3c3
compile fix
LeonhardFS Jul 5, 2022
e602092
more python finding magic
Jul 6, 2022
ceba0a4
remove unused function from test
Jul 6, 2022
7f523fa
remove unused function
Jul 6, 2022
e91f7da
fix for missing module, or import failure
Jul 6, 2022
2c1f0bd
typo
Jul 6, 2022
62b3eff
another mod import fallback
Jul 6, 2022
e11f185
get rid off printing unless wrong access
Jul 6, 2022
8f8dc99
adding a dedicated test for bug99
LeonhardFS Jul 6, 2022
cd101bd
adding typing.Optional[...] support and catch errors when extracting …
LeonhardFS Jul 6, 2022
c76c789
compile fix
LeonhardFS Jul 6, 2022
b8af607
typo
LeonhardFS Jul 6, 2022
e7b58c8
optional fix
LeonhardFS Jul 6, 2022
2db5470
test fix
LeonhardFS Jul 6, 2022
488823f
fix test
Jul 6, 2022
84b6446
another code extract fallback using new ast.unparse feature
LeonhardFS Jul 6, 2022
92b1d7c
wip, failing test
Jul 6, 2022
b12e394
merged
Jul 6, 2022
7ae4b77
fixes
Jul 6, 2022
eb0abb8
fixing wrong memory deref
Jul 6, 2022
910f06c
cleanup
Jul 6, 2022
ab35c61
add new test for bug94
Jul 6, 2022
4a55008
fix bug94 by making fallback partitions immortal as well
LeonhardFS Jul 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions tuplex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -565,12 +565,35 @@ if(PYTHON3_VERSION STREQUAL "")
message(STATUS "Found full python3-dev installation")
set(Python3_Embed_FOUND TRUE)
else()
find_package(Python3 COMPONENTS Interpreter REQUIRED)
# python3 -c 'import distutils.sysconfig; print(distutils.sysconfig.get_python_lib(plat_specific=False,standard_lib=True))'
# try to get get module libs at least
find_package(Python3 COMPONENTS Interpreter QUIET)
if(Python3_FOUND)
# python3 -c 'import distutils.sysconfig; print(distutils.sysconfig.get_python_lib(plat_specific=False,standard_lib=True))'
# try to get get module libs at least

# mark embed lib as not found
unset(Python3_Embed_FOUND)
# mark embed lib as not found
unset(Python3_Embed_FOUND)
else()
# use interpreter find script to detect python interpreter
include(FindPythonInterpreter)
find_python_interpreter(VERSION 3
INTERPRETER_OUT_VAR PYTHON_INTERPRETER
VERSION_OUT_VAR PYTHON_VERSION
REQUIRED
)
message(STATUS "Found interpreter ${PYTHON_INTERPRETER} version ${PYTHON_VERSION}")
set(Python3_EXECUTABLE ${PYTHON_INTERPRETER})
set(PYTHON3_VERSION ${PYTHON_VERSION})
unset(Python3_Embed_FOUND)
set(Python3_FOUND TRUE)

# check if embed artifacts are there...
set(Python3_FIND_VIRTUALENV "FIRST")
find_package(Python3 COMPONENTS Interpreter Development QUIET)
if(Python3_FOUND)
message(STATUS "Found full python3-dev installation")
set(Python3_Embed_FOUND TRUE)
endif()
endif()
endif()
else()
set(Python3_FIND_VIRTUALENV "FIRST")
Expand Down
9 changes: 0 additions & 9 deletions tuplex/adapters/cpython/include/PythonHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,6 @@ namespace python {
return version;
}

/*!
* find python standardlib location.
* @param version python version string. E.g., "3" or "3.7". Per default search for version build against.
* @param prefix_list list of paths where to search for <prefix>/lib/python<version> pattern.
* @return string for first match found, empty string else
*/
std::string find_stdlib_location(const std::string& version=python_version(true, false),
const std::vector<std::string>& prefix_list=std::vector<std::string>{"/usr/local"});

/*!
* retrieves main module and loads cloudpickle module. Exits program if cloudpickle is not found.
* @return module object holding the main module
Expand Down
50 changes: 10 additions & 40 deletions tuplex/adapters/cpython/src/PythonHelpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,44 +52,6 @@ namespace python {
Py_SetPythonHome(&vec[0]);
}

std::string find_stdlib_location(const std::string& version,
const std::vector<std::string>& prefix_list) {
// check whether folder <prefix>/lib exists
for(const auto& prefix : prefix_list) {
if(tuplex::dirExists(prefix + "/lib")) {
// list all entries under dir with python
auto paths = tuplex::glob(prefix + "/lib/python*");
for(auto path : paths) {
auto original_path = path;
// only check for folders (glob appends /!)
if(!path.empty() && path.back() == '/') {
path = path.substr(0, path.length() - 1);
// find / to get python name
auto idx = path.rfind('/');
if(idx < 0)
continue;
auto folder_name = path.substr(idx + 1);

// starts with python? => should because of globbing!
auto py_len = std::string("python").length();
if(folder_name.substr(0, py_len) == "python") {
// extract version
auto this_version = folder_name.substr(py_len);
// check if version starts with version string
if(version.empty())
return original_path;
else if(this_version.substr(0, version.length()) == version) {
return original_path;
}
}
}
}
}
}

return "";
}

void handle_and_throw_py_error() {
if(PyErr_Occurred()) {
PyObject *ptype = NULL, *pvalue = NULL, *ptraceback = NULL;
Expand Down Expand Up @@ -1717,7 +1679,7 @@ namespace python {
} else if(strStartsWith(typeStr, "typing.List")) {
python::Type elementType = decodePythonSchema(PyList_GetItem(args, 0));
return python::Type::makeListType(elementType);
} else if(strStartsWith(typeStr, "typing.Union")) {
} else if(strStartsWith(typeStr, "typing.Union") || strStartsWith(typeStr, "typing.Optional")) {
if(PyTuple_Size(args) == 2) {
auto c1 = PyTuple_GetItem(args, 0);
auto c2 = PyTuple_GetItem(args, 1);
Expand All @@ -1741,8 +1703,14 @@ namespace python {
": only Optional unions are understood right now");
}
} else {
throw std::runtime_error("Tuplex can't understand typing module annotation " + typeStr);

// whichever other typing annotations to decode...

// Add them here...
}

// unknown typing module annotation
throw std::runtime_error("Tuplex can't understand typing module annotation " + typeStr);
}

return python::Type::UNKNOWN;
Expand Down Expand Up @@ -1772,6 +1740,8 @@ namespace python {
#warning "better python error formatting needed!"
}

// important to incref!
Py_XINCREF(obj);
return obj;
}

Expand Down
131 changes: 131 additions & 0 deletions tuplex/cmake/FindPythonInterpreter.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Copyright 2021, Robert Adam. All rights reserved.
# Use of this source code is governed by a BSD-style license
# that can be found in the LICENSE file at the root of the
# source tree.
# from https://github.com/Krzmbrzl/FindPythonInterpreter/blob/main/FindPythonInterpreter.cmake

cmake_minimum_required(VERSION 3.5)

function(find_python_interpreter)
set(options REQUIRED EXACT)
set(oneValueArgs VERSION INTERPRETER_OUT_VAR VERSION_OUT_VAR)
set(multiValueArgs HINTS)
cmake_parse_arguments(FIND_PYTHON_INTERPRETER "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN})


# Error handling
if (FIND_PYTHON_INTERPRETER_UNPARSED_ARGUMENTS)
message(FATAL_ERROR "Unrecognized arguments to find_python_interpreter: \"${FIND_PYTHON_INTERPRETER_UNPARSED_ARGUMENTS}\"")
endif()
if (NOT FIND_PYTHON_INTERPRETER_INTERPRETER_OUT_VAR)
message(FATAL_ERROR "Called find_python_interpreter without the INTERPRETER_OUT_VAR parameter!")
endif()
if (FIND_PYTHON_INTERPRETER_EXACT AND NOT FIND_PYTHON_INTERPRETER_VERSION)
message(FATAL_ERROR "Specified EXACT but did not specify VERSION!")
endif()


# Defaults
if (NOT FIND_PYTHON_INTERPRETER_VERSION)
set(FIND_PYTHON_INTERPRETER_VERSION "0.0.0")
endif()
if (NOT FIND_PYTHON_INTERPRETER_HINTS)
set(FIND_PYTHON_INTERPRETER_HINTS "")
endif()


# Validate
if (NOT FIND_PYTHON_INTERPRETER_VERSION MATCHES "^[0-9]+(\.[0-9]+(\.[0-9]+)?)?$")
message(FATAL_ERROR "Invalid VERSION \"FIND_PYTHON_INTERPRETER_VERSION\" - must follow RegEx \"^[0-9]+(\.[0-9]+(\.[0-9]+)?)?$\"")
endif()


# "parse" version (first append 0.0.0 in case only a part of the version scheme was set by the user)
string(CONCAT VERSION_HELPER "${FIND_PYTHON_INTERPRETER_VERSION}" ".0.0.0")
string(REPLACE "." ";" VERSION_LIST "${VERSION_HELPER}")
list(GET VERSION_LIST 0 FIND_PYTHON_INTERPRETER_VERSION_MAJOR)
list(GET VERSION_LIST 1 FIND_PYTHON_INTERPRETER_VERSION_MINOR)
list(GET VERSION_LIST 1 FIND_PYTHON_INTERPRETER_VERSION_PATCH)


# Create names for the interpreter to search for
set(INTERPRETER_NAMES "")
if (FIND_PYTHON_INTERPRETER_VERSION_MAJOR STREQUAL "0")
# Search for either Python 2 or 3
list(APPEND INTERPRETER_NAMES "python3")
list(APPEND INTERPRETER_NAMES "python")
list(APPEND INTERPRETER_NAMES "python2")
else()
# Search for specified version
list(APPEND INTERPRETER_NAMES "python${FIND_PYTHON_INTERPRETER_VERSION_MAJOR}")
list(APPEND INTERPRETER_NAMES "python")

if (NOT FIND_PYTHON_INTERPRETER_VERSION_MINOR EQUAL 0)
list(PREPEND INTERPRETER_NAMES "python${FIND_PYTHON_INTERPRETER_VERSION_MAJOR}.${FIND_PYTHON_INTERPRETER_VERSION_MINOR}")

if (NOT FIND_PYTHON_INTERPRETER_VERSION_PATCH EQUAL 0)
list(PREPEND INTERPRETER_NAMES
"python${FIND_PYTHON_INTERPRETER_VERSION_MAJOR}.${FIND_PYTHON_INTERPRETER_VERSION_MINOR}.${FIND_PYTHON_INTERPRETER_VERSION_PATCH}")
endif()
endif()
endif()


# Start by trying to search for a python executable in PATH and HINTS
find_program(PYTHON_INTERPRETER NAMES ${INTERPRETER_NAMES} HINTS ${FIND_PYTHON_INTERPRETER_HINTS})


if (NOT PYTHON_INTERPRETER)
# Fall back to find_package
message(VERBOSE "Can't find Python interpreter in PATH -> Falling back to find_package")
if (FIND_PYTHON_INTERPRETER_VERSION_MAJOR EQUAL 0)
# Search arbitrary version
find_package(Python COMPONENTS Interpreter QUIET)
set(PYTHON_INTERPRETER "${Python_EXECUTABLE}")
else()
# Search specific version (Python 2 or 3)
find_package(Python${FIND_PYTHON_INTERPRETER_VERSION_MAJOR} COMPONENTS Interpreter QUIET)
set(PYTHON_INTERPRETER "${Python${FIND_PYTHON_INTERPRETER_VERSION_MAJOR}_EXECUTABLE}")
endif()
endif()


if (PYTHON_INTERPRETER)
# Verify that the version found is the one that is wanted
execute_process(
COMMAND ${PYTHON_INTERPRETER} "--version"
OUTPUT_VARIABLE PYTHON_INTERPRETER_VERSION
ERROR_VARIABLE PYTHON_INTERPRETER_VERSION # Python 2 reports the version on stderr
)

# Remove leading "Python " from version information
string(REPLACE "Python " "" PYTHON_INTERPRETER_VERSION "${PYTHON_INTERPRETER_VERSION}")
string(STRIP "${PYTHON_INTERPRETER_VERSION}" PYTHON_INTERPRETER_VERSION)


if (PYTHON_INTERPRETER_VERSION VERSION_LESS FIND_PYTHON_INTERPRETER_VERSION)
message(STATUS "Found Python version ${PYTHON_INTERPRETER_VERSION} but required at least ${FIND_PYTHON_INTERPRETER_VERSION}")
set(PYTHON_INTERPRETER "NOTFOUND")
set(PYTHON_INTERPRETER_VERSION "NOTFOUND")
elseif(PYTHON_INTERPRETER_VERSION VERSION_GREATER FIND_PYTHON_INTERPRETER_VERSION AND FIND_PYTHON_INTERPRETER_EXACT)
message(STATUS "Found Python interpreter version ${PYTHON_INTERPRETER_VERSION} but required exactly ${FIND_PYTHON_INTERPRETER_VERSION}")
set(PYTHON_INTERPRETER "NOTFOUND")
set(PYTHON_INTERPRETER_VERSION "NOTFOUND")
else()
message(STATUS "Found Python interpreter version ${PYTHON_INTERPRETER_VERSION}")
endif()
else()
set(PYTHON_INTERPRETER_VERSION "NOTFOUND")
endif()


# Set "return" values
set(${FIND_PYTHON_INTERPRETER_INTERPRETER_OUT_VAR} "${PYTHON_INTERPRETER}" PARENT_SCOPE)
if (FIND_PYTHON_INTERPRETER_VERSION_OUT_VAR)
set(${FIND_PYTHON_INTERPRETER_VERSION_OUT_VAR} "${PYTHON_INTERPRETER_VERSION}" PARENT_SCOPE)
endif()

if (NOT PYTHON_INTERPRETER AND FIND_PYTHON_INTERPRETER_REQUIRED)
message(FATAL_ERROR "Did NOT find Python interpreter")
endif()
endfunction()
10 changes: 8 additions & 2 deletions tuplex/core/include/ee/local/LocalBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ namespace tuplex {
* @param combine whether this is an aggregate (e.g. if we should call the aggregate combiner, rather than simply merging the hashtables)
* @return the final hashtable sink
*/
HashTableSink createFinalHashmap(const std::vector<const IExecutorTask*>& tasks, int hashtableKeyByteWidth, bool combine);
HashTableSink createFinalHashmap(const std::vector<const IExecutorTask*>& tasks,
int hashtableKeyByteWidth,
bool combine,
codegen::agg_init_f init_aggregate,
codegen::agg_combine_f combine_aggregate);

// hash join stage
void executeHashJoinStage(HashJoinStage* hstage);
Expand Down Expand Up @@ -172,7 +176,9 @@ namespace tuplex {
std::vector<IExecutorTask*> resolveViaSlowPath(std::vector<IExecutorTask*>& tasks,
bool merge_rows_in_order,
codegen::resolve_f functor,
TransformStage* tstage, bool combineHashmaps);
TransformStage* tstage, bool combineHashmaps,
codegen::agg_init_f init_aggregate,
codegen::agg_combine_f combine_aggregate);
};

/*!
Expand Down
7 changes: 6 additions & 1 deletion tuplex/core/include/logical/ParallelizeOperator.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ namespace tuplex {
*/
std::vector<tuplex::Partition*> getNormalPartitions();

void setFallbackPartitions(const std::vector<Partition*> &fallbackPartitions) { _fallbackPartitions = fallbackPartitions; }
void setFallbackPartitions(const std::vector<Partition*> &fallbackPartitions) {
_fallbackPartitions = fallbackPartitions;
// parallelize does not own the partitions, they must become immortal to allow for multiple calls involving this operator
for(auto p : _fallbackPartitions)
p->makeImmortal();
}
std::vector<Partition *> getFallbackPartitions() { return _fallbackPartitions; }

void setPartitionGroups(const std::vector<PartitionGroup>& partitionGroups) { _partitionGroups = partitionGroups; }
Expand Down
24 changes: 22 additions & 2 deletions tuplex/core/src/ContextOptions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -677,10 +677,30 @@ namespace tuplex {
// check that key exists, else issue warning!
auto it = _store.find(key);

auto lookup_key = key;

// if end, also check tuplex. version of it!
if(it == _store.end()) {
it = _store.find("tuplex." + key);
if(it != _store.end()) // success, so set key to tuplex. + key!
lookup_key = "tuplex." + key;
}

// still not found? check lowercase versions
if(it == _store.end()) {
for(it = _store.begin(); it != _store.end(); ++it) {
auto normalized_ref = tolower(it->first);
if(normalized_ref == tolower(key) || normalized_ref == tolower("tuplex." + key)) {
lookup_key = it->first;
break; // found valid iterator!
}
}
}

if(it == _store.end())
Logger::instance().defaultLogger().error("could not find key '" + key + "'");
Logger::instance().defaultLogger().error("could not find key '" + key + "', ignoring set attempt.");
else
_store[key] = value;
_store[lookup_key] = value;
}

size_t ContextOptions::DRIVER_MEMORY() const {
Expand Down
2 changes: 1 addition & 1 deletion tuplex/core/src/Partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ namespace tuplex {

uint8_t* Partition::lockWriteRaw(bool allowForeignOwnerAccess) {
// must be the thread who allocated this
if(!allowForeignOwnerAccess) {
if(!allowForeignOwnerAccess && _owner->getThreadID() != std::this_thread::get_id()) {
_owner->error("non-owner thread accessing partition");
assert(_owner->getThreadID() == std::this_thread::get_id());
}
Expand Down
19 changes: 16 additions & 3 deletions tuplex/core/src/UDF.cc
Original file line number Diff line number Diff line change
Expand Up @@ -462,18 +462,31 @@ namespace tuplex {
// for C-API ref check https://docs.python.org/3/library/functions.html#__import__

// modules first
for(auto m : ce.modules()) {
for(const auto& m : ce.modules()) {
std::stringstream ss;
ss<<"failed to import module " + m.original_identifier;
if(m.identifier != m.original_identifier)
ss<<" as "<<m.identifier;
auto sub_mod = PyImport_ImportModule(m.original_identifier.c_str());
auto nameList = PyTuple_New(0);
if(!sub_mod)
sub_mod = PyImport_ImportModuleEx(m.original_identifier.c_str(), nullptr, nullptr, nameList);

if(!sub_mod) {
python::unlockGIL();
throw std::runtime_error(ss.str());
}
// failure to set string?
PyDict_SetItemString(main_dict, m.identifier.c_str(), sub_mod);
if(PyErr_Occurred()) {
PyErr_Print();
PyErr_Clear();
python::unlockGIL();
throw std::runtime_error("failed to import module " + m.original_identifier);
throw std::runtime_error(ss.str());
}
}
// then functions
for(auto f : ce.functions()) {
for(const auto& f : ce.functions()) {
// need to fetch from imported module and add it
// basically this is encoded
// from ... import ...
Expand Down
Loading