这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
displayName: 'Install required packages'
- script: sudo apt-get install python3-setuptools && sudo apt-get remove python-pexpect python3-pexpect && sudo python3.7 -m pip install --upgrade pip && sudo python3.7 -m pip uninstall -y pygments && sudo python3.7 -m pip install pygments>=2.4.1 pexpect setuptools astor PyYAML jupyter nbformat && jupyter --version
displayName: 'Install python dependencies'
- script: cd tuplex && mkdir build && cd build && cmake -DLLVM_ROOT_DIR=/usr/lib/llvm-9 -DCMAKE_BUILD_TYPE=Release -DBUILD_FOR_CI=ON .. && make -j$(nproc)
- script: cd tuplex && mkdir build && cd build && cmake -DBUILD_WITH_ORC=ON -DLLVM_ROOT_DIR=/usr/lib/llvm-9 -DCMAKE_BUILD_TYPE=Release -DBUILD_FOR_CI=ON .. && make -j$(nproc)
displayName: 'Build Tuplex'
- script: cd tuplex && cd build && ctest --timeout 180 --output-on-failure
displayName: 'C++ tests'
Expand Down
21 changes: 12 additions & 9 deletions tuplex/core/include/physical/OrcReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ namespace tuplex {
* @param inputFilePath
*/
void read(const URI& inputFilePath) override {
Timer timer;

using namespace ::orc;
auto inStream = std::make_unique<orc::VirtualInputStream>(inputFilePath);
ReaderOptions options;
Expand Down Expand Up @@ -87,8 +89,8 @@ namespace tuplex {

try {
for (auto partition : pw.getOutputPartitions()) {
int64_t size = partition->size();
const uint8_t *ptr = partition->lockRaw();
int64_t size = partition->size();
_functor(_task, ptr, size, &numNormalRows, &numBadRows, false);
partition->unlock();
partition->invalidate();
Expand All @@ -103,13 +105,12 @@ namespace tuplex {
for (auto el : columns) {
delete el;
}
batch.reset();
rowReader.reset();
reader.reset();
inStream.reset();

auto& logger = Logger::instance().defaultLogger();
logger.info("Read " + std::to_string(numNormalRows) + " from file");
std::stringstream ss;
ss<<"[Task Finished] read from Orc file in "
<<std::to_string(timer.time())<<"s (";
ss<<pluralize(numNormalRows, "row")<<")";
Logger::instance().defaultLogger().info(ss.str());
}

private:
Expand All @@ -125,16 +126,18 @@ namespace tuplex {
size_t _rangeLen;

void writeBatchToPartition(PartitionWriter &pw, ::orc::ColumnVectorBatch *batch, std::vector<tuplex::orc::OrcBatch *> &columns) {
Serializer serializer(false);
serializer.setSchema(_schema);
for (uint64_t r = 0; r < batch->numElements; ++r) {
Serializer serializer(false);
serializer.setSchema(_schema);
for (auto col : columns) {
col->getField(serializer, r);
}
auto len = serializer.length();
const uint8_t *ptr = new uint8_t[len];
serializer.serialize((void *) ptr, len);
pw.writeData(ptr, len);

serializer.reset();
}
_numRowsRead += batch->numElements;
}
Expand Down
12 changes: 4 additions & 8 deletions tuplex/core/include/physical/SimpleOrcWriteTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,15 @@ class SimpleOrcWriteTask : public IExecutorTask {
}

using namespace ::orc;
auto outStream = new orc::VirtualOutputStream(_uri);
ORC_UNIQUE_PTR<Type> schema(orc::tuplexRowTypeToOrcType(_schema.getRowType(), _columns));
if (!schema) {
abort("error creating Orc schema.");
}

orc::VirtualOutputStream outStream(_uri);

WriterOptions options;
ORC_UNIQUE_PTR<Writer> writer = createWriter(*schema, outStream, options);
ORC_UNIQUE_PTR<Writer> writer = createWriter(*schema, &outStream, options);
if (!writer) {
abort("error creating Orc writer.");
}
Expand Down Expand Up @@ -102,8 +103,6 @@ class SimpleOrcWriteTask : public IExecutorTask {

writer->add(*batch);

batch.reset();

for (auto el : orcColumns) {
delete el;
}
Expand All @@ -113,12 +112,9 @@ class SimpleOrcWriteTask : public IExecutorTask {
}

writer->close();
delete outStream;
schema.reset();
writer.reset();

std::stringstream ss;
ss<<"[Task Finished] write to file in "
ss<<"[Task Finished] write to Orc file in "
<<std::to_string(timer.time())<<"s (";
ss<<pluralize(totalRows, "row")<<", "<<sizeToMemString(totalBytes)<<")";
owner()->info(ss.str());
Expand Down
221 changes: 203 additions & 18 deletions tuplex/io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,43 @@ include_directories(${Boost_INCLUDE_DIR})
if(BUILD_WITH_ORC)
message(STATUS "Building Tuplex with ORC support")

# Orc provides builds for the following libraries:
# - liblz4, libsnappy, libz, and libzstd
find_package(Protobuf REQUIRED)
get_filename_component(Protobuf_HOME "${Protobuf_INCLUDE_DIRS}" DIRECTORY)

include(ExternalProject)
set(EXTERNAL_INSTALL_LOCATION ${CMAKE_BINARY_DIR}/third_party)

# For MacOS, check whether certain 3rd party libs are already installed via brew
# check if snappy is already installed under MacOS
if(BREW_FOUND)
if(APPLE)
EXECUTE_PROCESS(COMMAND brew --prefix snappy OUTPUT_VARIABLE BREW_SNAPPY_DIR ERROR_VARIABLE BREW_SNAPPY_NOTFOUND OUTPUT_STRIP_TRAILING_WHITESPACE)
set(THIRDPARTY_CONFIGURE_COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}")

# Snappy
EXECUTE_PROCESS(COMMAND brew list snappy OUTPUT_VARIABLE BREW_SNAPPY_LIST ERROR_VARIABLE BREW_SNAPPY_NOTFOUND OUTPUT_STRIP_TRAILING_WHITESPACE)
if(BREW_SNAPPY_NOTFOUND)
set(SNAPPY_LIBRARIES "${EXTERNAL_INSTALL_LOCATION}/lib/libsnappy.a")
message(STATUS "Could not find locally installed snappy, building third party")
set(SNAPPY_VERSION "1.1.7")
set(SNAPPY_HOME "${EXTERNAL_INSTALL_LOCATION}")
set(SNAPPY_INCLUDE_DIR "${SNAPPY_HOME}/include")
set(SNAPPY_STATIC_LIB "${SNAPPY_HOME}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}snappy${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(SNAPPY_CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${SNAPPY_HOME}
-DBUILD_SHARED_LIBS=OFF -DCMAKE_INSTALL_LIBDIR=lib -DSNAPPY_BUILD_TESTS=OFF -DCMAKE_POSITION_INDEPENDENT_CODE=ON)
ExternalProject_Add (snappy_ep
URL "https://github.com/google/snappy/archive/${SNAPPY_VERSION}.tar.gz"
CMAKE_ARGS ${SNAPPY_CMAKE_ARGS}
BUILD_BYPRODUCTS "${SNAPPY_STATIC_LIB}")

set(SNAPPY_LIBRARIES ${SNAPPY_STATIC_LIB})

add_library(snappy INTERFACE)
target_link_libraries(snappy INTERFACE ${SNAPPY_STATIC_LIB})
target_include_directories(snappy SYSTEM INTERFACE ${SNAPPY_INCLUDE_DIR})

add_dependencies(snappy snappy_ep)
install(FILES "${SNAPPY_STATIC_LIB}" DESTINATION "lib")
set(SNAPPY_DEPENDS "snappy_ep")
else()
EXECUTE_PROCESS(COMMAND brew --prefix snappy OUTPUT_VARIABLE BREW_SNAPPY_DIR OUTPUT_STRIP_TRAILING_WHITESPACE)
set(ENV{SNAPPY_HOME} ${BREW_SNAPPY_DIR})
set(SNAPPY_HOME ${BREW_SNAPPY_DIR})
message(STATUS "Found locally installed snappy under $ENV{SNAPPY_HOME}")
Expand All @@ -58,29 +79,192 @@ if(BUILD_WITH_ORC)
endif()
message(STATUS "Snappy libraries: ${SNAPPY_LIBRARIES}")
endif()

# Lz4
EXECUTE_PROCESS(COMMAND brew list lz4 OUTPUT_VARIABLE BREW_LZ4_LIST ERROR_VARIABLE BREW_LZ4_NOTFOUND OUTPUT_STRIP_TRAILING_WHITESPACE)
if(BREW_LZ4_NOTFOUND)
message(STATUS "Could not find locally installed lz4, building third party")
set(LZ4_VERSION "1.7.5")
set(LZ4_HOME "${EXTERNAL_INSTALL_LOCATION}")
set(LZ4_INCLUDE_DIR "${LZ4_HOME}/include")
set(LZ4_STATIC_LIB "${LZ4_HOME}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}lz4${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(LZ4_CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${LZ4_HOME}
-DBUILD_SHARED_LIBS=OFF -DCMAKE_INSTALL_LIBDIR=lib -DLZ4_BUILD_TESTS=OFF -DCMAKE_POSITION_INDEPENDENT_CODE=ON)

if (CMAKE_VERSION VERSION_GREATER "3.7")
set(LZ4_CONFIGURE SOURCE_SUBDIR "contrib/cmake_unofficial" CMAKE_ARGS ${LZ4_CMAKE_ARGS})
else()
set(LZ4_CONFIGURE CONFIGURE_COMMAND "${THIRDPARTY_CONFIGURE_COMMAND}" ${LZ4_CMAKE_ARGS}
"${CMAKE_CURRENT_BINARY_DIR}/lz4_ep-prefix/src/lz4_ep/contrib/cmake_unofficial")
endif()

ExternalProject_Add (lz4_ep
URL "https://github.com/lz4/lz4/archive/v${LZ4_VERSION}.tar.gz"
${LZ4_CONFIGURE}
BUILD_BYPRODUCTS "${LZ4_STATIC_LIB}")

set(LZ4_LIBRARIES ${LZ4_STATIC_LIB})

add_library(lz4 INTERFACE)
target_link_libraries(lz4 INTERFACE ${LZ4_STATIC_LIB})
target_include_directories(lz4 SYSTEM INTERFACE ${LZ4_INCLUDE_DIR})

add_dependencies(lz4 lz4_ep)
install(FILES "${LZ4_STATIC_LIB}" DESTINATION "lib")
set(LZ4_DEPENDS "lz4_ep")
else()
EXECUTE_PROCESS(COMMAND brew --prefix lz4 OUTPUT_VARIABLE BREW_LZ4_DIR OUTPUT_STRIP_TRAILING_WHITESPACE)
set(ENV{LZ4_HOME} ${BREW_LZ4_DIR})
set(LZ4_HOME ${BREW_LZ4_DIR})
message(STATUS "Found locally installed lz4 under $ENV{LZ4_HOME}")
# set variables
file (TO_CMAKE_PATH "${LZ4_HOME}" _lz4_path)
find_library (LZ4_LIBRARY NAMES lz4 HINTS
${_lz4_path}
PATH_SUFFIXES "lib" "lib64")
if(LZ4_LIBRARY)
message(STATUS "lz4 lib: ${LZ4_LIBRARY}")
endif()
find_library (LZ4_STATIC_LIB NAMES ${CMAKE_STATIC_LIBRARY_PREFIX}${LZ4_LIB_NAME}${CMAKE_STATIC_LIBRARY_SUFFIX} HINTS
${_lz4_path}
PATH_SUFFIXES "lib" "lib64")
if(LZ4_LIBRARY)
set(LZ4_LIBRARIES "${LZ4_LIBRARY}")
elseif(LZ4_STATIC_LIB)
set(LZ4_LIBRARIES "${LZ4_STATIC_LIB}")
endif()
message(STATUS "Lz4 libraries: ${LZ4_LIBRARIES}")
endif()

# Zstd
EXECUTE_PROCESS(COMMAND brew list zstd OUTPUT_VARIABLE BREW_ZSTD_LIST ERROR_VARIABLE BREW_ZSTD_NOTFOUND OUTPUT_STRIP_TRAILING_WHITESPACE)
if(BREW_ZSTD_NOTFOUND)
message(STATUS "Could not find locally installed zstd, building third party")
set(ZSTD_VERSION "1.5.0")
set(ZSTD_HOME "${EXTERNAL_INSTALL_LOCATION}")
set(ZSTD_INCLUDE_DIR "${ZSTD_HOME}/include")
set(ZSTD_STATIC_LIB "${ZSTD_HOME}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}zstd${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(ZSTD_CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${ZSTD_HOME}
-DBUILD_SHARED_LIBS=OFF -DCMAKE_INSTALL_LIBDIR=lib -DZSTD_BUILD_TESTS=OFF -DCMAKE_POSITION_INDEPENDENT_CODE=ON)

if (CMAKE_VERSION VERSION_GREATER "3.7")
set(ZSTD_CONFIGURE SOURCE_SUBDIR "build/cmake" CMAKE_ARGS ${ZSTD_CMAKE_ARGS})
else()
set(ZSTD_CONFIGURE CONFIGURE_COMMAND "${THIRDPARTY_CONFIGURE_COMMAND}" ${ZSTD_CMAKE_ARGS}
"${CMAKE_CURRENT_BINARY_DIR}/zstd_ep-prefix/src/zstd_ep/build/cmake")
endif()

ExternalProject_Add (zstd_ep
URL "https://github.com/facebook/zstd/archive/v${ZSTD_VERSION}.tar.gz"
${ZSTD_CONFIGURE}
BUILD_BYPRODUCTS "${ZSTD_STATIC_LIB}")

set(ZSTD_LIBRARIES ${ZSTD_STATIC_LIB})

add_library(zstd INTERFACE)
target_link_libraries(zstd INTERFACE ${ZSTD_STATIC_LIB})
target_include_directories(zstd SYSTEM INTERFACE ${ZSTD_INCLUDE_DIR})

add_dependencies(zstd zstd_ep)
install(FILES "${ZSTD_STATIC_LIB}" DESTINATION "lib")
set(ZSTD_DEPENDS "zstd_ep")
else()
EXECUTE_PROCESS(COMMAND brew --prefix zstd OUTPUT_VARIABLE BREW_ZSTD_DIR OUTPUT_STRIP_TRAILING_WHITESPACE)
set(ENV{ZSTD_HOME} ${BREW_ZSTD_DIR})
set(ZSTD_HOME ${BREW_ZSTD_DIR})
message(STATUS "Found locally installed zstd under $ENV{ZSTD_HOME}")
# set variables
file (TO_CMAKE_PATH "${ZSTD_HOME}" _zstd_path)
find_library (ZSTD_LIBRARY NAMES zstd HINTS
${_zstd_path}
PATH_SUFFIXES "lib" "lib64")
if(ZSTD_LIBRARY)
message(STATUS "zstd lib: ${ZSTD_LIBRARY}")
endif()
find_library (ZSTD_STATIC_LIB NAMES ${CMAKE_STATIC_LIBRARY_PREFIX}${ZSTD_LIB_NAME}${CMAKE_STATIC_LIBRARY_SUFFIX} HINTS
${_zstd_path}
PATH_SUFFIXES "lib" "lib64")
if(ZSTD_LIBRARY)
set(ZSTD_LIBRARIES "${ZSTD_LIBRARY}")
elseif(ZSTD_STATIC_LIB)
set(ZSTD_LIBRARIES "${ZSTD_STATIC_LIB}")
endif()
message(STATUS "Zstd libraries: ${ZSTD_LIBRARIES}")
endif()

# Zlib
EXECUTE_PROCESS(COMMAND brew list zlib OUTPUT_VARIABLE BREW_ZLIB_LIST ERROR_VARIABLE BREW_ZLIB_NOTFOUND OUTPUT_STRIP_TRAILING_WHITESPACE)
if(BREW_ZLIB_NOTFOUND)
message(STATUS "Could not find locally installed zlib, building third party")
set(ZLIB_VERSION "1.2.11")
set(ZLIB_HOME "${EXTERNAL_INSTALL_LOCATION}")
set(ZLIB_INCLUDE_DIR "${ZLIB_HOME}/include")
set(ZLIB_STATIC_LIB "${ZLIB_HOME}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}z${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(ZLIB_CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${ZLIB_HOME}
-DBUILD_SHARED_LIBS=OFF -DCMAKE_INSTALL_LIBDIR=lib -DZLIB_BUILD_TESTS=OFF -DCMAKE_POSITION_INDEPENDENT_CODE=ON)
ExternalProject_Add (zlib_ep
URL "http://zlib.net/fossils/zlib-${ZLIB_VERSION}.tar.gz"
CMAKE_ARGS ${ZLIB_CMAKE_ARGS}
BUILD_BYPRODUCTS "${ZLIB_STATIC_LIB}")

set(ZLIB_LIBRARIES ${ZLIB_STATIC_LIB})

add_library(zlib INTERFACE)
target_link_libraries(zlib INTERFACE ${ZLIB_STATIC_LIB})
target_include_directories(zlib SYSTEM INTERFACE ${ZLIB_INCLUDE_DIR})

add_dependencies(zlib zlib_ep)
install(FILES "${ZLIB_STATIC_LIB}" DESTINATION "lib")
set(ZLIB_DEPENDS "zlib_ep")
else()
EXECUTE_PROCESS(COMMAND brew --prefix zlib OUTPUT_VARIABLE BREW_ZLIB_DIR OUTPUT_STRIP_TRAILING_WHITESPACE)
set(ENV{ZLIB_HOME} ${BREW_ZLIB_DIR})
set(ZLIB_HOME ${BREW_ZLIB_DIR})
message(STATUS "Found locally installed zlib under $ENV{ZLIB_HOME}")
# set variables
file (TO_CMAKE_PATH "${ZLIB_HOME}" _zlib_path)
find_library (ZLIB_LIBRARY NAMES z HINTS
${_zlib_path}
PATH_SUFFIXES "lib" "lib64")
if(ZLIB_LIBRARY)
message(STATUS "zlib lib: ${ZLIB_LIBRARY}")
endif()
find_library (ZLIB_STATIC_LIB NAMES ${CMAKE_STATIC_LIBRARY_PREFIX}z${CMAKE_STATIC_LIBRARY_SUFFIX} HINTS
${_zlib_path}
PATH_SUFFIXES "lib" "lib64")
if(ZLIB_LIBRARY)
set(ZLIB_LIBRARIES "${ZLIB_LIBRARY}")
elseif(ZLIB_STATIC_LIB)
set(ZLIB_LIBRARIES "${ZLIB_STATIC_LIB}")
endif()
message(STATUS "Zlib libraries: ${ZLIB_LIBRARIES}")
endif()
endif()
endif()

# set to third-party build
if(NOT SNAPPY_LIBRARIES)
set(SNAPPY_HOME "")
if (NOT APPLE)
message(STATUS "Adding byproducts to external project")
set(SNAPPY_LIBRARIES ${EXTERNAL_INSTALL_LOCATION}/lib/libsnappy.a)
set(ZSTD_LIBRARIES ${EXTERNAL_INSTALL_LOCATION}/lib/libzstd.a)
set(ZLIB_LIBRARIES ${EXTERNAL_INSTALL_LOCATION}/lib/libz.a)
set(LZ4_LIBRARIES ${EXTERNAL_INSTALL_LOCATION}/lib/liblz4.a)
set(ORC_THIRD_PARTY_LIBS
${SNAPPY_LIBRARIES}
${ZSTD_LIBRARIES}
${ZLIB_LIBRARIES}
${LZ4_LIBRARIES})
endif()

ExternalProject_Add(orc
GIT_REPOSITORY https://github.com/apache/orc.git
GIT_TAG rel/release-1.7.0
TIMEOUT 5
CMAKE_ARGS -DBUILD_LIBHDFSPP=OFF -DSNAPPY_HOME=${SNAPPY_HOME} -DOPENSSL_ROOT_DIR=${OPENSSL_ROOT_DIR} -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER} -DCMAKE_CXX_FLAGS="-Wno-poison-system-directories" -DCMAKE_INSTALL_PREFIX=${EXTERNAL_INSTALL_LOCATION} -DBUILD_JAVA=OFF -DBUILD_TOOLS=OFF -DBUILD_CPP_TESTS=OFF -DBUILD_POSITION_INDEPENDENT_LIB=ON -DPROTOBUF_HOME=${Protobuf_HOME}
CMAKE_ARGS -DBUILD_LIBHDFSPP=OFF -DSNAPPY_HOME=${SNAPPY_HOME} -DLZ4_HOME=${LZ4_HOME} -DZSTD_HOME=${ZSTD_HOME} -DZLIB_HOME=${ZLIB_HOME} -DOPENSSL_ROOT_DIR=${OPENSSL_ROOT_DIR} -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER} -DCMAKE_CXX_FLAGS="-Wno-poison-system-directories" -DCMAKE_INSTALL_PREFIX=${EXTERNAL_INSTALL_LOCATION} -DBUILD_JAVA=OFF -DBUILD_TOOLS=OFF -DBUILD_CPP_TESTS=OFF -DBUILD_POSITION_INDEPENDENT_LIB=ON -DPROTOBUF_HOME=${Protobuf_HOME}
PREFIX "${EXTERNAL_INSTALL_LOCATION}"
UPDATE_COMMAND "" # Disable update step: clones the project only once
BUILD_BYPRODUCTS
${EXTERNAL_INSTALL_LOCATION}/lib/liborc.a
${EXTERNAL_INSTALL_LOCATION}/lib/liblz4.a
${SNAPPY_LIBRARIES}
${EXTERNAL_INSTALL_LOCATION}/lib/libz.a
${EXTERNAL_INSTALL_LOCATION}/lib/libzstd.a
BUILD_BYPRODUCTS ${EXTERNAL_INSTALL_LOCATION}/lib/liborc.a ${ORC_THIRD_PARTY_LIBS}
)
ExternalProject_Add_StepDependencies(orc build ${SNAPPY_DEPENDS} ${LZ4_DEPENDS} ${ZSTD_DEPENDS} ${ZLIB_DEPENDS})
set(orc_INCLUDE_DIR ${EXTERNAL_INSTALL_LOCATION}/include)
ExternalProject_Get_Property(orc binary_dir)
set(orc_LIBRARY ${EXTERNAL_INSTALL_LOCATION}/lib/liborc.a)
Expand All @@ -91,10 +275,11 @@ if(BUILD_WITH_ORC)
add_dependencies(liborc orc)
include_directories(${orc_INCLUDE_DIR})

set(ORC_LIBRARIES ${EXTERNAL_INSTALL_LOCATION}/lib/liblz4.a
set(ORC_LIBRARIES
${SNAPPY_LIBRARIES}
${EXTERNAL_INSTALL_LOCATION}/lib/libz.a
${EXTERNAL_INSTALL_LOCATION}/lib/libzstd.a
${ZSTD_LIBRARIES}
${ZLIB_LIBRARIES}
${LZ4_LIBRARIES}
liborc)
endif()

Expand Down
Loading