diff --git a/.github/workflows/build_wheels.yml b/.github/workflows/build_wheels.yml index 06c9e6029..d9aa77fae 100644 --- a/.github/workflows/build_wheels.yml +++ b/.github/workflows/build_wheels.yml @@ -38,13 +38,29 @@ jobs: steps: - uses: actions/checkout@v3 + - name: Cache brew dependencies + if: runner.os == 'macOS' + uses: actions/cache@v3.3.2 + with: + # Paths to cache: + # /usr/local/Homebrew - installation folder of Homebrew + # /usr/local/Cellar - installation folder of Homebrew formulae + # /usr/local/Frameworks, /usr/local/bin, /usr/local/opt - contain (links to) binaries installed by Homebrew formulae + path: | + /usr/local/Homebrew + /usr/local/Cellar + /usr/local/Frameworks + /usr/local/bin + /usr/local/opt + key: macos-11-build-cache-${{ hashFiles('./scripts/macos/brew_dependencies.sh') }}-v2 + # need to make this an intermediate step, i.e. build first the different lambda runners on Ubuntu... - name: Build Lambda runner (Linux only) if: runner.os != 'macOS' run: docker pull registry-1.docker.io/tuplex/ci:${{ matrix.python-version }} && export PYTHON3_VERSION=${{ matrix.python-version }}.0 && bash ./scripts/create_lambda_zip.sh && mkdir -p ./tuplex/python/tuplex/other && cp ./build-lambda/tplxlam.zip ./tuplex/python/tuplex/other shell: bash - - name: Build wheels + - name: Build wheel #if: runner.os != 'macOS' uses: pypa/cibuildwheel@fff9ec32ed25a9c576750c91e06b410ed0c15db7 # hash corresponds to v2.16.2 env: @@ -62,6 +78,10 @@ jobs: # requires macOS 10.13 at least to build because of C++17 features. CIBW_ENVIRONMENT_MACOS: "CMAKE_ARGS='-DBUILD_WITH_AWS=ON -DBUILD_WITH_ORC=ON' JAVA_HOME=${JAVA_HOME_11_X64}" + # run all python tests to make sure wheels are not defunct + CIBW_TEST_REQUIRES: "pytest pytest-timeout numpy nbformat jupyter" + # use 3min timeout per test and print top 25 slowest tests + CIBW_TEST_COMMAND: "cd {project} && pytest tuplex/python/tests -v --timeout 600 --durations 25" - name: reorganize files run: touch ./scripts/dummy.version && cp ./scripts/*.version ./wheelhouse && cp ./.github/scripts/test_pypi.sh ./wheelhouse @@ -69,4 +89,4 @@ jobs: - uses: actions/upload-artifact@v3 with: path: | - ./wheelhouse/*.whl \ No newline at end of file + ./wheelhouse/*.whl diff --git a/scripts/azure/Dockerfile b/scripts/azure/Dockerfile new file mode 100644 index 000000000..91f428518 --- /dev/null +++ b/scripts/azure/Dockerfile @@ -0,0 +1,39 @@ +FROM ubuntu:22.04 + +ARG GIT_HASH +ARG GIT_REMOTE + +ENV PATH=/opt/bin:$PATH + +RUN mkdir -p /opt/sbin + +RUN echo "Building tuplex for commit ${GIT_HASH} / ${GIT_REMOTE}" + +RUN echo "Install MongoDB" +ADD install_mongodb.sh /opt/sbin/install_mongodb.sh +RUN bash /opt/sbin/install_mongodb.sh + +RUN echo "Install required packages" +ADD install_azure_ci_reqs.sh /opt/sbin/install_azure_ci_reqs.sh +RUN bash /opt/sbin/install_azure_ci_reqs.sh + +RUN echo 'Install python dependencies' +RUN apt-get update -y && apt-get install -y python3 python3-pip python3-setuptools ninja-build && python3 -m pip install pytest pygments>=2.4.1 MarkupSafe==2.0 pexpect setuptools astor PyYAML jupyter nbformat pymongo eventlet==0.30.0 gunicorn pymongo && jupyter --version + +RUN echo "Clone tuplex and checkout" +# Install git & add github to known hosts +RUN apt-get install -y git && mkdir -p /root/.ssh/ && touch /root/.ssh/known_hosts && ssh-keyscan github.com >>/root/.ssh/known_hosts +RUN mkdir -p /code && cd /code && git clone "${GIT_REMOTE}" tuplex && cd tuplex && git checkout "${GIT_HASH}" + + +RUN echo 'Test local MongoDB' +RUN cd /code/tuplex/python && python3 -m pip install -r requirements.txt && python3 mongodb_test.py && pkill mongod || true + +RUN echo "Build Tuplex" +RUN cd /code/tuplex && TUPLEX_BUILD_ALL=1 CMAKE_ARGS="-DBUILD_WITH_ORC=ON -DLLVM_ROOT_DIR=/usr/lib/llvm-16 -DCMAKE_BUILD_TYPE=Release -DBUILD_FOR_CI=ON" python3 setup.py install --user + +RUN echo "C++ tests" +RUN cd /code/tuplex && cd build/temp.linux-x86_64-3.10 && ctest --timeout 180 --output-on-failure --repeat until-pass:3 -j 2 + +RUN echo "Python tests" +RUN cd /code/tuplex && cd build/temp.linux-x86_64-3.10/dist/python && python3.10 -m pytest -x --full-trace -l --log-cli-level=DEBUG --capture=tee-sys diff --git a/scripts/azure/simulate_ci_locally.sh b/scripts/azure/simulate_ci_locally.sh new file mode 100755 index 000000000..6097c9727 --- /dev/null +++ b/scripts/azure/simulate_ci_locally.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash + +echo "Retrieving last commit has" +GIT_HASH=`git rev-parse --short HEAD` + +GIT_REMOTE=`git remote -v | cut -f2 | head -n 1 | cut -f1 -d ' '` +GIT_REMOTE="https://github.com/$(git remote get-url origin | sed 's/https:\/\/github.com\///' | sed 's/git@github.com://')" +echo "Building docker image for hash ${GIT_HASH}, remote ${GIT_REMOTE}" +docker build --no-cache -t tuplex/azure --build-arg GIT_HASH=${GIT_HASH} --build-arg GIT_REMOTE=${GIT_REMOTE} . + diff --git a/scripts/build_linux_wheels.sh b/scripts/build_linux_wheels.sh index 17682866d..d78877d72 100755 --- a/scripts/build_linux_wheels.sh +++ b/scripts/build_linux_wheels.sh @@ -39,7 +39,7 @@ fi export CIBW_ENVIRONMENT="TUPLEX_LAMBDA_ZIP='./tuplex/other/tplxlam.zip' CMAKE_ARGS='-DBUILD_WITH_AWS=ON -DBUILD_WITH_ORC=ON' LD_LIBRARY_PATH=/usr/local/lib:/opt/lib" # Use the following line to build only python3.7-3.9 wheel -export CIBW_BUILD="cp3{7,8,9}-*" +export CIBW_BUILD="cp3{8,9,10,11}-*" export CIBW_ARCHS_LINUX="x86_64" # do not build musllinux yet @@ -50,7 +50,10 @@ export CIBW_SKIP="*-musllinux_*" #export CIBW_SKIP="cp3{5,6,7,8}-macosx* pp*" export CIBW_BUILD_VERBOSITY=3 -export CIBW_PROJECT_REQUIRES_PYTHON=">=3.7" +export CIBW_PROJECT_REQUIRES_PYTHON=">=3.8" + +# uncomment to increase verbosity of cibuildwheel +# export CIBW_BUILD_VERBOSITY=3 cibuildwheel --platform linux . diff --git a/scripts/build_linux_wheels_with_test.sh b/scripts/build_linux_wheels_with_test.sh new file mode 100755 index 000000000..6830bb0b2 --- /dev/null +++ b/scripts/build_linux_wheels_with_test.sh @@ -0,0 +1,65 @@ +#!/usr/bin/env bash +# (c) 2023 Tuplex team +# this script invokes the cibuildwheel process with necessary env variables to build the wheel for linux/docker +# builds wheels for python 3.7 - 3.9 + +# check from where script is invoked +CWD="$(cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd -P)" + +echo "Executing buildwheel script located in $CWD" +pushd $CWD > /dev/null +cd .. + +# delete dir if exists +rm -rf wheelhouse +# delete in tree build files +rm -rf tuplex/python/tuplex/libexec/tuplex*.so + + +# CIBUILDWHEEL CONFIGURATION +export CIBUILDWHEEL=1 +export TUPLEX_BUILD_ALL=0 +export CIBW_ARCHS_LINUX=x86_64 +export CIBW_MANYLINUX_X86_64_IMAGE='registry-1.docker.io/tuplex/ci:latest' + +# uncomment to prefer local image when building locally +# export CIBW_MANYLINUX_X86_64_IMAGE='tuplex/ci' + +# check whether lambda zip was build and stored in build-lambda +TUPLEX_LAMBDA_ZIP=${TUPLEX_LAMBDA_ZIP:-build-lambda/tplxlam.zip} + +echo "work dir is: $(pwd)" +if [[ -f "${TUPLEX_LAMBDA_ZIP}" ]]; then + echo "Found lambda runner ${TUPLEX_LAMBDA_ZIP}, adding to package" + mkdir -p tuplex/other + cp ${TUPLEX_LAMBDA_ZIP} tuplex/other/tplxlam.zip +fi + +# add to environment, e.g. TUPLEX_BUILD_TYPE=tsan to force a tsan build. Release is the default mode +export CIBW_ENVIRONMENT="TUPLEX_LAMBDA_ZIP='./tuplex/other/tplxlam.zip' CMAKE_ARGS='-DBUILD_WITH_AWS=ON -DBUILD_WITH_ORC=ON' LD_LIBRARY_PATH=/usr/local/lib:/opt/lib" + +# Use the following line to build only python3.7-3.9 wheel +export CIBW_BUILD="cp3{8,9,10,11}-*" +export CIBW_ARCHS_LINUX="x86_64" + +# do not build musllinux yet +export CIBW_SKIP="*-musllinux_*" + +# to test the others from 3.7-3.9, use these two lines: +#export CIBW_BUILD="cp3{7,8,9}-*" +#export CIBW_SKIP="cp3{5,6,7,8}-macosx* pp*" + +export CIBW_BUILD_VERBOSITY=3 +export CIBW_PROJECT_REQUIRES_PYTHON=">=3.8" + +# uncomment to increase verbosity of cibuildwheel +# export CIBW_BUILD_VERBOSITY=3 + +export CIBW_TEST_REQUIRES="pytest pytest-timeout numpy nbformat jupyter" +export CIBW_TEST_COMMAND="cd {project} && pytest tuplex/python/tests --timeout_method thread --timeout 300 -l -v" + +cibuildwheel --platform linux . + +popd > /dev/null + +echo "Done!" diff --git a/scripts/build_macos_wheels.sh b/scripts/build_macos_wheels.sh index c701f6d8d..63246f974 100755 --- a/scripts/build_macos_wheels.sh +++ b/scripts/build_macos_wheels.sh @@ -59,9 +59,6 @@ fi echo "-- Building wheels for ${CIBW_BUILD}" -# if macOS is 10.x -> use this as minimum -MINIMUM_TARGET="10.13" - MACOS_VERSION=$(sw_vers -productVersion) echo "-- Processing on MacOS ${MACOS_VERSION}" function version { echo "$@" | awk -F. '{ printf("%d%03d%03d%03d\n", $1,$2,$3,$4); }'; } @@ -82,13 +79,13 @@ cd .. # Note: protobuf 3.20 - 3.21.2 is broken for MacOS, do not use those versions export CIBW_BEFORE_BUILD_MACOS="brew install protobuf coreutils zstd zlib libmagic llvm@16 aws-sdk-cpp pcre2 antlr4-cpp-runtime googletest gflags yaml-cpp celero wget boost ninja snappy" -export CIBW_ENVIRONMENT_MACOS="MACOSX_DEPLOYMENT_TARGET=${MINIMUM_TARGET} CMAKE_ARGS='-DBUILD_WITH_AWS=ON -DBUILD_WITH_ORC=ON' " +export CIBW_ENVIRONMENT_MACOS="MACOSX_DEPLOYMENT_TARGET=${MINIMUM_TARGET} CMAKE_ARGS='-DBUILD_WITH_AWS=ON -DBUILD_WITH_ORC=ON -DCMAKE_BUILD_TYPE=Release' CMAKE_BUILD_TYPE=Release" export CIBW_BUILD="${CIBW_BUILD}" export CIBW_PROJECT_REQUIRES_PYTHON=">=3.8" # uncomment to increase verbosity of cibuildwheel -# export CIBW_BUILD_VERBOSITY=3 +export CIBW_BUILD_VERBOSITY=3 cibuildwheel --platform macos diff --git a/scripts/build_macos_wheels_with_test.sh b/scripts/build_macos_wheels_with_test.sh new file mode 100755 index 000000000..6ced7cfcb --- /dev/null +++ b/scripts/build_macos_wheels_with_test.sh @@ -0,0 +1,108 @@ +#!/usr/bin/env bash +# (c) 2017-2023 Tuplex team +# builds x86_64 (and arm64) wheels + +# add -x option for verbose output +set -euo pipefail + +function fail { + printf '%s\n' "$1" >&2 + exit "${2-1}" +} + +function detect_instruction_set() { + arch="$(uname -m)" # -i is only linux, -m is linux and apple + if [[ "$arch" = x86_64* ]]; then + if [[ "$(uname -a)" = *ARM64* ]]; then + echo 'arm64' + else + echo 'x86_64' + fi + elif [[ "$arch" = i*86 ]]; then + echo 'x86_32' + elif [[ "$arch" = arm* ]]; then + echo $arch + elif test "$arch" = aarch64; then + echo 'arm64' + else + exit 1 + fi +} + +# check from where script is invoked +CWD="$(cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd -P)" +echo " || Tuplex macOS wheel builder || " +echo "-- Executing buildwheel script located in $CWD" + +# check platform is darwin +if [ ! "$(uname -s)" = "Darwin" ]; then + fail "Error: Need to run script under macOS" +fi + +# check which tags are supported +arch=$(detect_instruction_set) +echo "-- Detected arch ${arch}" + +# try to extract version of compiler first via command-line tools or xcode +# either needs to be installed. +xcode_version_str=$(pkgutil --pkg-info=com.apple.pkg.CLTools_Executables 2>/dev/null | grep version || pkgutil --pkg-info=com.apple.pkg.Xcode | grep version) +echo "-- Detected Xcode ${xcode_version_str}" + +# if no param is given, use defaults to build all +if [ "${arch}" = "arm64" ]; then + # build Python 3.9 - 3.11 + CIBW_BUILD=${CIBW_BUILD-"cp3{9,10,11}-macosx_arm64"} +else + # build Python 3.8 - 3.11 + CIBW_BUILD=${CIBW_BUILD-"cp3{8,9,10,11}-macosx_x86_64"} +fi + +echo "-- Building wheels for ${CIBW_BUILD}" + +MACOS_VERSION=$(sw_vers -productVersion) +echo "-- Processing on MacOS ${MACOS_VERSION}" +function version { echo "$@" | awk -F. '{ printf("%d%03d%03d%03d\n", $1,$2,$3,$4); }'; } + +MACOS_VERSION_MAJOR=`echo $MACOS_VERSION | cut -d . -f1` + +if [ "$MACOS_VERSION_MAJOR" -ge 11 ]; then + echo "-- Newer MacOS detected (>=11.0), using more recent base target." + echo "-- Using minimum target ${MACOS_VERSION_MAJOR}.0" + MINIMUM_TARGET="${MACOS_VERSION_MAJOR}.0" +else + # keep as is + echo "-- Defaulting build to use as minimum target ${MINIMUM_TARGET}" +fi + +pushd $CWD > /dev/null +cd .. + +# fix because of Python +MINIMUM_TARGET=11.0 + +# Note: 3.8 only supports tags up to 10.16 +MINIMUM_TARGET=10.13 + +# Note: protobuf 3.20 - 3.21.2 is broken for MacOS, do not use those versions +export CIBW_BEFORE_BUILD_MACOS="brew install protobuf coreutils zstd zlib libmagic llvm@16 aws-sdk-cpp pcre2 antlr4-cpp-runtime googletest gflags yaml-cpp celero wget boost ninja snappy libdwarf libelf" + + +# Note: orc build breaks wheel right now... +export CIBW_ENVIRONMENT_MACOS="MACOSX_DEPLOYMENT_TARGET=${MINIMUM_TARGET} CMAKE_ARGS='-DBUILD_WITH_AWS=ON -DBUILD_WITH_ORC=ON' TUPLEX_BUILD_TYPE=RelWithDebInfo" +#export CIBW_ENVIRONMENT_MACOS="MACOSX_DEPLOYMENT_TARGET=${MINIMUM_TARGET} CMAKE_ARGS='-DBUILD_WITH_AWS=ON -DBUILD_WITH_ORC=ON' TUPLEX_BUILD_TYPE=Debug" + +export CIBW_BUILD="${CIBW_BUILD}" +export CIBW_PROJECT_REQUIRES_PYTHON=">=3.8" + +# uncomment to increase verbosity of cibuildwheel +export CIBW_BUILD_VERBOSITY=3 + +# uncomment and set to specific identifier +#export CIBW_BUILD="cp39-macosx_x86_64" + +export CIBW_TEST_REQUIRES="pytest pytest-timeout numpy nbformat jupyter" +export CIBW_TEST_COMMAND="cd {project} && pytest tuplex/python/tests --timeout_method thread --timeout 300 -l -v" + +cibuildwheel --platform macos + +popd diff --git a/scripts/macos/brew_dependencies.sh b/scripts/macos/brew_dependencies.sh index f4c58fb95..c2882af16 100755 --- a/scripts/macos/brew_dependencies.sh +++ b/scripts/macos/brew_dependencies.sh @@ -2,4 +2,9 @@ # This script installs all required dependencies via brew # for instructions on how to install brew, visit https://brew.sh/ -brew install openjdk@11 cmake coreutils protobuf zstd zlib libmagic llvm@16 pcre2 gflags yaml-cpp celero wget boost googletest + +# brew doesn't provide llvm@16 bottle anymore for big sur, but python3.8 only works with big sur tags. use llvm@15 instead +brew install openjdk@11 cmake coreutils protobuf zstd zlib libmagic llvm@15 pcre2 gflags yaml-cpp celero wget boost googletest libdwarf libelf + +# link (when e.g. used from restoring cache) +brew link --overwrite cmake coreutils protobuf zstd zlib libmagic llvm@15 pcre2 gflags yaml-cpp celero wget boost googletest libdwarf libelf abseil diff --git a/scripts/macos/install_antlr4_cpp_runtime.sh b/scripts/macos/install_antlr4_cpp_runtime.sh index f76629047..94b7c835f 100644 --- a/scripts/macos/install_antlr4_cpp_runtime.sh +++ b/scripts/macos/install_antlr4_cpp_runtime.sh @@ -7,6 +7,11 @@ PREFIX=/usr/local # if antlr4 exists already, skip [ -d "antlr4" ] && exit 0 +if [ -d "${PREFIX}/include/antlr4-runtime" ]; then + echo "skip antlr4 runtime install, directory already exists" + exit 0 +fi + # if macOS is 10.x -> use this as minimum MINIMUM_TARGET="-DCMAKE_OSX_DEPLOYMENT_TARGET=10.13" diff --git a/scripts/macos/install_aws-sdk-cpp.sh b/scripts/macos/install_aws-sdk-cpp.sh index 94510d55c..ff6ff2411 100755 --- a/scripts/macos/install_aws-sdk-cpp.sh +++ b/scripts/macos/install_aws-sdk-cpp.sh @@ -1,5 +1,11 @@ #!/usr/bin/env bash +# check if dir exists (i.e. restored from cache, then skip) +if [ -d "/usr/local/include/aws" ]; then + echo ">> Skip aws-sdk-cpp compile from source, already exists" + exit 0 +fi + echo ">> installing AWS SDK from source" CPU_CORES=$(sysctl -n hw.physicalcpu) diff --git a/setup.py b/setup.py index f619ccd81..c7e414936 100644 --- a/setup.py +++ b/setup.py @@ -374,6 +374,13 @@ def find_pkg_path(lines): except: logging.error('Could not detect macos version, defaulting to macos 10.13 as build target') + # special case: Python3.8 earlier, widely deployed versions only support suffxi 10_13 or up to 10.16 so use that as target + if sys.version_info.major == 3 and sys.version_info.minor == 8: + if macos_build_target != "10.13" or macos_build_target != "10.16": + logging.warning(f"Building Tuplex with Python {sys.version_info}, however earlier versions of Python 3.8 can only comprehend tag 10_13, using therefore deployment target 10.13") + macos_build_target = "10.13" + + logging.info(f"Building with macOS platform tag {macos_build_target}") # get mac OS version cmake_args.append('-DCMAKE_OSX_DEPLOYMENT_TARGET={}'.format(macos_build_target)) diff --git a/tuplex/CMakeLists.txt b/tuplex/CMakeLists.txt index f00dcc5a0..480b38736 100755 --- a/tuplex/CMakeLists.txt +++ b/tuplex/CMakeLists.txt @@ -1,6 +1,15 @@ -# (c) 2017 Leonhard Spiegelberg +# (c) 2017-2023 Leonhard Spiegelberg cmake_minimum_required(VERSION 3.16 FATAL_ERROR) +# top-level language specification +# enable c++17 +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +message(STATUS "Using language version: C++${CMAKE_CXX_STANDARD}") + +# add cmake modules from cmake folder +list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/") + # Tuplex build options: # ===================== @@ -36,6 +45,16 @@ if(CMAKE_GENERATOR STREQUAL "Ninja") message(STATUS "Using ninja generator, if fails use -w dupbuild=err") endif() +# The -fvisibility=hidden option only works for static builds. +if (NOT BUILD_SHARED_LIBS) + set(CMAKE_CXX_VISIBILITY_PRESET hidden) +else() + if (CMAKE_CXX_VISIBILITY_PRESET STREQUAL "hidden") + message(FATAL_ERROR "CMAKE_CXX_VISIBILITY_PRESET=hidden is incompatible \ + with BUILD_SHARED_LIBS.") + endif() +endif() + # detect MacOS Version because at least 10.13 is required when building with AWS SDK if(APPLE) execute_process(COMMAND bash -c "sw_vers | grep -Eo '([0-9]{1,}\\.)+[0-9]{1,}' | head -1" OUTPUT_VARIABLE MACOSX_VERSION_STRING OUTPUT_STRIP_TRAILING_WHITESPACE) @@ -73,14 +92,6 @@ endif() # uncomment to get verbose cmake output # set(CMAKE_VERBOSE_MAKEFILE ON) -# top-level language specification -# enable c++17 -set(CMAKE_CXX_STANDARD 17) -set(CMAKE_CXX_STANDARD_REQUIRED ON) -message(STATUS "Using language version: C++${CMAKE_CXX_STANDARD}") - -# add cmake modules from cmake folder -list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/") message(STATUS "additional cmake module path is ${CMAKE_MODULE_PATH}") include("${CMAKE_SOURCE_DIR}/cmake/ucm.cmake") #handy package to manipulate compiler flags include("${CMAKE_SOURCE_DIR}/cmake/CPM.cmake") # package manager from https://github.com/cpm-cmake/CPM.cmake @@ -192,64 +203,39 @@ if(BUILD_FOR_CI) add_definitions(-DBUILD_FOR_CI) endif() -# build with AWS support +## Protobuf if(BUILD_WITH_AWS) - # requires at least High Sierra (10.13) - if(APPLE) - - # mac os version detection here - execute_process(COMMAND bash -c "sw_vers | grep -Eo '([0-9]{1,}\\.)+[0-9]{1,}' | head -1" OUTPUT_VARIABLE MACOSX_VERSION_STRING OUTPUT_STRIP_TRAILING_WHITESPACE) - if(NOT CMAKE_OSX_DEPLOYMENT_TARGET OR "${CMAKE_OSX_DEPLOYMENT_TARGET}" STREQUAL "") - - # check what the major OS X version is, if 10 -> build for 10.13 (lowest supported) - string(REPLACE "." ";" VERSION_LIST ${MACOSX_VERSION_STRING}) - list(GET VERSION_LIST 0 MACOSX_VERSION_MAJOR) - if(MACOSX_VERSION_MAJOR LESS_EQUAL 10) - # use high sierra target per default - set(CMAKE_OSX_DEPLOYMENT_TARGET 10.13) - else() - # use maj.0 as default - set(CMAKE_OSX_DEPLOYMENT_TARGET ${MACOSX_VERSION_MAJOR}.0) - endif() - endif() - - message(STATUS "Using macOS target ${CMAKE_OSX_DEPLOYMENT_TARGET} to build with AWS SDK component") - if("${CMAKE_OSX_DEPLOYMENT_TARGET}" VERSION_LESS "10.13") - message(FATAL_ERROR "Building Tuplex with AWS SDK support on Darwin requires at least macOS 10.13 (High Sierra)") - endif() - endif() - - # special case: if using mac os and a brew installed aws-sdk-cpp, can't use static libs => need to force to shared_libs - if(APPLE AND BREW_FOUND) - # check if brewed aws-sdk-cpp -> force shared libs. - # i.e. check brew list | grep aws-sdk-cpp - execute_process(COMMAND bash "-c" "brew list | grep aws-sdk-cpp" OUTPUT_VARIABLE BREWED_AWSSDK RESULT_VARIABLE BREW_RET OUTPUT_STRIP_TRAILING_WHITESPACE) - if(NOT BREWED_AWSSDK STREQUAL "") - message(STATUS "Found brewed AWS SDK C++ installed, forcing build to use shared libs.") - SET(BUILD_SHARED_LIBS ON FORCE) - else() - message(STATUS "Found custom installed AWS SDK C++ installed, if cmake fails with AWS SDK files not found consider setting BUILD_SHARED_LIBS=ON/OFF depending on your AWS SDK C++ installation") - endif() - endif() - find_package(AWSSDK REQUIRED COMPONENTS s3 core lambda transfer) - message(STATUS "AWS libs: ${AWSSDK_LINK_LIBRARIES}") - message(STATUS "AWS include dirs: ${AWSSDK_INCLUDE_DIR}") - if(AWSSDK_FOUND) - add_definitions(-DBUILD_WITH_AWS) - else() - message(FATAL_ERROR "option build with AWSSDK specified, but AWS SDK was not found.") - endif () - # building with AWS backend support? # communication with AWS Lambda happens via protobuf, i.e. make sure protobuf compiler # is installed - #set(Protobuf_USE_STATIC_LIBS ON) + set(PROTOBUF_REQUIRED True) +endif() + +if(BUILD_WITH_ORC) + # ORC requires protobuf for schema + set(PROTOBUF_REQUIRED True) +endif() + +# if protobuf is required, add as lib here. +if(PROTOBUF_REQUIRED) + message(STATUS "Build requires Protobuf") + set(Protobuf_USE_STATIC_LIBS ON) # https://github.com/protocolbuffers/protobuf/issues/12637 find_package(Protobuf CONFIG) if(NOT Protobuf_FOUND) find_package(Protobuf REQUIRED) endif() + if(Protobuf_LIBRARY) + else() + get_target_property(Protobuf_LIBRARY protobuf::libprotobuf LOCATION) + endif() + cmake_path(GET Protobuf_LIBRARY PARENT_PATH Protobuf_LIBRARY_DIR) + cmake_path(GET Protobuf_LIBRARY_DIR PARENT_PATH Protobuf_HOME) + + message(STATUS "Protobuf home is ${Protobuf_HOME}") + assert_var(Protobuf_HOME) + # newer protobuf has abseil dependency, amend protobuf libs accordingly because protobuf is shipped in # a non-fixed state (see https://github.com/protocolbuffers/protobuf/issues/12637) # there's a bug in cmake for cmake < 3.27 where version is detected wrongly as 4.x -> fix @@ -292,11 +278,62 @@ if(BUILD_WITH_AWS) absl::utility absl::variant utf8_range::utf8_validity - ) + ) list(APPEND Protobuf_LIBRARIES ${protobuf_ABSL_USED_TARGETS}) endif() endif() + + +# build with AWS support +if(BUILD_WITH_AWS) + # requires at least High Sierra (10.13) + if(APPLE) + + # mac os version detection here + execute_process(COMMAND bash -c "sw_vers | grep -Eo '([0-9]{1,}\\.)+[0-9]{1,}' | head -1" OUTPUT_VARIABLE MACOSX_VERSION_STRING OUTPUT_STRIP_TRAILING_WHITESPACE) + if(NOT CMAKE_OSX_DEPLOYMENT_TARGET OR "${CMAKE_OSX_DEPLOYMENT_TARGET}" STREQUAL "") + + # check what the major OS X version is, if 10 -> build for 10.13 (lowest supported) + string(REPLACE "." ";" VERSION_LIST ${MACOSX_VERSION_STRING}) + list(GET VERSION_LIST 0 MACOSX_VERSION_MAJOR) + if(MACOSX_VERSION_MAJOR LESS_EQUAL 10) + # use high sierra target per default + set(CMAKE_OSX_DEPLOYMENT_TARGET 10.13) + else() + # use maj.0 as default + set(CMAKE_OSX_DEPLOYMENT_TARGET ${MACOSX_VERSION_MAJOR}.0) + endif() + endif() + + message(STATUS "Using macOS target ${CMAKE_OSX_DEPLOYMENT_TARGET} to build with AWS SDK component") + if("${CMAKE_OSX_DEPLOYMENT_TARGET}" VERSION_LESS "10.13") + message(FATAL_ERROR "Building Tuplex with AWS SDK support on Darwin requires at least macOS 10.13 (High Sierra)") + endif() + endif() + + # special case: if using mac os and a brew installed aws-sdk-cpp, can't use static libs => need to force to shared_libs + if(APPLE AND BREW_FOUND) + # check if brewed aws-sdk-cpp -> force shared libs. + # i.e. check brew list | grep aws-sdk-cpp + execute_process(COMMAND bash "-c" "brew list | grep aws-sdk-cpp" OUTPUT_VARIABLE BREWED_AWSSDK RESULT_VARIABLE BREW_RET OUTPUT_STRIP_TRAILING_WHITESPACE) + if(NOT BREWED_AWSSDK STREQUAL "") + message(STATUS "Found brewed AWS SDK C++ installed, forcing build to use shared libs.") + SET(BUILD_SHARED_LIBS ON FORCE) + else() + message(STATUS "Found custom installed AWS SDK C++ installed, if cmake fails with AWS SDK files not found consider setting BUILD_SHARED_LIBS=ON/OFF depending on your AWS SDK C++ installation") + endif() + endif() + find_package(AWSSDK REQUIRED COMPONENTS s3 core lambda transfer) + message(STATUS "AWS libs: ${AWSSDK_LINK_LIBRARIES}") + message(STATUS "AWS include dirs: ${AWSSDK_INCLUDE_DIR}") + if(AWSSDK_FOUND) + add_definitions(-DBUILD_WITH_AWS) + else() + message(FATAL_ERROR "option build with AWSSDK specified, but AWS SDK was not found.") + endif () +endif() + if(GENERATE_PDFS) message(STATUS "Tuplex configured to emit PDF files for various AST stages") add_definitions(-DGENERATE_PDFS) @@ -811,7 +848,7 @@ set(EXTERNAL_INSTALL_LOCATION ${CMAKE_BINARY_DIR}/third_party) # external libs to build / download set(ZLIB_VERSION "1.2.11") # which zlib version to use -set(ZSTD_VERSION "1.5.0") # which zstd version to use +set(ZSTD_VERSION "1.5.5") # which zstd version to use set(BUILD_AND_DOWNLOAD_ZLIB True) set(BUILD_AND_DOWNLOAD_ZSTD True) @@ -876,17 +913,26 @@ endif() # zstd has no cmake standard module, so manually search for it find_package(zstd "${ZSTD_VERSION}") if(zstd_FOUND) - # check if zstd is defined as target - if(TARGET zstd::libzstd_static) - set(ZSTD_LIBRARIES "zstd::libzstd_static") # could also be libzstd_shared - endif() - # if not, use variables directly - if(ZSTD_LIBRARY) - set(ZSTD_LIBRARIES "${ZSTD_LIBRARY}") - elseif(ZSTD_STATIC_LIB) - set(ZSTD_LIBRARIES "${ZSTD_STATIC_LIB}") + # check if zstd version is up to required version + if(zstd_VERSION VERSION_GREATER_EQUAL ${ZSTD_VERSION}) + # check if zstd is defined as target + if(TARGET zstd::libzstd_static) + set(ZSTD_LIBRARIES "zstd::libzstd_static") # could also be libzstd_shared + endif() + # if not, use variables directly + if(ZSTD_LIBRARY) + set(ZSTD_LIBRARIES "${ZSTD_LIBRARY}") + elseif(ZSTD_STATIC_LIB) + set(ZSTD_LIBRARIES "${ZSTD_STATIC_LIB}") + endif() + else() + message(STATUS "Found locally installed zstd ${zstd_VERSION}, but required is at least ${ZSTD_VERSION}. Building suitable zstd library ${ZSTD_VERSION} from source.") + unset(zstd_FOUND) endif() -else() +endif() + +# if zstd is not found (or version not ok) +if(NOT zstd_FOUND) # check if brewed by chance, if not fetch if(APPLE AND BREW_FOUND) @@ -941,7 +987,7 @@ else() BUILD_BYPRODUCTS "${ZSTD_STATIC_LIB}" DOWNLOAD_EXTRACT_TIMESTAMP TRUE) - set(ZSTD_LIBRARIES ${ZSTD_STATIC_LIB}) + set(ZSTD_LIBRARIES "zstd") add_library(zstd INTERFACE) target_link_libraries(zstd INTERFACE ${ZSTD_STATIC_LIB}) @@ -986,21 +1032,43 @@ endif() # ncurses/curses lib for terminal manipulation find_package(Curses REQUIRED) +# For debug tracing, actually link & include symbols (for macos right now only) +if(APPLE) + # Use explicit stracktrace to produce errors + include(FetchContent) + + # for this also requires libdwarf + find_package(LibElf REQUIRED) + find_package(LibDwarf REQUIRED) + + # Also requires one of: libbfd (gnu binutils), libdwarf, libdw (elfutils) + FetchContent_Declare(backward + GIT_REPOSITORY https://github.com/bombela/backward-cpp + GIT_TAG master # or a version tag, such as v1.6 + SYSTEM # optional, the Backward include directory will be treated as system directory + ) + FetchContent_MakeAvailable(backward) + # Add Backward to your target (either Backward::Interface, Backward::Object, or Backward::Backward) + #target_link_libraries(mytarget PUBLIC Backward::Interface) +endif() + # add subdirs here... add_subdirectory(io) # <-- make sure to call this first, because it changes parent scope with io dependencies add_subdirectory(utils) -add_subdirectory(test) add_subdirectory(codegen) add_subdirectory(core) add_subdirectory(python) add_subdirectory(runtime) add_subdirectory(adapters) - # can only build aws lambda on linux platform if(LINUX AND BUILD_WITH_AWS) # removed AWS lambda implementation, can be found on separate branch - add_subdirectory(awslambda) + add_subdirectory(awslambda) endif() +# call test dir last to get vars from before +add_subdirectory(test) + + ########################################################################### # (7) Additional flags diff --git a/tuplex/adapters/cpython/src/PythonSerializer.cc b/tuplex/adapters/cpython/src/PythonSerializer.cc index 0f3e6b156..7b3257c04 100644 --- a/tuplex/adapters/cpython/src/PythonSerializer.cc +++ b/tuplex/adapters/cpython/src/PythonSerializer.cc @@ -132,10 +132,19 @@ namespace tuplex { PyObject *elem_to_insert = nullptr; if (current_type.isOptionType() && current_type.getReturnType().isTupleType()) { // createPyTupleFromMemory requires a ptr to start of the actual tuple data, so need to decode and add offset here - uint64_t offset = *((uint64_t *)(ptr + current_buffer_index)); - assert(current_buffer_index + offset <= capacity); - elem_to_insert = createPyObjectFromMemory(ptr + current_buffer_index + offset, current_type, - capacity, bitmap, bitmap_index); + // check first whether element is NULL, if so return None. + // else, read tuple ptr + assert(bitmap); + bool is_null = bitmap[bitmap_index/64] & (1UL << (bitmap_index % 64)); + if(is_null) { + Py_XINCREF(Py_None); + elem_to_insert = Py_None; + } else { + uint64_t offset = *((uint64_t *)(ptr + current_buffer_index)) & 0xFFFFFFFF; + assert(current_buffer_index + offset <= capacity); + elem_to_insert = createPyObjectFromMemory(ptr + current_buffer_index + offset, current_type, + capacity, bitmap, bitmap_index); + } } else { // otherwise, simply pass ptr to the current field elem_to_insert = createPyObjectFromMemory(ptr + current_buffer_index, current_type, capacity, @@ -232,13 +241,13 @@ namespace tuplex { } else if (elementType == python::Type::STRING) { char *string_errors = nullptr; // get offset for string - auto currOffset = *reinterpret_cast(ptr); + auto currOffset = *reinterpret_cast(ptr) & 0xFFFFFFFF; assert(currOffset <= capacity); auto currStr = reinterpret_cast(&ptr[currOffset]); element = PyUnicode_DecodeUTF8(currStr, (long)(strlen(currStr)), string_errors); ptr += sizeof(int64_t); } else if(elementType.isTupleType()) { - auto currOffset = *(uint64_t *)ptr; + auto currOffset = *(uint64_t *)ptr & 0xFFFFFFFF; assert(currOffset <= capacity); element = createPyTupleFromMemory(ptr + currOffset, elementType, capacity); ptr += sizeof(int64_t); @@ -281,7 +290,7 @@ namespace tuplex { } else { char *string_errors = nullptr; // get offset for string - auto currOffset = *reinterpret_cast(ptr); + auto currOffset = *reinterpret_cast(ptr) & 0xFFFFFFFF; assert(currOffset <= capacity); auto currStr = reinterpret_cast(&ptr[currOffset]); element = PyUnicode_DecodeUTF8(currStr, (long)(strlen(currStr)), string_errors); @@ -300,7 +309,7 @@ namespace tuplex { Py_XINCREF(Py_None); element = Py_None; } else { - uint64_t currOffset = *((uint64_t *)(ptr)); + uint64_t currOffset = *((uint64_t *)(ptr)) & 0xFFFFFFFF; assert(currOffset <= capacity); element = createPyTupleFromMemory(ptr + currOffset, underlyingType, capacity); ptr += sizeof(int64_t); diff --git a/tuplex/cmake/FindLibDwarf.cmake b/tuplex/cmake/FindLibDwarf.cmake new file mode 100644 index 000000000..19827ab99 --- /dev/null +++ b/tuplex/cmake/FindLibDwarf.cmake @@ -0,0 +1,129 @@ +# Copyright (c) 2004-present, Facebook, Inc. +# All rights reserved. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the hphp/hsl/ subdirectory of this source tree. + +# - Try to find libdwarf +# Once done this will define +# +# LIBDWARF_FOUND - system has libdwarf +# LIBDWARF_INCLUDE_DIR - the libdwarf include directory +# LIBDWARF_LIBRARIES - Link these to use libdwarf +# LIBDWARF_DEFINITIONS - Compiler switches required for using libdwarf +# + +# Locate libelf library at first +if (NOT LIBELF_FOUND) + find_package (LibElf) +endif (NOT LIBELF_FOUND) + +if (LIBDWARF_LIBRARIES AND LIBDWARF_INCLUDE_DIR) + set (LibDwarf_FIND_QUIETLY TRUE) +endif (LIBDWARF_LIBRARIES AND LIBDWARF_INCLUDE_DIR) + +find_package(PkgConfig) +pkg_check_modules(PkgConfig_LibDwarf QUIET libdwarf) + +find_path (DWARF_INCLUDE_DIR + NAMES + libdwarf.h dwarf.h + PATHS + ${PkgConfig_LibDwarf_INCLUDE_DIRS} + /usr/include + /usr/include/libdwarf + /usr/local/include + /usr/local/include/libdwarf + /opt/local/include + /sw/include + ENV CPATH) # PATH and INCLUDE will also work + +if (DWARF_INCLUDE_DIR) + set (LIBDWARF_INCLUDE_DIR ${DWARF_INCLUDE_DIR}) +endif () + +find_library (LIBDWARF_LIBRARIES + NAMES + dwarf libdwarf + PATHS + /usr/lib + /usr/local/lib + /opt/local/lib + /sw/lib + ${PkgConfig_LibDwarf_LIBRARY_DIRS} + ENV LIBRARY_PATH # PATH and LIB will also work + ENV LD_LIBRARY_PATH) +include (FindPackageHandleStandardArgs) + + +# handle the QUIETLY and REQUIRED arguments and set LIBDWARF_FOUND to TRUE +# if all listed variables are TRUE +FIND_PACKAGE_HANDLE_STANDARD_ARGS(LibDwarf DEFAULT_MSG + LIBELF_FOUND + LIBDWARF_LIBRARIES + LIBDWARF_INCLUDE_DIR) + +if (LIBDWARF_LIBRARIES AND LIBDWARF_INCLUDE_DIR) + set(CMAKE_REQUIRED_INCLUDES ${LIBDWARF_INCLUDE_DIR}) + set(CMAKE_REQUIRED_LIBRARIES ${LIBDWARF_LIBRARIES} ${LIBELF_LIBRARIES}) + + # libdwarf makes breaking changes occasionally and doesn't provide an easy + # way to test for them. The following checks should detect the changes and + # pass that information on accordingly. + INCLUDE(CheckCXXSourceCompiles) + INCLUDE(CheckFunctionExists) + + MACRO(CHECK_LIBDWARF_INIT init params var) + # Check for the existence of this particular init function. + unset(INIT_EXISTS CACHE) + CHECK_FUNCTION_EXISTS(${init} INIT_EXISTS) + if (INIT_EXISTS) + set(LIBDWARF_USE_INIT_C ${var}) + + # Check to see if we can use a const name. + unset(DW_CONST CACHE) + + if (NOT "${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang") + # -std=c++11 is already set in HPHPCompiler.cmake, don't + # add -std=c++0x on top of that or clang will give errors + set(CMAKE_REQUIRED_FLAGS "-std=c++0x") + endif() + + CHECK_CXX_SOURCE_COMPILES(" + #include + #include + int dwarfCallback(const char * a, int b, Dwarf_Unsigned c, + Dwarf_Unsigned d, Dwarf_Unsigned e, Dwarf_Unsigned f, + Dwarf_Unsigned * g, Dwarf_Ptr h, int * i) {} + int main() { ${init}(${params}); return 0; }" DW_CONST) + if (DW_CONST) + set(LIBDWARF_CONST_NAME 1) + else() + set(LIBDWARF_CONST_NAME 0) + endif() + endif() + ENDMACRO(CHECK_LIBDWARF_INIT) + + # Order is important, last one is used. + CHECK_LIBDWARF_INIT("dwarf_producer_init" + "0, dwarfCallback, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr" 0) + CHECK_LIBDWARF_INIT("dwarf_producer_init_c" "0, dwarfCallback, nullptr, nullptr, nullptr, nullptr" 1) + + set(CMAKE_REQUIRED_INCLUDES) + set(CMAKE_REQUIRED_LIBRARIES) +endif() + +if(LIBDWARF_CONST_NAME) + message(STATUS "libdwarf uses const char* type") +else() + message(STATUS "libdwarf uses char* type") +endif() +if(LIBDWARF_USE_INIT_C) + message(STATUS "libdwarf has dwarf_producer_init_c") +else() + message(STATUS "libdwarf does not have dwarf_producer_init_c, using dwarf_producer_init") +endif() + +mark_as_advanced(LIBDW_INCLUDE_DIR DWARF_INCLUDE_DIR) +mark_as_advanced(LIBDWARF_INCLUDE_DIR LIBDWARF_LIBRARIES) +mark_as_advanced(LIBDWARF_CONST_NAME LIBDWARF_USE_INIT_C) \ No newline at end of file diff --git a/tuplex/cmake/FindLibElf.cmake b/tuplex/cmake/FindLibElf.cmake new file mode 100644 index 000000000..acb0ce219 --- /dev/null +++ b/tuplex/cmake/FindLibElf.cmake @@ -0,0 +1,70 @@ +# - Try to find libelf +# Once done this will define +# +# LIBELF_FOUND - system has libelf +# LIBELF_INCLUDE_DIR - the libelf include directory +# LIBELF_LIBRARIES - Link these to use libelf +# LIBELF_DEFINITIONS - Compiler switches required for using libelf +# +# Copyright (c) 2008 Bernhard Walle +# +# Redistribution and use is allowed according to the terms of the New +# BSD license. +# For details see the accompanying COPYING-CMAKE-SCRIPTS file. +# + + +if (LIBELF_LIBRARIES AND LIBELF_INCLUDE_DIR) + set (LibElf_FIND_QUIETLY TRUE) +endif (LIBELF_LIBRARIES AND LIBELF_INCLUDE_DIR) + +find_package(PkgConfig) +pkg_check_modules(PkgConfig_LibElf QUIET libelf) + +find_path (LIBELF_INCLUDE_DIR + NAMES + libelf.h + PATHS + ${PkgConfig_LibElf_INCLUDE_DIRS} + /usr/include + /usr/include/libelf + /usr/local/include + /usr/local/include/libelf + /opt/local/include + /opt/local/include/libelf + /sw/include + /sw/include/libelf + ENV CPATH) + +find_library (LIBELF_LIBRARIES + NAMES + elf + PATHS + /usr/lib + /usr/local/lib + /opt/local/lib + /sw/lib + ${PkgConfig_LibElf_LIBRARY_DIRS} + ENV LIBRARY_PATH + ENV LD_LIBRARY_PATH) + +include (FindPackageHandleStandardArgs) + + +# handle the QUIETLY and REQUIRED arguments and set LIBELF_FOUND to TRUE if all listed variables are TRUE +FIND_PACKAGE_HANDLE_STANDARD_ARGS(LibElf DEFAULT_MSG + LIBELF_LIBRARIES + LIBELF_INCLUDE_DIR) + +SET(CMAKE_REQUIRED_LIBRARIES elf) +INCLUDE(CheckCXXSourceCompiles) +CHECK_CXX_SOURCE_COMPILES("#include +int main() { + Elf *e = (Elf*)0; + size_t sz; + elf_getshdrstrndx(e, &sz); + return 0; +}" ELF_GETSHDRSTRNDX) +SET(CMAKE_REQUIRED_LIBRARIES) + +mark_as_advanced(LIBELF_INCLUDE_DIR LIBELF_LIBRARIES ELF_GETSHDRSTRNDX) \ No newline at end of file diff --git a/tuplex/cmake/Findzstd.cmake b/tuplex/cmake/Findzstd.cmake index a860ccdf2..0ad33f395 100644 --- a/tuplex/cmake/Findzstd.cmake +++ b/tuplex/cmake/Findzstd.cmake @@ -58,8 +58,20 @@ if(zstd_FOUND) INTERFACE_INCLUDE_DIRECTORIES "${zstd_INCLUDE_DIR}" IMPORTED_LOCATION "${zstd_STATIC_LIBRARY}") endif() + + # Find a ZSTD version + if(zstd_INCLUDE_DIR AND EXISTS "${zstd_INCLUDE_DIR}/zstd.h") + file(READ "${zstd_INCLUDE_DIR}/zstd.h" CONTENT) + string(REGEX MATCH ".*define ZSTD_VERSION_MAJOR *([0-9]+).*define ZSTD_VERSION_MINOR *([0-9]+).*define ZSTD_VERSION_RELEASE *([0-9]+)" VERSION_REGEX "${CONTENT}") + set(zstd_VERSION_MAJOR ${CMAKE_MATCH_1}) + set(zstd_VERSION_MINOR ${CMAKE_MATCH_2}) + set(zstd_VERSION_RELEASE ${CMAKE_MATCH_3}) + set(zstd_VERSION "${zstd_VERSION_MAJOR}.${zstd_VERSION_MINOR}.${zstd_VERSION_RELEASE}") + endif() + endif() unset(zstd_STATIC_LIBRARY_SUFFIX) -mark_as_advanced(zstd_INCLUDE_DIR zstd_LIBRARY zstd_STATIC_LIBRARY) \ No newline at end of file +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(zstd REQUIRED_VARS zstd_LIBRARY zstd_INCLUDE_DIR zstd_VERSION) \ No newline at end of file diff --git a/tuplex/codegen/include/CodegenHelper.h b/tuplex/codegen/include/CodegenHelper.h index 8b15e338e..63af95f38 100644 --- a/tuplex/codegen/include/CodegenHelper.h +++ b/tuplex/codegen/include/CodegenHelper.h @@ -405,25 +405,6 @@ namespace tuplex { return get_or_throw().CreateConstGEP2_64(Ty, Ptr, Idx0, Idx1, Name); } - inline llvm::Value *CreateConstInBoundsGEP2_64(llvm::Value *Ptr, uint64_t Idx0, - uint64_t Idx1, const std::string &Name = "") const { - using namespace llvm; - - // cf. https://github.com/llvm/llvm-project/commit/544fa425c98d60042214bd78ee90abf0a46fa2ff - assert(Ptr->getType()); - llvm::Type *Ty = nullptr; - - // print types - auto ptrType = cast(Ptr->getType()->getScalarType()); - Ty = ptrType->getPointerElementType(); - -#if LLVM_VERSION_MAJOR >= 13 - // match - assert(cast(Ptr->getType()->getScalarType())->isOpaqueOrPointeeTypeMatches(Ty)); -#endif - return CreateConstInBoundsGEP2_64(Ptr, Ty, Idx0, Idx1, Name); - } - inline llvm::Value *CreatePtrToInt(llvm::Value *V, llvm::Type *DestTy, const std::string &Name = "") { return get_or_throw().CreatePtrToInt(V, DestTy, Name); } @@ -433,7 +414,7 @@ namespace tuplex { inline llvm::CallInst *CreateCall(llvm::FunctionType *FTy, llvm::Value *Callee, -#if (LLVM_VERSION_MAJOR >= 10) +#if (LLVM_VERSION_MAJOR >= 16) llvm::ArrayRef Args = std::nullopt, #else llvm::ArrayRef Args = {}, @@ -445,7 +426,7 @@ namespace tuplex { } inline llvm::CallInst* CreateCall(llvm::Value* func_value, -#if (LLVM_VERSION_MAJOR >= 10) +#if (LLVM_VERSION_MAJOR >= 16) llvm::ArrayRef Args = std::nullopt, #else llvm::ArrayRef Args = {}, @@ -459,7 +440,7 @@ namespace tuplex { } inline llvm::CallInst* CreateCall(llvm::Function* func, -#if (LLVM_VERSION_MAJOR >= 10) +#if (LLVM_VERSION_MAJOR >= 16) llvm::ArrayRef Args = std::nullopt, #else llvm::ArrayRef Args = {}, @@ -470,7 +451,7 @@ namespace tuplex { } inline llvm::CallInst *CreateCall(llvm::FunctionCallee Callee, -#if (LLVM_VERSION_MAJOR >= 10) +#if (LLVM_VERSION_MAJOR >= 16) llvm::ArrayRef Args = std::nullopt, #else llvm::ArrayRef Args = {}, @@ -508,19 +489,6 @@ namespace tuplex { #endif } - inline llvm::LoadInst *CreateLoad(llvm::Value *Ptr, const std::string& Name ="") const { - throw std::runtime_error("need to replace this call with typed call."); - assert(Ptr->getType()->getPointerElementType()); - return CreateLoad(Ptr->getType()->getPointerElementType(), Ptr, Name); - } - - inline llvm::Value *CreateGEP(llvm::Value *Ptr, llvm::ArrayRef IdxList, - const std::string &Name = "") const { - assert(Ptr->getType()->getScalarType()->getPointerElementType()); - // this is deprecated - return CreateGEP(Ptr->getType()->getScalarType()->getPointerElementType(), - Ptr, IdxList, Name); - } inline llvm::Value* CreateInBoundsGEP(llvm::Value* Ptr, llvm::Type* pointee_type, llvm::Value* Idx) { return get_or_throw().CreateInBoundsGEP(pointee_type, Ptr, {Idx}); @@ -647,15 +615,6 @@ namespace tuplex { #endif } - inline llvm::Value *CreatePtrDiff(llvm::Value *LHS, llvm::Value *RHS, - const std::string &Name = "") const { - assert(LHS->getType() == RHS->getType() && LHS->getType()->isPointerTy()); - llvm::Type *ElemTy = LHS->getType()->getPointerElementType(); - assert(ElemTy); - return CreatePtrDiff(ElemTy, LHS, RHS, Name); - } - - llvm::Value *CreateRetVoid() const { return get_or_throw().CreateRetVoid(); } @@ -804,14 +763,7 @@ namespace tuplex { llvm::Instruction& inst = *firstBlock.getFirstInsertionPt(); ctorBuilder.SetInsertPoint(&inst); } -// disable here clang/gcc warning just for this - it's a limitation of how ctorbuilder is architected. -#pragma clang diagnostic push -#pragma clang diagnostic ignored "-Wreturn-local-addr" -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wreturn-local-addr" return std::move(ctorBuilder); -#pragma GCC diagnostic pop -#pragma clang diagnostic pop } // in order to serialize/deserialize data properly and deal with diff --git a/tuplex/codegen/include/IteratorContextProxy.h b/tuplex/codegen/include/IteratorContextProxy.h index af44102a3..685a0695c 100644 --- a/tuplex/codegen/include/IteratorContextProxy.h +++ b/tuplex/codegen/include/IteratorContextProxy.h @@ -127,70 +127,6 @@ namespace tuplex { llvm::Value *iterator, const std::shared_ptr &iteratorInfo); - /*! - * Update index for a zip iterator in preparing for the getIteratorNextElement call by calling updateIteratorIndex on each argument. - * If any argument is exhausted, return true and stop calling updateIteratorIndex on rest of the arguments. - * Only return false if none of the argument iterators is exhausted. - * @param builder - * @param iterator - * @param iteratorInfo - * @return true if iterator is exhausted (getIteratorNextElement should not get called later), otherwise false - */ - llvm::Value *updateZipIndex(const codegen::IRBuilder& builder, - llvm::Value *iterator, - const std::shared_ptr &iteratorInfo); - - /*! - * Generate the next element of a zip iterator. - * Only to be called after calling updateIteratorIndex. - * @param builder - * @param yieldType - * @param iterator - * @param iteratorInfo - * @return tuple element of yieldType - */ - SerializableValue getZipNextElement(const codegen::IRBuilder& builder, - const python::Type &yieldType, - llvm::Value *iterator, - const std::shared_ptr &iteratorInfo); - - /*! - * Generate the next element of a enumerate iterator. - * Only to be called after calling updateIteratorIndex. - * @param builder - * @param iterator - * @param iteratorInfo - * @return true if iterator is exhausted (getIteratorNextElement should not get called later), otherwise false - */ - llvm::Value *updateEnumerateIndex(const codegen::IRBuilder& builder, - llvm::Value *iterator, - const std::shared_ptr &iteratorInfo); - - /*! - * Generate the next element of a enumerate iterator. - * Only to be called after calling updateIteratorIndex. - * @param builder - * @param yieldType - * @param iterator - * @param iteratorInfo - * @return tuple element of yieldType - */ - SerializableValue getEnumerateNextElement(const codegen::IRBuilder& builder, - const python::Type &yieldType, - llvm::Value *iterator, - const std::shared_ptr &iteratorInfo); - - /*! - * Increment index field of a list/string/tuple iterator by offset. - * Increment index field of a range iterator by step * offset. - * Decrement index field of any reverseiterator by offset. - * For zip and enumerate, will use recursive calls on their arguments until a list/string/tuple iterator or a reverseiterator is reached. - * @param builder - * @param iterator - * @param iteratorInfo - * @param offset can be negative - */ - void incrementIteratorIndex(const codegen::IRBuilder& builder, llvm::Value *iterator, const std::shared_ptr &iteratorInfo, int offset); }; /*! @@ -200,6 +136,11 @@ namespace tuplex { * @return corresponding llvm::Type */ extern llvm::Type* createIteratorContextTypeFromIteratorInfo(LLVMEnvironment& env, const IteratorInfo& iteratorInfo); + + extern void increment_iterator_index(LLVMEnvironment& env, const codegen::IRBuilder& builder, + llvm::Value *iterator, + const std::shared_ptr &iteratorInfo, + int32_t offset); } namespace codegen { diff --git a/tuplex/codegen/include/LLVMEnvironment.h b/tuplex/codegen/include/LLVMEnvironment.h index ac671a2e9..b64dd1507 100644 --- a/tuplex/codegen/include/LLVMEnvironment.h +++ b/tuplex/codegen/include/LLVMEnvironment.h @@ -76,7 +76,7 @@ namespace llvm { CallInst *CI = CallInst::Create(Callee, Ops, Name); if (FMFSource) CI->copyFastMathFlags(FMFSource); -#if (LLVM_VERSION_MAJOR <= 14) +#if (LLVM_VERSION_MAJOR <= 15) builder.GetInsertBlock()->getInstList().insert(builder.GetInsertPoint(), CI); #else CI->insertInto(builder.GetInsertBlock(), builder.GetInsertBlock()->begin()); @@ -1013,6 +1013,8 @@ namespace tuplex { llvm::BlockAddress *createOrGetUpdateIteratorIndexFunctionDefaultBlockAddress(const codegen::IRBuilder &builder, const python::Type &iterableType, bool reverse=false); + + llvm::Value *cbool_const(bool b); }; // i.e. there should be a function diff --git a/tuplex/codegen/src/BlockGeneratorVisitor.cc b/tuplex/codegen/src/BlockGeneratorVisitor.cc index 9eef5cd45..25657011f 100644 --- a/tuplex/codegen/src/BlockGeneratorVisitor.cc +++ b/tuplex/codegen/src/BlockGeneratorVisitor.cc @@ -3657,56 +3657,58 @@ namespace tuplex { return ft.getLoad(builder, {idx}); } else { - // THIS HERE IS BACKUP CODE, usable if the AST tree isn't reduced completely. - _logger.warn( - "backup code used for [] operator: Make sure the AST tree is properly reduced in its literal expressions."); - - // ast tree is not completely reduced here, so generate expressions - assert(isStaticValue(index_node, true)); - - FlattenedTuple ft = FlattenedTuple::fromLLVMStructVal(_env, - builder, - value.val, - value_node->getInferredType()); - auto tupleNumElements = value_node->getInferredType().parameters().size(); - - // create temp struct type & use GEP method to retrieve the element. - // go over all the first level elements that are contained - std::vector elements; - std::vector elementTypes; - for (int i = 0; i < tupleNumElements; ++i) { - auto load = ft.getLoad(builder, {i}); - elements.push_back(load); - elementTypes.push_back(load.val->getType()); - } - - // create new struct type to get the i-th element via getelementptr - auto structType = llvm::StructType::create(_env->getContext(), elementTypes, "indextuple"); - // load the values into this struct type - auto alloc = builder.CreateAlloca(structType, 0, nullptr); - for (int i = 0; i < tupleNumElements; ++i) - builder.CreateStore(elements[i].val, builder.CreateGEP(alloc, {i32Const(0), i32Const(i)})); - - - // fetch element - auto lookupPtr = builder.CreateGEP(alloc, {i32Const(0), builder.CreateTrunc(index.val, - Type::getInt32Ty( - _env->getContext()))}); - - // also need to lookup size... - auto salloc = builder.CreateAlloca(llvm::ArrayType::get(_env->i64Type(), tupleNumElements), 0, - nullptr); - // insert elements - for (int i = 0; i < tupleNumElements; ++i) - builder.CreateStore(elements[i].size, - builder.CreateGEP(salloc, {i32Const(0), i32Const(i)})); - - auto retSize = builder.CreateLoad(builder.CreateGEP(salloc, {i32Const(0), - builder.CreateTrunc(index.val, - Type::getInt32Ty( - _env->getContext()))})); - auto retVal = builder.CreateLoad(lookupPtr); - return SerializableValue(retVal, retSize); + throw std::runtime_error("indexing via [] for non homogenous tuple not supported for LLVM17+"); +// // THIS HERE IS BACKUP CODE, usable if the AST tree isn't reduced completely. +// _logger.warn( +// "backup code used for [] operator: Make sure the AST tree is properly reduced in its literal expressions."); +// +// // ast tree is not completely reduced here, so generate expressions +// assert(isStaticValue(index_node, true)); +// +// FlattenedTuple ft = FlattenedTuple::fromLLVMStructVal(_env, +// builder, +// value.val, +// value_node->getInferredType()); +// auto tupleNumElements = value_node->getInferredType().parameters().size(); +// +// // create temp struct type & use GEP method to retrieve the element. +// // go over all the first level elements that are contained +// std::vector elements; +// std::vector elementTypes; +// for (int i = 0; i < tupleNumElements; ++i) { +// auto load = ft.getLoad(builder, {i}); +// elements.push_back(load); +// elementTypes.push_back(load.val->getType()); +// } +// +// // create new struct type to get the i-th element via getelementptr +// auto structType = llvm::StructType::create(_env->getContext(), elementTypes, "indextuple"); +// // load the values into this struct type +// auto alloc = builder.CreateAlloca(structType, 0, nullptr); +// for (int i = 0; i < tupleNumElements; ++i) +// builder.CreateStore(elements[i].val, builder.CreateStructGEP(alloc, structType, i)); +// +// +// // fetch element +// auto lookupPtr = builder.CreateGEP(structType, alloc, {i32Const(0), builder.CreateTrunc(index.val, +// Type::getInt32Ty( +// _env->getContext()))}); +// +// // also need to lookup size... +// auto llvm_array_type = llvm::ArrayType::get(_env->i64Type(), tupleNumElements); +// auto salloc = builder.CreateAlloca(llvm_array_type, 0, +// nullptr); +// // insert elements +// for (int i = 0; i < tupleNumElements; ++i) +// builder.CreateStore(elements[i].size, +// builder.CreateGEP(llvm_array_type, salloc, {i32Const(0), i32Const(i)})); +// +// auto retSize = builder.CreateLoad(builder.getInt64Ty(), builder.CreateGEP(llvm_array_type, salloc, {i32Const(0), +// builder.CreateTrunc(index.val, +// Type::getInt32Ty( +// _env->getContext()))})); +// auto retVal = builder.CreateLoad(lookupPtr); +// return SerializableValue(retVal, retSize); } } @@ -5302,10 +5304,11 @@ namespace tuplex { // empty iterator is always exhausted loopCond = _env->i1Const(false); } else { + auto iterator = exprAlloc.val; // increment iterator index by 1 and check if it is exhausted - auto iteratorExhausted = _iteratorContextProxy->updateIteratorIndex(builder, exprAlloc.val, iteratorInfo); + auto iteratorExhausted = _iteratorContextProxy->updateIteratorIndex(builder, iterator, iteratorInfo); // decrement iterator index by 1 - _iteratorContextProxy->incrementIteratorIndex(builder, exprAlloc.val, iteratorInfo, -1); + increment_iterator_index(*_env, builder, iterator, iteratorInfo, -1); // loopCond = !iteratorExhausted i.e. if iterator exhausted, ends the loop loopCond = builder.CreateICmpEQ(iteratorExhausted, _env->i1Const(false)); } @@ -5334,8 +5337,9 @@ namespace tuplex { // first iteration is guaranteed to exist, or an exception would have been raised earlier _logger.debug("first iteration of for loop unrolled to allow type-stability during loop"); if(exprType.isIteratorType()) { + auto iterator = exprAlloc.val; // increment iterator index by 1 - _iteratorContextProxy->incrementIteratorIndex(builder, exprAlloc.val, iteratorInfo, 1); + increment_iterator_index(*_env, builder, iterator, iteratorInfo, 1); } else { builder.CreateStore(builder.CreateAdd(start, step), currPtr); } diff --git a/tuplex/codegen/src/CodegenHelper.cc b/tuplex/codegen/src/CodegenHelper.cc index 5c4679692..f4ef2b7b3 100644 --- a/tuplex/codegen/src/CodegenHelper.cc +++ b/tuplex/codegen/src/CodegenHelper.cc @@ -28,7 +28,9 @@ #include #include #include +#if LLVM_VERSION_MAJOR < 17 #include +#endif #include // to iterate over predecessors/successors easily #include #include diff --git a/tuplex/codegen/src/FlattenedTuple.cc b/tuplex/codegen/src/FlattenedTuple.cc index 77f266450..598dcc8c4 100644 --- a/tuplex/codegen/src/FlattenedTuple.cc +++ b/tuplex/codegen/src/FlattenedTuple.cc @@ -83,7 +83,6 @@ namespace tuplex { auto field_type = _tree.fieldType(index); if(field_type.isTupleType() && field_type != python::Type::EMPTYTUPLE) { // need to assign a subtree - assert(value->getType()->isStructTy() || value->getType()->getPointerElementType()->isStructTy()); // struct or struct* auto subtree = _tree.subTree(index); auto subtree_type = subtree.tupleType(); @@ -1039,10 +1038,10 @@ namespace tuplex { // here it is just a load // ==> an empty tuple can't have a bitmap! if(isEmptyTuple()) { - throw std::runtime_error("need to figure this out..."); // what needs to be stored here anyways?? + throw std::runtime_error("need to figure this out..."); // what needs to be stored here anyways? assert(1 == numElements()); // store size for packed empty tuple - builder.CreateStore(_tree.get(0).size, builder.CreateGEP(ptr, {_env->i32Const(0), _env->i32Const(numElements())})); + builder.CreateStore(_tree.get(0).size, builder.CreateStructGEP(ptr, llvmType, numElements())); return; } diff --git a/tuplex/codegen/src/IteratorContextProxy.cc b/tuplex/codegen/src/IteratorContextProxy.cc index cb372a2ae..e09401a6d 100644 --- a/tuplex/codegen/src/IteratorContextProxy.cc +++ b/tuplex/codegen/src/IteratorContextProxy.cc @@ -465,194 +465,6 @@ namespace tuplex { return next_from_iterator(*_env, builder, yieldType, iterator, iteratorInfo); } - llvm::Value *IteratorContextProxy::updateZipIndex(const codegen::IRBuilder& builder, - llvm::Value *iterator, - const std::shared_ptr &iteratorInfo) { - using namespace llvm; - - auto argsType = iteratorInfo->argsType; - auto argsIteratorInfo = iteratorInfo->argsIteratorInfo; - - int zipSize = argsType.parameters().size(); - if(zipSize == 0) { - return _env->i1Const(true); - } - - BasicBlock *currBB = builder.GetInsertBlock(); - BasicBlock *exhaustedBB = BasicBlock::Create(_env->getContext(), "exhaustedBB", currBB->getParent()); - BasicBlock *endBB = BasicBlock::Create(_env->getContext(), "endBB", currBB->getParent()); - - builder.SetInsertPoint(exhaustedBB); - builder.CreateBr(endBB); - - builder.SetInsertPoint(endBB); - // zipExhausted indicates whether the given zip iterator is exhausted - auto zipExhausted = builder.CreatePHI(_env->i1Type(), 2); - zipExhausted->addIncoming(_env->i1Const(true), exhaustedBB); - - std::vector zipElementEntryBB; - std::vector zipElementCondBB; - for (int i = 0; i < zipSize; ++i) { - BasicBlock *currElementEntryBB = BasicBlock::Create(_env->getContext(), "zipElementBB" + std::to_string(i), currBB->getParent()); - BasicBlock *currElementCondBB = BasicBlock::Create(_env->getContext(), "currCondBB" + std::to_string(i), currBB->getParent()); - zipElementEntryBB.push_back(currElementEntryBB); - zipElementCondBB.push_back(currElementCondBB); - } - zipExhausted->addIncoming(_env->i1Const(false), zipElementCondBB[zipSize - 1]); - - builder.SetInsertPoint(currBB); - builder.CreateBr(zipElementEntryBB[0]); - // iterate over all arg iterators - // if the current arg iterator is exhausted, jump directly to exhaustedBB and zipExhausted will be set to true - for (int i = 0; i < zipSize; ++i) { - builder.SetInsertPoint(zipElementEntryBB[i]); - auto currIteratorPtr = builder.CreateGEP(iterator, {_env->i32Const(0), _env->i32Const(i)}); - auto currIterator = builder.CreateLoad(currIteratorPtr); - auto currIteratorInfo = argsIteratorInfo[i]; - assert(currIteratorInfo); - auto exhausted = updateIteratorIndex(builder, currIterator, currIteratorInfo); - builder.CreateBr(zipElementCondBB[i]); - builder.SetInsertPoint(zipElementCondBB[i]); - if(i == zipSize - 1) { - builder.CreateCondBr(exhausted, exhaustedBB, endBB); - } else { - builder.CreateCondBr(exhausted, exhaustedBB, zipElementEntryBB[i+1]); - } - } - builder.SetInsertPoint(endBB); - - return zipExhausted; - } - - SerializableValue IteratorContextProxy::getZipNextElement(const codegen::IRBuilder& builder, - const python::Type &yieldType, - llvm::Value *iterator, - const std::shared_ptr &iteratorInfo) { - using namespace llvm; - auto argsType = iteratorInfo->argsType; - auto argsIteratorInfo = iteratorInfo->argsIteratorInfo; - - FlattenedTuple ft(_env); - ft.init(yieldType); - - // previously UpdateIteratorIndexFunction was called on each arg iterator which increments index of each arg iterator by 1 - // restore index for all arg iterators - incrementIteratorIndex(builder, iterator, iteratorInfo, -1); - for (int i = 0; i < argsType.parameters().size(); ++i) { - auto currIteratorInfo = argsIteratorInfo[i]; - auto llvm_curr_iterator_type = createIteratorContextTypeFromIteratorInfo(*_env, *currIteratorInfo.get()); - auto currIteratorPtr = builder.CreateGEP(iterator, {_env->i32Const(0), _env->i32Const(i)}); - auto currIterator = builder.CreateLoad(llvm_curr_iterator_type->getPointerTo(), currIteratorPtr); - - // update current arg iterator index before fetching value - incrementIteratorIndex(builder, currIterator, currIteratorInfo, 1); - auto currIteratorNextVal = getIteratorNextElement(builder, yieldType.parameters()[i], currIterator, currIteratorInfo); - ft.setElement(builder, i, currIteratorNextVal.val, currIteratorNextVal.size, currIteratorNextVal.is_null); - } - auto retVal = ft.getLoad(builder); - auto retSize = ft.getSize(builder); - return SerializableValue(retVal, retSize); - } - - llvm::Value *IteratorContextProxy::updateEnumerateIndex(const codegen::IRBuilder& builder, - llvm::Value *iterator, - const std::shared_ptr &iteratorInfo) { - using namespace llvm; - - auto argIteratorInfo = iteratorInfo->argsIteratorInfo.front(); - auto argIteratorPtr = builder.CreateGEP(iterator, {_env->i32Const(0), _env->i32Const(1)}); - auto argIterator = builder.CreateLoad(argIteratorPtr); - auto enumerateExhausted = updateIteratorIndex(builder, argIterator, argIteratorInfo); - - return enumerateExhausted; - } - - SerializableValue IteratorContextProxy::getEnumerateNextElement(const codegen::IRBuilder& builder, - const python::Type &yieldType, - llvm::Value *iterator, - const std::shared_ptr &iteratorInfo) { - using namespace llvm; - - auto argIteratorInfo = iteratorInfo->argsIteratorInfo.front(); - - FlattenedTuple ft(_env); - ft.init(yieldType); - auto startValPtr = builder.CreateGEP(iterator, {_env->i32Const(0), _env->i32Const(0)}); - auto startVal = builder.CreateLoad(startValPtr); - auto start = SerializableValue(startVal, _env->i64Const(8)); - auto argIteratorPtr = builder.CreateGEP(iterator, {_env->i32Const(0), _env->i32Const(1)}); - auto argIterator = builder.CreateLoad(argIteratorPtr); - auto val = getIteratorNextElement(builder, yieldType.parameters()[1], argIterator, argIteratorInfo); - ft.setElement(builder, 0, start.val, start.size, start.is_null); - ft.setElement(builder, 1, val.val, val.size, val.is_null); - auto retVal = ft.getLoad(builder); - auto retSize = ft.getSize(builder); - // increment start index value - auto newStartVal = builder.CreateAdd(startVal, _env->i64Const(1)); - builder.CreateStore(newStartVal, startValPtr); - - return SerializableValue(retVal, retSize); - } - - void IteratorContextProxy::incrementIteratorIndex(const codegen::IRBuilder& builder, - llvm::Value *iterator, - const std::shared_ptr &iteratorInfo, - int offset) { - using namespace llvm; - - auto iteratorName = iteratorInfo->iteratorName; - auto argsIteratorInfo = iteratorInfo->argsIteratorInfo; - - if(iteratorName == "zip") { - for (int i = 0; i < argsIteratorInfo.size(); ++i) { - auto currIteratorPtr = builder.CreateGEP(iterator, {_env->i32Const(0), _env->i32Const(i)}); - - // get iterator type - auto llvm_iterator_type = createIteratorContextTypeFromIteratorInfo(*_env, *argsIteratorInfo[i]); - - auto currIterator = builder.CreateLoad(llvm_iterator_type->getPointerTo(), currIteratorPtr); - incrementIteratorIndex(builder, currIterator, argsIteratorInfo[i], offset); - } - return; - } - - if(iteratorName == "enumerate") { - auto currIteratorPtr = builder.CreateGEP(iterator, {_env->i32Const(0), _env->i32Const(1)}); - auto currIterator = builder.CreateLoad(currIteratorPtr); - incrementIteratorIndex(builder, currIterator, argsIteratorInfo.front(), offset); - return; - } - - auto iterablesType = iteratorInfo->argsType; - if(iteratorName == "iter") { - if(iterablesType.isIteratorType()) { - // iter() call on an iterator, ignore the outer iter and call again - assert(argsIteratorInfo.front()); - incrementIteratorIndex(builder, iterator, argsIteratorInfo.front(), offset); - return; - } - } else if(iteratorName == "reversed") { - // for reverseiterator, need to decrement index by offset - offset = -offset; - } else { - throw std::runtime_error("unsupported iterator" + iteratorName); - } - - // change index field - auto indexPtr = builder.CreateGEP(iterator, {_env->i32Const(0), _env->i32Const(1)}); - auto currIndex = builder.CreateLoad(builder.getInt32Ty(), indexPtr); - if(iterablesType == python::Type::RANGE) { - // index will change by offset * step - auto rangePtr = builder.CreateGEP(iterator, {_env->i32Const(0), _env->i32Const(2)}); - auto range = builder.CreateLoad(rangePtr); - auto stepPtr = builder.CreateGEP(_env->getRangeObjectType(), range, {_env->i32Const(0), _env->i32Const(2)}); - auto step = builder.CreateLoad(stepPtr); - builder.CreateStore(builder.CreateAdd(currIndex, builder.CreateMul(_env->i64Const(offset), step)), indexPtr); - } else { - builder.CreateStore(builder.CreateAdd(currIndex, _env->i32Const(offset)), indexPtr); - } - } - // helper to retrieve iteratorcontexttype from iteratorInfo llvm::Type* createIteratorContextTypeFromIteratorInfo(LLVMEnvironment& env, const IteratorInfo& iteratorInfo) { // coupled with FunctionRegistry @@ -922,8 +734,6 @@ namespace tuplex { logger.debug("ft type: " + _env.getLLVMTypeName(ft)); logger.debug("iterator type: " + _env.getLLVMTypeName(iterator->getType())); - // ok, update is something crazy fancy here: mod.getOrInsertFunction(name, FT).getCallee()->getType()->getPointerElementType()->isFunctionTy() - auto nextFunc_value = llvm::getOrInsertCallable(*_env.getModule(), funcName, ft); llvm::FunctionCallee nextFunc_callee(ft, nextFunc_value); auto exhausted = builder.CreateCall(nextFunc_callee, iterator); @@ -965,7 +775,7 @@ namespace tuplex { const std::shared_ptr &iteratorInfo) { using namespace llvm; - llvm::Type *iteratorContextType = createIteratorContextTypeFromIteratorInfo(_env, *iteratorInfo); //iterator->getType()->getPointerElementType(); + llvm::Type *iteratorContextType = createIteratorContextTypeFromIteratorInfo(_env, *iteratorInfo); std::string funcName; auto iteratorName = iteratorInfo->iteratorName; @@ -999,8 +809,6 @@ namespace tuplex { logger.debug("ft type: " + _env.getLLVMTypeName(ft)); logger.debug("iterator type: " + _env.getLLVMTypeName(iterator->getType())); - // ok, update is something crazy fancy here: mod.getOrInsertFunction(name, FT).getCallee()->getType()->getPointerElementType()->isFunctionTy() - auto nextFunc_value = llvm::getOrInsertCallable(*_env.getModule(), funcName, ft); llvm::FunctionCallee nextFunc_callee(ft, nextFunc_value); auto exhausted = builder.CreateCall(nextFunc_callee, iterator); diff --git a/tuplex/codegen/src/LLVMEnvironment.cc b/tuplex/codegen/src/LLVMEnvironment.cc index e0d9fcfe1..79f73a359 100644 --- a/tuplex/codegen/src/LLVMEnvironment.cc +++ b/tuplex/codegen/src/LLVMEnvironment.cc @@ -1103,8 +1103,12 @@ namespace tuplex { #if (LLVM_VERSION_MAJOR > 14) if(stype->isOpaquePointerTy()) return "ptr"; -#endif +#elif (LLVM_VERSION_MAJOR >= 17) + return "ptr" +#else stype = stype->getPointerElementType(); +#endif + pointer_stars += "*"; } @@ -1166,9 +1170,12 @@ namespace tuplex { #if (LLVM_VERSION_MAJOR > 14) if(t->isOpaquePointerTy()) return "ptr"; -#endif +#elif (LLVM_VERSION_MAJOR >= 17) + return "ptr"; +#else // recurse: return getLLVMTypeName(t->getPointerElementType()) + "*"; +#endif } if (t->isArrayTy()) { @@ -1662,7 +1669,7 @@ namespace tuplex { auto str_size = CreateFirstBlockAlloca(builder, i64Type()); auto str = builder.CreateCall(func, {value, str_size}); - return SerializableValue(str, builder.CreateLoad(str_size)); + return SerializableValue(str, builder.CreateLoad(builder.getInt64Ty(), str_size)); } @@ -1949,7 +1956,7 @@ namespace tuplex { auto cbool_type = codegen::ctypeToLLVM(builder.getContext()); Value* bool_val = env.CreateFirstBlockAlloca(builder, cbool_type); - builder.CreateStore(env.boolConst(false), bool_val); + builder.CreateStore(env.cbool_const(false), bool_val); // all the basicblocks BasicBlock* bbParse = BasicBlock::Create(ctx, "parse_bool_value", func); @@ -2242,6 +2249,14 @@ namespace tuplex { return retAddr; } + llvm::Value *LLVMEnvironment::cbool_const(bool b) { + auto cbool_type = codegen::ctypeToLLVM(getContext()); + assert(cbool_type->isIntegerTy()); + auto num_bits = cbool_type->getIntegerBitWidth(); + return llvm::Constant::getIntegerValue(llvm::Type::getIntNTy(getContext(), num_bits), + llvm::APInt(num_bits, b)); + } + SerializableValue list_get_element(LLVMEnvironment& env, const codegen::IRBuilder& builder, const python::Type& list_type, llvm::Value* list_ptr, llvm::Value* index) { diff --git a/tuplex/core/CMakeLists.txt b/tuplex/core/CMakeLists.txt index e5f323112..031cb9fe7 100755 --- a/tuplex/core/CMakeLists.txt +++ b/tuplex/core/CMakeLists.txt @@ -12,20 +12,12 @@ find_package(YAMLCPP REQUIRED) if(BUILD_WITH_AWS) # locate aws sdk & include lambda component find_package(AWSSDK REQUIRED COMPONENTS core s3 lambda) - MESSAGE(STATUS "building with AWS Lambda backend") - - # communication with AWS Lambda happens via protobuf, i.e. make sure protobuf compiler - # is installed - # set(Protobuf_USE_STATIC_LIBS ON) - # https://github.com/protocolbuffers/protobuf/issues/12637 - find_package(Protobuf CONFIG) - if(NOT Protobuf_FOUND) - find_package(Protobuf REQUIRED) - endif() - include_directories(Protobuf_INCLUDE_DIRS) + MESSAGE(STATUS "Building with AWS Lambda backend") - add_library(proto-objects OBJECT "${CMAKE_CURRENT_LIST_DIR}/proto/Lambda.proto") + # make sure protobuf was discovered in parent dir + assert_var(Protobuf_FOUND) + add_library(proto-objects OBJECT "${CMAKE_CURRENT_LIST_DIR}/proto/Lambda.proto") target_link_libraries(proto-objects PUBLIC protobuf::libprotobuf) set(PROTO_BINARY_DIR "${CMAKE_CURRENT_BINARY_DIR}/managed") diff --git a/tuplex/core/include/llvm13/JITCompiler_llvm13.h b/tuplex/core/include/llvm13/JITCompiler_llvm13.h index c02996dd1..bb6f33e7a 100644 --- a/tuplex/core/include/llvm13/JITCompiler_llvm13.h +++ b/tuplex/core/include/llvm13/JITCompiler_llvm13.h @@ -77,6 +77,7 @@ namespace tuplex { // custom symbols std::unordered_map _customSymbols; + void defineCustomSymbols(llvm::orc::JITDylib &jitlib); }; } #endif diff --git a/tuplex/core/include/physical/CSVParseRowGenerator.h b/tuplex/core/include/physical/CSVParseRowGenerator.h index 87460a1e0..e44c530f9 100644 --- a/tuplex/core/include/physical/CSVParseRowGenerator.h +++ b/tuplex/core/include/physical/CSVParseRowGenerator.h @@ -289,13 +289,13 @@ namespace tuplex { builder.CreateICmpEQ(cellEnd, _endPtr)); - auto beforeCellBegin = clampWithStartPtr(builder, builder.CreateGEP(cellBegin, _env->i32Const(-1))); + auto beforeCellBegin = clampWithStartPtr(builder, builder.MovePtrByBytes(cellBegin, -1)); // note that cellEnd is excl. Hence at cellEnd there is the character after the cell end - auto afterCellEnd = clampWithEndPtr(builder, builder.CreateGEP(cellEnd, _env->i32Const(0))); + auto afterCellEnd = clampWithEndPtr(builder, builder.MovePtrByBytes(cellEnd, (int64_t)0)); - auto beforeIsQuote = builder.CreateICmpEQ(builder.CreateLoad(beforeCellBegin), + auto beforeIsQuote = builder.CreateICmpEQ(builder.CreateLoad(builder.getInt8Ty(), beforeCellBegin), _env->i8Const(_quotechar)); - auto afterIsQuote = builder.CreateICmpEQ(builder.CreateLoad(afterCellEnd), _env->i8Const(_quotechar)); + auto afterIsQuote = builder.CreateICmpEQ(builder.CreateLoad(builder.getInt8Ty(), afterCellEnd), _env->i8Const(_quotechar)); auto beforeAndAfterAreQuotes = builder.CreateAnd(beforeIsQuote, afterIsQuote); return builder.CreateSelect(cellAtBoundaries, _env->i1Const(false), beforeAndAfterAreQuotes); diff --git a/tuplex/core/include/physical/HashProbeTask.h b/tuplex/core/include/physical/HashProbeTask.h index fef2faf06..85022f60a 100644 --- a/tuplex/core/include/physical/HashProbeTask.h +++ b/tuplex/core/include/physical/HashProbeTask.h @@ -44,6 +44,8 @@ namespace tuplex { void execute() override; TaskType type() const override { return TaskType::HASHPROBE; } + + void releaseAllLocks() override; }; } diff --git a/tuplex/core/include/physical/IExceptionableTaskGenerator.h b/tuplex/core/include/physical/IExceptionableTaskGenerator.h index 2a885f934..3ba318e85 100644 --- a/tuplex/core/include/physical/IExceptionableTaskGenerator.h +++ b/tuplex/core/include/physical/IExceptionableTaskGenerator.h @@ -159,7 +159,7 @@ namespace tuplex { std::map _parameters; // helper functions to use variables via alloc/store in code - std::map _variables; + std::map> _variables; void addVariable(IRBuilder& builder, const std::string name, llvm::Type* type, llvm::Value* initialValue=nullptr); llvm::Value* getVariable(IRBuilder& builder, const std::string name); llvm::Value* getPointerToVariable(IRBuilder& builder, const std::string name); diff --git a/tuplex/core/include/physical/IExecutorTask.h b/tuplex/core/include/physical/IExecutorTask.h index cb330327a..dfe64fdaf 100644 --- a/tuplex/core/include/physical/IExecutorTask.h +++ b/tuplex/core/include/physical/IExecutorTask.h @@ -52,6 +52,11 @@ namespace tuplex { virtual double wallTime() const { return 0.0; } virtual TaskType type() const = 0; + + /*! + * used when an exception is thrown to release all pending locks. -> else, deadlock. + */ + virtual void releaseAllLocks() = 0; }; diff --git a/tuplex/core/include/physical/ResolveTask.h b/tuplex/core/include/physical/ResolveTask.h index 2a5cf15eb..1faf86c01 100644 --- a/tuplex/core/include/physical/ResolveTask.h +++ b/tuplex/core/include/physical/ResolveTask.h @@ -228,6 +228,8 @@ namespace tuplex { double wallTime() const override { return _wallTime; } size_t getNumInputRows() const override { return _numInputRowsRead; } + void releaseAllLocks() override; + private: int64_t _stageID; /// to which stage does this task belong to. std::vector _partitions; diff --git a/tuplex/core/include/physical/SimpleFileWriteTask.h b/tuplex/core/include/physical/SimpleFileWriteTask.h index 578ac0212..388c6304d 100644 --- a/tuplex/core/include/physical/SimpleFileWriteTask.h +++ b/tuplex/core/include/physical/SimpleFileWriteTask.h @@ -79,6 +79,10 @@ class SimpleFileWriteTask : public IExecutorTask { TaskType type() const override { return TaskType::SIMPLEFILEWRITE; } std::vector getOutputPartitions() const override { return std::vector{}; } + void releaseAllLocks() override { + for(auto p : _partitions) + p->unlock(); + } private: URI _uri; std::vector _partitions; diff --git a/tuplex/core/include/physical/SimpleOrcWriteTask.h b/tuplex/core/include/physical/SimpleOrcWriteTask.h index ccd52729f..50c93fcff 100644 --- a/tuplex/core/include/physical/SimpleOrcWriteTask.h +++ b/tuplex/core/include/physical/SimpleOrcWriteTask.h @@ -123,6 +123,10 @@ class SimpleOrcWriteTask : public IExecutorTask { TaskType type() const override { return TaskType::SIMPLEFILEWRITE; } std::vector getOutputPartitions() const override { return std::vector{}; } + void releaseAllLocks() override { + for(auto p : _partitions) + p->unlock(); + } private: URI _uri; std::vector _partitions; diff --git a/tuplex/core/include/physical/TransformTask.h b/tuplex/core/include/physical/TransformTask.h index 3e5d27623..5b97b9a81 100644 --- a/tuplex/core/include/physical/TransformTask.h +++ b/tuplex/core/include/physical/TransformTask.h @@ -259,6 +259,8 @@ namespace tuplex { size_t output_rows_written() const { return _numOutputRowsWritten; } size_t output_limit() const { return _outLimit; } + + void releaseAllLocks() override; private: void resetSinks(); void resetSources(); diff --git a/tuplex/core/src/ContextOptions.cc b/tuplex/core/src/ContextOptions.cc index 5823a4bd2..c04c5f0c1 100644 --- a/tuplex/core/src/ContextOptions.cc +++ b/tuplex/core/src/ContextOptions.cc @@ -577,7 +577,7 @@ namespace tuplex { // check first with pathParent, then PATH std::vector failedPaths; for(auto c : candidates) { - URI p = URI(pathParent + "/" + c); + URI p = !pathParent.empty() ? URI(pathParent + "/" + c) : URI(c); if(p.exists() && p.isFile()) return p; else diff --git a/tuplex/core/src/DataSet.cc b/tuplex/core/src/DataSet.cc index b6be96c0f..6e7204adb 100644 --- a/tuplex/core/src/DataSet.cc +++ b/tuplex/core/src/DataSet.cc @@ -744,7 +744,7 @@ namespace tuplex { } // there could be different number of columns. -> pick max! - int numColumns = rows[0].getNumColumns(); + auto numColumns = rows[0].getNumColumns(); for(unsigned i = 1; i < rows.size(); ++i) numColumns = std::max(numColumns, rows[i].getNumColumns()); diff --git a/tuplex/core/src/Executor.cc b/tuplex/core/src/Executor.cc index 845b78e6a..5078d4bc2 100644 --- a/tuplex/core/src/Executor.cc +++ b/tuplex/core/src/Executor.cc @@ -90,9 +90,19 @@ namespace tuplex { //executor.logger().info("started task..."); // process task - task->execute(); // save which thread executed this task task->setID(std::this_thread::get_id()); + try { + task->execute(); + } catch(const std::exception& e) { + task->releaseAllLocks(); + executor.error(std::string("Task failed with exception: ") + e.what()); + } catch(...) { + task->releaseAllLocks(); + executor.error("Task failed with unknown exception."); + } + // save which thread executed this task + task->setID(std::this_thread::get_id()); _numPendingTasks.fetch_add(-1, std::memory_order_release); @@ -115,7 +125,15 @@ namespace tuplex { task->setThreadNumber(executor.threadNumber()); // redundant? // process task - task->execute(); + try { + task->execute(); + } catch(const std::exception& e) { + task->releaseAllLocks(); + executor.error(std::string("Task failed with exception ") + e.what()); + } catch(...) { + task->releaseAllLocks(); + executor.error("Task failed with unknown exception."); + } // save which thread executed this task task->setID(std::this_thread::get_id()); diff --git a/tuplex/core/src/Partition.cc b/tuplex/core/src/Partition.cc index c5b9c4bfc..8aa3b95ab 100644 --- a/tuplex/core/src/Partition.cc +++ b/tuplex/core/src/Partition.cc @@ -85,28 +85,8 @@ namespace tuplex { } bool Partition::saveToFile(const URI& partitionURI) { -// auto uuid = uuidToString(_uuid); -// auto vfs = VirtualFileSystem::fromURI(partitionURI); -// -// // create file & write partition contents to it -// std::unique_ptr file = vfs.open_file(partitionURI, VFS_WRITE | VFS_OVERWRITE); -// if(!file) { -// std::stringstream ss; -// ss<<"Could not save partition "<logger().error(ss.str()); -// return false; -// } -// -// auto status = file.get()->write(_arena, (uint64_t)_size); -// -// if(status != VirtualFileSystemStatus::VFS_OK) { -// assert(file); -// _owner->logger().error("Could not save partition " + uuid + " to path " + file.get()->getURI().toPath()); -// -// return false; -// } - auto path = partitionURI.toString().substr(7); + auto path = partitionURI.toString().substr(partitionURI.prefix().length()); // does file exist already? // => fail @@ -114,9 +94,23 @@ namespace tuplex { throw std::runtime_error("partition file under " + path + " already exists."); } + // create parent path if not exists + auto parent_uri = partitionURI.parent(); + auto parent_path = parent_uri.toString().substr(parent_uri.prefix().length()); + if(!dirExists(parent_path)) { + boost::system::error_code ec; + boost::filesystem::create_directories(parent_path, ec); + if(ec) { + std::stringstream ss; + ss<<"failed to create not yet existing parent dir "< -#include "llvm/Support/DynamicLibrary.h" +#include +#include #include static bool _loaded = false; @@ -32,6 +33,19 @@ namespace tuplex { bool loaded() { return _loaded; } + + static void* findAddrOfSymbol(const char* name) { + auto addr_ptr = llvm::sys::DynamicLibrary::SearchForAddressOfSymbol(name); + + if(!addr_ptr) { + // try mangled version by prepending "_" + auto mangled_name = std::string("_") + name; + addr_ptr = llvm::sys::DynamicLibrary::SearchForAddressOfSymbol(mangled_name.c_str()); + } + + return addr_ptr; + } + bool init(const std::string& path) { if(path.length() == 0) @@ -68,13 +82,13 @@ namespace tuplex { rtfree_all = nullptr; rtmalloc=nullptr; rtfree=nullptr; - setRunTimeMemory = reinterpret_cast(llvm::sys::DynamicLibrary::SearchForAddressOfSymbol("setRunTimeMemory")); - freeRunTimeMemory = reinterpret_cast(llvm::sys::DynamicLibrary::SearchForAddressOfSymbol("freeRunTimeMemory")); - releaseRunTimeMemory = reinterpret_cast(llvm::sys::DynamicLibrary::SearchForAddressOfSymbol("releaseRunTimeMemory")); - rtfree_all = reinterpret_cast(llvm::sys::DynamicLibrary::SearchForAddressOfSymbol("rtfree_all")); - rtmalloc = reinterpret_cast(llvm::sys::DynamicLibrary::SearchForAddressOfSymbol("rtmalloc")); - rtfree = reinterpret_cast(llvm::sys::DynamicLibrary::SearchForAddressOfSymbol("rtfree")); - runTimeMemorySize = reinterpret_cast(llvm::sys::DynamicLibrary::SearchForAddressOfSymbol("getRunTimeMemorySize")); + setRunTimeMemory = reinterpret_cast(findAddrOfSymbol("setRunTimeMemory")); + freeRunTimeMemory = reinterpret_cast(findAddrOfSymbol("freeRunTimeMemory")); + releaseRunTimeMemory = reinterpret_cast(findAddrOfSymbol("releaseRunTimeMemory")); + rtfree_all = reinterpret_cast(findAddrOfSymbol("rtfree_all")); + rtmalloc = reinterpret_cast(findAddrOfSymbol("rtmalloc")); + rtfree = reinterpret_cast(findAddrOfSymbol("rtfree")); + runTimeMemorySize = reinterpret_cast(findAddrOfSymbol("getRunTimeMemorySize")); cJSON_Hooks tmp = {rtmalloc, rtfree}; cJSON_InitHooks(&tmp); @@ -82,7 +96,7 @@ namespace tuplex { srand(time(0)); if(!setRunTimeMemory || !freeRunTimeMemory) { - Logger::instance().defaultLogger().error("Could not find required runtime symbols in shared library."); + Logger::instance().defaultLogger().error("Could not find required runtime symbols setRunTimeMemory or freeRunTimeMemory in shared library " + path + "."); return false; } diff --git a/tuplex/core/src/Signals.cc b/tuplex/core/src/Signals.cc index 1df313817..33b5c2ddb 100644 --- a/tuplex/core/src/Signals.cc +++ b/tuplex/core/src/Signals.cc @@ -27,8 +27,18 @@ namespace tuplex { sig_received = signum; shutdown_requested = true; #ifndef NDEBUG - const char str[] = "\n => received signal SIGINT in tplx_signal_handler, aborting.\n"; - write(STDERR_FILENO, str, sizeof(str) - 1); // write is signal safe, the others not. + if(SIGINT == signum) { + const char str[] = "\n => received signal SIGINT in tplx_signal_handler, aborting.\n"; + write(STDERR_FILENO, str, sizeof(str) - 1); // write is signal safe, the others not. + } + if(SIGALRM == signum) { + const char str[] = "\n => received signal SIGALRM in tplx_signal_handler, aborting.\n"; + write(STDERR_FILENO, str, sizeof(str) - 1); // write is signal safe, the others not. + } + if(SIGTERM == signum) { + const char str[] = "\n => received signal SIGTERM in tplx_signal_handler, aborting.\n"; + write(STDERR_FILENO, str, sizeof(str) - 1); // write is signal safe, the others not. + } #endif } @@ -43,17 +53,21 @@ namespace tuplex { action.sa_handler = tplx_signal_handler; sigemptyset(&action.sa_mask); - // for now only install on sigint, this effectively disables - // all other python handlers. That's ok though... + // install handler on following signals: + // SIGINT, SIGALRM, SIGTERM + std::vector signals_to_catch({SIGINT, SIGALRM, SIGTERM}); - if(0 == sigaction(SIGINT, &action, NULL)) - return true; - else { - // errno has description - Logger::instance().defaultLogger().error("Failed to install custom signal handlers, details: " + - std::string(strerror(errno))); - return false; + // install on above signals, this effectively disables + // all other python handlers. That's ok though... + for(auto sigtype : signals_to_catch) { + if(0 != sigaction(sigtype, &action, NULL)) { + // errno has description + Logger::instance().defaultLogger().error("Failed to install custom signal handlers for signal type " + std::to_string(sigtype) + " , details: " + + std::string(strerror(errno))); + return false; + } } + return true; } bool check_interrupted() { diff --git a/tuplex/core/src/llvm13/JITCompiler_llvm13.cc b/tuplex/core/src/llvm13/JITCompiler_llvm13.cc index feca7dabd..07c8ce4bd 100644 --- a/tuplex/core/src/llvm13/JITCompiler_llvm13.cc +++ b/tuplex/core/src/llvm13/JITCompiler_llvm13.cc @@ -109,6 +109,7 @@ namespace tuplex { tmb.setCPU(CPUStr); tmb.setRelocationModel(Reloc::Model::PIC_); tmb.addFeatures(getFeatureList()); + //tmb.addFeatures(codegen::getLLVMFeatureStr()); //<-- should add here probably SSE4.2.?? // build on top of this: @@ -237,6 +238,15 @@ namespace tuplex { // create for this module own jitlib auto& ES = _lljit->getExecutionSession(); + + // if lib with name already exists, remove + llvm::orc::JITDylib *jitlib_ptr = nullptr; + if((jitlib_ptr = ES.getJITDylibByName(module_name))) { + auto err = ES.removeJITDylib(*jitlib_ptr); + if(err) + throw std::runtime_error("failed to remove JITDylib " + module_name + " from execution session."); + jitlib_ptr = nullptr; + } auto& jitlib = ES.createJITDylib(module_name).get(); const auto& DL = _lljit->getDataLayout(); MangleAndInterner Mangle(ES, DL); @@ -252,8 +262,7 @@ namespace tuplex { jitlib.addGenerator(std::move(*ProcessSymbolsGenerator)); // define symbols from custom symbols for this jitlib - for(auto keyval: _customSymbols) - auto rc = jitlib.define(absoluteSymbols({{Mangle(keyval.first), keyval.second}})); + defineCustomSymbols(jitlib); _dylibs.push_back(&jitlib); // save reference for search auto err = _lljit->addIRModule(jitlib, std::move(tsm.get())); @@ -277,6 +286,26 @@ namespace tuplex { return true; } + void JITCompiler::defineCustomSymbols(llvm::orc::JITDylib &jitlib) { + auto& ES = _lljit->getExecutionSession(); + const auto& DL = _lljit->getDataLayout(); + llvm::orc::MangleAndInterner Mangle(ES, DL); + + for(auto keyval: _customSymbols) { +#if LLVM_VERSION_MAJOR <= 16 + auto rc = jitlib.define(llvm::orc::absoluteSymbols({{Mangle(keyval.first), keyval.second}})); +#else + // LLVM17 introduces new llvm::orc::ExecutorSymbolDef class + // convert JITEvaluatedSymbol from map to this new class. + auto rc = jitlib.define(llvm::orc::absoluteSymbols(llvm::orc::SymbolMap({ + { Mangle(keyval.first), + { llvm::orc::ExecutorAddr(keyval.second.getAddress()), + keyval.second.getFlags()} } + }))); +#endif + } + } + bool JITCompiler::compile(std::unique_ptr mod) { llvm::Expected tsm = llvm::orc::ThreadSafeModule(std::move(mod), std::make_unique()); if(!tsm) { @@ -301,6 +330,16 @@ namespace tuplex { // create for this module own jitlib auto& ES = _lljit->getExecutionSession(); + + // if lib with name already exists, remove + llvm::orc::JITDylib *jitlib_ptr = nullptr; + if((jitlib_ptr = ES.getJITDylibByName(module_name.str()))) { + auto err = ES.removeJITDylib(*jitlib_ptr); + if(err) + throw std::runtime_error("failed to remove JITDylib " + module_name.str() + " from execution session."); + jitlib_ptr = nullptr; + } + auto& jitlib = ES.createJITDylib(module_name.str()).get(); const auto& DL = _lljit->getDataLayout(); llvm::orc::MangleAndInterner Mangle(ES, DL); @@ -320,9 +359,7 @@ namespace tuplex { jitlib.addGenerator(std::move(*ProcessSymbolsGenerator)); // define symbols from custom symbols for this jitlib - for(auto keyval: _customSymbols) - auto rc = jitlib.define(llvm::orc::absoluteSymbols({{Mangle(keyval.first), keyval.second}})); - + defineCustomSymbols(jitlib); _dylibs.push_back(&jitlib); // save reference for search assert(tsm); diff --git a/tuplex/core/src/physical/CSVParserGenerator.cc b/tuplex/core/src/physical/CSVParserGenerator.cc index b06db5710..2f61a3270 100644 --- a/tuplex/core/src/physical/CSVParserGenerator.cc +++ b/tuplex/core/src/physical/CSVParserGenerator.cc @@ -43,7 +43,7 @@ namespace tuplex { // create some preliminary things - auto endPtr = oldBuilder.CreateGEP(getInputPtrArg(), getInputSizeArg()); + auto endPtr = oldBuilder.MovePtrByBytes(getInputPtrArg(), getInputSizeArg()); oldBuilder.CreateBr(bBody); @@ -59,11 +59,12 @@ namespace tuplex { // if skipHeader is true, skip first row // !!! there is no header validation/order etc. here. if(_skipHeader) { - auto parseCode = builder.CreateCall(parseRowF, {_resStructVar, builder.CreateLoad(_currentPtrVar), _endPtr}); - auto numParsedBytes = builder.CreateLoad(builder.CreateGEP(_resStructVar, {_env->i32Const(0), _env->i32Const(0)})); + auto parseCode = builder.CreateCall(parseRowF, {_resStructVar, builder.CreateLoad(_env->i8ptrType(), _currentPtrVar), _endPtr}); + auto llvm_ret_type = _rowGenerator.resultType(); + auto numParsedBytes = builder.CreateLoad(builder.getInt64Ty(), builder.CreateStructGEP(_resStructVar, llvm_ret_type, 0)); // inc ptr & go to loop cond - builder.CreateStore(builder.CreateGEP(builder.CreateLoad(_currentPtrVar), numParsedBytes), _currentPtrVar); + builder.CreateStore(builder.MovePtrByBytes(builder.CreateLoad(_env->i8ptrType(), _currentPtrVar), numParsedBytes), _currentPtrVar); } builder.CreateBr(bLoopCond); @@ -71,7 +72,7 @@ namespace tuplex { // loop condition, i.e. p < endp builder.SetInsertPoint(bLoopCond); - auto cond = builder.CreateICmpULT(builder.CreatePtrToInt(builder.CreateLoad(_currentPtrVar), _env->i64Type()), + auto cond = builder.CreateICmpULT(builder.CreatePtrToInt(builder.CreateLoad(_env->i8ptrType(), _currentPtrVar), _env->i64Type()), builder.CreatePtrToInt(_endPtr, _env->i64Type())); builder.CreateCondBr(cond, bLoopBody, bLoopDone); @@ -80,12 +81,13 @@ namespace tuplex { builder.SetInsertPoint(bLoopBody); //call func and advance ptr - auto parseCode = builder.CreateCall(parseRowF, {_resStructVar, builder.CreateLoad(_currentPtrVar), _endPtr}); + auto parseCode = builder.CreateCall(parseRowF, {_resStructVar, builder.CreateLoad(_env->i8ptrType(), _currentPtrVar), _endPtr}); _env->debugPrint(builder, "parseCode is ", parseCode); - auto numParsedBytes = builder.CreateLoad(builder.CreateGEP(_resStructVar, {_env->i32Const(0), _env->i32Const(0)})); + auto llvm_ret_type = _rowGenerator.resultType(); + auto numParsedBytes = builder.CreateLoad(builder.getInt64Ty(), builder.CreateStructGEP(_resStructVar, llvm_ret_type, 0)); // inc ptr & go to loop cond with next blocks - builder.CreateStore(builder.CreateGEP(builder.CreateLoad(_currentPtrVar), numParsedBytes), _currentPtrVar); + builder.CreateStore(builder.MovePtrByBytes(builder.CreateLoad(_env->i8ptrType(), _currentPtrVar), numParsedBytes), _currentPtrVar); // ignore empty results at end // maybe add assert that lineEnd is >= endPtr @@ -95,8 +97,8 @@ namespace tuplex { builder.SetInsertPoint(bNonEmpty); // can only stuff if bytes were parsed! - auto lineStart = builder.CreateLoad(builder.CreateGEP(_resStructVar, {_env->i32Const(0), _env->i32Const(1)})); - auto lineEnd = builder.CreateLoad(builder.CreateGEP(_resStructVar, {_env->i32Const(0), _env->i32Const(2)})); + auto lineStart = builder.CreateLoad(_env->i8ptrType(), builder.CreateStructGEP(_resStructVar, llvm_ret_type, 1)); + auto lineEnd = builder.CreateLoad(_env->i8ptrType(), builder.CreateStructGEP(_resStructVar, llvm_ret_type, 2)); // check result code, if zero all ok. Else, go into exception handling BasicBlock *bNoException = BasicBlock::Create(context, "no_exception", getFunction()); @@ -160,8 +162,14 @@ namespace tuplex { #warning "this here is outdated... should not be used. Remove code" for(const auto& t : stype.parameters()) { - Value* val = builder.CreateLoad(builder.CreateGEP(resStructVal, {_env->i32Const(0), _env->i32Const(3 + 2 * pos)})); - Value* size = builder.CreateLoad(builder.CreateGEP(resStructVal, {_env->i32Const(0), _env->i32Const(3 + 2 * pos + 1)})); + auto llvm_ret_type = _rowGenerator.resultType(); + auto val_position = 3 + 2 * pos; + auto size_position = 3 + 2 * pos + 1; + auto val_ptr = builder.CreateStructGEP(resStructVal, llvm_ret_type, val_position); + auto size_ptr = builder.CreateStructGEP(resStructVal, llvm_ret_type, size_position); + Value* val = builder.CreateLoad(llvm_ret_type->getStructElementType(val_position), val_ptr); + assert(llvm_ret_type->getStructElementType(size_position) == builder.getInt64Ty()); + Value* size = builder.CreateLoad(builder.getInt64Ty(), size_ptr); // !!! zero terminated string if(python::Type::STRING == t) diff --git a/tuplex/core/src/physical/ExceptionSourceTaskBuilder.cc b/tuplex/core/src/physical/ExceptionSourceTaskBuilder.cc index c5f8b575b..1185685cd 100644 --- a/tuplex/core/src/physical/ExceptionSourceTaskBuilder.cc +++ b/tuplex/core/src/physical/ExceptionSourceTaskBuilder.cc @@ -37,7 +37,7 @@ namespace tuplex { callProcessFuncWithHandler(builder, userData, tuple, normalRowCountVar, badRowCountVar, rowNumberVar, inputRowPtr, inputRowSize, terminateEarlyOnLimitCode, processRowFunc); } else { - Value *normalRowCount = builder.CreateLoad(normalRowCountVar, "normalRowCount"); + Value *normalRowCount = builder.CreateLoad(builder.getInt64Ty(), normalRowCountVar, "normalRowCount"); builder.CreateStore(builder.CreateAdd(normalRowCount, env().i64Const(1)), normalRowCountVar); } } diff --git a/tuplex/core/src/physical/HashJoinStage.cc b/tuplex/core/src/physical/HashJoinStage.cc index 0119fac71..eb5e61ec9 100644 --- a/tuplex/core/src/physical/HashJoinStage.cc +++ b/tuplex/core/src/physical/HashJoinStage.cc @@ -69,10 +69,10 @@ namespace tuplex { builder.CreateStore(env->i8nullptr(), hashed_value); // read num rows - Value *numRows = builder.CreateLoad(builder.CreatePointerCast(builder.CreateLoad(curPtrVar), env->i64ptrType()), + Value *numRows = builder.CreateLoad(builder.getInt64Ty(), builder.CreatePointerCast(builder.CreateLoad(env->i8ptrType(), curPtrVar), env->i64ptrType()), "numInputRows"); // move ptr by int64_t - builder.CreateStore(builder.CreateGEP(builder.CreateLoad(curPtrVar), env->i64Const(sizeof(int64_t))), + builder.CreateStore(builder.MovePtrByBytes(builder.CreateLoad(env->i8ptrType(), curPtrVar), sizeof(int64_t)), curPtrVar); // set up @@ -84,7 +84,7 @@ namespace tuplex { // loop cond counter < numRows builder.SetInsertPoint(bbLoopCondition); - auto cond = builder.CreateICmpSLT(builder.CreateLoad(rowCounterVar), numRows); + auto cond = builder.CreateICmpSLT(builder.CreateLoad(builder.getInt64Ty(), rowCounterVar), numRows); builder.CreateCondBr(cond, bbLoopBody, bbLoopExit); @@ -94,9 +94,9 @@ namespace tuplex { generateProbingCode(env, builder, argMap["userData"], argMap["hmap"], curPtrVar, hashed_value, rightType(), rightKeyIndex(), leftType(), leftKeyIndex(), _joinType); - auto row_number = builder.CreateLoad(rowCounterVar); + auto row_number = builder.CreateLoad(builder.getInt64Ty(), rowCounterVar); //env->debugPrint(builder, "row number: ", row_number); - builder.CreateStore(builder.CreateAdd(env->i64Const(1), builder.CreateLoad(rowCounterVar)), rowCounterVar); + builder.CreateStore(builder.CreateAdd(env->i64Const(1), builder.CreateLoad(builder.getInt64Ty(), rowCounterVar)), rowCounterVar); builder.CreateBr(bbLoopCondition); // loop body done @@ -136,7 +136,7 @@ namespace tuplex { // deserialize tuple codegen::FlattenedTuple ftIn(env.get()); ftIn.init(probeType); - auto curPtr = builder.CreateLoad(ptrVar); + auto curPtr = builder.CreateLoad(env->i8ptrType(), ptrVar); ftIn.deserializationCode(builder, curPtr); @@ -197,7 +197,7 @@ namespace tuplex { builder.SetInsertPoint(bbMatchFound); // call join code - writeJoinResult(env, builder, userData, builder.CreateLoad(hashedValueVar), buildType, buildKeyIndex, ftIn, + writeJoinResult(env, builder, userData, builder.CreateLoad(env->i8ptrType(), hashedValueVar), buildType, buildKeyIndex, ftIn, probeKeyIndex); builder.CreateBr(bbNext); @@ -208,7 +208,7 @@ namespace tuplex { auto serializedSize = ftIn.getSize(builder); // should be 341 for the first row! //env->debugPrint(builder, "serialized size:", serializedSize); - builder.CreateStore(builder.CreateGEP(curPtr, serializedSize), ptrVar); + builder.CreateStore(builder.MovePtrByBytes(curPtr, serializedSize), ptrVar); } llvm::Value *HashJoinStage::makeKey(std::shared_ptr &env, codegen::IRBuilder &builder, @@ -244,7 +244,7 @@ namespace tuplex { builder.SetInsertPoint(bbNotNull); builder.CreateStore(env->i8Const('_'), skey_ptr); - builder.CreateMemCpy(builder.CreateGEP(skey_ptr, env->i64Const(1)), 0, key.val, 0, key.size); + builder.CreateMemCpy(builder.MovePtrByBytes(skey_ptr, 1), 0, key.val, 0, key.size); builder.CreateBr(bbNext); builder.SetInsertPoint(bbNext); // update builder var! @@ -267,7 +267,7 @@ namespace tuplex { auto func = builder.GetInsertBlock()->getParent(); //env->debugPrint(builder, "joining records with all from bucket :P"); - auto numRows = builder.CreateLoad(builder.CreatePointerCast(bucketPtr, env->i64ptrType())); + auto numRows = builder.CreateLoad(builder.getInt64Ty(), builder.CreatePointerCast(bucketPtr, env->i64ptrType())); // env->debugPrint(builder, "bucket contains #rows: ", numRows); @@ -275,7 +275,7 @@ namespace tuplex { // uint8_t* row_data = rightPtr + sizeof(int64_t); // rightPtr += sizeof(int64_t) + row_length; - bucketPtr = builder.CreateGEP(bucketPtr, env->i64Const(sizeof(int64_t))); + bucketPtr = builder.MovePtrByBytes(bucketPtr, sizeof(int64_t)); // TODO: put bucketPtr Var in constructor auto bucketPtrVar = env->CreateFirstBlockAlloca(builder, @@ -295,14 +295,14 @@ namespace tuplex { builder.CreateBr(bbLoopCond); builder.SetInsertPoint(bbLoopCond); - auto cond = builder.CreateICmpSLT(builder.CreateLoad(loopVar), numRows); + auto cond = builder.CreateICmpSLT(builder.CreateLoad(builder.getInt64Ty(), loopVar), numRows); builder.CreateCondBr(cond, bbLoopBody, bbLoopDone); builder.SetInsertPoint(bbLoopBody); - bucketPtr = builder.CreateLoad(bucketPtrVar); - auto rowLength = builder.CreateLoad(builder.CreatePointerCast(bucketPtr, env->i64ptrType())); - bucketPtr = builder.CreateGEP(bucketPtr, env->i64Const(sizeof(int64_t))); + bucketPtr = builder.CreateLoad(env->i8ptrType(), bucketPtrVar); + auto rowLength = builder.CreateLoad(builder.getInt64Ty(), builder.CreatePointerCast(bucketPtr, env->i64ptrType())); + bucketPtr = builder.MovePtrByBytes(bucketPtr, sizeof(int64_t)); // actual data is now in bucketPtr // ==> deserialize! @@ -371,12 +371,12 @@ namespace tuplex { // logic here // move bucketPtr - builder.CreateStore(builder.CreateGEP(builder.CreateLoad(bucketPtrVar), + builder.CreateStore(builder.MovePtrByBytes(builder.CreateLoad(env->i8ptrType(), bucketPtrVar), builder.CreateAdd(env->i64Const(sizeof(int64_t)), rowLength)), bucketPtrVar); - builder.CreateStore(builder.CreateAdd(builder.CreateLoad(loopVar), env->i64Const(1)), loopVar); + builder.CreateStore(builder.CreateAdd(builder.CreateLoad(builder.getInt64Ty(), loopVar), env->i64Const(1)), loopVar); builder.CreateBr(bbLoopCond); builder.SetInsertPoint(bbLoopDone); diff --git a/tuplex/core/src/physical/HashProbeTask.cc b/tuplex/core/src/physical/HashProbeTask.cc index f46756480..a28785e7c 100644 --- a/tuplex/core/src/physical/HashProbeTask.cc +++ b/tuplex/core/src/physical/HashProbeTask.cc @@ -64,4 +64,8 @@ namespace tuplex { // TODO: history server notification! } + void HashProbeTask::releaseAllLocks() { + _output.unlock(); + _inputPartition->unlock(); + } } \ No newline at end of file diff --git a/tuplex/core/src/physical/IExceptionableTaskGenerator.cc b/tuplex/core/src/physical/IExceptionableTaskGenerator.cc index 7ff24b7f2..bf785be61 100644 --- a/tuplex/core/src/physical/IExceptionableTaskGenerator.cc +++ b/tuplex/core/src/physical/IExceptionableTaskGenerator.cc @@ -121,8 +121,8 @@ namespace tuplex { // adjust inputptr (has been already updated) to previous row uwsing inputlength - auto inputptr = builder.CreateGEP(getVariable(builder, "currentInputPtr"), - builder.CreateNeg(inputlength));//builder.CreateLoad(_currentInputPtrVar); + auto inputptr = builder.MovePtrByBytes(getVariable(builder, "currentInputPtr"), + builder.CreateNeg(inputlength)); std::vector eh_parameters{_parameters["userData"], ehcode, ehopid, row, inputptr, inputlength}; builder.CreateCall(eh_func, eh_parameters); @@ -142,26 +142,27 @@ namespace tuplex { void IExceptionableTaskGenerator::addVariable(IRBuilder &builder, const std::string name, llvm::Type *type, llvm::Value *initialValue) { - _variables[name] = builder.CreateAlloca(type, 0, nullptr, name); + _variables[name] = std::make_pair(type, builder.CreateAlloca(type, 0, nullptr, name)); if(initialValue) - builder.CreateStore(initialValue, _variables[name]); + assignToVariable(builder, name, initialValue); } llvm::Value* IExceptionableTaskGenerator::getVariable(IRBuilder &builder, const std::string name) { assert(_variables.find(name) != _variables.end()); - return builder.CreateLoad(_variables[name]); + return builder.CreateLoad(_variables[name].first, _variables[name].second); } llvm::Value* IExceptionableTaskGenerator::getPointerToVariable(IRBuilder &builder, const std::string name) { assert(_variables.find(name) != _variables.end()); - return _variables[name]; + return _variables[name].second; } void IExceptionableTaskGenerator::assignToVariable(IRBuilder &builder, const std::string name, llvm::Value *newValue) { assert(_variables.find(name) != _variables.end()); - builder.CreateStore(newValue, _variables[name]); + assert(newValue->getType() == _variables[name].first); + builder.CreateStore(newValue, _variables[name].second); } void IExceptionableTaskGenerator::linkBlocks() { @@ -235,7 +236,7 @@ namespace tuplex { // store back some variables in then block & make sure to mark as last block! // add to variable how much was serialized - auto newoutput = builder.CreateGEP(output, serializedRowSize); + auto newoutput = builder.MovePtrByBytes(output, serializedRowSize); assignToVariable(builder, "outputPtr", newoutput); auto newcapacity = builder.CreateSub(capacity, serializedRowSize); assignToVariable(builder, "outputCapacityLeft", newcapacity); @@ -244,7 +245,7 @@ namespace tuplex { // inc how many rows are written numRowsPtr = getVariable(builder, "outputBasePtr"); - auto curRows = builder.CreateLoad(numRowsPtr); + auto curRows = builder.CreateLoad(builder.getInt64Ty(), numRowsPtr); builder.CreateStore(builder.CreateAdd(curRows, _env->i64Const(1)), numRowsPtr); // inc how many writtes are written @@ -282,7 +283,7 @@ namespace tuplex { auto output_ptr = builder.CreateCall(func, parameters, "output_ptr"); // first save back to variables the memory request incl. 8 byte offset for number of rows! assignToVariable(builder, "outputBasePtr", builder.CreatePointerCast(output_ptr, _env->i64Type()->getPointerTo(0))); - assignToVariable(builder, "outputPtr", builder.CreateGEP(output_ptr, _env->i32Const(sizeof(int64_t)))); + assignToVariable(builder, "outputPtr", builder.MovePtrByBytes(output_ptr, sizeof(int64_t))); // check for null. If so (i.e. no memory returned), if so exit task function immediately // --> also if capacity returned is less than minRequested. diff --git a/tuplex/core/src/physical/LLVMOptimizer.cc b/tuplex/core/src/physical/LLVMOptimizer.cc index ee63adfa0..915d1f180 100644 --- a/tuplex/core/src/physical/LLVMOptimizer.cc +++ b/tuplex/core/src/physical/LLVMOptimizer.cc @@ -10,54 +10,57 @@ #include -#include "llvm/ADT/Triple.h" -#include "llvm/Analysis/CallGraph.h" -#include "llvm/Analysis/CallGraphSCCPass.h" -#include "llvm/Analysis/LoopPass.h" -#include "llvm/Analysis/RegionPass.h" -#include "llvm/Analysis/TargetLibraryInfo.h" -#include "llvm/Analysis/TargetTransformInfo.h" -#include "llvm/Bitcode/BitcodeWriterPass.h" -#include "llvm/CodeGen/TargetPassConfig.h" -#include "llvm/IR/DataLayout.h" -#include "llvm/IR/DebugInfo.h" -#include "llvm/IR/IRPrintingPasses.h" -#include "llvm/IR/LLVMContext.h" -#include "llvm/IR/LegacyPassManager.h" -#include "llvm/IR/LegacyPassNameParser.h" -#include "llvm/IR/Module.h" -#include "llvm/IR/Verifier.h" -#include "llvm/IRReader/IRReader.h" -#include "llvm/InitializePasses.h" -#include "llvm/LinkAllIR.h" -#include "llvm/LinkAllPasses.h" -#include "llvm/MC/SubtargetFeature.h" -#include "llvm/Support/Debug.h" -#include "llvm/Support/FileSystem.h" -#include "llvm/Support/Host.h" -#include "llvm/Support/ManagedStatic.h" -#include "llvm/Support/PluginLoader.h" -#include "llvm/Support/PrettyStackTrace.h" -#include "llvm/Support/Signals.h" -#include "llvm/Support/SourceMgr.h" -#include "llvm/Support/SystemUtils.h" - -#if LLVM_VERSION_MAJOR < 14 -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#if (LLVM_VERSION_MAJOR < 17) +#include +#include +#include #else +#include +#include +#endif +#if (LLVM_VERSION_MAJOR < 14) +#include +#else #include - #endif -#include "llvm/Support/TargetSelect.h" -#include "llvm/Support/ToolOutputFile.h" -#include "llvm/Support/YAMLTraits.h" -#include "llvm/Target/TargetMachine.h" -// #include "llvm/Transforms/Coroutines.h" -#include "llvm/Transforms/IPO/AlwaysInliner.h" -#include "llvm/Transforms/IPO/PassManagerBuilder.h" -#include "llvm/Transforms/Utils/Cloning.h" +#include +#include +#include +#include +#include +#include #include #include #include diff --git a/tuplex/core/src/physical/PipelineBuilder.cc b/tuplex/core/src/physical/PipelineBuilder.cc index c9fee174f..c49b9933e 100644 --- a/tuplex/core/src/physical/PipelineBuilder.cc +++ b/tuplex/core/src/physical/PipelineBuilder.cc @@ -1418,7 +1418,7 @@ namespace tuplex { auto func = quoteForCSV_prototype(env.getContext(), env.getModule().get()); val = builder.CreateCall(func, {val, size, quotedSize, env.i8Const(','), env.i8Const('"')}); fmtString += "%s"; - fmtSize = builder.CreateAdd(fmtSize, builder.CreateLoad(quotedSize)); + fmtSize = builder.CreateAdd(fmtSize, builder.CreateLoad(env.i64Type(), quotedSize)); } else if(type.isOptionType()) { // check element type & call string conversion function with convert @@ -1510,7 +1510,7 @@ namespace tuplex { auto snprintf_func = snprintf_prototype(env.getContext(), env.getModule().get()); //{csvRow, fmtSize, env().strConst(builder, fmtString), ...} - args[0] = builder.CreateLoad(bufVar); args[1] = fmtSize; args[2] = env.strConst(builder, fmtString); + args[0] = builder.CreateLoad(env.i8ptrType(), bufVar); args[1] = fmtSize; args[2] = env.strConst(builder, fmtString); auto charsRequired = builder.CreateCall(snprintf_func, args); auto sizeWritten = builder.CreateAdd(builder.CreateZExt(charsRequired, env.i64Type()), env.i64Const(1)); @@ -1525,7 +1525,7 @@ namespace tuplex { // realloc with sizeWritten // store new malloc in bufVar builder.CreateStore(env.malloc(builder, sizeWritten), bufVar); - args[0] = builder.CreateLoad(bufVar); + args[0] = builder.CreateLoad(env.i8ptrType(), bufVar); args[1] = sizeWritten; builder.CreateCall(snprintf_func, args); @@ -1539,7 +1539,7 @@ namespace tuplex { // then, call writeRow - auto buf = builder.CreateLoad(bufVar); + auto buf = builder.CreateLoad(env.i8ptrType(), bufVar); // use string length instead of size, because else writer will copy '\0' too! auto length = builder.CreateSub(sizeWritten, env.i64Const(1)); diff --git a/tuplex/core/src/physical/ResolveTask.cc b/tuplex/core/src/physical/ResolveTask.cc index fa9ab3312..24cf2b8dd 100644 --- a/tuplex/core/src/physical/ResolveTask.cc +++ b/tuplex/core/src/physical/ResolveTask.cc @@ -1424,4 +1424,18 @@ namespace tuplex { // -> 3 functions in python: a.) init aggregate, b.) update aggregate c.) later: combine aggregates (this will be done last) // @TODO. } + + void ResolveTask::releaseAllLocks() { + for(auto p : _partitions) + p->unlock(); + for(auto p : _exceptionPartitions) + p->unlock(); + for(auto p : _generalPartitions) + p->unlock(); + for(auto p : _fallbackPartitions) + p->unlock(); + this->_mergedRowsSink.unlock(); + this->_fallbackSink.unlock(); + this->_generalCaseSink.unlock(); + } } \ No newline at end of file diff --git a/tuplex/core/src/physical/ResultSet.cc b/tuplex/core/src/physical/ResultSet.cc index 5086a1e58..2ac3302a7 100644 --- a/tuplex/core/src/physical/ResultSet.cc +++ b/tuplex/core/src/physical/ResultSet.cc @@ -295,23 +295,33 @@ namespace tuplex { Row ResultSet::getNextRow() { if (_currentNormalPartitions.empty() && _currentFallbackPartitions.empty() && _currentGeneralPartitions.empty()) { + // all partitions are exhausted return empty row as default value - if (_partitionGroups.empty()) + if (_partitionGroups.empty()) { return Row(); + } + _normalRowCounter = 0; _generalRowCounter = 0; _fallbackRowCounter = 0; + //logger.info("get first partition group"); auto group = _partitionGroups.front(); _partitionGroups.pop_front(); for (int i = group.normalPartitionStartIndex; i < group.normalPartitionStartIndex + group.numNormalPartitions; ++i) { + if(_remainingNormalPartitions.empty()) + break; // TODO: need to fix for take (?) _currentNormalPartitions.push_back(_remainingNormalPartitions.front()); _remainingNormalPartitions.pop_front(); } for (int i = group.generalPartitionStartIndex; i < group.generalPartitionStartIndex + group.numGeneralPartitions; ++i) { + if(_remainingGeneralPartitions.empty()) + break; // TODO: need to fix for take (?) _currentGeneralPartitions.push_back(_remainingGeneralPartitions.front()); _remainingGeneralPartitions.pop_front(); } for (int i = group.fallbackPartitionStartIndex; i < group.fallbackPartitionStartIndex + group.numFallbackPartitions; ++i) { + if(_remainingFallbackPartitions.empty()) + break; // TODO: need to fix for take (?) _currentFallbackPartitions.push_back(_remainingFallbackPartitions.front()); _remainingFallbackPartitions.pop_front(); } @@ -342,7 +352,8 @@ namespace tuplex { return getNextFallbackRow(); } } else { - // all three cases remain, three way row comparison + + // all three cases remain, three-way row comparison auto generalRowInd = currentGeneralRowInd(); auto fallbackRowInd = currentFallbackRowInd(); if (_normalRowCounter + _generalRowCounter < generalRowInd && _normalRowCounter + _generalRowCounter + _fallbackRowCounter < fallbackRowInd) { diff --git a/tuplex/core/src/physical/TransformTask.cc b/tuplex/core/src/physical/TransformTask.cc index 8dae4952e..3379dfcea 100644 --- a/tuplex/core/src/physical/TransformTask.cc +++ b/tuplex/core/src/physical/TransformTask.cc @@ -1056,4 +1056,11 @@ namespace tuplex { } assert(_htable->hm); } + + void TransformTask::releaseAllLocks() { + // need to unlock all input partitions & output partitions (sinks) + for(auto p : _inputPartitions) + p->unlock(); + unlockAllMemorySinks(); + } } diff --git a/tuplex/core/src/physical/TuplexSourceTaskBuilder.cc b/tuplex/core/src/physical/TuplexSourceTaskBuilder.cc index bde539d30..b84e7773e 100644 --- a/tuplex/core/src/physical/TuplexSourceTaskBuilder.cc +++ b/tuplex/core/src/physical/TuplexSourceTaskBuilder.cc @@ -37,7 +37,7 @@ namespace tuplex { callProcessFuncWithHandler(builder, userData, tuple, normalRowCountVar, rowNumberVar, inputRowPtr, inputRowSize, terminateEarlyOnLimitCode, processRowFunc); } else { - Value *normalRowCount = builder.CreateLoad(normalRowCountVar, "normalRowCount"); + Value *normalRowCount = builder.CreateLoad(builder.getInt64Ty(), normalRowCountVar, "normalRowCount"); builder.CreateStore(builder.CreateAdd(normalRowCount, env().i64Const(1)), normalRowCountVar); } } diff --git a/tuplex/io/CMakeLists.txt b/tuplex/io/CMakeLists.txt index 19cc26d32..de1af5b52 100644 --- a/tuplex/io/CMakeLists.txt +++ b/tuplex/io/CMakeLists.txt @@ -20,14 +20,9 @@ include_directories(${Boost_INCLUDE_DIR}) # Install and build ORC C++ APIs when BUILD_WITH_ORC is active if(BUILD_WITH_ORC) message(STATUS "Building Tuplex with ORC support") + message(STATUS "Protobuf_HOME is ${Protobuf_HOME}") - # https://github.com/protocolbuffers/protobuf/issues/12637 - find_package(Protobuf CONFIG) - if(NOT Protobuf_NOTFOUND) - find_package(Protobuf REQUIRED) - endif() - get_filename_component(Protobuf_HOME "${Protobuf_INCLUDE_DIRS}" DIRECTORY) - + ASSERT_VAR(Protobuf_HOME) # For MacOS, check whether certain 3rd party libs are already installed via brew if(BREW_FOUND) if(APPLE) @@ -199,9 +194,14 @@ if(BUILD_WITH_ORC) ExternalProject_Add(orc GIT_REPOSITORY https://github.com/apache/orc.git - GIT_TAG rel/release-1.9.1 + GIT_TAG rel/release-1.9.2 TIMEOUT 5 - CMAKE_ARGS -DBUILD_LIBHDFSPP=OFF -DSNAPPY_HOME=${SNAPPY_HOME} -DLZ4_HOME=${LZ4_HOME} -DZSTD_HOME=${ZSTD_HOME} -DZLIB_HOME=${ZLIB_HOME} -DOPENSSL_ROOT_DIR=${OPENSSL_ROOT_DIR} -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER} -DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS} -DCMAKE_INSTALL_PREFIX=${EXTERNAL_INSTALL_LOCATION} -DSTOP_BUILD_ON_WARNING=OFF -DBUILD_JAVA=OFF -DBUILD_TOOLS=OFF -DBUILD_CPP_TESTS=OFF -DBUILD_POSITION_INDEPENDENT_LIB=ON -DPROTOBUF_HOME=${Protobuf_HOME} + CMAKE_ARGS -DBUILD_LIBHDFSPP=OFF -DSNAPPY_HOME=${SNAPPY_HOME} + -DLZ4_HOME=${LZ4_HOME} -DZSTD_HOME=${ZSTD_HOME} -DZLIB_HOME=${ZLIB_HOME} + -DOPENSSL_ROOT_DIR=${OPENSSL_ROOT_DIR} -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER} + -DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS} -DCMAKE_INSTALL_PREFIX=${EXTERNAL_INSTALL_LOCATION} + -DSTOP_BUILD_ON_WARNING=OFF -DBUILD_JAVA=OFF -DBUILD_TOOLS=OFF -DBUILD_CPP_TESTS=OFF + -DBUILD_POSITION_INDEPENDENT_LIB=ON -DPROTOBUF_HOME=${Protobuf_HOME} PREFIX "${EXTERNAL_INSTALL_LOCATION}" UPDATE_COMMAND "" # Disable update step: clones the project only once BUILD_BYPRODUCTS ${EXTERNAL_INSTALL_LOCATION}/lib/liborc.a ${ORC_THIRD_PARTY_LIBS} diff --git a/tuplex/python/CMakeLists.txt b/tuplex/python/CMakeLists.txt index 1bfacc167..e7ed0c139 100644 --- a/tuplex/python/CMakeLists.txt +++ b/tuplex/python/CMakeLists.txt @@ -33,7 +33,6 @@ file(GLOB_RECURSE SOURCES src/*.cc) message(STATUS "libs: ${Python3_LIBRARIES}") message(STATUS "includes: ${Python3_INCLUDE_DIRS}") - ## use e.g. cpm https://github.com/cpm-cmake/CPM.cmake ## fetch pybind11 (external project) #CPMAddPackage( @@ -63,7 +62,9 @@ add_dependencies(${MODULE_NAME} libcore libcodegen) target_include_directories(${MODULE_NAME} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include ${CMAKE_CURRENT_BINARY_DIR}) -target_link_libraries(${MODULE_NAME} PRIVATE +if(APPLE) +target_link_libraries(${MODULE_NAME} PUBLIC Backward::Interface + PRIVATE libcodegen libcore libcpythonadapter @@ -73,6 +74,19 @@ target_link_libraries(${MODULE_NAME} PRIVATE Boost::thread Boost::system Boost::filesystem) +else() + target_link_libraries(${MODULE_NAME} + PRIVATE + libcodegen + libcore + libcpythonadapter + libutils + libio + Boost::iostreams + Boost::thread + Boost::system + Boost::filesystem) +endif() #check if single generator or multiple # copy setup.py/MANIFEST.in files and the directory tuplex diff --git a/tuplex/python/include/PythonCommon.h b/tuplex/python/include/PythonCommon.h index f6b34d63d..f34a4b832 100644 --- a/tuplex/python/include/PythonCommon.h +++ b/tuplex/python/include/PythonCommon.h @@ -1,3 +1,4 @@ + //--------------------------------------------------------------------------------------------------------------------// // // // Tuplex: Blazing Fast Python Data Science // @@ -145,6 +146,14 @@ namespace tuplex { using no_gil_python3_sink_st = nogil_python3_sink; extern py::object registerPythonLoggingCallback(py::object callback_functor); + + inline py::list pybind_list_from_obj(PyObject* listObj) { + assert(listObj); + assert(PyList_Check(listObj)); + assert(listObj->ob_refcnt > 0); + + return py::cast(listObj); + } } #endif //TUPLEX_PYTHONCOMMON_H diff --git a/tuplex/python/src/PythonCommon.cc b/tuplex/python/src/PythonCommon.cc index affc009cf..6e064ff9f 100644 --- a/tuplex/python/src/PythonCommon.cc +++ b/tuplex/python/src/PythonCommon.cc @@ -10,6 +10,15 @@ #include + +// include backward lib +#ifdef __APPLE__ +// init backtrace +#define BACKWARD_HAS_DWARF 1 +#include +backward::SignalHandling sh; +#endif + namespace tuplex { py::object registerPythonLoggingCallback(py::object callback_functor) { python::registerWithInterpreter(); diff --git a/tuplex/python/src/PythonDataSet.cc b/tuplex/python/src/PythonDataSet.cc index a066297c6..90c2b47b4 100644 --- a/tuplex/python/src/PythonDataSet.cc +++ b/tuplex/python/src/PythonDataSet.cc @@ -74,11 +74,17 @@ namespace tuplex { Logger::instance().flushToPython(); auto listObj = PyList_New(1); PyList_SetItem(listObj, 0, python::PyString_FromString(err_message.c_str())); - return py::reinterpret_borrow(listObj); + return pybind_list_from_obj(listObj); } + + Logger::instance().flushToPython(); + // collect results & transfer them back to python // new version, directly interact with the interpreter Timer timer; + + Logger::instance().logger("python").info("Converting result-set to CPython objects"); + // build python list object from resultset auto listObj = resultSetToCPython(rs.get(), std::numeric_limits::max()); @@ -98,7 +104,7 @@ namespace tuplex { Logger::instance().logger("python").info("Data transfer back to Python took " + std::to_string(timer.time()) + " seconds"); - auto list = py::reinterpret_borrow(listObj); + auto list = pybind_list_from_obj(listObj); // Logger::instance().flushAll(); Logger::instance().flushToPython(); @@ -161,7 +167,7 @@ namespace tuplex { Logger::instance().flushToPython(); auto listObj = PyList_New(1); PyList_SetItem(listObj, 0, python::PyString_FromString(err_message.c_str())); - return py::reinterpret_borrow(listObj); + return pybind_list_from_obj(listObj); } // collect results & transfer them back to python @@ -178,7 +184,7 @@ namespace tuplex { if (ss.str().length() > 0) PySys_FormatStdout("%s", ss.str().c_str()); - return py::reinterpret_borrow(listObj); + return pybind_list_from_obj(listObj); } } @@ -885,25 +891,64 @@ namespace tuplex { PyObject* PythonDataSet::anyToCPythonWithPyObjects(ResultSet* rs, size_t maxRowCount) { assert(rs); + auto& logger = Logger::instance().logger("python"); + // simply call the getnext row function from resultset PyObject * emptyListObj = PyList_New(0); size_t rowCount = std::min(rs->rowCount(), maxRowCount); + logger.debug("Found " + std::to_string(rowCount) + " rows to convert to python objects."); PyObject * listObj = PyList_New(rowCount); if (PyErr_Occurred()) { + PyErr_Print(); PyErr_Clear(); return emptyListObj; } - for(int i = 0; i < rowCount; ++i) { + // avoid locking to often, so retrieve rows in batches +#ifndef NDEBUG + static const size_t ROW_BATCH_SIZE = 1024; +#else + static const size_t ROW_BATCH_SIZE = 2048 * 8; +#endif + for(int i = 0; i < rowCount; i += ROW_BATCH_SIZE) { + // convert to vector of rows, then lock GIL and convert each to python + std::vector v; v.reserve(ROW_BATCH_SIZE); + int max_j = std::min((int)rowCount - i, (int)ROW_BATCH_SIZE); assert(i >= 0); python::unlockGIL(); - auto row = rs->getNextRow(); + logger.debug("Converting batch of rows " + std::to_string(i) + " - " + std::to_string(i + max_j) + " from result set to rows."); + for(int j = 0; j < max_j; ++j) { + v.emplace_back(rs->getNextRow()); + } + + python::lockGIL(); - auto py_row = python::rowToPython(row, true); - assert(py_row); - PyList_SET_ITEM(listObj, i, py_row); + // perfom signal check after each batch to make sure interrupts are handled correctly + check_and_forward_signals(true); + + // conversion to python objects + for(int j = 0; j < max_j; ++j) { + auto py_row = python::rowToPython(v[j], true); + assert(py_row); + PyList_SET_ITEM(listObj, i + j, py_row); + } + logger.debug("Wrote batch of rows " + std::to_string(i) + " - " + std::to_string(i + max_j) + " from result set to Python list."); + + // check & forward signals again + check_and_forward_signals(true); } + // // old, batch-less version + // for(int i = 0; i < rowCount; ++i) { + // python::unlockGIL(); + // auto row = rs->getNextRow(); + // python::lockGIL(); + // auto py_row = python::rowToPython(row, true); + // assert(py_row); + // PyList_SET_ITEM(listObj, i, py_row); + // } + + logger.debug("Python object conversion done, writing list output object"); return listObj; } @@ -1354,8 +1399,10 @@ namespace tuplex { // b.c. merging of arbitrary python objects is not implemented yet, whenever they're present, use general // version // @TODO: this could be optimized! - if(rs->fallbackRowCount() != 0) + if(rs->fallbackRowCount() != 0) { + Logger::instance().defaultLogger().info("Using slow anyToCPythonWithPyObjects conversion function, because fallback row count is not 0."); return anyToCPythonWithPyObjects(rs, maxRowCount); + } auto type = rs->schema().getRowType(); // if single type, reset by one @@ -1665,7 +1712,7 @@ namespace tuplex { auto typeobj = python::encodePythonSchema(row_type.parameters()[i]); PyList_SetItem(listObj, i, typeobj); } - return py::reinterpret_borrow(listObj); + return pybind_list_from_obj(listObj); } py::object PythonDataSet::exception_counts() { diff --git a/tuplex/python/tests/notebook_utils.py b/tuplex/python/tests/notebook_utils.py index e8bbaeaf0..06fb2b55a 100644 --- a/tuplex/python/tests/notebook_utils.py +++ b/tuplex/python/tests/notebook_utils.py @@ -14,6 +14,8 @@ import tempfile import subprocess import json +import tempfile +import logging def get_jupyter_version(): """helper to get version of jupyter as tuple""" @@ -87,12 +89,12 @@ def get_jupyter_function_code(func_name, code): Returns: result of get_source run in jupyter notebook """ - fname = 'testnb.ipynb' - - # create notebook - if os.path.exists(fname): - raise Exception('File {} already exists. Aborting testing.'.format(fname)) + # create temp name + fname = None + with tempfile.NamedTemporaryFile() as tmp: + fname = tmp.name + '.ipynb' + logging.debug(f'Writing data to temp file {fname}') try: create_function_notebook(func_name, code, fname) diff --git a/tuplex/python/tests/test_exceptions.py b/tuplex/python/tests/test_exceptions.py index b8af4c44e..81ec43222 100644 --- a/tuplex/python/tests/test_exceptions.py +++ b/tuplex/python/tests/test_exceptions.py @@ -10,19 +10,27 @@ #----------------------------------------------------------------------------------------------------------------------# import unittest +import pytest from tuplex import Context from random import randint, sample, shuffle from math import floor from helper import options_for_pytest -class TestExceptions(unittest.TestCase): - def setUp(self): +class TestExceptions: + + def setup_method(self, method): self.conf = options_for_pytest() self.conf.update({"tuplex.webui.enable": False, "executorCount": 8, "executorMemory": "256MB", "driverMemory": "256MB", "partitionSize": "256KB", "tuplex.optimizer.mergeExceptionsInOrder": False}) self.conf_in_order = options_for_pytest() self.conf_in_order.update({"tuplex.webui.enable": False, "executorCount": 8, "executorMemory": "256MB", "driverMemory": "256MB", "partitionSize": "256KB", "tuplex.optimizer.mergeExceptionsInOrder": True}) + def assertEqual(self, lhs, rhs): + assert lhs == rhs + + def assertTrue(self, ans): + assert ans + def test_merge_with_filter(self): c = Context(self.conf_in_order) @@ -36,8 +44,11 @@ def test_merge_with_filter(self): output = c.parallelize([-1.1, 1, 2, -2.2, 4, 5, -6.6]).filter(lambda x: x < 0 or x > 3).collect() self.compare_in_order([-1.1, -2.2, 4, 5, -6.6], output) - input = list(range(1, 100001)) - sampled = sample(input, 40000) + @pytest.mark.parametrize("n", [1000, 2500]) + def test_merge_with_filter(self, n): + c = Context(self.conf_in_order) + input = list(range(1, n + 1)) + sampled = sample(input, int(0.4 * n)) for i in sampled: ind = randint(0, 1) if ind == 0: @@ -47,7 +58,7 @@ def test_merge_with_filter(self): output = c.parallelize(input).filter(lambda x: x != 0).collect() self.compare_in_order(list(filter(lambda x: x != 0, input)), output) - + def process(self, input_size, num_filtered, num_schema, num_resolved, num_unresolved): inds = list(range(input_size)) shuffle(inds) @@ -86,16 +97,20 @@ def resolve_udf(x): else: return x - c = Context(self.conf_in_order) + # for larger partitions, there's a multi-threading issue for this. + # need to fix. + conf = self.conf_in_order + # use this line to force single-threaded + # conf['executorCount'] = 0 + c = Context(conf) output = c.parallelize(input).filter(filter_udf).map(map_udf).resolve(ZeroDivisionError, resolve_udf).collect() self.assertEqual(list(filter(lambda x: x != -3 and x != -1, input)), output) - def test_everything(self): - self.process(100, 0.25, 0.25, 0.25, 0.25) - self.process(1000, 0.25, 0.25, 0.25, 0.25) - self.process(10000, 0.25, 0.25, 0.25, 0.25) - self.process(100000, 0.25, 0.25, 0.25, 0.25) + # test tends to be slow on Github actions, do not test for 100k + @pytest.mark.parametrize("n", [100, 1000, 10000]) + def test_everything(self, n): + self.process(n, 0.25, 0.25, 0.25, 0.25) def test_merge_with_filter_on_exps(self): c = Context(self.conf_in_order) @@ -103,17 +118,18 @@ def test_merge_with_filter_on_exps(self): output = c.parallelize([0, 1.1, 2.2, 1, 3.3, 4, 5]).filter(lambda x: x != 0 and x != 1.1).collect() self.compare_in_order([2.2, 1, 3.3, 4, 5], output) - def test_merge_runtime_only(self): + @pytest.mark.parametrize("n", [10000]) + def test_merge_runtime_only(self, n): c = Context(self.conf_in_order) output = c.parallelize([1, 0, 0, 4]).map(lambda x: 1 // x).resolve(ZeroDivisionError, lambda x: -1).collect() self.compare_in_order([1, -1, -1, 0], output) - output = c.parallelize([0 for i in range(100000)]).map(lambda x: 1 // x).resolve(ZeroDivisionError, lambda x: -1).collect() - self.compare_in_order([-1 for i in range(100000)], output) + output = c.parallelize([0 for i in range(n)]).map(lambda x: 1 // x).resolve(ZeroDivisionError, lambda x: -1).collect() + self.compare_in_order([-1 for i in range(n)], output) input = [] - for i in range(100000): + for i in range(n): if i % 100 == 0: input.append(0) else: @@ -122,7 +138,7 @@ def test_merge_runtime_only(self): output = c.parallelize(input).map(lambda x: 1 // x).resolve(ZeroDivisionError, lambda x: -1).collect() expectedOutput = [] - for i in range(100000): + for i in range(n): if i % 100 == 0: expectedOutput.append(-1) else: @@ -140,7 +156,8 @@ def test_merge_some_fail(self): .collect() self.compare_in_order([1, 2, -1, 5, 6, 7, 10, 11, 12, -3, 15], output) - def test_merge_both_but_no_resolve(self): + @pytest.mark.parametrize("n", [10000]) + def test_merge_both_but_no_resolve(self, n): c = Context(self.conf_in_order) input = [1, 2, -1, "a", 5, 6, 7, -2, "b", 10, 11, 12, -3, "c", 15] @@ -150,8 +167,8 @@ def test_merge_both_but_no_resolve(self): .collect() self.compare_in_order([1, 2, -1, "a", 5, 6, 7, "b", 10, 11, 12, -3, "c", 15], output) - input = list(range(1, 100001)) - sampled = sample(input, 40000) + input = list(range(1, n + 1)) + sampled = sample(input, int(0.4 * n)) for i in sampled: ind = randint(0, 2) if ind == 0: @@ -165,7 +182,8 @@ def test_merge_both_but_no_resolve(self): output = c.parallelize(input).map(lambda x: 1 // (x - x) if x == -1 or x == 0 else x).resolve(ZeroDivisionError, lambda x: 1 // x if x == 0 else x).collect() self.compare_in_order(expectedOutput, output) - def test_merge_both(self): + @pytest.mark.parametrize("n", [10000]) + def test_merge_both(self, n): c = Context(self.conf_in_order) input = [1, 2, 0, "a", 5, 6, 7, 0, "b", 10, 11, 12, 0, "c", 15] @@ -176,8 +194,8 @@ def test_merge_both(self): output = c.parallelize(input).map(lambda x: 1 // x if x == 0 else x).resolve(ZeroDivisionError, lambda x: -1).collect() self.compare_in_order([1, 2, "a", -1, 5, 6, 7, "b", -1, 10, 11, 12, "c", -1, 15], output) - input = list(range(1, 100001)) - sampled = sample(input, 40000) + input = list(range(1, n + 1)) + sampled = sample(input, int(0.4 * n)) for i in sampled: if randint(0, 1) == 0: input[i - 1] = str(input[i - 1]) @@ -187,7 +205,9 @@ def test_merge_both(self): output = c.parallelize(input).map(lambda x: 1 // x if x == 0 else x).resolve(ZeroDivisionError, lambda x: x).collect() self.compare_in_order(input, output) - def test_merge_input_only(self): + # 40k too slow under macOS, need to investigate + @pytest.mark.parametrize("n", [10000]) + def test_merge_input_only(self, n): c = Context(self.conf_in_order) input = [1, 2, "a", 4, 5, "b", 6, 7, 8, 9, 10, "d"] @@ -195,7 +215,7 @@ def test_merge_input_only(self): self.compare_in_order(input, output) input = [] - for i in range(40000): + for i in range(n): if i % 100 == 0: input.append(str(i)) else: @@ -253,7 +273,8 @@ def test_no_merge_some_fail(self): .collect() self.compare([1, 2, -1, 5, 6, 7, 10, 11, 12, -3, 15], output) - def test_no_merge_both_but_no_resolve(self): + @pytest.mark.parametrize("n", [10000]) + def test_no_merge_both_but_no_resolve(self, n): c = Context(self.conf) input = [1, 2, -1, "a", 5, 6, 7, -2, "b", 10, 11, 12, -3, "c", 15] @@ -263,8 +284,8 @@ def test_no_merge_both_but_no_resolve(self): .collect() self.compare([1, 2, -1, "a", 5, 6, 7, "b", 10, 11, 12, -3, "c", 15], output) - input = list(range(1, 100001)) - sampled = sample(input, 40000) + input = list(range(1, n + 1)) + sampled = sample(input, int(0.4 * n)) for i in sampled: ind = randint(0, 2) if ind == 0: @@ -278,7 +299,8 @@ def test_no_merge_both_but_no_resolve(self): output = c.parallelize(input).map(lambda x: 1 // (x - x) if x == -1 or x == 0 else x).resolve(ZeroDivisionError, lambda x: 1 // x if x == 0 else x).collect() self.compare(expectedOutput, output) - def test_no_merge_both(self): + @pytest.mark.parametrize("n", [10000]) + def test_no_merge_both(self, n): c = Context(self.conf) input = [1, 2, 0, "a", 5, 6, 7, 0, "b", 10, 11, 12, 0, "c", 15] @@ -289,8 +311,8 @@ def test_no_merge_both(self): output = c.parallelize(input).map(lambda x: 1 // x if x == 0 else x).resolve(ZeroDivisionError, lambda x: -1).collect() self.compare([1, 2, "a", -1, 5, 6, 7, "b", -1, 10, 11, 12, "c", -1, 15], output) - input = list(range(1, 100001)) - sampled = sample(input, 40000) + input = list(range(1, n + 1)) + sampled = sample(input, int(0.4 * n)) for i in sampled: if randint(0, 1) == 0: input[i - 1] = str(input[i - 1]) @@ -300,7 +322,9 @@ def test_no_merge_both(self): output = c.parallelize(input).map(lambda x: 1 // x if x == 0 else x).resolve(ZeroDivisionError, lambda x: x).collect() self.compare(input, output) - def test_no_merge_input_only(self): + # 40k too slow under macOS, need to investigate. + @pytest.mark.parametrize("n", [10000]) + def test_no_merge_input_only(self, n): c = Context(self.conf) input = [1, 2, "a", 4, 5, "b", 6, 7, 8, 9, 10, "d"] @@ -308,7 +332,7 @@ def test_no_merge_input_only(self): self.compare(input, output) input = [] - for i in range(40000): + for i in range(n): if i % 100 == 0: input.append(str(i)) else: @@ -317,14 +341,15 @@ def test_no_merge_input_only(self): output = c.parallelize(input).map(lambda x: x).collect() self.compare(input, output) - def test_no_merge_runtime_only(self): + @pytest.mark.parametrize("n", [10000]) + def test_no_merge_runtime_only(self, n): c = Context(self.conf) output = c.parallelize([1, 0, 0, 4]).map(lambda x: 1 // x).resolve(ZeroDivisionError, lambda x: -1).collect() self.compare([1, -1, -1, 0], output) input = [] - for i in range(100000): + for i in range(n): if i % 100 == 0: input.append(0) else: @@ -333,7 +358,7 @@ def test_no_merge_runtime_only(self): output = c.parallelize(input).map(lambda x: 1 // x).resolve(ZeroDivisionError, lambda x: -1).collect() expectedOutput = [] - for i in range(100000): + for i in range(n): if i % 100 == 0: expectedOutput.append(-1) else: @@ -341,7 +366,9 @@ def test_no_merge_runtime_only(self): self.compare(expectedOutput, output) - def test_parallelize_exceptions_no_merge(self): + # 50k too slow under macOS, need to investigate + @pytest.mark.parametrize("n", [10000]) + def test_parallelize_exceptions_no_merge(self, n): c = Context(self.conf) output = c.parallelize([1, 2, 3, 4, None]).map(lambda x: x).collect() @@ -362,7 +389,7 @@ def test_parallelize_exceptions_no_merge(self): l1 = [] l2 = [] input = [] - for i in range(50000): + for i in range(n): if i % 100 == 0: l2.append(str(i)) input.append(str(i)) @@ -388,7 +415,7 @@ def compare_in_order(self, expectedOutput, output): def test_withColumn(self): c = Context(self.conf_in_order) - ds = c.parallelize([(1, "a", True), (0, "b", False), (3, "c", True)])\ + ds = c.parallelize([(1, "a", True), (0, "b", False), (3, "c", True)]) \ .withColumn("new", lambda x, y, z: str(1 // x) + y) output = ds.collect() ecounts = ds.exception_counts diff --git a/tuplex/python/tests/test_tuples.py b/tuplex/python/tests/test_tuples.py index 85b08b212..c1e795064 100644 --- a/tuplex/python/tests/test_tuples.py +++ b/tuplex/python/tests/test_tuples.py @@ -17,7 +17,8 @@ class TestTuples(unittest.TestCase): def setUp(self): self.conf = options_for_pytest() - self.conf.update({"webui.enable" : False, "driverMemory" : "16MB", "partitionSize" : "256KB"}) + self.conf.update({"webui.enable" : False, "driverMemory" : "32MB", "executorCount": 4, + "executorMemory": "32MB", "partitionSize" : "64KB"}) def testEmptyTupleI(self): c = Context(self.conf) diff --git a/tuplex/python/tuplex/context.py b/tuplex/python/tuplex/context.py index 70ff3d8e6..f92a5ddee 100644 --- a/tuplex/python/tuplex/context.py +++ b/tuplex/python/tuplex/context.py @@ -190,6 +190,7 @@ def __init__(self, conf=None, name="", **kwargs): if options['tuplex.webui.enable']: ensure_webui(options) + # last arg are the options as json string serialized b.c. of boost python problems # because webui=False/True is convenient, pass it as well to tuplex options if 'tuplex.webui' in options.keys(): options['tuplex.webui.enable'] = options['tuplex.webui'] diff --git a/tuplex/runtime/include/Runtime.h b/tuplex/runtime/include/Runtime.h index b23f0060c..0f5ce8ac1 100644 --- a/tuplex/runtime/include/Runtime.h +++ b/tuplex/runtime/include/Runtime.h @@ -14,6 +14,8 @@ // this file defines external C functions accesible from within the Python/UDF Compiler. Functions should be prefixed // with rt (no namespaces in C :/ ) +#define EXPORT_SYMBOL __attribute__((visibility("default"))) + #ifdef __cplusplus extern "C" { #endif @@ -28,46 +30,46 @@ extern "C" { * controls how much memory the compiled codepath should use for malloc/free * @param size if 0, dynamic autogrowth is assumed */ -extern void setRunTimeMemory(const size_t size, size_t blockSize) noexcept; -extern size_t getRunTimeMemorySize() noexcept; +EXPORT_SYMBOL extern void setRunTimeMemory(const size_t size, size_t blockSize) noexcept; +EXPORT_SYMBOL extern size_t getRunTimeMemorySize() noexcept; /*! * needs to be called in order to free all memory as used by UDFs. */ -extern void freeRunTimeMemory() noexcept; +EXPORT_SYMBOL extern void freeRunTimeMemory() noexcept; /*! * delete heap. */ -extern void releaseRunTimeMemory() noexcept; +EXPORT_SYMBOL extern void releaseRunTimeMemory() noexcept; /*! * returns address for memory block with given size * @param size * @return */ -extern void* rtmalloc(const size_t size) noexcept; // !!! do not change name without changing LLVMEnvironment.h malloc +EXPORT_SYMBOL extern void* rtmalloc(const size_t size) noexcept; // !!! do not change name without changing LLVMEnvironment.h malloc /*! * frees memory block * @param ptr */ -extern void rtfree(void* ptr) noexcept; +EXPORT_SYMBOL extern void rtfree(void* ptr) noexcept; /*! * frees all memory allocated by malloc at this point, i.e. garbage collection. * However, the C memory management is not invoked. (this is faster than always calling malloc/free) */ -extern void rtfree_all() noexcept; // !!! do not change without changing LLVMEnvironment.h freeAll +EXPORT_SYMBOL extern void rtfree_all() noexcept; // !!! do not change without changing LLVMEnvironment.h freeAll /*********** * fast conversion functions * @Todo: Maybe later add llvm versions of them, i.e. by linking the module to further optimize the code */ -extern int32_t fast_atoi64(const char *start, const char *end, int64_t* out); -extern int32_t fast_atod(const char *start, const char *end, double* out); -extern int32_t fast_atob(const char *start, const char *end, unsigned char *out); -extern int32_t fast_dequote(const char *start, const char *end, char **out, int64_t* size); +EXPORT_SYMBOL extern int32_t fast_atoi64(const char *start, const char *end, int64_t* out); +EXPORT_SYMBOL extern int32_t fast_atod(const char *start, const char *end, double* out); +EXPORT_SYMBOL extern int32_t fast_atob(const char *start, const char *end, unsigned char *out); +EXPORT_SYMBOL extern int32_t fast_dequote(const char *start, const char *end, char **out, int64_t* size); /*! * if necessary, return runtime allocated CSV quoted string, if not return string itself @@ -75,41 +77,41 @@ extern int32_t fast_dequote(const char *start, const char *end, char **out, int6 * @param size * @return */ -extern char* quoteForCSV(const char *str, int64_t size, int64_t* new_size, char separator, char quotechar); +EXPORT_SYMBOL extern char* quoteForCSV(const char *str, int64_t size, int64_t* new_size, char separator, char quotechar); -extern char* csvNormalize(const char quotechar, const char* start, const char* end, int64_t* ret_size); +EXPORT_SYMBOL extern char* csvNormalize(const char quotechar, const char* start, const char* end, int64_t* ret_size); // python3 compatible float to str function // i.e. 0.0 is outputted to 0.0 instead of 0 // --> bug or feature in python3?? -extern char* floatToStr(const double d, int64_t* res_size); +EXPORT_SYMBOL extern char* floatToStr(const double d, int64_t* res_size); /****** * String functions */ -extern char* strCenter(const char* s, int64_t s_size, int64_t width, int64_t* res_size, const char fillchar); -extern char* strLower(const char* s, int64_t size); -extern const char* strLowerSIMD(const char *s, int64_t size); -extern char* strUpper(const char* s, int64_t size); -extern char* strSwapcase(const char* s, int64_t size); -extern char* strFormat(const char* fmt, int64_t* res_size, const char* argtypes, ...); -extern int64_t strRfind(const char* s, const char* needle); -extern char* strReplace(const char* str, const char* from, const char* to, int64_t* res_size); - -extern char* strRStrip(const char* str, const char* chars, int64_t* res_size); -extern char* strLStrip(const char* str, const char* chars, int64_t* res_size); -extern char* strStrip(const char* str, const char* chars, int64_t* res_size); -extern int64_t strCount(const char* str, const char* sub, int64_t strSize, int64_t subSize); -extern int8_t strIsDecimal(const char* str); -extern int8_t strIsDigit(const char* str); -extern int8_t strIsAlpha(const char* str); -extern int8_t strIsAlNum(const char* str); - -extern char* strJoin(const char *base_str, int64_t base_str_size, int64_t num_words, const char** str_array, const int64_t* len_array, int64_t* res_size); -extern int64_t strSplit(const char *base_str, int64_t base_str_length, const char *delim, int64_t delim_length, char*** res_str_array, int64_t** res_len_array, int64_t *res_list_size); +EXPORT_SYMBOL extern char* strCenter(const char* s, int64_t s_size, int64_t width, int64_t* res_size, const char fillchar); +EXPORT_SYMBOL extern char* strLower(const char* s, int64_t size); +EXPORT_SYMBOL extern const char* strLowerSIMD(const char *s, int64_t size); +EXPORT_SYMBOL extern char* strUpper(const char* s, int64_t size); +EXPORT_SYMBOL extern char* strSwapcase(const char* s, int64_t size); +EXPORT_SYMBOL extern char* strFormat(const char* fmt, int64_t* res_size, const char* argtypes, ...); +EXPORT_SYMBOL extern int64_t strRfind(const char* s, const char* needle); +EXPORT_SYMBOL extern char* strReplace(const char* str, const char* from, const char* to, int64_t* res_size); + +EXPORT_SYMBOL extern char* strRStrip(const char* str, const char* chars, int64_t* res_size); +EXPORT_SYMBOL extern char* strLStrip(const char* str, const char* chars, int64_t* res_size); +EXPORT_SYMBOL extern char* strStrip(const char* str, const char* chars, int64_t* res_size); +EXPORT_SYMBOL extern int64_t strCount(const char* str, const char* sub, int64_t strSize, int64_t subSize); +EXPORT_SYMBOL extern int8_t strIsDecimal(const char* str); +EXPORT_SYMBOL extern int8_t strIsDigit(const char* str); +EXPORT_SYMBOL extern int8_t strIsAlpha(const char* str); +EXPORT_SYMBOL extern int8_t strIsAlNum(const char* str); + +EXPORT_SYMBOL extern char* strJoin(const char *base_str, int64_t base_str_size, int64_t num_words, const char** str_array, const int64_t* len_array, int64_t* res_size); +EXPORT_SYMBOL extern int64_t strSplit(const char *base_str, int64_t base_str_length, const char *delim, int64_t delim_length, char*** res_str_array, int64_t** res_len_array, int64_t *res_list_size); // string.capwords -extern char* stringCapwords(const char* str, int64_t size, int64_t *res_size); +EXPORT_SYMBOL extern char* stringCapwords(const char* str, int64_t size, int64_t *res_size); // @TODO: str.title @@ -125,31 +127,31 @@ struct matchObject { char *subject; size_t subject_len; }; -extern matchObject* wrapPCRE2MatchObject(pcre2_match_data *match_data, char* subject, size_t subject_len); +EXPORT_SYMBOL extern matchObject* wrapPCRE2MatchObject(pcre2_match_data *match_data, char* subject, size_t subject_len); // expose functions -extern pcre2_general_context* pcre2GetLocalGeneralContext(); -extern void* pcre2GetGlobalGeneralContext(); -extern void* pcre2GetGlobalMatchContext(); -extern void* pcre2GetGlobalCompileContext(); +EXPORT_SYMBOL extern pcre2_general_context* pcre2GetLocalGeneralContext(); +EXPORT_SYMBOL extern void* pcre2GetGlobalGeneralContext(); +EXPORT_SYMBOL extern void* pcre2GetGlobalMatchContext(); +EXPORT_SYMBOL extern void* pcre2GetGlobalCompileContext(); // could get rid of these functions, it's a direct free call... -extern void pcre2ReleaseGlobalGeneralContext(void* gcontext); -extern void pcre2ReleaseGlobalMatchContext(void* mcontext); -extern void pcre2ReleaseGlobalCompileContext(void* ccontext); +EXPORT_SYMBOL extern void pcre2ReleaseGlobalGeneralContext(void* gcontext); +EXPORT_SYMBOL extern void pcre2ReleaseGlobalMatchContext(void* mcontext); +EXPORT_SYMBOL extern void pcre2ReleaseGlobalCompileContext(void* ccontext); // return a uniformly random integer on [start, end) -extern int64_t uniform_int(int64_t start, int64_t end); +EXPORT_SYMBOL extern int64_t uniform_int(int64_t start, int64_t end); // what about overflow? -extern int64_t pow_i64(int64_t base, int64_t exp); -extern double pow_f64(double base, int64_t exp); +EXPORT_SYMBOL extern int64_t pow_i64(int64_t base, int64_t exp); +EXPORT_SYMBOL extern double pow_f64(double base, int64_t exp); // python compatible python func for float -extern double rt_py_pow(double base, double exponent, int64_t* ecCode); +EXPORT_SYMBOL extern double rt_py_pow(double base, double exponent, int64_t* ecCode); // spanner function for CSV parsing -int fallback_spanner(const char* ptr, const char c1, const char c2, const char c3, const char c4); +EXPORT_SYMBOL int fallback_spanner(const char* ptr, const char c1, const char c2, const char c3, const char c4); #ifdef __cplusplus } diff --git a/tuplex/runtime/src/Runtime.cc b/tuplex/runtime/src/Runtime.cc index 044c6ff0f..426af001a 100644 --- a/tuplex/runtime/src/Runtime.cc +++ b/tuplex/runtime/src/Runtime.cc @@ -532,7 +532,6 @@ extern "C" char* strReplace(const char* str, const char* from, const char* to, i return ret; } - /*! * strFormat function with variable number of arguments. Supports formatting for bool, int, float, str. * No support for tuples or other objects yet. diff --git a/tuplex/test/codegen/CMakeLists.txt b/tuplex/test/codegen/CMakeLists.txt index 764e38f6a..10fe5cd7d 100755 --- a/tuplex/test/codegen/CMakeLists.txt +++ b/tuplex/test/codegen/CMakeLists.txt @@ -10,13 +10,16 @@ include(GoogleTest) ADD_EXECUTABLE(testcodegen ${SRCS}) ASSERT_VAR(CURSES_LIBRARIES) +ASSERT_VAR(ZSTD_LIBRARIES) TARGET_LINK_LIBRARIES(testcodegen libcodegen + libutils ${GTest_LIBRARIES} ${ZSTD_LIBRARIES} ${ZLIB_LIBRARIES} ${CURSES_LIBRARIES} + ${LLVM_LIBRARIES} runtime ) diff --git a/tuplex/test/core/SerializerTest.cc b/tuplex/test/core/SerializerTest.cc index 91aa6156d..f188e9f78 100644 --- a/tuplex/test/core/SerializerTest.cc +++ b/tuplex/test/core/SerializerTest.cc @@ -157,6 +157,13 @@ TEST(Serializer, ListOfTuples) { python::Type::I64}); EXPECT_TRUE(schema.getRowType() == et); + { + // check length + Deserializer d(schema); + auto inferred_len = d.inferLength(buffer); + EXPECT_EQ(len, inferred_len); + } + Deserializer d(schema); d.deserialize(buffer, 2048); free(buffer); @@ -216,6 +223,32 @@ TEST(Serializer, ListOfLists) { EXPECT_EQ(lst22.desc(), "['####','QWERT']"); } +TEST(Serializer, ListOfOptionalList) { + // "[[5,6],None]" + Serializer s; + auto *buffer = (uint8_t*)malloc(2048); + + // test with [[5,6], None] + auto len = s.append(List(List(5, 6), option::none)) + .serialize(buffer, 2048); + + + + Schema schema = s.getSchema(); + auto et = python::Type::makeTupleType({python::Type::makeListType(python::Type::makeOptionType(python::Type::makeListType(python::Type::I64)))}); + EXPECT_TRUE(schema.getRowType() == et); + + Deserializer d(schema); + d.deserialize(buffer, 2048); + free(buffer); + + auto row = d.getTuple(); + EXPECT_EQ(row.numElements(), 1); + auto lst1 = *(List *)row.getField(0).getPtr(); + EXPECT_EQ(lst1.numElements(), 2); + EXPECT_EQ(lst1.desc(), "[[5,6],None]"); +} + TEST(Serializer, OptionalTuple) { Serializer s; auto *buffer = (uint8_t*)malloc(2048); @@ -241,4 +274,26 @@ TEST(Serializer, OptionalTuple) { EXPECT_EQ(tuple.numElements(), 2); EXPECT_EQ(tuple.desc(), "(1234,9876)"); EXPECT_EQ(std::string((char *)(row.getField(2).getPtr())), "$$$$tuple$$$$"); -} \ No newline at end of file +} + +// [("a", [("b", [1, 2]), ...] +TEST(Serializer, NestedListTuple) { + Serializer s; + auto *buffer = (uint8_t*)malloc(2048); + + // (str, List[Tuple[str, List[i64]]]) + auto len = s.append("a").append(List::from_vector({Field(Tuple(Field("b"), List::from_vector({Field((int64_t)1), Field((int64_t)2)})))})) + .serialize(buffer, 2048); + + Schema schema = s.getSchema(); + auto et = python::Type::makeTupleType({python::Type::STRING, python::Type::makeListType(python::Type::makeTupleType({python::Type::STRING, python::Type::makeListType(python::Type::I64)}))}); + EXPECT_EQ(schema.getRowType().desc(), et.desc()); + EXPECT_GT(len, 0); + EXPECT_TRUE(schema.getRowType() == et); + Deserializer d(schema); + d.deserialize(buffer, 2048); + free(buffer); + + auto row = d.getTuple(); + EXPECT_EQ("('a',[('b',[1,2])])", row.desc()); +} diff --git a/tuplex/test/core/SignalTest.cc b/tuplex/test/core/SignalTest.cc index 437f90d0b..4af926de4 100644 --- a/tuplex/test/core/SignalTest.cc +++ b/tuplex/test/core/SignalTest.cc @@ -49,4 +49,37 @@ TEST_F(SigTest, FlightInterrupt) { }); auto ref = pipelineAsStrs(ds); t.join(); +} + +// sigalarm is used e.g., by pytest-timeout. Make sure it also works. +TEST_F(SigTest, FlightInterruptSigAlarm) { + // test pipeline over several context configurations + using namespace tuplex; + using namespace std; + std::string bts_path="../resources/pipelines/flights/flights_on_time_performance_2019_01.10k-sample.csv"; + std::string carrier_path="../resources/pipelines/flights/L_CARRIER_HISTORY.csv"; + std::string airport_path="../resources/pipelines/flights/GlobalAirportDatabase.txt"; + + // for reference deactivate all options! + auto opt_ref = testOptions(); + opt_ref.set("tuplex.runTimeMemory", "128MB"); // join might require a lot of runtime memory!!! + opt_ref.set("tuplex.executorCount", "0"); // single-threaded + opt_ref.set("tuplex.useLLVMOptimizer", "false"); // deactivate + opt_ref.set("tuplex.optimizer.nullValueOptimization", "false"); + opt_ref.set("tuplex.csv.selectionPushdown", "false"); + opt_ref.set("tuplex.optimizer.generateParser", "false"); + opt_ref.set("tuplex.optimizer.mergeExceptionsInOrder", "false"); + + // Tuplex thread + Context c_ref(opt_ref); + auto& ds = flightPipeline(c_ref, bts_path, carrier_path, airport_path, false); + + // Note: Could run this twice: Once to detect the delay time & then with a signal interrupt. + // launch thread to issue signal in 250ms + std::thread t([]() { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); // use 500ms per default. + std::raise(SIGALRM); + }); + auto ref = pipelineAsStrs(ds); + t.join(); } \ No newline at end of file diff --git a/tuplex/test/core/UseCaseFunctionsTest.cc b/tuplex/test/core/UseCaseFunctionsTest.cc index 18201718a..f1a816d29 100644 --- a/tuplex/test/core/UseCaseFunctionsTest.cc +++ b/tuplex/test/core/UseCaseFunctionsTest.cc @@ -15,7 +15,9 @@ #include #include #include +#if LLVM_VERSION_MAJOR < 17 #include "llvm/Transforms/IPO/PassManagerBuilder.h" +#endif class UseCaseFunctionsTest : public PyTest { protected: diff --git a/tuplex/test/runtime/CMakeLists.txt b/tuplex/test/runtime/CMakeLists.txt index 73d28c517..cff9af37d 100755 --- a/tuplex/test/runtime/CMakeLists.txt +++ b/tuplex/test/runtime/CMakeLists.txt @@ -9,7 +9,7 @@ file(GLOB SRCS *.cc) include(GoogleTest) -ADD_EXECUTABLE(testruntime ${SRCS}) +ADD_EXECUTABLE(testruntime ${SRCS} ../../runtime/src/Runtime.cc ../../runtime/src/StringFunctions.cc) TARGET_LINK_LIBRARIES(testruntime libio diff --git a/tuplex/test/wrappers/CMakeLists.txt b/tuplex/test/wrappers/CMakeLists.txt index 3afa1d0d0..84b753641 100644 --- a/tuplex/test/wrappers/CMakeLists.txt +++ b/tuplex/test/wrappers/CMakeLists.txt @@ -17,16 +17,33 @@ ADD_EXECUTABLE(testwrappers ${SRCS} ${PYSRCS}) target_include_directories(testwrappers PRIVATE "../../python/include/" ${Boost_INCLUDE_DIR}) -TARGET_LINK_LIBRARIES(testwrappers - libcore - libcodegen - libutils - libio - ${GTest_LIBRARIES} - libcpythonadapter - ${Boost_LIBRARIES} - ${CURSES_LIBRARY} - pybind11::embed - ) + +# use backward for problematic macos builds +if(APPLE) + TARGET_LINK_LIBRARIES(testwrappers + libcore + libcodegen + libutils + libio + ${GTest_LIBRARIES} + libcpythonadapter + ${Boost_LIBRARIES} + ${CURSES_LIBRARY} + pybind11::embed + Backward::Interface + ) +else() + TARGET_LINK_LIBRARIES(testwrappers + libcore + libcodegen + libutils + libio + ${GTest_LIBRARIES} + libcpythonadapter + ${Boost_LIBRARIES} + ${CURSES_LIBRARY} + pybind11::embed + ) +endif() gtest_add_tests(TARGET testwrappers TEST_PREFIX "") diff --git a/tuplex/test/wrappers/WrapperTest.cc b/tuplex/test/wrappers/WrapperTest.cc index fea33dd16..b33a2944c 100644 --- a/tuplex/test/wrappers/WrapperTest.cc +++ b/tuplex/test/wrappers/WrapperTest.cc @@ -21,6 +21,9 @@ #include +#include +#include + // need for these tests a running python interpreter, so spin it up class WrapperTest : public TuplexTest { void SetUp() override { @@ -3019,111 +3022,287 @@ TEST_F(WrapperTest, NonConformingResolve) { } -//// debug any python module... -///** Takes a path and adds it to sys.paths by calling PyRun_SimpleString. -// * This does rather laborious C string concatenation so that it will work in -// * a primitive C environment. -// * -// * Returns 0 on success, non-zero on failure. -// */ -//int add_path_to_sys_module(const char *path) { -// int ret = 0; -// const char *prefix = "import sys\nsys.path.append(\""; -// const char *suffix = "\")\n"; -// char *command = (char*)malloc(strlen(prefix) -// + strlen(path) -// + strlen(suffix) -// + 1); -// if (! command) { -// return -1; -// } -// strcpy(command, prefix); -// strcat(command, path); -// strcat(command, suffix); -// ret = PyRun_SimpleString(command); -//#ifdef DEBUG -// printf("Calling PyRun_SimpleString() with:\n"); -// printf("%s", command); -// printf("PyRun_SimpleString() returned: %d\n", ret); -// fflush(stdout); -//#endif -// free(command); -// return ret; -//} -// -///** This imports a Python module and calls a specific function in it. -// * It's arguments are similar to main(): -// * argc - Number of strings in argv -// * argv - Expected to be 4 strings: -// * - Name of the executable. -// * - Path to the directory that the Python module is in. -// * - Name of the Python module. -// * - Name of the function in the module. -// * -// * The Python interpreter will be initialised and the path to the Python module -// * will be added to sys.paths then the module will be imported. -// * The function will be called with no arguments and its return value will be -// * ignored. -// * -// * This returns 0 on success, non-zero on failure. -// */ -//int import_call_execute(int argc, const char *argv[]) { -// int return_value = 0; -// PyObject *pModule = NULL; -// PyObject *pFunc = NULL; -// PyObject *pResult = NULL; -// -// if (argc != 4) { -// fprintf(stderr, -// "Wrong arguments!" -// " Usage: %s package_path module function\n", argv[0]); -// return_value = -1; -// goto except; -// } -// Py_SetProgramName((wchar_t*)argv[0]); -// Py_Initialize(); -// if (add_path_to_sys_module(argv[1])) { -// return_value = -2; -// goto except; -// } -// pModule = PyImport_ImportModule(argv[2]); -// if (! pModule) { -// fprintf(stderr, -// "%s: Failed to load module \"%s\"\n", argv[0], argv[2]); -// return_value = -3; -// goto except; -// } -// pFunc = PyObject_GetAttrString(pModule, argv[3]); -// if (! pFunc) { -// fprintf(stderr, -// "%s: Can not find function \"%s\"\n", argv[0], argv[3]); -// return_value = -4; -// goto except; -// } -// if (! PyCallable_Check(pFunc)) { -// fprintf(stderr, -// "%s: Function \"%s\" is not callable\n", argv[0], argv[3]); -// return_value = -5; -// goto except; -// } -// pResult = PyObject_CallObject(pFunc, NULL); -// if (! pResult) { -// fprintf(stderr, "%s: Function call failed\n", argv[0]); -// return_value = -6; -// goto except; -// } -//#ifdef DEBUG -// printf("%s: PyObject_CallObject() succeeded\n", argv[0]); -//#endif -// assert(! PyErr_Occurred()); -// goto finally; -// except: -// assert(PyErr_Occurred()); -// PyErr_Print(); -// finally: -// Py_XDECREF(pFunc); -// Py_XDECREF(pModule); -// Py_XDECREF(pResult); -// Py_Finalize(); -// return return_value; -//} +TEST_F(WrapperTest, CombinedExceptionHandling) { + // this is based on test_exceptions.py + //def process(self, input_size, num_filtered, num_schema, num_resolved, num_unresolved): + // inds = list(range(input_size)) + // shuffle(inds) + // inds = iter(inds) + // + // input = list(range(1, input_size + 1)) + // + // for _ in range(floor(num_filtered * input_size)): + // ind = next(inds) + // input[ind] = -1 + // + // for _ in range(floor(num_schema * input_size)): + // ind = next(inds) + // input[ind] = "E" + // + // for _ in range(floor(num_resolved * input_size)): + // ind = next(inds) + // input[ind] = -2 + // + // for _ in range(floor(num_unresolved * input_size)): + // ind = next(inds) + // input[ind] = -3 + // + // def filter_udf(x): + // return x != -1 + // + // def map_udf(x): + // if x == -2 or x == -3: + // return 1 // (x - x) + // else: + // return x + // + // def resolve_udf(x): + // if x == -3: + // return 1 // (x - x) + // else: + // return x + // + // # for larger partitions, there's a multi-threading issue for this. + // # need to fix. + // conf = self.conf_in_order + // # use this line to force single-threaded + // # conf['executorCount'] = 0 + // c = Context(conf) + // output = c.parallelize(input).filter(filter_udf).map(map_udf).resolve(ZeroDivisionError, resolve_udf).collect() + // + // self.assertEqual(list(filter(lambda x: x != -3 and x != -1, input)), output) + // + // @pytest.mark.parametrize("n", [100, 1000, 10000, 100000]) + // def test_everything(self, n): + // self.process(n, 0.25, 0.25, 0.25, 0.25) + + using namespace tuplex; + + // use here a resolve operator that doesn't trigger + + auto ctx_opts = "{\"webui.enable\": false," + " \"driverMemory\": \"256MB\"," + " \"partitionSize\": \"256KB\"," + "\"executorCount\": 8," + "\"tuplex.optimizer.mergeExceptionsInOrder\": true," + "\"tuplex.scratchDir\": \"file://" + scratchDir + "\"," + "\"resolveWithInterpreterOnly\": true}"; + + std::string udf_filter = "def filter_udf(x):\n" + " return x != -1"; + + std::string udf_map = "def map_udf(x):\n" + " if x == -2 or x == -3:\n" + " return 1 // (x - x)\n" + " else:\n" + " return x"; + std::string udf_resolve = "def resolve_udf(x):\n" + " if x == -3:\n" + " return 1 // (x - x)\n" + " else:\n" + " return x"; + + auto initial_pickled = python::pickleObject(python::getMainModule(), PyLong_FromLong(0)); + + std::cout<<"starting to generate data..."< v(N, nullptr); + int pos = 0; + auto num_filtered = 0.25; + auto num_schema = 0.25; + auto num_resolved = 0.25; + auto num_unresolved = 0.25; + for(pos = 0; pos <= floor(num_filtered * N); pos++) + v[pos] = PyLong_FromLong(-1); + auto count = pos; + for(; pos <= count + floor(num_schema * N); pos++) + v[pos] = python::PyString_FromString("E"); + count = pos; + for(; pos <= count + floor(num_resolved * N); pos++) + v[pos] = PyLong_FromLong(-2); + count = pos; + for(; pos <= count + floor(num_unresolved * N) && pos < N; pos++) + v[pos] = PyLong_FromLong(-3); + count = pos; + for(; pos < N; pos++) + v[pos] = PyLong_FromLong(-1); + + // shuffle vector + auto rng = std::default_random_engine {}; + std::shuffle(std::begin(v), std::end(v), rng); + + // now assign to list + for(unsigned i = 0; i < N; ++i) { + PyList_SetItem(list, i, v[i]); + v[i] = nullptr; + } + + std::cout<<"data gen done"<(list); + PythonContext ctx("", "", ctx_opts); + { + + // output = c.parallelize(input).filter(filter_udf).map(map_udf).resolve(ZeroDivisionError, resolve_udf).collect() + // self.assertEqual(list(filter(lambda x: x != -3 and x != -1, input)), output) + auto ds = ctx.parallelize(data_list) + .filter(udf_filter, "").map(udf_map, "").resolve(ecToI64(ExceptionCode::ZERODIVISIONERROR), udf_resolve, ""); + + //ds.show(); + python::runGC(); + + + // check + auto res = ds.collect(); + auto res_obj = res.ptr(); + ASSERT_TRUE(res_obj); + ASSERT_TRUE(PyList_Check(res_obj)); + // EXPECT_EQ(PyList_Size(res_obj), N); + + python::runGC(); + + std::cout<(list); + auto columns_list = py::reinterpret_borrow(cols); + PythonContext ctx("", "", ctx_opts); + { + // .withColumn("str", lambda x, y, z: str(1 // x) + y) + auto ds = ctx.parallelize(data_list, columns_list) + .withColumn("str", "lambda x, y, z: str(1 // x) + y", ""); + + auto result_before_resolve = ds.collect(); + auto result_before_resolve_obj = result_before_resolve.ptr(); + + ASSERT_TRUE(result_before_resolve_obj); + ASSERT_TRUE(PyList_Check(result_before_resolve_obj)); + EXPECT_EQ(PyList_Size(result_before_resolve_obj), 2); + + //ds.show(); + python::runGC(); + + // check + auto res = ds.resolve(ecToI64(ExceptionCode::ZERODIVISIONERROR), "lambda x, y, z: \"NULL\"", "").collect(); + auto res_obj = res.ptr(); + ASSERT_TRUE(res_obj); + ASSERT_TRUE(PyList_Check(res_obj)); + EXPECT_EQ(PyList_Size(res_obj), 3); + + python::runGC(); + + std::cout<(list); + PythonContext ctx("", "", ctx_opts); + { + auto ds = ctx.parallelize(data_list); + + auto result_before_resolve = ds.collect(); + auto result_before_resolve_obj = result_before_resolve.ptr(); + + ASSERT_TRUE(result_before_resolve_obj); + ASSERT_TRUE(PyList_Check(result_before_resolve_obj)); + EXPECT_EQ(PyList_Size(result_before_resolve_obj), 2); + + ds.show(); + python::runGC(); + + std::cout<(list); + PythonContext ctx("", "", ctx_opts); + { + auto ds = ctx.parallelize(data_list); + + auto result_before_resolve = ds.collect(); + auto result_before_resolve_obj = result_before_resolve.ptr(); + + ASSERT_TRUE(result_before_resolve_obj); + ASSERT_TRUE(PyList_Check(result_before_resolve_obj)); + EXPECT_EQ(PyList_Size(result_before_resolve_obj), 1); + + ds.show(); + python::runGC(); + + std::cout<& elements); public: - List() : _elements(nullptr), _numElements(0) {} + List() : _elements(nullptr), _numElements(0), _listType(python::Type::EMPTYLIST) {} + List(List&& other) : _numElements(other._numElements), _elements(other._elements), _listType(other._listType) { + other._numElements = 0; + other._elements = nullptr; + } + + List(const python::Type& elementType) : _elements(nullptr), _numElements(0), _listType(python::Type::makeListType(elementType)) {} + ~List(); // new variadic template param ctor @@ -61,6 +69,11 @@ namespace tuplex { l.init_from_vector(elements); return l; } + + List* allocate_deep_copy() const; + + size_t serialized_length() const; + size_t serialize_to(uint8_t* ptr) const; }; diff --git a/tuplex/utils/include/Row.h b/tuplex/utils/include/Row.h index 27b169c5b..04aa8a91a 100644 --- a/tuplex/utils/include/Row.h +++ b/tuplex/utils/include/Row.h @@ -38,6 +38,21 @@ namespace tuplex { public: Row() : _serializedLength(0) {} + Row(const Row& other) : _schema(other._schema), _values(other._values), _serializedLength(other._serializedLength) {} + Row& operator = (const Row& other) { + _schema = other._schema; + _values = other._values; + _serializedLength = other._serializedLength; + return *this; + } + + Row(Row&& other) : _schema(other._schema), _serializedLength(other._serializedLength), _values(std::move(other._values)) { + other._values = {}; + other._serializedLength = 0; + other._schema = Schema::UNKNOWN; + } + + // new constructor using variadic templates template Row(Targs... Fargs) { vec_build(_values, Fargs...); @@ -45,13 +60,33 @@ namespace tuplex { _serializedLength = getSerializedLength(); } - int getNumColumns() const { return _values.size(); } + inline size_t getNumColumns() const { return _values.size(); } inline Field get(const int col) const { assert(!_values.empty()); assert(0 <= col && col < _values.size()); return _values[col]; } + inline void set(const unsigned col, const Field& f) { +#ifndef NDEBUG + if(col >= _values.size()) + throw std::runtime_error("invalid column index in get specified"); +#endif + _values[col] = f; + + // need to update type of row! + auto old_type = _schema.getRowType(); + auto types = old_type.parameters(); + if(types[col] != f.getType()) { + types[col] = f.getType(); + _schema = Schema(_schema.getMemoryLayout(), python::Type::makeTupleType(types)); + } + + // update length, may change! + _serializedLength = getSerializedLength(); + } + + bool getBoolean(const int col) const; int64_t getInt(const int col) const; double getDouble(const int col) const; diff --git a/tuplex/utils/include/Serializer.h b/tuplex/utils/include/Serializer.h index 47bf131c4..c966e53b0 100644 --- a/tuplex/utils/include/Serializer.h +++ b/tuplex/utils/include/Serializer.h @@ -36,24 +36,50 @@ namespace tuplex { size_t _bufferSize; size_t _bufferCapacity; public: - Buffer(const size_t growthConstant) : _growthConstant(growthConstant), _buffer(nullptr), _bufferSize(0), _bufferCapacity(0) { + Buffer(const size_t growthConstant) : _growthConstant(growthConstant), _buffer(nullptr), _bufferSize(0), + _bufferCapacity(0) { assert(_growthConstant > 0); } + Buffer() : Buffer::Buffer(1024) {} + + // movable + Buffer(Buffer &&other) : _growthConstant(other._growthConstant), _buffer(other._buffer), + _bufferSize(other._bufferSize), _bufferCapacity(other._bufferCapacity) { + other._bufferSize = 0; + other._bufferCapacity = 0; + other._buffer = nullptr; + } + + // make non-copyable + Buffer(const Buffer& other) = delete; + Buffer& operator = (const Buffer& other) = delete; + ~Buffer() { - if(_buffer) - free(_buffer); + free_and_reset(); } + void provideSpace(const size_t numBytes); - void* buffer() { return _buffer; } + void* buffer() { assert(_buffer); return _buffer; } void* ptr() const { static_assert(sizeof(unsigned char) == 1, "byte type must be 1 byte wide"); assert(_buffer); return (unsigned char*)_buffer + _bufferSize; } void movePtr(const size_t numBytes) { _bufferSize += numBytes; } size_t size() const { return _bufferSize; } size_t capacity() const { return _bufferCapacity; } void reset() { _bufferSize = 0; } + + /*! + * reset buffer by actually releasing the memory. + */ + inline void free_and_reset() { + if(_buffer) + free(_buffer); + _buffer = nullptr; + _bufferSize = 0; + _bufferCapacity = 0; + } }; /*! @@ -99,7 +125,7 @@ namespace tuplex { Serializer& appendWithoutInference(const option &tuple, const python::Type &tupleType); Serializer& appendWithoutInference(const uint8_t* buf, size_t bufSize); - Serializer& appendWithoutInference(const Field f); + Serializer& appendWithoutInference(const Field& f); inline bool hasSchemaVarLenFields() const { // from _isVarLenField, if any element is set to true return true @@ -120,6 +146,25 @@ namespace tuplex { _fixedLenFields(_bufferGrowthConstant), _varLenFields(_bufferGrowthConstant), _col(0) {} + ~Serializer() { + + } + + // move constructor + Serializer(Serializer&& other) : _autoSchema(other._autoSchema), + _schema(other._schema), + _types(std::move(other._types)), _col(other._col), + _fixedLenFields(std::move(other._fixedLenFields)), + _varLenFields(std::move(other._varLenFields)), + _isVarField(std::move(other._isVarField)), + _varLenFieldOffsets(std::move(other._varLenFieldOffsets)), + _requiresBitmap(std::move(other._requiresBitmap)), + _isNull(std::move(other._isNull)) {} + + // make non-copyable + Serializer(const Serializer& other) = delete; + Serializer& operator = (const Serializer& other) = delete; + Serializer& reset(); // general case: options! @@ -153,11 +198,13 @@ namespace tuplex { Serializer& appendObject(const uint8_t* buf, size_t bufSize); + Serializer& appendField(const Field& f); + Serializer& appendNull(); // only define append for long when long and int64_t are not the same to avoid overload error template - typename std::enable_if::value, Serializer&>::type append(const T l) { return append(static_cast(l)); } + typename std::enable_if::value && !std::is_same::value, Serializer&>::type append(const T l) { return append(static_cast(l)); } Schema getSchema() { fixSchema(); return _schema; } @@ -300,6 +347,15 @@ namespace tuplex { return Schema(Schema::MemoryLayout::UNKNOWN, python::TypeFactory::instance().createOrGetTupleType(v)); } + /*! + * get size of list to serialize + * @param l + * @return + */ + extern size_t serialized_list_size(const List& l); + + size_t serialize_list_to_ptr(const List& l, uint8_t* ptr, size_t capacity_left); + } #endif //TUPLEX_SERIALIZER_H \ No newline at end of file diff --git a/tuplex/utils/include/Tuple.h b/tuplex/utils/include/Tuple.h index 6e7baa91e..5ccf56e07 100644 --- a/tuplex/utils/include/Tuple.h +++ b/tuplex/utils/include/Tuple.h @@ -32,6 +32,12 @@ namespace tuplex { public: Tuple() : _elements(nullptr), _numElements(0) {} + + Tuple(Tuple&& other) : _numElements(other._numElements), _elements(other._elements) { + other._numElements = 0; + other._elements = nullptr; + } + ~Tuple(); // new variadic template param ctor @@ -59,6 +65,11 @@ namespace tuplex { t.init_from_vector(elements); return t; } + + Tuple* allocate_deep_copy() const; + + size_t serialized_length() const; + size_t serialize_to(uint8_t* ptr) const; }; diff --git a/tuplex/utils/src/Field.cc b/tuplex/utils/src/Field.cc index af0983990..3848d50ec 100644 --- a/tuplex/utils/src/Field.cc +++ b/tuplex/utils/src/Field.cc @@ -86,24 +86,6 @@ namespace tuplex { return f; } - Field::Field(const Field &other) { - _type = other._type; - _size = other._size; - _isNull = other._isNull; - - // special handling: - // ptr type? - if(other.hasPtrData()) { - assert(other._ptrValue); - // memcpy - _ptrValue = new uint8_t[_size]; - std::memcpy(_ptrValue, other._ptrValue, _size); - } else { - // primitive val copy (doesn't matter which) - _iValue = other._iValue; - } - } - Field::Field(const Tuple &t) { // allocate size and then transfer tuple to ptr _size = sizeof(Tuple); @@ -133,6 +115,39 @@ namespace tuplex { _ptrValue = reinterpret_cast(new Tuple(t)); } + void Field::deep_copy_from_other(const Field &other) { + if(other.hasPtrData()) { + assert(_ptrValue == nullptr); + + // special data structs have to perform individual deep copies + if(other._type.isTupleType()) { + auto tuple_ptr = reinterpret_cast(other._ptrValue); + _ptrValue = reinterpret_cast(tuple_ptr->allocate_deep_copy()); + _size = sizeof(Tuple); + } else if(other._type.isListType()) { + auto list_ptr = reinterpret_cast(other._ptrValue); + _ptrValue = reinterpret_cast(list_ptr->allocate_deep_copy()); + _size = sizeof(List); + } else { + // dict is currently stored as string... + + // memcpy --> is this correct for Tuple e.g.? + _size = other._size; + + // special case option type + if(_size != 0) { + _ptrValue = new uint8_t[_size]; + assert(other._ptrValue); + std::memcpy(_ptrValue, other._ptrValue, _size); + } else { + _ptrValue = nullptr; + } + } + } else { + _iValue = other._iValue; + } + } + Field& Field::operator = (const Field &other) { _size = other._size; @@ -141,13 +156,14 @@ namespace tuplex { // special handling: // ptr type? if(other.hasPtrData()) { - assert(other._ptrValue); - releaseMemory(); - // memcpy - _ptrValue = new uint8_t[_size]; - assert(_ptrValue); - std::memcpy(_ptrValue, other._ptrValue, _size); + _ptrValue = nullptr; + + // only invoke deepcopy if size != 0 + if(other._size != 0) { + assert(other._ptrValue); + deep_copy_from_other(other); + } } else { // primitive val copy (doesn't matter which) _iValue = other._iValue; @@ -166,9 +182,9 @@ namespace tuplex { else delete [] _ptrValue; } - - _ptrValue = nullptr; } + _ptrValue = nullptr; + _size = 0; } Field::~Field() { @@ -347,8 +363,8 @@ namespace tuplex { // emptylist to any list if(f._type == python::Type::EMPTYLIST && targetType.isListType()) { - // upcast to list - throw std::runtime_error("not yet implemented, pls add"); + // upcast to empty list with set list type + return Field(List(targetType.elementType())); } // emptydict to any dict @@ -373,6 +389,7 @@ namespace tuplex { Field c = upcastTo_unsafe(tmp, targetType.elementType()); c._type = targetType; c._isNull = f._isNull; + return c; } if(t == python::Type::BOOLEAN) { diff --git a/tuplex/utils/src/List.cc b/tuplex/utils/src/List.cc index dc2a3e671..2103c9b16 100644 --- a/tuplex/utils/src/List.cc +++ b/tuplex/utils/src/List.cc @@ -11,6 +11,7 @@ #include #include #include +#include namespace tuplex { @@ -18,19 +19,52 @@ namespace tuplex { if(elements.empty()) { _numElements = 0; _elements = nullptr; + _listType = python::Type::EMPTYLIST; } else { _numElements = elements.size(); _elements = new Field[_numElements]; - for(int i = 0; i < _numElements; ++i) { - if(elements[i].getType() != elements[0].getType()) throw std::runtime_error("List::init_from_vector called with elements of nonuniform type."); - _elements[i] = elements[i]; + + // two-way approach: First, check if homogenous + assert(!elements.empty()); + auto el_type = elements[0].getType(); + auto uni_type = el_type; + bool is_homogeneous = true; + for(unsigned i = 1; i < elements.size(); ++i) { + uni_type = unifyTypes(uni_type, elements[i].getType()); + if(elements[i].getType() != el_type) + is_homogeneous = false; + } + + if(is_homogeneous) { + for(int i = 0; i < _numElements; ++i) { + if(elements[i].getType() != elements[0].getType()) + throw std::runtime_error("List::init_from_vector called with elements" + " of nonuniform type, tried to set list element with field of type " + + elements[i].getType().desc() + " but list has assumed type of " + + elements[0].getType().desc()); + _elements[i] = elements[i]; + } + _listType = python::Type::makeListType(uni_type); + } else if(python::Type::UNKNOWN != uni_type) { + _listType = python::Type::makeListType(uni_type); + // cast each element up + for(unsigned i = 0; i < _numElements; ++i) + _elements[i] = Field::upcastTo_unsafe(elements[i], uni_type); + } else { + // heterogeneous list... + _listType = python::Type::makeListType(python::Type::PYOBJECT); + for(unsigned i = 0; i < _numElements; ++i) + _elements[i] = elements[i]; } } + assert(_numElements != 0 && _listType != python::Type::EMPTYLIST); + assert(!_listType.isIllDefined()); } List::List(const List &other) { // deep copy needed _numElements = other._numElements; + _listType = other._listType; if(_numElements > 0) { _elements = new Field[_numElements]; @@ -50,6 +84,7 @@ namespace tuplex { // deep copy needed _numElements = other._numElements; + _listType = other._listType; if(_numElements > 0) { _elements = new Field[_numElements]; @@ -88,7 +123,7 @@ namespace tuplex { python::Type List::getType() const { if(_numElements > 0) - return python::Type::makeListType(_elements[0].getType()); + return _listType; else return python::Type::EMPTYLIST; } @@ -120,4 +155,25 @@ namespace tuplex { } return num; } + + List* List::allocate_deep_copy() const { + List *L = new List(); + assert(L->_elements == nullptr); + L->_numElements = _numElements; + L->_elements = new Field[L->_numElements]; + L->_listType = _listType; + for(unsigned i = 0; i < _numElements; ++i) { + L->_elements[i] = _elements[i]; + } + return L; + } + + size_t List::serialized_length() const { + return serialized_list_size(*this); + } + + size_t List::serialize_to(uint8_t *ptr) const { + auto len = serialized_list_size(*this); + return serialize_list_to_ptr(*this, ptr, len); + } } \ No newline at end of file diff --git a/tuplex/utils/src/Row.cc b/tuplex/utils/src/Row.cc index 2dfcb24e5..688f327c6 100644 --- a/tuplex/utils/src/Row.cc +++ b/tuplex/utils/src/Row.cc @@ -89,8 +89,7 @@ namespace tuplex { std::string Row::toPythonString() const { std::string s = "("; for(int i = 0; i < getNumColumns(); ++i) { - s += _values[i].desc(); - + s += _values[i].toPythonString(); if(i != getNumColumns() - 1) s += ","; } @@ -111,6 +110,7 @@ namespace tuplex { // get types of rows & return then tuple type std::vector types; + types.reserve(_values.size()); for(const auto& el: _values) types.push_back(el.getType()); @@ -197,7 +197,7 @@ namespace tuplex { } } } - return serializer; + return std::move(serializer); } bool operator == (const Row& lhs, const Row& rhs) { @@ -206,8 +206,10 @@ namespace tuplex { return false; // special case: empty rows - if(lhs._values.size() == 0) + if(lhs._values.size() == 0) { + assert(rhs._values.size() == 0); return true; + } // check whether type matches if(lhs.getRowType() != rhs.getRowType()) diff --git a/tuplex/utils/src/Serializer.cc b/tuplex/utils/src/Serializer.cc index 8477fa370..9b89a96f9 100644 --- a/tuplex/utils/src/Serializer.cc +++ b/tuplex/utils/src/Serializer.cc @@ -465,7 +465,7 @@ namespace tuplex { return appendWithoutInference(f); } - Serializer &Serializer::appendWithoutInference(const Field f) { + Serializer &Serializer::appendWithoutInference(const Field& f) { if (python::Type::BOOLEAN == f.getType()) return appendWithoutInference(static_cast(f.getInt())); else if (python::Type::I64 == f.getType()) @@ -714,17 +714,87 @@ namespace tuplex { return *this; } - Serializer &Serializer::appendWithoutInferenceHelper(const List &l) { + size_t serialized_list_size(const List& l) { + // need always 8 bytes to store size + auto size = sizeof(uint64_t); + + if(l.getType() == python::Type::EMPTYLIST) + return size; + + auto elementType = l.getType().elementType(); + if(elementType.isSingleValued()) + return size; // done, sufficient to store size only. + + // need bitmap field for elements? + std::vector bitmapV; + void *bitmapAddr = nullptr; + size_t bitmapSize = 0; + if(elementType.isOptionType()) { + auto numBitmapFields = core::ceilToMultiple(l.numElements(), 64ul)/64; + bitmapSize = numBitmapFields * sizeof(uint64_t); + size += bitmapSize; + elementType = elementType.getReturnType(); + } + + if(elementType == python::Type::STRING || elementType == python::Type::PYOBJECT) { // strings are serialized differently + // offset numbers + size_t current_offset = sizeof(uint64_t) * l.numElements(); + for (size_t i = 0; i < l.numElements(); i++) { + size += sizeof(uint64_t); + size += l.getField(i).getPtrSize(); + } + } else if(elementType.isTupleType()) { + // skip #elements * 8 bytes as placeholder for offsets + size += l.numElements() * sizeof(uint64_t); + for (size_t listIndex = 0; listIndex < l.numElements(); ++listIndex) { + + // skip None entries + if(bitmapSize != 0 && l.getField(listIndex).isNull()) + continue; + + auto currTuple = *(Tuple *)(l.getField(listIndex).getPtr()); + auto tuple_serialized_length = currTuple.serialized_length(); + size += tuple_serialized_length; + } + } else if (elementType.isListType()) { + // skip #elements * 8 bytes as placeholder for offsets + size += l.numElements() * sizeof(uint64_t); + + // same logic as for tuple here + for (size_t listIndex = 0; listIndex < l.numElements(); ++listIndex) { + // skip None entries + if(bitmapSize != 0 && l.getField(listIndex).isNull()) + continue; + + auto currList = *(List *)(l.getField(listIndex).getPtr()); + auto list_serialized_length = currList.serialized_length(); + size += list_serialized_length; + } + } else if(elementType == python::Type::I64 || + elementType == python::Type::BOOLEAN || + elementType == python::Type::F64) { + size += l.numElements() * sizeof(int64_t); // 8 bytes each + } else { + throw std::runtime_error( + "invalid list type: " + l.getType().desc() + " encountered, can't serialize."); + } + return size; + } + + size_t serialize_list_to_ptr(const List& l, uint8_t* ptr, size_t capacity_left) { + assert(ptr && capacity_left >= serialized_list_size(l)); + auto original_ptr = ptr; + + *((uint64_t *)ptr) = l.numElements(); + ptr += sizeof(uint64_t); - // add number of elements - _varLenFields.provideSpace(sizeof(uint64_t)); - *((uint64_t *)_varLenFields.ptr()) = l.numElements(); - _varLenFields.movePtr(sizeof(uint64_t)); + if(l.getType() == python::Type::EMPTYLIST) + return ptr - original_ptr; auto elementType = l.getType().elementType(); if(elementType.isSingleValued()) { // done. List can be retrieved from numElements and listType - return *this; + return ptr - original_ptr; } // need bitmap field for elements? @@ -734,18 +804,16 @@ namespace tuplex { if(elementType.isOptionType()) { auto numBitmapFields = core::ceilToMultiple(l.numElements(), 64ul)/64; bitmapSize = numBitmapFields * sizeof(uint64_t); - _varLenFields.provideSpace(bitmapSize); - bitmapAddr = _varLenFields.ptr(); - _varLenFields.movePtr(bitmapSize); + bitmapAddr = ptr; + ptr += bitmapSize; } if(elementType == python::Type::STRING) { // strings are serialized differently // offset numbers size_t current_offset = sizeof(uint64_t) * l.numElements(); for (size_t i = 0; i < l.numElements(); i++) { - _varLenFields.provideSpace(sizeof(uint64_t)); - *((uint64_t *) _varLenFields.ptr()) = current_offset; - _varLenFields.movePtr(sizeof(uint64_t)); + *((uint64_t *)ptr) = current_offset; + ptr += sizeof(uint64_t); // update for next field: move forward one uint64_t, then add on the string current_offset -= sizeof(uint64_t); current_offset += strlen((char *) l.getField(i).getPtr()) + 1; @@ -753,157 +821,166 @@ namespace tuplex { // string data for (size_t i = 0; i < l.numElements(); i++) { size_t slen = strlen((char*)l.getField(i).getPtr()); - _varLenFields.provideSpace(slen + 1); - std::memcpy(_varLenFields.ptr(), l.getField(i).getPtr(), slen); - *((uint8_t *) _varLenFields.ptr() + slen) = 0; - _varLenFields.movePtr(slen + 1); + std::memcpy(ptr, l.getField(i).getPtr(), slen); + *((uint8_t *) ptr + slen) = 0; + ptr += slen + 1; } } else if(elementType.isTupleType()) { - void *varLenOffsetAddr = _varLenFields.ptr(); + uint8_t *varLenOffsetAddr = ptr; // skip #elements * 8 bytes as placeholder for offsets auto offsetBytes = l.numElements() * sizeof(uint64_t); - _varLenFields.provideSpace(offsetBytes); - _varLenFields.movePtr(offsetBytes); + ptr += offsetBytes; + for (size_t listIndex = 0; listIndex < l.numElements(); ++listIndex) { // write offset to placeholder - uint64_t currOffset = (uintptr_t)_varLenFields.ptr() - (uintptr_t)varLenOffsetAddr; + uint64_t currOffset = (uintptr_t)ptr - (uintptr_t)varLenOffsetAddr; *(uint64_t *)varLenOffsetAddr = currOffset; + // increment varLenOffsetAddr by 8 - varLenOffsetAddr = (void *)((uint64_t *)varLenOffsetAddr + 1); + varLenOffsetAddr += sizeof(uint64_t); + + // skip None entries + if(bitmapSize != 0 && l.getField(listIndex).isNull()) + continue; + // append tuple auto currTuple = *(Tuple *)(l.getField(listIndex).getPtr()); - appendWithoutInferenceHelper(currTuple); + auto tuple_serialized_length = currTuple.serialized_length(); + assert(ptr - original_ptr + tuple_serialized_length <= capacity_left); + auto size = currTuple.serialize_to(ptr); + assert(size == tuple_serialized_length); + ptr += tuple_serialized_length; } } else if (elementType.isListType()) { - void *varLenOffsetAddr = _varLenFields.ptr(); + uint8_t *varLenOffsetAddr = ptr; // skip #elements * 8 bytes as placeholder for offsets auto offsetBytes = l.numElements() * sizeof(uint64_t); - _varLenFields.provideSpace(offsetBytes); - _varLenFields.movePtr(offsetBytes); + ptr += offsetBytes; + + // same logic as for tuple here for (size_t listIndex = 0; listIndex < l.numElements(); ++listIndex) { // write offset to placeholder - uint64_t currOffset = (uintptr_t)_varLenFields.ptr() - (uintptr_t)varLenOffsetAddr; - *(uint64_t *)varLenOffsetAddr = currOffset; + uint64_t currOffset = (uintptr_t)ptr - (uintptr_t)varLenOffsetAddr; + + *(uint64_t *)varLenOffsetAddr = currOffset; // <-- this is problematic (!) + // increment varLenOffsetAddr by 8 - varLenOffsetAddr = (void *)((uint64_t *)varLenOffsetAddr + 1); - // append list + varLenOffsetAddr += sizeof(uint64_t); + + // skip None entries + if(bitmapSize != 0 && l.getField(listIndex).isNull()) + continue; + + // append tuple auto currList = *(List *)(l.getField(listIndex).getPtr()); - appendWithoutInferenceHelper(currList); + auto list_serialized_length = currList.serialized_length(); + currList.serialize_to(ptr); + ptr += list_serialized_length; } } else if(elementType == python::Type::I64 || elementType == python::Type::BOOLEAN) { for(size_t i = 0; i < l.numElements(); i++) { - _varLenFields.provideSpace(sizeof(uint64_t)); - *((uint64_t*)_varLenFields.ptr()) = l.getField(i).getInt(); - _varLenFields.movePtr(sizeof(uint64_t)); + *((uint64_t*)ptr) = l.getField(i).getInt(); + ptr += sizeof(uint64_t); } } else if(elementType == python::Type::F64) { for(size_t i = 0; i < l.numElements(); i++) { - _varLenFields.provideSpace(sizeof(uint64_t)); - *((double*)_varLenFields.ptr()) = l.getField(i).getDouble(); - _varLenFields.movePtr(sizeof(uint64_t)); + *((double*)ptr) = l.getField(i).getDouble(); + ptr += sizeof(uint64_t); } } else if(elementType.isOptionType()) { auto underlyingElementType = elementType.getReturnType(); - size_t numNonNullElements = l.numNonNullElements(); if(underlyingElementType == python::Type::STRING) { // offset numbers - size_t currentOffset = sizeof(uint64_t) * numNonNullElements; + size_t currentOffset = sizeof(uint64_t) * l.numElements(); for(size_t i = 0; i < l.numElements(); i++) { if(l.getField(i).isNull()) { bitmapV.push_back(true); } else { bitmapV.push_back(false); // write offset - _varLenFields.provideSpace(sizeof(uint64_t)); - *((uint64_t *) _varLenFields.ptr()) = currentOffset; - _varLenFields.movePtr(sizeof(uint64_t)); + *((uint64_t *)ptr) = currentOffset; // update for next field: move forward one uint64_t, then add on the string - currentOffset -= sizeof(uint64_t); currentOffset += strlen((char *) l.getField(i).getPtr()) + 1; } + ptr += sizeof(uint64_t); } // string data for (size_t i = 0; i < l.numElements(); i++) { if(!l.getField(i).isNull()) { size_t slen = strlen((char*)l.getField(i).getPtr()); - _varLenFields.provideSpace(slen + 1); - std::memcpy(_varLenFields.ptr(), l.getField(i).getPtr(), slen); - *((uint8_t *) _varLenFields.ptr() + slen) = 0; - _varLenFields.movePtr(slen + 1); + std::memcpy(ptr, l.getField(i).getPtr(), slen); + *((uint8_t *) ptr + slen) = 0; + ptr += slen + 1; } } } else if(underlyingElementType.isTupleType()) { - void *varLenOffsetAddr = _varLenFields.ptr(); + void *varLenOffsetAddr = ptr; // skip #elements * 8 bytes as placeholder for offsets - auto offsetBytes = numNonNullElements * sizeof(uint64_t); - _varLenFields.provideSpace(offsetBytes); - _varLenFields.movePtr(offsetBytes); + auto offsetBytes = l.numElements() * sizeof(uint64_t); + ptr += offsetBytes; for (size_t listIndex = 0; listIndex < l.numElements(); ++listIndex) { if(l.getField(listIndex).isNull()) { bitmapV.push_back(true); } else { bitmapV.push_back(false); // write offset to placeholder - uint64_t currOffset = (uintptr_t)_varLenFields.ptr() - (uintptr_t)varLenOffsetAddr; + uint64_t currOffset = (uintptr_t)ptr - (uintptr_t)varLenOffsetAddr; *(uint64_t *)varLenOffsetAddr = currOffset; // increment varLenOffsetAddr by 8 varLenOffsetAddr = (void *)((uint64_t *)varLenOffsetAddr + 1); // append tuple auto currTuple = *(Tuple *)(l.getField(listIndex).getPtr()); - appendWithoutInferenceHelper(currTuple); + ptr += currTuple.serialize_to(ptr); } } } else if(underlyingElementType.isListType()) { - void *varLenOffsetAddr = _varLenFields.ptr(); + uint8_t *varLenOffsetAddr = ptr; // skip #elements * 8 bytes as placeholder for offsets - auto offsetBytes = l.numNonNullElements() * sizeof(uint64_t); - _varLenFields.provideSpace(offsetBytes); - _varLenFields.movePtr(offsetBytes); + auto offsetBytes = l.numElements() * sizeof(uint64_t); + ptr += offsetBytes; for (size_t listIndex = 0; listIndex < l.numElements(); ++listIndex) { if(l.getField(listIndex).isNull()) { bitmapV.push_back(true); } else { bitmapV.push_back(false); // write offset to placeholder - uint64_t currOffset = (uintptr_t)_varLenFields.ptr() - (uintptr_t)varLenOffsetAddr; + uint64_t currOffset = (uintptr_t)ptr - (uintptr_t)varLenOffsetAddr; *(uint64_t *)varLenOffsetAddr = currOffset; - // increment varLenOffsetAddr by 8 - varLenOffsetAddr = (void *)((uint64_t *)varLenOffsetAddr + 1); // append list auto currList = *(List *)(l.getField(listIndex).getPtr()); - appendWithoutInferenceHelper(currList); + ptr += currList.serialize_to(ptr); } + // increment varLenOffsetAddr always by 8 + varLenOffsetAddr += sizeof(uint64_t); } } else if(underlyingElementType == python::Type::I64 || underlyingElementType == python::Type::BOOLEAN) { for(size_t i = 0; i < l.numElements(); i++) { if(l.getField(i).isNull()) { bitmapV.push_back(true); + *((uint64_t*)ptr) = 0; } else { bitmapV.push_back(false); - _varLenFields.provideSpace(sizeof(uint64_t)); - *((uint64_t*)_varLenFields.ptr()) = l.getField(i).getInt(); - _varLenFields.movePtr(sizeof(uint64_t)); + *((uint64_t*)ptr) = l.getField(i).getInt(); } + ptr += sizeof(uint64_t); } } else if(underlyingElementType == python::Type::F64) { for(size_t i = 0; i < l.numElements(); i++) { if(l.getField(i).isNull()) { bitmapV.push_back(true); + *((uint64_t*)ptr) = 0; } else { bitmapV.push_back(false); - _varLenFields.provideSpace(sizeof(uint64_t)); - *((double*)_varLenFields.ptr()) = l.getField(i).getDouble(); - _varLenFields.movePtr(sizeof(uint64_t)); + *((double*)ptr) = l.getField(i).getDouble(); } + ptr += sizeof(uint64_t); } } else { - // throw std::runtime_error("serializing invalid list type!: " + l.getType().desc()); - Logger::instance().logger("serializer").error( + throw std::runtime_error( "invalid list type: " + l.getType().desc() + " encountered, can't serialize."); } } else { - // throw std::runtime_error("serializing invalid list type!: " + l.getType().desc()); - Logger::instance().logger("serializer").error( + throw std::runtime_error( "invalid list type: " + l.getType().desc() + " encountered, can't serialize."); } @@ -923,6 +1000,16 @@ namespace tuplex { std::memcpy(bitmapAddr, bitmap, bitmapSize); } + return ptr - original_ptr; + } + + Serializer &Serializer::appendWithoutInferenceHelper(const List &l) { + auto size = serialized_list_size(l); + _varLenFields.provideSpace(size); + auto ret = serialize_list_to_ptr(l, (uint8_t*)_varLenFields.ptr(), size); + assert(ret == size); + _varLenFields.movePtr(size); + return *this; } @@ -973,7 +1060,8 @@ namespace tuplex { std::memcpy(ptr, bitmap, bitmapSize); } - std::memcpy((uint8_t *) ptr + bitmapSize, _fixedLenFields.buffer(), _fixedLenFields.size()); + if(_fixedLenFields.size() > 0) // do not serialize fields like EMPTYTUPLE etc. E.g., a field like empty tuple will serialize to 0 bytes. + std::memcpy((uint8_t *) ptr + bitmapSize, _fixedLenFields.buffer(), _fixedLenFields.size()); // always write this addr if varlen fields are present if(hasSchemaVarLenFields()) @@ -981,9 +1069,10 @@ namespace tuplex { *((int64_t *) ((uint8_t *) ptr + bitmapSize + _fixedLenFields.size())) = _varLenFields.size(); if (_varLenFields.size() > 0) { + assert(capacityLeft >= bitmapSize + _fixedLenFields.size() + sizeof(int64_t) + _varLenFields.size()); // copy varlenfields over - std::memcpy((uint8_t *) ptr + bitmapSize + _fixedLenFields.size() + sizeof(int64_t), + std::memcpy(((uint8_t *) ptr) + bitmapSize + _fixedLenFields.size() + sizeof(int64_t), _varLenFields.buffer(), _varLenFields.size()); // set correct offsets in buffer @@ -1044,6 +1133,46 @@ namespace tuplex { calcBitmapSize(_requiresBitmap); } + Serializer &Serializer::appendField(const Field &f) { + // dispatch according to field type + if(f.getType() == python::Type::NULLVALUE) + return appendNull(); + if(f.getType() == python::Type::BOOLEAN) + return append((bool)f.getInt()); + if(f.getType() == python::Type::I64) + return append(f.getInt()); + if(f.getType() == python::Type::F64) + return append(f.getDouble()); + if(f.getType() == python::Type::STRING) + return append(std::string((const char*)f.getPtr())); + + if(f.getType().isListType()) + return append(*(List*)f.getPtr()); + + if(f.getType().isTupleType()) + return append(*(Tuple*)f.getPtr()); + + if(f.getType().isOptionType()) { + auto et = f.getType().getReturnType(); + if(et == python::Type::BOOLEAN) + return append(f.isNull() ? option::none : option((bool)f.getInt())); + if(et == python::Type::I64) + return append(f.isNull() ? option::none : option(f.getInt())); + if(et == python::Type::F64) + return append(f.isNull() ? option::none : option(f.getDouble())); + if(et == python::Type::STRING) + return append(f.isNull() ? option::none : option(std::string((const char*)f.getPtr()))); + + if(et.isListType()) + return append(f.isNull() ? option::none : option(*(List*)f.getPtr()), et); + + if(et.isTupleType()) + return append(f.isNull() ? option::none : option(*(Tuple*)f.getPtr()), et); + } + + throw std::runtime_error("Unknown field type " + f.getType().desc() + " to append found."); + } + Deserializer::Deserializer(const Schema &schema) : _schema(schema), _buffer(nullptr), _numSerializedFields(0) { // get flattened type representation @@ -1084,6 +1213,8 @@ namespace tuplex { _isVarLenField.push_back(true); } else { Logger::instance().logger("core").error("non deserializable type '" + el.desc() + "' detected"); + // treat as none... + _isVarLenField.push_back(false); } } } @@ -1359,6 +1490,8 @@ namespace tuplex { assert(phys_col < (inferLength(_buffer) - sizeof(int64_t)) / sizeof(int64_t)); // sharper bound because of varlen // get offset: offset is in the lower 32bit, the upper are the size of the var entry int64_t offset = *((int64_t *) ((uint8_t *) _buffer + sizeof(int64_t) * phys_col + calcBitmapSize(_requiresBitmap))); + + // @TODO: better list handling & testing. int64_t len = ((offset & (0xFFFFFFFFl << 32)) >> 32); // shortcut, warn about empty list: @@ -1411,6 +1544,7 @@ namespace tuplex { ptr += sizeof(uint64_t); } else if(currFieldType.isListType()) { auto listOffset = *(int64_t *)ptr; + listOffset &= 0xFFFFFFFF; // offset is lower 4 bytes. f = Field(getListHelper(currFieldType, ptr + listOffset)); ptr += sizeof(uint64_t); } else if(currFieldType == python::Type::NULLVALUE) { @@ -1422,7 +1556,7 @@ namespace tuplex { } else if(currFieldType == python::Type::EMPTYDICT) { f = Field::empty_dict(); } else if(currFieldType.isOptionType()) { - // need to check bitmapV + // need to check bitmap auto underlyingType = currFieldType.getReturnType(); if(underlyingType == python::Type::BOOLEAN) { if(bitmapV[bitmapIndex]) { @@ -1464,6 +1598,7 @@ namespace tuplex { f = Field::null(currFieldType); } else { auto listOffset = *(int64_t *)ptr; + listOffset &= 0xFFFFFFFF; f = Field(option(getListHelper(underlyingType, ptr + listOffset))); ptr += sizeof(uint64_t); } @@ -1473,6 +1608,7 @@ namespace tuplex { f = Field::null(currFieldType); } else { auto tupleOffset = *(int64_t *)ptr; + tupleOffset &= 0xFFFFFFFF; f = Field(option(getTupleHelper(underlyingType, ptr + tupleOffset))); ptr += sizeof(uint64_t); } diff --git a/tuplex/utils/src/Tuple.cc b/tuplex/utils/src/Tuple.cc index a2c9a4367..a67db7af3 100644 --- a/tuplex/utils/src/Tuple.cc +++ b/tuplex/utils/src/Tuple.cc @@ -11,6 +11,7 @@ #include #include #include +#include namespace tuplex { @@ -62,9 +63,9 @@ namespace tuplex { if(_elements) { assert(_numElements > 0); delete [] _elements; - _elements = nullptr; - _numElements = 0; } + _elements = nullptr; + _numElements = 0; } @@ -114,4 +115,39 @@ namespace tuplex { } return true; } -} \ No newline at end of file + + Tuple* Tuple::allocate_deep_copy() const { + Tuple *t = new Tuple(); + assert(t->_elements == nullptr); + t->_numElements = _numElements; + t->_elements = new Field[t->_numElements]; + for(unsigned i = 0; i < _numElements; ++i) { + t->_elements[i] = _elements[i]; + } + return t; + } + + size_t Tuple::serialized_length() const { + if(_numElements == 0) + return 0; + + // use Serializer to check length + Serializer s; + for(unsigned i = 0; i < _numElements; ++i) + s.appendField(_elements[i]); + return s.length(); + } + + size_t Tuple::serialize_to(uint8_t* ptr) const { + if(_numElements == 0) + return 0; + + // use Serializer to check length + Serializer s; + for(unsigned i = 0; i < _numElements; ++i) + s.appendField(_elements[i]); + auto length = s.length(); + return s.serialize(ptr, length); + } +} + diff --git a/tuplex/utils/src/TypeSystem.cc b/tuplex/utils/src/TypeSystem.cc index 56b5df013..49889f505 100644 --- a/tuplex/utils/src/TypeSystem.cc +++ b/tuplex/utils/src/TypeSystem.cc @@ -994,6 +994,14 @@ namespace python { bUnderlyingType = b.getReturnType(); } + // if makeOption -> recursive call + if(makeOption) { + auto ans = unifyTypes(aUnderlyingType, bUnderlyingType); + if(python::Type::UNKNOWN == ans) + return ans; + return python::Type::makeOptionType(ans); + } + // same underlying types? make option if (aUnderlyingType == bUnderlyingType) { return python::Type::makeOptionType(aUnderlyingType); @@ -1033,6 +1041,18 @@ namespace python { return python::Type::makeListType(newElementType); } + // any list is compatible with empty list + if(aUnderlyingType.isListType() && bUnderlyingType == python::Type::EMPTYLIST) + return aUnderlyingType; + if(aUnderlyingType == python::Type::EMPTYLIST && bUnderlyingType.isListType()) + return bUnderlyingType; + + // any dict is compatible with empty dict + if(aUnderlyingType.isDictionaryType() && bUnderlyingType == python::Type::EMPTYDICT) + return aUnderlyingType; + if(aUnderlyingType == python::Type::EMPTYDICT && bUnderlyingType.isDictionaryType()) + return bUnderlyingType; + // tuple type? check if every parameter type compatible if(aUnderlyingType.isTupleType() && bUnderlyingType.isTupleType()) { if (aUnderlyingType.parameters().size() != bUnderlyingType.parameters().size()) {