From 97ae216a089de05f9a80dcfd062ccee35d386062 Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Sun, 3 Jul 2022 14:36:36 -0400 Subject: [PATCH 01/32] [HOTFIX] Fix for key with optional tuplex. prefixing #69 --- tuplex/core/src/ContextOptions.cc | 24 +++++++++++++++++++-- tuplex/core/src/physical/PipelineBuilder.cc | 2 ++ tuplex/core/src/physical/TransformStage.cc | 5 +++++ tuplex/test/core/DataSetCollect.cc | 12 +++++++++++ 4 files changed, 41 insertions(+), 2 deletions(-) diff --git a/tuplex/core/src/ContextOptions.cc b/tuplex/core/src/ContextOptions.cc index 49b498969..7dbb5136c 100644 --- a/tuplex/core/src/ContextOptions.cc +++ b/tuplex/core/src/ContextOptions.cc @@ -677,10 +677,30 @@ namespace tuplex { // check that key exists, else issue warning! auto it = _store.find(key); + auto lookup_key = key; + + // if end, also check tuplex. version of it! + if(it == _store.end()) + it = _store.find("tuplex." + key); + + if(it != _store.end()) + lookup_key = "tuplex." + key; + + // still not found? check lowercase versions + if(it == _store.end()) { + for(it = _store.begin(); it != _store.end(); ++it) { + auto normalized_ref = tolower(it->first); + if(normalized_ref == tolower(key) || normalized_ref == tolower("tuplex." + key)) { + lookup_key = it->first; + break; // found valid iterator! + } + } + } + if(it == _store.end()) - Logger::instance().defaultLogger().error("could not find key '" + key + "'"); + Logger::instance().defaultLogger().error("could not find key '" + key + "', ignoring set attempt."); else - _store[key] = value; + _store[lookup_key] = value; } size_t ContextOptions::DRIVER_MEMORY() const { diff --git a/tuplex/core/src/physical/PipelineBuilder.cc b/tuplex/core/src/physical/PipelineBuilder.cc index 00e7feaff..192bce673 100644 --- a/tuplex/core/src/physical/PipelineBuilder.cc +++ b/tuplex/core/src/physical/PipelineBuilder.cc @@ -1012,6 +1012,8 @@ namespace tuplex { using namespace llvm; using namespace std; +#error "fix this here for hashing arbitrary types. I.e. serializing should do the trick" + // check colKey and whether it works. auto lastType = _lastRowResult.getTupleType(); python::Type keyType; diff --git a/tuplex/core/src/physical/TransformStage.cc b/tuplex/core/src/physical/TransformStage.cc index f753e6e41..b7db17cdd 100644 --- a/tuplex/core/src/physical/TransformStage.cc +++ b/tuplex/core/src/physical/TransformStage.cc @@ -240,6 +240,7 @@ namespace tuplex { if(!result.hash_map && !result.null_bucket) return std::vector(); + Deserializer ds(Schema(Schema::MemoryLayout::ROW, keyRowType)); Partition* partition = nullptr; PartitionWriter pw(driver, outputSchema, outputDataSetID, context.id(), context.getOptions().PARTITION_SIZE()); @@ -334,6 +335,10 @@ namespace tuplex { r = Row(s); r = r.upcastedRow(out_row_type); } else { + + // decode Row from memory + auto row = Row::fromMemory(ds, key, keylen); + throw std::runtime_error("decoding of other types not yet supported..."); // // this is how it potentially should look like... diff --git a/tuplex/test/core/DataSetCollect.cc b/tuplex/test/core/DataSetCollect.cc index 0df3f85ed..dfee3e19a 100644 --- a/tuplex/test/core/DataSetCollect.cc +++ b/tuplex/test/core/DataSetCollect.cc @@ -650,4 +650,16 @@ TEST_F(DataSetTest, OutputValidation) { // invalid output folder EXPECT_THROW(c.parallelize({Row(Field::empty_list()), Row(Field::empty_list())}).map(UDF("lambda x: len(x)")).tocsv("."), std::runtime_error); +} + + +TEST_F(DataSetTest, MapColumnTypingBug) { + // reported as https://github.com/tuplex/tuplex/issues/96 + using namespace tuplex; + + Context c(microTestOptions()); + // c.csv("zillow_data.csv").mapColumn('title', lambda x: x + "-hi").filter(lambda a: False) + auto v = c.csv("../resources/zillow_data.csv").mapColumn("title", UDF("lambda x: x + '-hi'")).unique().collectAsVector();//.filter(UDF("lambda a: False")).collectAsVector(); + + EXPECT_EQ(v.size(), 0); } \ No newline at end of file From 4bef8f55af4aff75d5b9c60c713e9da5c4fbda86 Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Sun, 3 Jul 2022 17:14:13 -0400 Subject: [PATCH 02/32] fix hashing for rows with more complex types --- tuplex/codegen/include/HashHelper.h | 47 +++++++++++++++++++++ tuplex/codegen/src/HashHelper.cc | 15 +++++++ tuplex/core/src/physical/PipelineBuilder.cc | 6 ++- tuplex/core/src/physical/StageBuilder.cc | 6 +++ tuplex/core/src/physical/TransformStage.cc | 5 +-- tuplex/test/core/DataSetCollect.cc | 2 +- tuplex/test/core/FullPipelines.cc | 2 +- tuplex/utils/include/StringUtils.h | 12 ++++++ tuplex/utils/src/StringUtils.cc | 28 ++++++------ 9 files changed, 102 insertions(+), 21 deletions(-) create mode 100644 tuplex/codegen/include/HashHelper.h create mode 100644 tuplex/codegen/src/HashHelper.cc diff --git a/tuplex/codegen/include/HashHelper.h b/tuplex/codegen/include/HashHelper.h new file mode 100644 index 000000000..d5bd28685 --- /dev/null +++ b/tuplex/codegen/include/HashHelper.h @@ -0,0 +1,47 @@ +//--------------------------------------------------------------------------------------------------------------------// +// // +// Tuplex: Blazing Fast Python Data Science // +// // +// // +// (c) 2017 - 2021, Tuplex team // +// Created by Leonhard Spiegelberg first on 7/4/2022 // +// License: Apache 2.0 // +//--------------------------------------------------------------------------------------------------------------------// +#ifndef HASHHELPER_HEADER_ +#define HASHHELPER_HEADER_ + +#include "LLVMEnvironment.h" +#include "CodegenHelper.h" + +namespace tuplex { + namespace codegen { + + // this is a simple hashmap proxy structure + class HashProxy { + public: + /*! + * create a new hashmap (codegen) + * @param builder + * @param global whether to create the hashmap as global variable or not. + */ + HashProxy(llvm::IRBuilder<>& builder, bool global=false); + + /*! + * put a value into the hashmap + * @param builder + * @param key + * @param value if default serializable value, then no + */ + void put(llvm::IRBuilder<>& builder, + const SerializableValue& key, + const SerializableValue& value=SerializableValue()); + + }; + + // these are helper functions to deal with generating code to hash different keys etc. + extern void hashmap_put(llvm::IRBuilder<>& builder, + const SerializableValue& key, + const SerializableValue& value); + } +} +#endif \ No newline at end of file diff --git a/tuplex/codegen/src/HashHelper.cc b/tuplex/codegen/src/HashHelper.cc new file mode 100644 index 000000000..5990699b9 --- /dev/null +++ b/tuplex/codegen/src/HashHelper.cc @@ -0,0 +1,15 @@ +// +// Created by Leonhard Spiegelberg on 7/3/22. +// +#include + +namespace tuplex { + namespace codegen { + // these are helper functions to deal with generating code to hash different keys etc. + extern void hashmap_put(llvm::IRBuilder<>& builder, + const SerializableValue& key, + const SerializableValue& value) { + // check what type key is => this determines the structure + } + } +} \ No newline at end of file diff --git a/tuplex/core/src/physical/PipelineBuilder.cc b/tuplex/core/src/physical/PipelineBuilder.cc index 192bce673..1287b9008 100644 --- a/tuplex/core/src/physical/PipelineBuilder.cc +++ b/tuplex/core/src/physical/PipelineBuilder.cc @@ -1006,14 +1006,14 @@ namespace tuplex { const int hashtableWidth, bool bucketize, bool isAggregateByKey) { + auto& logger = Logger::instance().logger("codegen"); + assert(!bucketize || (bucketize && !keyCols.empty() && !isAggregateByKey)); assert((isAggregateByKey && !bucketize) || !isAggregateByKey); assert(hashtableWidth == 8 || hashtableWidth == 0); using namespace llvm; using namespace std; -#error "fix this here for hashing arbitrary types. I.e. serializing should do the trick" - // check colKey and whether it works. auto lastType = _lastRowResult.getTupleType(); python::Type keyType; @@ -1051,6 +1051,8 @@ namespace tuplex { keyType = python::Type::STRING; // otherwise, we need to serialize the full row and use a string hashmap } + logger.debug("generate hashing using keytype=" + keyType.desc()); + // perform type conversions: general types become strings (serialize to hash) if(keyType.isTupleType()) { if(keyType.isOptionType()) throw std::runtime_error("This shouldn't happen."); diff --git a/tuplex/core/src/physical/StageBuilder.cc b/tuplex/core/src/physical/StageBuilder.cc index 8844e27b0..11ca64ac7 100644 --- a/tuplex/core/src/physical/StageBuilder.cc +++ b/tuplex/core/src/physical/StageBuilder.cc @@ -855,6 +855,12 @@ namespace tuplex { } else if(aop->aggType() == AggregateType::AGG_UNIQUE) { // nothing to do... // => here aggregate is directly written to output table! + // do not bucketize but simply use all hash keys! + std::vector v; + auto num_columns = aop->getOutputSchema().getRowType().parameters().size(); + for(size_t i = 0; i < num_columns; ++i) + v.push_back(i); + _hashColKeys = v; } else { throw std::runtime_error("unsupported aggregate type"); } diff --git a/tuplex/core/src/physical/TransformStage.cc b/tuplex/core/src/physical/TransformStage.cc index b7db17cdd..1d4258c5f 100644 --- a/tuplex/core/src/physical/TransformStage.cc +++ b/tuplex/core/src/physical/TransformStage.cc @@ -224,7 +224,7 @@ namespace tuplex { // -> if not, error and return empty vector! // hashKeyType is the type in which the key is stored. (NOT INCLUDING OPT!) - python::Type hashKeyType = result.keyType.withoutOptions(); // remove option b.c. of null-bucket design. @TODO: this is not 100% correct, because inner options will also get sacrificed by this... + python::Type hashKeyType = result.keyType; // remove option b.c. of null-bucket design. @TODO: this is not 100% correct, because inner options will also get sacrificed by this... python::Type keyRowType = python::Type::propagateToTupleType(hashKeyType); bool requiresUpcast = false; @@ -338,8 +338,7 @@ namespace tuplex { // decode Row from memory auto row = Row::fromMemory(ds, key, keylen); - - throw std::runtime_error("decoding of other types not yet supported..."); + r = row.upcastedRow(out_row_type); // // this is how it potentially should look like... // // decode key into Row, upcast, serialize diff --git a/tuplex/test/core/DataSetCollect.cc b/tuplex/test/core/DataSetCollect.cc index dfee3e19a..1c5f85fe7 100644 --- a/tuplex/test/core/DataSetCollect.cc +++ b/tuplex/test/core/DataSetCollect.cc @@ -661,5 +661,5 @@ TEST_F(DataSetTest, MapColumnTypingBug) { // c.csv("zillow_data.csv").mapColumn('title', lambda x: x + "-hi").filter(lambda a: False) auto v = c.csv("../resources/zillow_data.csv").mapColumn("title", UDF("lambda x: x + '-hi'")).unique().collectAsVector();//.filter(UDF("lambda a: False")).collectAsVector(); - EXPECT_EQ(v.size(), 0); + EXPECT_GT(v.size(), 0); } \ No newline at end of file diff --git a/tuplex/test/core/FullPipelines.cc b/tuplex/test/core/FullPipelines.cc index 1bb49ef5f..aca4309fb 100644 --- a/tuplex/test/core/FullPipelines.cc +++ b/tuplex/test/core/FullPipelines.cc @@ -39,7 +39,7 @@ namespace tuplex { tuplex::splitString(col, '_', [&](const std::string &p) { s += tuplex::char2str(toupper(p[0])); for (int j = 1; j < p.length(); ++j) - s += tuplex::char2str(tolower(p[j])); + s += tuplex::char2str(std::tolower(p[j])); }); return s; diff --git a/tuplex/utils/include/StringUtils.h b/tuplex/utils/include/StringUtils.h index 00d21d67d..b199b39a1 100644 --- a/tuplex/utils/include/StringUtils.h +++ b/tuplex/utils/include/StringUtils.h @@ -358,6 +358,18 @@ namespace tuplex { return v; } + + /*! + * converts a string to lowercase (copy) + * @param s + * @return new version of string with lowercased characters + */ + inline std::string tolower(const std::string& s) { + std::string copy = s; + for(auto& c : copy) + c = std::tolower(c); + return copy; + } } diff --git a/tuplex/utils/src/StringUtils.cc b/tuplex/utils/src/StringUtils.cc index 09d7fc4a6..546222d56 100644 --- a/tuplex/utils/src/StringUtils.cc +++ b/tuplex/utils/src/StringUtils.cc @@ -203,8 +203,8 @@ namespace tuplex { return static_cast(ExceptionCode::BOOLPARSE_ERROR); case 2: // convert char - buffer[0] = (char) tolower(*start); - buffer[1] = (char) tolower(*(start + 1)); + buffer[0] = (char) std::tolower(*start); + buffer[1] = (char) std::tolower(*(start + 1)); buffer[2] = 0; if (strcmp(buffer, "no") == 0) { @@ -214,9 +214,9 @@ namespace tuplex { return static_cast(ExceptionCode::BOOLPARSE_ERROR); case 3: // convert char - buffer[0] = (char) tolower(*start); - buffer[1] = (char) tolower(*(start + 1)); - buffer[2] = (char) tolower(*(start + 2)); + buffer[0] = (char) std::tolower(*start); + buffer[1] = (char) std::tolower(*(start + 1)); + buffer[2] = (char) std::tolower(*(start + 2)); buffer[3] = 0; if (strcmp(buffer, "yes") == 0) { @@ -226,10 +226,10 @@ namespace tuplex { return static_cast(ExceptionCode::BOOLPARSE_ERROR); case 4: // convert char - buffer[0] = (char) tolower(*start); - buffer[1] = (char) tolower(*(start + 1)); - buffer[2] = (char) tolower(*(start + 2)); - buffer[3] = (char) tolower(*(start + 3)); + buffer[0] = (char) std::tolower(*start); + buffer[1] = (char) std::tolower(*(start + 1)); + buffer[2] = (char) std::tolower(*(start + 2)); + buffer[3] = (char) std::tolower(*(start + 3)); buffer[4] = 0; if (strcmp(buffer, "true") == 0) { @@ -239,11 +239,11 @@ namespace tuplex { return static_cast(ExceptionCode::BOOLPARSE_ERROR); case 5: // convert char - buffer[0] = (char) tolower(*start); - buffer[1] = (char) tolower(*(start + 1)); - buffer[2] = (char) tolower(*(start + 2)); - buffer[3] = (char) tolower(*(start + 3)); - buffer[4] = (char) tolower(*(start + 4)); + buffer[0] = (char) std::tolower(*start); + buffer[1] = (char) std::tolower(*(start + 1)); + buffer[2] = (char) std::tolower(*(start + 2)); + buffer[3] = (char) std::tolower(*(start + 3)); + buffer[4] = (char) std::tolower(*(start + 4)); buffer[5] = 0; if (strcmp(buffer, "false") == 0) { From f56043a0925227596d5d271d5c1fb0cc93bb8010 Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Sun, 3 Jul 2022 18:53:38 -0400 Subject: [PATCH 03/32] test fix --- tuplex/test/core/AssertAndRaise.cc | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/tuplex/test/core/AssertAndRaise.cc b/tuplex/test/core/AssertAndRaise.cc index 9bfcb9780..ecdebf945 100644 --- a/tuplex/test/core/AssertAndRaise.cc +++ b/tuplex/test/core/AssertAndRaise.cc @@ -18,7 +18,9 @@ using namespace std; // Note: only this test here fails... TEST_F(AssertAndRaiseTest, Assert) { - Context c(microTestOptions()); + auto opt = microTestOptions(); + opt.set("optimizer.mergeExceptionsInOrder", "true"); + Context c(opt); auto code = "def f(x):\n" "\tassert x % 2 == 1, 'only odd numbers'\n" @@ -35,15 +37,17 @@ TEST_F(AssertAndRaiseTest, Assert) { // with resolver auto v1 = ds.resolve(ExceptionCode::ASSERTIONERROR, UDF("lambda x: (x-1) * (x-1)")).collectAsVector(); ASSERT_EQ(v1.size(), 5); - EXPECT_EQ(v1[0].getInt(0), 1); - EXPECT_EQ(v1[1].getInt(0), 1); - EXPECT_EQ(v1[2].getInt(0), 9); - EXPECT_EQ(v1[3].getInt(0), 9); - EXPECT_EQ(v1[4].getInt(0), 25); + EXPECT_EQ(v1[0].getInt(0), 1); // -> 1 * 1 = 1 + EXPECT_EQ(v1[1].getInt(0), 1); // -> (2-1) * (2-1) = 1 + EXPECT_EQ(v1[2].getInt(0), 9); // -> 3 * 3 = 9 + EXPECT_EQ(v1[3].getInt(0), 9); // -> (4 - 1) * (4 -1 ) = 9 + EXPECT_EQ(v1[4].getInt(0), 25); // -> 5 * 5 = 25 } TEST_F(AssertAndRaiseTest, Raise) { - Context c(microTestOptions()); + auto opt = microTestOptions(); + opt.set("optimizer.mergeExceptionsInOrder", "true"); + Context c(opt); auto code = "def f(x):\n" "\tif x % 2 == 1:\n" @@ -73,7 +77,9 @@ TEST_F(AssertAndRaiseTest, Raise) { // ignore test for hierarchy? TEST_F(AssertAndRaiseTest, Ignore) { - Context c(microTestOptions()); + auto opt = microTestOptions(); + opt.set("optimizer.mergeExceptionsInOrder", "true"); + Context c(opt); auto code = "def f(x):\n" "\tif x % 2 == 0:\n" From 81ecd8f957f9f8bf56471a299a277d95a6d497b3 Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Sun, 3 Jul 2022 18:58:11 -0400 Subject: [PATCH 04/32] another fix --- tuplex/test/core/DataSetCollect.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tuplex/test/core/DataSetCollect.cc b/tuplex/test/core/DataSetCollect.cc index 1c5f85fe7..7a7b2fbff 100644 --- a/tuplex/test/core/DataSetCollect.cc +++ b/tuplex/test/core/DataSetCollect.cc @@ -467,7 +467,9 @@ TEST_F(DataSetTest, BuiltinPowerInt) { using namespace tuplex; using namespace std; - Context c(microTestOptions()); + auto opt = microTestOptions(); + opt.set("optimizer.mergeExceptionsInOrder", "true"); + Context c(opt); // simple integer case, everything well defined... // --> majority positive From 3b87b985b752a2ace84e597eebd04c45cd048007 Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Tue, 5 Jul 2022 14:38:08 -0400 Subject: [PATCH 05/32] wip fixing aggregateByKey (except hybrid) --- tuplex/core/include/ee/local/LocalBackend.h | 10 +- tuplex/core/src/ee/local/LocalBackend.cc | 163 +++++++++++++++++++- tuplex/python/tuplex/dataset.py | 8 +- tuplex/test/core/AggregateTest.cc | 2 + 4 files changed, 173 insertions(+), 10 deletions(-) diff --git a/tuplex/core/include/ee/local/LocalBackend.h b/tuplex/core/include/ee/local/LocalBackend.h index 63ea60c4b..58ae5a96c 100644 --- a/tuplex/core/include/ee/local/LocalBackend.h +++ b/tuplex/core/include/ee/local/LocalBackend.h @@ -77,7 +77,11 @@ namespace tuplex { * @param combine whether this is an aggregate (e.g. if we should call the aggregate combiner, rather than simply merging the hashtables) * @return the final hashtable sink */ - HashTableSink createFinalHashmap(const std::vector& tasks, int hashtableKeyByteWidth, bool combine); + HashTableSink createFinalHashmap(const std::vector& tasks, + int hashtableKeyByteWidth, + bool combine, + codegen::agg_init_f init_aggregate, + codegen::agg_combine_f combine_aggregate); // hash join stage void executeHashJoinStage(HashJoinStage* hstage); @@ -172,7 +176,9 @@ namespace tuplex { std::vector resolveViaSlowPath(std::vector& tasks, bool merge_rows_in_order, codegen::resolve_f functor, - TransformStage* tstage, bool combineHashmaps); + TransformStage* tstage, bool combineHashmaps, + codegen::agg_init_f init_aggregate, + codegen::agg_combine_f combine_aggregate); }; /*! diff --git a/tuplex/core/src/ee/local/LocalBackend.cc b/tuplex/core/src/ee/local/LocalBackend.cc index 56e5de9a0..2980efa98 100644 --- a/tuplex/core/src/ee/local/LocalBackend.cc +++ b/tuplex/core/src/ee/local/LocalBackend.cc @@ -997,7 +997,13 @@ namespace tuplex { auto resolveFunctor = _options.RESOLVE_WITH_INTERPRETER_ONLY() ? nullptr : syms->resolveFunctor; // cout<<"*** num tasks before resolution: "<aggInitFunctor, + syms->aggCombineFunctor); // cout<<"*** num tasks after resolution: "<setHashResult(nullptr, nullptr); } else { - auto hsink = createFinalHashmap({completedTasks.cbegin(), completedTasks.cend()}, tstage->hashtableKeyByteWidth(), combineOutputHashmaps); + auto hsink = createFinalHashmap({completedTasks.cbegin(), completedTasks.cend()}, + tstage->hashtableKeyByteWidth(), + combineOutputHashmaps, + syms->aggInitFunctor, + syms->aggCombineFunctor); tstage->setHashResult(hsink.hm, hsink.null_bucket); } break; @@ -1193,7 +1203,11 @@ namespace tuplex { std::vector LocalBackend::resolveViaSlowPath( std::vector &tasks, bool merge_rows_in_order, - codegen::resolve_f functor, tuplex::TransformStage *tstage, bool combineHashmaps) { + codegen::resolve_f functor, + tuplex::TransformStage *tstage, + bool combineHashmaps, + codegen::agg_init_f init_aggregate, + codegen::agg_combine_f combine_aggregate) { using namespace std; assert(tstage); @@ -1208,7 +1222,11 @@ namespace tuplex { // special case: create a global hash output result and put it into the FIRST resolve task. Timer timer; - hsink = createFinalHashmap({tasks.cbegin(), tasks.cend()}, tstage->hashtableKeyByteWidth(), combineHashmaps); + hsink = createFinalHashmap({tasks.cbegin(), tasks.cend()}, + tstage->hashtableKeyByteWidth(), + combineHashmaps, + init_aggregate, + combine_aggregate); logger().info("created combined normal-case result in " + std::to_string(timer.time()) + "s"); hasNormalHashSink = true; } @@ -1815,7 +1833,136 @@ namespace tuplex { } } - HashTableSink LocalBackend::createFinalHashmap(const std::vector& tasks, int hashtableKeyByteWidth, bool combine) { + struct apply_context { + map_t hm; + codegen::agg_init_f init_aggregate; + codegen::agg_combine_f combine_aggregate; + }; + static int apply_to_bucket(const apply_context* ctx, hashmap_element* entry) { + assert(ctx->hm); + auto key = entry->key; + auto keylen = entry->keylen; + auto data = (uint8_t*)entry->data; + + // update data + // initialize + uint8_t* init_val = nullptr; + int64_t init_size = 0; + ctx->init_aggregate(&init_val, &init_size); + + // call combine over null-bucket + // decode first bucket values! + // --> for aggregate by key a single value is stored there + uint64_t bucket_size = *(uint64_t*)data; + uint8_t* bucket_val = data + 8; + auto new_val = init_val; + auto new_size = init_size; + auto rc = ctx->combine_aggregate(&new_val, &new_size, bucket_val, bucket_size); + + // rc no 0? -> resolve! + if(rc != 0) { + std::cerr<<"combine function failed"<(malloc(new_size + 8)); + *(int64_t*)data = new_size; + memcpy(data + 8, new_val, new_size); + // assign to hashmap + entry->data = data; + free(old_ptr); + runtime::rtfree_all(); // combine aggregate allocates via runtime + return MAP_OK; + } + + void applyCombinePerGroup(HashTableSink& sink, int hashtableKeyByteWidth, codegen::agg_init_f init_aggregate, + codegen::agg_combine_f combine_aggregate) { + + // combineBuckets code: + // auto sizeA = *(int64_t*)bucketA; + // auto valA = static_cast(malloc(sizeA)); + // // TODO: when we convert everything to thread locals, we should change agg_combine_functor to match the size | value format of agg_aggregate_functor so that we can roll aggregate into aggregateByKey and just using the nullbucket + // memcpy(valA, bucketA + 8, sizeA); + // + // auto sizeB = *(uint64_t*)bucketB; + // auto valB = bucketB + 8; + // + // agg_combine_functor(&valA, &sizeA, valB, sizeB); + // + // // allocate the output buffer (should be avoided by the above TODO eventually) + // auto ret = static_cast(malloc(sizeA + 8)); + // *(int64_t*)ret = sizeA; + // memcpy(ret + 8, valA, sizeA); + // free(valA); free(bucketA); + // return ret; + + // apply to null bucket if it exists + if(sink.null_bucket) { + // initialize + uint8_t* init_val = nullptr; + int64_t init_size = 0; + init_aggregate(&init_val, &init_size); + + // call combine over null-bucket + // decode first bucket values! + // --> for aggregate by key a single value is stored there + uint64_t bucket_size = *(uint64_t*)sink.null_bucket; + uint8_t* bucket_val = sink.null_bucket + 8; + auto new_val = init_val; + auto new_size = init_size; + auto rc = combine_aggregate(&new_val, &new_size, bucket_val, bucket_size); + + // rc no 0? -> resolve! + if(rc != 0) { + std::cerr<<"combine function failed"<(malloc(new_size + 8)); + *(int64_t*)sink.null_bucket = new_size; + memcpy(sink.null_bucket + 8, new_val, new_size); + free(new_val); + free(old_ptr); + } + + if(hashtableKeyByteWidth == 8) { + // this dispatch is not great, should get refactored... + apply_context ctx; + ctx.hm = sink.hm; + ctx.init_aggregate = init_aggregate; + ctx.combine_aggregate = combine_aggregate; + int64_hashmap_iterate(sink.hm, reinterpret_cast(apply_to_bucket), &ctx); + } else { + // the regular, bytes based hashmap + apply_context ctx; + ctx.hm = sink.hm; + ctx.init_aggregate = init_aggregate; + ctx.combine_aggregate = combine_aggregate; + hashmap_iterate(sink.hm, reinterpret_cast(apply_to_bucket), &ctx); + } + + // TODO: missing is, need to apply UDFs to hybrid hashmap as well in case... + // --> should be a trivial function (?) + // @TODO. + } + + HashTableSink LocalBackend::createFinalHashmap(const std::vector& tasks, + int hashtableKeyByteWidth, + bool combine, + codegen::agg_init_f init_aggregate, + codegen::agg_combine_f combine_aggregate) { + assert(init_aggregate && combine_aggregate); + + // note: in order to preserve semantics on each group at least once the combine function has to be run. + // this can be achieved by running combine with the initial value + if(tasks.empty()) { HashTableSink sink; if(hashtableKeyByteWidth == 8) sink.hm = int64_hashmap_new(); @@ -1826,7 +1973,9 @@ namespace tuplex { // no merge necessary, just directly return result // fetch hash table from task assert(tasks.front()->type() == TaskType::UDFTRAFOTASK || tasks.front()->type() == TaskType::RESOLVE); - return getHashSink(tasks.front()); + auto sink = getHashSink(tasks.front()); + applyCombinePerGroup(sink, hashtableKeyByteWidth, init_aggregate, combine_aggregate); + return sink; } else { // @TODO: getHashSink should be updated to also work with hybrids. Yet, the merging of normal hashtables @@ -1865,6 +2014,8 @@ namespace tuplex { else hashmap_free(task_sink.hm); // remove hashmap (keys and buckets already handled) } + + applyCombinePerGroup(sink, hashtableKeyByteWidth, init_aggregate, combine_aggregate); return sink; } } diff --git a/tuplex/python/tuplex/dataset.py b/tuplex/python/tuplex/dataset.py index a2b8c0b33..53c85ff98 100644 --- a/tuplex/python/tuplex/dataset.py +++ b/tuplex/python/tuplex/dataset.py @@ -490,6 +490,7 @@ def toorc(self, path, part_size=0, num_rows=max_rows, num_parts=0, part_name_gen def aggregate(self, combine, aggregate, initial_value): """ + cf. aggregateByKey for details Args: combine: a UDF to combine two aggregates (results of the aggregate function or the initial_value) aggregate: a UDF which produces a result @@ -524,9 +525,12 @@ def aggregate(self, combine, aggregate, initial_value): def aggregateByKey(self, combine, aggregate, initial_value, key_columns): """ + An experimental aggregateByKey function similar to aggregate. There are several scenarios that do not work with this function yet and its performance hasn't been properly + optimized either. Data is grouped by the supplied key_columns. Then, for each group a new aggregate is initialized using the initial_value, which can be thought of as a neutral value. + The aggregate function is then called for each element and the current aggregate structure. It is guaranteed that the combine function is called at least once per group by applying the initial_value to the aggregate. Args: - combine: a UDF to combine two aggregates (results of the aggregate function or the initial_value) - aggregate: a UDF which produces a result + combine: a UDF to combine two aggregates (results of the aggregate function or the initial_value). E.g., cobmine = lambda agg1, agg2: agg1 + agg2. The initial value should be the neutral element. + aggregate: a UDF which produces a result by combining a value with the aggregate initialized by initial_value. E.g., aggreagte = lambda agg, value: agg + value sums up values. initial_value: a neutral initial value. key_columns: the columns to group the aggregate by Returns: diff --git a/tuplex/test/core/AggregateTest.cc b/tuplex/test/core/AggregateTest.cc index 68887b26b..cd8e8554d 100644 --- a/tuplex/test/core/AggregateTest.cc +++ b/tuplex/test/core/AggregateTest.cc @@ -342,6 +342,8 @@ TEST_F(AggregateTest, LargeAggregateByKey) { EXPECT_EQ(columns2[2], ""); // have the combiner drop some data + // --> this will work when semantics guarantee that combiner is executed at least ONCE per group. + auto combine3 = UDF("lambda a, b: (1112, a[1] + b[1])"); auto agg3 = UDF("lambda a, x: (a[0] + x[0], a[1] + x[2])"); auto& ds3 = c.csv("../resources/aggbykey_large_test.csv").aggregateByKey(combine3, agg3, Row(0, 0), {"col1"}); From c04c63fa1c535ac0972cd2599c3f1d9f275c6b7a Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Tue, 5 Jul 2022 15:51:12 -0400 Subject: [PATCH 06/32] memcpy fix --- tuplex/core/src/ee/local/LocalBackend.cc | 21 ++++++++++++++------- tuplex/core/src/physical/TransformStage.cc | 7 ++++++- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/tuplex/core/src/ee/local/LocalBackend.cc b/tuplex/core/src/ee/local/LocalBackend.cc index 2980efa98..ba0de02e7 100644 --- a/tuplex/core/src/ee/local/LocalBackend.cc +++ b/tuplex/core/src/ee/local/LocalBackend.cc @@ -1853,7 +1853,7 @@ namespace tuplex { // call combine over null-bucket // decode first bucket values! // --> for aggregate by key a single value is stored there - uint64_t bucket_size = *(uint64_t*)data; + int64_t bucket_size = *(int64_t*)data; uint8_t* bucket_val = data + 8; auto new_val = init_val; auto new_size = init_size; @@ -1864,17 +1864,24 @@ namespace tuplex { std::cerr<<"combine function failed"<(malloc(new_size + 8)); + *(int64_t*)new_data = new_size; + memcpy(new_data + 8, new_val, new_size); + + // free original aggregate (must come after data copy!) free(init_val); auto old_ptr = data; - // create new combined pointer - data = static_cast(malloc(new_size + 8)); - *(int64_t*)data = new_size; - memcpy(data + 8, new_val, new_size); + // assign to hashmap - entry->data = data; + entry->data = new_data; free(old_ptr); runtime::rtfree_all(); // combine aggregate allocates via runtime + + // // check + // uint8_t* bucket = nullptr; + //hashmap_get(ctx->hm, key, keylen, (void **) (&bucket)); + return MAP_OK; } diff --git a/tuplex/core/src/physical/TransformStage.cc b/tuplex/core/src/physical/TransformStage.cc index 1d4258c5f..906f8c203 100644 --- a/tuplex/core/src/physical/TransformStage.cc +++ b/tuplex/core/src/physical/TransformStage.cc @@ -411,7 +411,12 @@ namespace tuplex { return final_length; } - static size_t appendBucketAsPartition(std::vector> &rows, const uint8_t *buffer, uint64_t keylen, const char *key, const python::Type &keyType, const python::Type &aggType) { + static size_t appendBucketAsPartition(std::vector> &rows, + const uint8_t *buffer, + uint64_t keylen, + const char *key, + const python::Type &keyType, + const python::Type &aggType) { Serializer s; // get the key From d395e0929104ea054252825b8f4b68208ec2a604 Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Tue, 5 Jul 2022 15:58:22 -0400 Subject: [PATCH 07/32] another test getting fixed --- tuplex/core/src/ContextOptions.cc | 8 ++++---- tuplex/test/core/MetricsTest.cc | 5 ++++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/tuplex/core/src/ContextOptions.cc b/tuplex/core/src/ContextOptions.cc index 7dbb5136c..5823a4bd2 100644 --- a/tuplex/core/src/ContextOptions.cc +++ b/tuplex/core/src/ContextOptions.cc @@ -680,11 +680,11 @@ namespace tuplex { auto lookup_key = key; // if end, also check tuplex. version of it! - if(it == _store.end()) + if(it == _store.end()) { it = _store.find("tuplex." + key); - - if(it != _store.end()) - lookup_key = "tuplex." + key; + if(it != _store.end()) // success, so set key to tuplex. + key! + lookup_key = "tuplex." + key; + } // still not found? check lowercase versions if(it == _store.end()) { diff --git a/tuplex/test/core/MetricsTest.cc b/tuplex/test/core/MetricsTest.cc index 24d9564a7..f0f3e9c36 100644 --- a/tuplex/test/core/MetricsTest.cc +++ b/tuplex/test/core/MetricsTest.cc @@ -19,7 +19,10 @@ TEST_F(MetricsTest, BasicTest) { using namespace tuplex; using namespace std; - Context c(testOptions()); + auto co = microTestOptions(); + co.set("tuplex.useLLVMOptimizer", "true"); + EXPECT_TRUE(co.USE_LLVM_OPTIMIZER()); + Context c(co); vector ref; vector data; size_t N = 1000; From c63c4e996a0b2ea030ad570e4f4ac5d1fb95ee4f Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Tue, 5 Jul 2022 16:47:02 -0400 Subject: [PATCH 08/32] key fix --- tuplex/core/src/ee/local/LocalBackend.cc | 13 ++++++++++--- tuplex/core/src/physical/TransformStage.cc | 8 +++++++- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/tuplex/core/src/ee/local/LocalBackend.cc b/tuplex/core/src/ee/local/LocalBackend.cc index ba0de02e7..54816d7fb 100644 --- a/tuplex/core/src/ee/local/LocalBackend.cc +++ b/tuplex/core/src/ee/local/LocalBackend.cc @@ -1965,7 +1965,6 @@ namespace tuplex { bool combine, codegen::agg_init_f init_aggregate, codegen::agg_combine_f combine_aggregate) { - assert(init_aggregate && combine_aggregate); // note: in order to preserve semantics on each group at least once the combine function has to be run. // this can be achieved by running combine with the initial value @@ -1981,7 +1980,12 @@ namespace tuplex { // fetch hash table from task assert(tasks.front()->type() == TaskType::UDFTRAFOTASK || tasks.front()->type() == TaskType::RESOLVE); auto sink = getHashSink(tasks.front()); - applyCombinePerGroup(sink, hashtableKeyByteWidth, init_aggregate, combine_aggregate); + + // aggByKey or aggregate? + if(init_aggregate && combine_aggregate) { + applyCombinePerGroup(sink, hashtableKeyByteWidth, init_aggregate, combine_aggregate); + } + return sink; } else { @@ -2022,7 +2026,10 @@ namespace tuplex { hashmap_free(task_sink.hm); // remove hashmap (keys and buckets already handled) } - applyCombinePerGroup(sink, hashtableKeyByteWidth, init_aggregate, combine_aggregate); + // aggByKey or aggregate? + if(init_aggregate && combine_aggregate) { + applyCombinePerGroup(sink, hashtableKeyByteWidth, init_aggregate, combine_aggregate); + } return sink; } } diff --git a/tuplex/core/src/physical/TransformStage.cc b/tuplex/core/src/physical/TransformStage.cc index 906f8c203..ff99bf4a7 100644 --- a/tuplex/core/src/physical/TransformStage.cc +++ b/tuplex/core/src/physical/TransformStage.cc @@ -225,6 +225,12 @@ namespace tuplex { // hashKeyType is the type in which the key is stored. (NOT INCLUDING OPT!) python::Type hashKeyType = result.keyType; // remove option b.c. of null-bucket design. @TODO: this is not 100% correct, because inner options will also get sacrificed by this... + + // special case: If keyRowType is option or tuple with single content -> nullbucket is used! + if(hashKeyType.isOptionType()) + hashKeyType = hashKeyType.getReturnType(); + if(hashKeyType.isTupleType() && hashKeyType.parameters().size() == 1 && hashKeyType.parameters().front().isOptionType()) + hashKeyType = hashKeyType.parameters().front().getReturnType(); python::Type keyRowType = python::Type::propagateToTupleType(hashKeyType); bool requiresUpcast = false; @@ -329,7 +335,7 @@ namespace tuplex { while((key = hashmap_get_next_key(hashtable, &iterator, &keylen)) != nullptr) { Row r; - if(hashKeyType == python::Type::propagateToTupleType(python::Type::STRING)) { + if(hashKeyType == python::Type::STRING || hashKeyType == python::Type::propagateToTupleType(python::Type::STRING)) { // use directly key as str... std::string s(key); r = Row(s); From 42f4947618b0df92b8c2a25fde38032f94f9cdaa Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Tue, 5 Jul 2022 16:58:31 -0400 Subject: [PATCH 09/32] adding agg test --- tuplex/python/tests/test_aggregates.py | 50 ++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/tuplex/python/tests/test_aggregates.py b/tuplex/python/tests/test_aggregates.py index aef5f56fb..382db14dd 100644 --- a/tuplex/python/tests/test_aggregates.py +++ b/tuplex/python/tests/test_aggregates.py @@ -14,6 +14,8 @@ import random import numpy as np from tuplex import * +import typing +import os class TestAggregates(unittest.TestCase): def setUp(self): @@ -53,3 +55,51 @@ def test_sum_by_key(self): self.assertAlmostEqual(res[0][1], 10.0 - 4.5) self.assertAlmostEqual(res[1][1], 20.0) + + def test_311(self): + input_path = 'test_311_testfile.csv' + output_path = 'test_311_testfile.out.csv' + with open(input_path) as fp: + data = '''UniqueKey,CreatedDate,Agency,ComplaintType,Descriptor,IncidentZip,StreetName +46688741,06/30/2020 07:24:41 PM,NYPD,Noise - Residential,Loud Music/Party,10037.0,MADISON AVENUE +53493739,02/28/2022 07:30:31 PM,NYPD,Illegal Parking,Double Parked Blocking Traffic,11203.0,EAST 56 STREET +48262955,11/27/2020 12:00:00 PM,DSNY,Derelict Vehicles,Derelict Vehicles,11203.0,CLARKSON AVENUE +48262956,11/27/2020 12:00:00 PM,DSNY,Derelict Vehicles,Derelict Vehicles,11208.0,SHEPHERD AVENUE +48262957,11/27/2020 12:00:00 PM,DSNY,Derelict Vehicles,Derelict Vehicles,11238.0,BERGEN STREET +46688747,06/30/2020 02:51:45 PM,NYPD,Noise - Vehicle,Engine Idling,10009.0,EAST 12 STREET +46688748,06/30/2020 09:26:45 AM,NYPD,Non-Emergency Police Matter,Face Covering Violation,11204.0,20 AVENUE +48262973,11/27/2020 03:46:00 PM,DEP,Water Quality,unknown odor/taste in drinking water (QA6),10021.0,EAST 70 STREET +53493766,02/28/2022 05:28:38 AM,NYPD,Noise - Vehicle,Car/Truck Horn,11366.0,PARSONS BOULEVARD''' + fp.write(data) + + def fix_zip_codes(zips): + if not zips: + return None + # Truncate everything to length 5 + s = zips[:5] + + # Set 00000 zip codes to nan + if s == "00000": + return None + else: + return s + + ctx = Context(self.conf) + df = ctx.csv( + ",".join(input_path), + null_values=["Unspecified", "NO CLUE", "NA", "N/A", "0", ""], + type_hints={0: typing.Optional[str], + 1: typing.Optional[str], + 2: typing.Optional[str], + 3: typing.Optional[str], + 4: typing.Optional[str], + 5: typing.Optional[str], + }, + ) + # Do the pipeline + df = df.mapColumn("IncidentZip", fix_zip_codes).unique() + + # Output to csv + df.tocsv(output_path) + + self.assertTrue(os.path.isfile(output_path)) \ No newline at end of file From 77f5cfff0fd7302b9dfda7e1945fcc1d68999f78 Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Tue, 5 Jul 2022 17:21:47 -0400 Subject: [PATCH 10/32] multi-user test fix --- tuplex/test/core/TestUtils.h | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tuplex/test/core/TestUtils.h b/tuplex/test/core/TestUtils.h index b59ed2607..34150e899 100644 --- a/tuplex/test/core/TestUtils.h +++ b/tuplex/test/core/TestUtils.h @@ -67,7 +67,12 @@ class TuplexTest : public ::testing::Test { void SetUp() override { testName = std::string(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) + std::string(::testing::UnitTest::GetInstance()->current_test_info()->name()); - scratchDir = "/tmp/" + testName; + auto user = getUserName(); + if(user.empty()) { + std::cerr<<"could not retrieve user name, setting to user"< Date: Tue, 5 Jul 2022 17:22:48 -0400 Subject: [PATCH 11/32] compile fix --- tuplex/test/core/TestUtils.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tuplex/test/core/TestUtils.h b/tuplex/test/core/TestUtils.h index 34150e899..bbe23c52c 100644 --- a/tuplex/test/core/TestUtils.h +++ b/tuplex/test/core/TestUtils.h @@ -67,7 +67,7 @@ class TuplexTest : public ::testing::Test { void SetUp() override { testName = std::string(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) + std::string(::testing::UnitTest::GetInstance()->current_test_info()->name()); - auto user = getUserName(); + auto user = tuplex::getUserName(); if(user.empty()) { std::cerr<<"could not retrieve user name, setting to user"< Date: Tue, 5 Jul 2022 21:44:10 -0400 Subject: [PATCH 12/32] more python finding magic --- tuplex/CMakeLists.txt | 33 +++++- tuplex/cmake/FindPythonInterpreter.cmake | 131 +++++++++++++++++++++++ 2 files changed, 159 insertions(+), 5 deletions(-) create mode 100644 tuplex/cmake/FindPythonInterpreter.cmake diff --git a/tuplex/CMakeLists.txt b/tuplex/CMakeLists.txt index 512b74276..9c077d313 100755 --- a/tuplex/CMakeLists.txt +++ b/tuplex/CMakeLists.txt @@ -565,12 +565,35 @@ if(PYTHON3_VERSION STREQUAL "") message(STATUS "Found full python3-dev installation") set(Python3_Embed_FOUND TRUE) else() - find_package(Python3 COMPONENTS Interpreter REQUIRED) - # python3 -c 'import distutils.sysconfig; print(distutils.sysconfig.get_python_lib(plat_specific=False,standard_lib=True))' - # try to get get module libs at least + find_package(Python3 COMPONENTS Interpreter QUIET) + if(Python3_FOUND) + # python3 -c 'import distutils.sysconfig; print(distutils.sysconfig.get_python_lib(plat_specific=False,standard_lib=True))' + # try to get get module libs at least - # mark embed lib as not found - unset(Python3_Embed_FOUND) + # mark embed lib as not found + unset(Python3_Embed_FOUND) + else() + # use interpreter find script to detect python interpreter + include(FindPythonInterpreter) + find_python_interpreter(VERSION 3 + INTERPRETER_OUT_VAR PYTHON_INTERPRETER + VERSION_OUT_VAR PYTHON_VERSION + REQUIRED + ) + message(STATUS "Found interpreter ${PYTHON_INTERPRETER} version ${PYTHON_VERSION}") + set(Python3_EXECUTABLE ${PYTHON_INTERPRETER}) + set(PYTHON3_VERSION ${PYTHON_VERSION}) + unset(Python3_Embed_FOUND) + set(Python3_FOUND TRUE) + + # check if embed artifacts are there... + set(Python3_FIND_VIRTUALENV "FIRST") + find_package(Python3 COMPONENTS Interpreter Development QUIET) + if(Python3_FOUND) + message(STATUS "Found full python3-dev installation") + set(Python3_Embed_FOUND TRUE) + endif() + endif() endif() else() set(Python3_FIND_VIRTUALENV "FIRST") diff --git a/tuplex/cmake/FindPythonInterpreter.cmake b/tuplex/cmake/FindPythonInterpreter.cmake new file mode 100644 index 000000000..370f92fc0 --- /dev/null +++ b/tuplex/cmake/FindPythonInterpreter.cmake @@ -0,0 +1,131 @@ +# Copyright 2021, Robert Adam. All rights reserved. +# Use of this source code is governed by a BSD-style license +# that can be found in the LICENSE file at the root of the +# source tree. +# from https://github.com/Krzmbrzl/FindPythonInterpreter/blob/main/FindPythonInterpreter.cmake + +cmake_minimum_required(VERSION 3.5) + +function(find_python_interpreter) + set(options REQUIRED EXACT) + set(oneValueArgs VERSION INTERPRETER_OUT_VAR VERSION_OUT_VAR) + set(multiValueArgs HINTS) + cmake_parse_arguments(FIND_PYTHON_INTERPRETER "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN}) + + + # Error handling + if (FIND_PYTHON_INTERPRETER_UNPARSED_ARGUMENTS) + message(FATAL_ERROR "Unrecognized arguments to find_python_interpreter: \"${FIND_PYTHON_INTERPRETER_UNPARSED_ARGUMENTS}\"") + endif() + if (NOT FIND_PYTHON_INTERPRETER_INTERPRETER_OUT_VAR) + message(FATAL_ERROR "Called find_python_interpreter without the INTERPRETER_OUT_VAR parameter!") + endif() + if (FIND_PYTHON_INTERPRETER_EXACT AND NOT FIND_PYTHON_INTERPRETER_VERSION) + message(FATAL_ERROR "Specified EXACT but did not specify VERSION!") + endif() + + + # Defaults + if (NOT FIND_PYTHON_INTERPRETER_VERSION) + set(FIND_PYTHON_INTERPRETER_VERSION "0.0.0") + endif() + if (NOT FIND_PYTHON_INTERPRETER_HINTS) + set(FIND_PYTHON_INTERPRETER_HINTS "") + endif() + + + # Validate + if (NOT FIND_PYTHON_INTERPRETER_VERSION MATCHES "^[0-9]+(\.[0-9]+(\.[0-9]+)?)?$") + message(FATAL_ERROR "Invalid VERSION \"FIND_PYTHON_INTERPRETER_VERSION\" - must follow RegEx \"^[0-9]+(\.[0-9]+(\.[0-9]+)?)?$\"") + endif() + + + # "parse" version (first append 0.0.0 in case only a part of the version scheme was set by the user) + string(CONCAT VERSION_HELPER "${FIND_PYTHON_INTERPRETER_VERSION}" ".0.0.0") + string(REPLACE "." ";" VERSION_LIST "${VERSION_HELPER}") + list(GET VERSION_LIST 0 FIND_PYTHON_INTERPRETER_VERSION_MAJOR) + list(GET VERSION_LIST 1 FIND_PYTHON_INTERPRETER_VERSION_MINOR) + list(GET VERSION_LIST 1 FIND_PYTHON_INTERPRETER_VERSION_PATCH) + + + # Create names for the interpreter to search for + set(INTERPRETER_NAMES "") + if (FIND_PYTHON_INTERPRETER_VERSION_MAJOR STREQUAL "0") + # Search for either Python 2 or 3 + list(APPEND INTERPRETER_NAMES "python3") + list(APPEND INTERPRETER_NAMES "python") + list(APPEND INTERPRETER_NAMES "python2") + else() + # Search for specified version + list(APPEND INTERPRETER_NAMES "python${FIND_PYTHON_INTERPRETER_VERSION_MAJOR}") + list(APPEND INTERPRETER_NAMES "python") + + if (NOT FIND_PYTHON_INTERPRETER_VERSION_MINOR EQUAL 0) + list(PREPEND INTERPRETER_NAMES "python${FIND_PYTHON_INTERPRETER_VERSION_MAJOR}.${FIND_PYTHON_INTERPRETER_VERSION_MINOR}") + + if (NOT FIND_PYTHON_INTERPRETER_VERSION_PATCH EQUAL 0) + list(PREPEND INTERPRETER_NAMES + "python${FIND_PYTHON_INTERPRETER_VERSION_MAJOR}.${FIND_PYTHON_INTERPRETER_VERSION_MINOR}.${FIND_PYTHON_INTERPRETER_VERSION_PATCH}") + endif() + endif() + endif() + + + # Start by trying to search for a python executable in PATH and HINTS + find_program(PYTHON_INTERPRETER NAMES ${INTERPRETER_NAMES} HINTS ${FIND_PYTHON_INTERPRETER_HINTS}) + + + if (NOT PYTHON_INTERPRETER) + # Fall back to find_package + message(VERBOSE "Can't find Python interpreter in PATH -> Falling back to find_package") + if (FIND_PYTHON_INTERPRETER_VERSION_MAJOR EQUAL 0) + # Search arbitrary version + find_package(Python COMPONENTS Interpreter QUIET) + set(PYTHON_INTERPRETER "${Python_EXECUTABLE}") + else() + # Search specific version (Python 2 or 3) + find_package(Python${FIND_PYTHON_INTERPRETER_VERSION_MAJOR} COMPONENTS Interpreter QUIET) + set(PYTHON_INTERPRETER "${Python${FIND_PYTHON_INTERPRETER_VERSION_MAJOR}_EXECUTABLE}") + endif() + endif() + + + if (PYTHON_INTERPRETER) + # Verify that the version found is the one that is wanted + execute_process( + COMMAND ${PYTHON_INTERPRETER} "--version" + OUTPUT_VARIABLE PYTHON_INTERPRETER_VERSION + ERROR_VARIABLE PYTHON_INTERPRETER_VERSION # Python 2 reports the version on stderr + ) + + # Remove leading "Python " from version information + string(REPLACE "Python " "" PYTHON_INTERPRETER_VERSION "${PYTHON_INTERPRETER_VERSION}") + string(STRIP "${PYTHON_INTERPRETER_VERSION}" PYTHON_INTERPRETER_VERSION) + + + if (PYTHON_INTERPRETER_VERSION VERSION_LESS FIND_PYTHON_INTERPRETER_VERSION) + message(STATUS "Found Python version ${PYTHON_INTERPRETER_VERSION} but required at least ${FIND_PYTHON_INTERPRETER_VERSION}") + set(PYTHON_INTERPRETER "NOTFOUND") + set(PYTHON_INTERPRETER_VERSION "NOTFOUND") + elseif(PYTHON_INTERPRETER_VERSION VERSION_GREATER FIND_PYTHON_INTERPRETER_VERSION AND FIND_PYTHON_INTERPRETER_EXACT) + message(STATUS "Found Python interpreter version ${PYTHON_INTERPRETER_VERSION} but required exactly ${FIND_PYTHON_INTERPRETER_VERSION}") + set(PYTHON_INTERPRETER "NOTFOUND") + set(PYTHON_INTERPRETER_VERSION "NOTFOUND") + else() + message(STATUS "Found Python interpreter version ${PYTHON_INTERPRETER_VERSION}") + endif() + else() + set(PYTHON_INTERPRETER_VERSION "NOTFOUND") + endif() + + + # Set "return" values + set(${FIND_PYTHON_INTERPRETER_INTERPRETER_OUT_VAR} "${PYTHON_INTERPRETER}" PARENT_SCOPE) + if (FIND_PYTHON_INTERPRETER_VERSION_OUT_VAR) + set(${FIND_PYTHON_INTERPRETER_VERSION_OUT_VAR} "${PYTHON_INTERPRETER_VERSION}" PARENT_SCOPE) + endif() + + if (NOT PYTHON_INTERPRETER AND FIND_PYTHON_INTERPRETER_REQUIRED) + message(FATAL_ERROR "Did NOT find Python interpreter") + endif() +endfunction() From ceba0a45d53bdeee0710b32a647a49fb208e8809 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 5 Jul 2022 22:00:53 -0400 Subject: [PATCH 13/32] remove unused function from test --- tuplex/test/adapters/cpython/PythonHelperTest.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tuplex/test/adapters/cpython/PythonHelperTest.cc b/tuplex/test/adapters/cpython/PythonHelperTest.cc index d5874f1b7..68f78fb30 100644 --- a/tuplex/test/adapters/cpython/PythonHelperTest.cc +++ b/tuplex/test/adapters/cpython/PythonHelperTest.cc @@ -462,11 +462,14 @@ TEST_F(PythonHelperTest, SchemaEncoding) { EXPECT_EQ(python::PyString_AsString(w0), "typing.Callable[[int], str]"); } + TEST_F(PythonHelperTest, FindStdlib) { EXPECT_NE(python::python_version(), ""); - auto loc = python::find_stdlib_location(); - std::cout<<"Found python stdlib location to be in: "< Date: Tue, 5 Jul 2022 22:02:11 -0400 Subject: [PATCH 14/32] remove unused function --- .../adapters/cpython/include/PythonHelpers.h | 9 ----- tuplex/adapters/cpython/src/PythonHelpers.cc | 38 ------------------- .../test/adapters/cpython/PythonHelperTest.cc | 9 +---- 3 files changed, 1 insertion(+), 55 deletions(-) diff --git a/tuplex/adapters/cpython/include/PythonHelpers.h b/tuplex/adapters/cpython/include/PythonHelpers.h index e332f5f83..efb24cbea 100644 --- a/tuplex/adapters/cpython/include/PythonHelpers.h +++ b/tuplex/adapters/cpython/include/PythonHelpers.h @@ -99,15 +99,6 @@ namespace python { return version; } - /*! - * find python standardlib location. - * @param version python version string. E.g., "3" or "3.7". Per default search for version build against. - * @param prefix_list list of paths where to search for /lib/python pattern. - * @return string for first match found, empty string else - */ - std::string find_stdlib_location(const std::string& version=python_version(true, false), - const std::vector& prefix_list=std::vector{"/usr/local"}); - /*! * retrieves main module and loads cloudpickle module. Exits program if cloudpickle is not found. * @return module object holding the main module diff --git a/tuplex/adapters/cpython/src/PythonHelpers.cc b/tuplex/adapters/cpython/src/PythonHelpers.cc index 388d0ff7a..d84765889 100644 --- a/tuplex/adapters/cpython/src/PythonHelpers.cc +++ b/tuplex/adapters/cpython/src/PythonHelpers.cc @@ -52,44 +52,6 @@ namespace python { Py_SetPythonHome(&vec[0]); } - std::string find_stdlib_location(const std::string& version, - const std::vector& prefix_list) { - // check whether folder /lib exists - for(const auto& prefix : prefix_list) { - if(tuplex::dirExists(prefix + "/lib")) { - // list all entries under dir with python - auto paths = tuplex::glob(prefix + "/lib/python*"); - for(auto path : paths) { - auto original_path = path; - // only check for folders (glob appends /!) - if(!path.empty() && path.back() == '/') { - path = path.substr(0, path.length() - 1); - // find / to get python name - auto idx = path.rfind('/'); - if(idx < 0) - continue; - auto folder_name = path.substr(idx + 1); - - // starts with python? => should because of globbing! - auto py_len = std::string("python").length(); - if(folder_name.substr(0, py_len) == "python") { - // extract version - auto this_version = folder_name.substr(py_len); - // check if version starts with version string - if(version.empty()) - return original_path; - else if(this_version.substr(0, version.length()) == version) { - return original_path; - } - } - } - } - } - } - - return ""; - } - void handle_and_throw_py_error() { if(PyErr_Occurred()) { PyObject *ptype = NULL, *pvalue = NULL, *ptraceback = NULL; diff --git a/tuplex/test/adapters/cpython/PythonHelperTest.cc b/tuplex/test/adapters/cpython/PythonHelperTest.cc index 68f78fb30..9e2a50cf7 100644 --- a/tuplex/test/adapters/cpython/PythonHelperTest.cc +++ b/tuplex/test/adapters/cpython/PythonHelperTest.cc @@ -463,13 +463,6 @@ TEST_F(PythonHelperTest, SchemaEncoding) { } -TEST_F(PythonHelperTest, FindStdlib) { - +TEST_F(PythonHelperTest, VersionCheck) { EXPECT_NE(python::python_version(), ""); - - // deprecated, the function below is never really used and does not support - // a variety of common python interpreter installations (pyenv, virtualenv, conda, ...)... - // auto loc = python::find_stdlib_location(); - // std::cout<<"Found python stdlib location to be in: "< Date: Tue, 5 Jul 2022 22:18:11 -0400 Subject: [PATCH 15/32] fix for missing module, or import failure --- tuplex/core/src/UDF.cc | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/tuplex/core/src/UDF.cc b/tuplex/core/src/UDF.cc index f29bca0e5..a3b6234e1 100644 --- a/tuplex/core/src/UDF.cc +++ b/tuplex/core/src/UDF.cc @@ -462,18 +462,26 @@ namespace tuplex { // for C-API ref check https://docs.python.org/3/library/functions.html#__import__ // modules first - for(auto m : ce.modules()) { + for(const auto& m : ce.modules()) { + std::stringstream ss; + ss<<"failed to import module " + m.original_identifier; + if(m.identifier != m.original_identifier) + ss<<" as "< Date: Tue, 5 Jul 2022 22:18:31 -0400 Subject: [PATCH 16/32] typo --- tuplex/core/src/UDF.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tuplex/core/src/UDF.cc b/tuplex/core/src/UDF.cc index a3b6234e1..9443930cc 100644 --- a/tuplex/core/src/UDF.cc +++ b/tuplex/core/src/UDF.cc @@ -477,7 +477,7 @@ namespace tuplex { PyErr_Print(); PyErr_Clear(); python::unlockGIL(); - throw std::runtime_error(ss.str()r); + throw std::runtime_error(ss.str()); } } // then functions From 62b3effeb71fe9a0e4e83210a95080cd5bfdc5a5 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 5 Jul 2022 22:56:11 -0400 Subject: [PATCH 17/32] another mod import fallback --- tuplex/core/src/UDF.cc | 5 +++++ tuplex/test/core/ClosureTest.cc | 1 - 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/tuplex/core/src/UDF.cc b/tuplex/core/src/UDF.cc index 9443930cc..da30d0e57 100644 --- a/tuplex/core/src/UDF.cc +++ b/tuplex/core/src/UDF.cc @@ -468,7 +468,12 @@ namespace tuplex { if(m.identifier != m.original_identifier) ss<<" as "< Date: Tue, 5 Jul 2022 23:11:27 -0400 Subject: [PATCH 18/32] get rid off printing unless wrong access --- tuplex/core/src/Partition.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tuplex/core/src/Partition.cc b/tuplex/core/src/Partition.cc index ea08a65f1..c5b9c4bfc 100644 --- a/tuplex/core/src/Partition.cc +++ b/tuplex/core/src/Partition.cc @@ -55,7 +55,7 @@ namespace tuplex { uint8_t* Partition::lockWriteRaw(bool allowForeignOwnerAccess) { // must be the thread who allocated this - if(!allowForeignOwnerAccess) { + if(!allowForeignOwnerAccess && _owner->getThreadID() != std::this_thread::get_id()) { _owner->error("non-owner thread accessing partition"); assert(_owner->getThreadID() == std::this_thread::get_id()); } From 8f8dc99ca43179055459917fdc10c90ca8a1cf41 Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Tue, 5 Jul 2022 23:44:08 -0400 Subject: [PATCH 19/32] adding a dedicated test for bug99 --- tuplex/test/wrappers/WrapperTest.cc | 76 +++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/tuplex/test/wrappers/WrapperTest.cc b/tuplex/test/wrappers/WrapperTest.cc index 5b51e1aaa..6e9548a1b 100644 --- a/tuplex/test/wrappers/WrapperTest.cc +++ b/tuplex/test/wrappers/WrapperTest.cc @@ -2610,6 +2610,82 @@ TEST_F(WrapperTest, TracingVisitorError) { } } +// as reported in https://github.com/tuplex/tuplex/issues/99 +TEST_F(WrapperTest, AllRows311) { + using namespace tuplex; + + std::string test_data = "UniqueKey,CreatedDate,Agency,ComplaintType,Descriptor,IncidentZip,StreetName\n" + "46688741,06/30/2020 07:24:41 PM,NYPD,Noise - Residential,Loud Music/Party,10037.0,MADISON AVENUE\n" + "53493739,02/28/2022 07:30:31 PM,NYPD,Illegal Parking,Double Parked Blocking Traffic,11203.0,EAST 56 STREET\n" + "48262955,11/27/2020 12:00:00 PM,DSNY,Derelict Vehicles,Derelict Vehicles,11203.0,CLARKSON AVENUE\n" + "48262956,11/27/2020 12:00:00 PM,DSNY,Derelict Vehicles,Derelict Vehicles,11208.0,SHEPHERD AVENUE\n" + "48262957,11/27/2020 12:00:00 PM,DSNY,Derelict Vehicles,Derelict Vehicles,11238.0,BERGEN STREET\n" + "46688747,06/30/2020 02:51:45 PM,NYPD,Noise - Vehicle,Engine Idling,10009.0,EAST 12 STREET\n" + "46688748,06/30/2020 09:26:45 AM,NYPD,Non-Emergency Police Matter,Face Covering Violation,11204.0,20 AVENUE\n" + "48262973,11/27/2020 03:46:00 PM,DEP,Water Quality,unknown odor/taste in drinking water (QA6),10021.0,EAST 70 STREET\n" + "53493766,02/28/2022 05:28:38 AM,NYPD,Noise - Vehicle,Car/Truck Horn,11366.0,PARSONS BOULEVARD\n"; + + // write test data out to test path + std::string input_path = this->testName + "_test_311_testfile.csv"; + std::string output_path = this->testName + "_test_311_output.csv"; + + stringToFile(input_path, test_data); + + auto udf_code = "def fix_zip_codes(zips):\n" + " if not zips:\n" + " return None\n" + " # Truncate everything to length 5\n" + " s = zips[:5]\n" + "\n" + " # Set 00000 zip codes to nan\n" + " if s == \"00000\":\n" + " return None\n" + " else:\n" + " return s"; + + auto ctx_opts = "{\"webui.enable\": false," + " \"driverMemory\": \"8MB\"," + " \"partitionSize\": \"256KB\"," + "\"executorCount\": 0," + "\"tuplex.scratchDir\": \"file://" + scratchDir + "\"," + "\"resolveWithInterpreterOnly\": true}"; + + // null_values=["Unspecified", "NO CLUE", "NA", "N/A", "0", ""], + // type_hints={0: typing.Optional[str], + // 1: typing.Optional[str], + // 2: typing.Optional[str], + // 3: typing.Optional[str], + // 4: typing.Optional[str], + // 5: typing.Optional[str], + // } + + PythonContext ctx("", "", ctx_opts); + { + auto null_values_obj = PyList_New(6); + PyList_SetItem(null_values_obj, 0, python::PyString_FromString("Unspecified")); + PyList_SetItem(null_values_obj, 1, python::PyString_FromString("NO CLUE")); + PyList_SetItem(null_values_obj, 2, python::PyString_FromString("NA")); + PyList_SetItem(null_values_obj, 3, python::PyString_FromString("N/A")); + PyList_SetItem(null_values_obj, 4, python::PyString_FromString("0")); + PyList_SetItem(null_values_obj, 5, python::PyString_FromString("")); + + auto type_hints_obj = PyDict_New(); + for(unsigned i = 0; i <= 5; ++i) + PyDict_SetItem(type_hints_obj, PyLong_FromLong(i), python::runAndGet("import typing; x=typing.Optional[str]", "x")); + + + auto null_values = py::reinterpret_borrow(null_values_obj); + auto type_hints = py::reinterpret_borrow(type_hints_obj); + + auto res = ctx.csv(input_path, py::none(), true, false, "", "\"", null_values, type_hints) + .mapColumn("Incident Zip", udf_code, "") + .unique() + .collect(); + auto resObj = res.ptr(); + ASSERT_TRUE(PyList_Check(resObj)); + EXPECT_GE(PyList_Size(resObj), 1); + } +} //// debug any python module... ///** Takes a path and adds it to sys.paths by calling PyRun_SimpleString. From cd101bd666577b908e7fe5d0ddae903c391999ac Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Tue, 5 Jul 2022 23:57:34 -0400 Subject: [PATCH 20/32] adding typing.Optional[...] support and catch errors when extracting type annotations in CSV --- tuplex/adapters/cpython/src/PythonHelpers.cc | 10 ++++++- tuplex/python/src/PythonContext.cc | 30 +++++++++++++++++--- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/tuplex/adapters/cpython/src/PythonHelpers.cc b/tuplex/adapters/cpython/src/PythonHelpers.cc index d84765889..54a47d9b2 100644 --- a/tuplex/adapters/cpython/src/PythonHelpers.cc +++ b/tuplex/adapters/cpython/src/PythonHelpers.cc @@ -1703,8 +1703,16 @@ namespace python { ": only Optional unions are understood right now"); } } else { - throw std::runtime_error("Tuplex can't understand typing module annotation " + typeStr); + + // typing.Optional[...] is a python3.9 feature (before equivalent to Union[None, T] + if(strStartsWith(typeStr, "typing.Optional")) { + python::Type elementType = decodePythonSchema(PyList_GetItem(args, 0)); + return python::Type::makeOptionType(elementType); + } } + + // unknown typing module annotation + throw std::runtime_error("Tuplex can't understand typing module annotation " + typeStr); } return python::Type::UNKNOWN; diff --git a/tuplex/python/src/PythonContext.cc b/tuplex/python/src/PythonContext.cc index 35e387e11..aed9dfa49 100644 --- a/tuplex/python/src/PythonContext.cc +++ b/tuplex/python/src/PythonContext.cc @@ -1157,11 +1157,33 @@ namespace tuplex { assert(PyGILState_Check()); // make sure this thread holds the GIL! // extract columns (if not none) - auto columns = extractFromListOfStrings(cols.ptr(), "columns "); - auto null_value_strs = extractFromListOfStrings(null_values.ptr(), "null_values "); - auto type_idx_hints_c = extractIndexBasedTypeHints(type_hints.ptr(), columns, "type_hints "); - auto type_col_hints_c = extractColumnBasedTypeHints(type_hints.ptr(), columns, "type_hints "); + std::vector columns; + std::vector null_value_strs; + std::unordered_map type_idx_hints_c; + std::unordered_map type_col_hints_c; + try { + columns = extractFromListOfStrings(cols.ptr(), "columns "); + null_value_strs = extractFromListOfStrings(null_values.ptr(), "null_values "); + type_idx_hints_c = extractIndexBasedTypeHints(type_hints.ptr(), columns, "type_hints "); + type_col_hints_c = extractColumnBasedTypeHints(type_hints.ptr(), columns, "type_hints "); + } catch(const std::exception& e) { + err_message = e.what(); + Logger::instance().defaultLogger().error(err_message); + } catch(...) { + err_message = "unknown C++ exception occurred, please change type."; + Logger::instance().defaultLogger().error(err_message); + } + + if!err_message.empty()) { + Logger::instance().flushAll(); + assert(_context); + ds = &_context->makeError(err_message); + pds.wrap(ds); + Logger::instance().flushToPython(); + return pds; + } + // internal Tuplex API python::unlockGIL(); DataSet *ds = nullptr; std::string err_message = ""; From c76c78940d633b0eef6e894e2f49136f2fcf3f95 Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Tue, 5 Jul 2022 23:58:56 -0400 Subject: [PATCH 21/32] compile fix --- tuplex/python/src/PythonContext.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tuplex/python/src/PythonContext.cc b/tuplex/python/src/PythonContext.cc index aed9dfa49..0b8048439 100644 --- a/tuplex/python/src/PythonContext.cc +++ b/tuplex/python/src/PythonContext.cc @@ -1141,6 +1141,8 @@ namespace tuplex { return makeError("job aborted via signal"); PythonDataSet pds; + DataSet *ds = nullptr; + std::string err_message = ""; //#ifndef NDEBUG // using namespace std; @@ -1153,7 +1155,6 @@ namespace tuplex { assert(quotechar.size() == 1); assert(delimiter.size() <= 1); - assert(PyGILState_Check()); // make sure this thread holds the GIL! // extract columns (if not none) @@ -1185,8 +1186,6 @@ namespace tuplex { // internal Tuplex API python::unlockGIL(); - DataSet *ds = nullptr; - std::string err_message = ""; try { ds = &_context->csv(pattern, columns, autodetect_header ? option::none : option(header), delimiter.empty() ? option::none : option(delimiter[0]), From b8af607d0ff7d9d1712b9a0257d6c0a12aa9dbf5 Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Tue, 5 Jul 2022 23:59:53 -0400 Subject: [PATCH 22/32] typo --- tuplex/python/src/PythonContext.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tuplex/python/src/PythonContext.cc b/tuplex/python/src/PythonContext.cc index 0b8048439..f32be094d 100644 --- a/tuplex/python/src/PythonContext.cc +++ b/tuplex/python/src/PythonContext.cc @@ -1175,7 +1175,7 @@ namespace tuplex { Logger::instance().defaultLogger().error(err_message); } - if!err_message.empty()) { + if(!err_message.empty()) { Logger::instance().flushAll(); assert(_context); ds = &_context->makeError(err_message); From e7b58c87d159278a0c4d0ce00607abdebd92db6e Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Wed, 6 Jul 2022 00:05:32 -0400 Subject: [PATCH 23/32] optional fix --- tuplex/adapters/cpython/src/PythonHelpers.cc | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tuplex/adapters/cpython/src/PythonHelpers.cc b/tuplex/adapters/cpython/src/PythonHelpers.cc index 54a47d9b2..22c95bfe0 100644 --- a/tuplex/adapters/cpython/src/PythonHelpers.cc +++ b/tuplex/adapters/cpython/src/PythonHelpers.cc @@ -1679,7 +1679,7 @@ namespace python { } else if(strStartsWith(typeStr, "typing.List")) { python::Type elementType = decodePythonSchema(PyList_GetItem(args, 0)); return python::Type::makeListType(elementType); - } else if(strStartsWith(typeStr, "typing.Union")) { + } else if(strStartsWith(typeStr, "typing.Union") || strStartsWith(typeStr, "typing.Optional")) { if(PyTuple_Size(args) == 2) { auto c1 = PyTuple_GetItem(args, 0); auto c2 = PyTuple_GetItem(args, 1); @@ -1704,11 +1704,9 @@ namespace python { } } else { - // typing.Optional[...] is a python3.9 feature (before equivalent to Union[None, T] - if(strStartsWith(typeStr, "typing.Optional")) { - python::Type elementType = decodePythonSchema(PyList_GetItem(args, 0)); - return python::Type::makeOptionType(elementType); - } + // whichever other typing annotations to decode... + + // Add them here... } // unknown typing module annotation From 2db5470152c059e08f0bc1ac4fd21918774ecad4 Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Wed, 6 Jul 2022 00:10:56 -0400 Subject: [PATCH 24/32] test fix --- tuplex/test/wrappers/WrapperTest.cc | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tuplex/test/wrappers/WrapperTest.cc b/tuplex/test/wrappers/WrapperTest.cc index 6e9548a1b..88770d3d1 100644 --- a/tuplex/test/wrappers/WrapperTest.cc +++ b/tuplex/test/wrappers/WrapperTest.cc @@ -2678,12 +2678,16 @@ TEST_F(WrapperTest, AllRows311) { auto type_hints = py::reinterpret_borrow(type_hints_obj); auto res = ctx.csv(input_path, py::none(), true, false, "", "\"", null_values, type_hints) - .mapColumn("Incident Zip", udf_code, "") + .mapColumn("IncidentZip", udf_code, "") .unique() .collect(); auto resObj = res.ptr(); ASSERT_TRUE(PyList_Check(resObj)); - EXPECT_GE(PyList_Size(resObj), 1); + EXPECT_GE(PyList_Size(resObj), 2); + + // print result out + PyObject_Print(resObj, stdout, 0); + std::cout< Date: Wed, 6 Jul 2022 11:23:41 -0400 Subject: [PATCH 25/32] fix test --- tuplex/python/tests/test_aggregates.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tuplex/python/tests/test_aggregates.py b/tuplex/python/tests/test_aggregates.py index 382db14dd..f36fdae02 100644 --- a/tuplex/python/tests/test_aggregates.py +++ b/tuplex/python/tests/test_aggregates.py @@ -59,7 +59,7 @@ def test_sum_by_key(self): def test_311(self): input_path = 'test_311_testfile.csv' output_path = 'test_311_testfile.out.csv' - with open(input_path) as fp: + with open(input_path, 'w') as fp: data = '''UniqueKey,CreatedDate,Agency,ComplaintType,Descriptor,IncidentZip,StreetName 46688741,06/30/2020 07:24:41 PM,NYPD,Noise - Residential,Loud Music/Party,10037.0,MADISON AVENUE 53493739,02/28/2022 07:30:31 PM,NYPD,Illegal Parking,Double Parked Blocking Traffic,11203.0,EAST 56 STREET @@ -85,8 +85,7 @@ def fix_zip_codes(zips): return s ctx = Context(self.conf) - df = ctx.csv( - ",".join(input_path), + df = ctx.csv(input_path, null_values=["Unspecified", "NO CLUE", "NA", "N/A", "0", ""], type_hints={0: typing.Optional[str], 1: typing.Optional[str], @@ -100,6 +99,7 @@ def fix_zip_codes(zips): df = df.mapColumn("IncidentZip", fix_zip_codes).unique() # Output to csv + output_path_part0 = 'test_311_testfile.out.part0.csv' df.tocsv(output_path) - self.assertTrue(os.path.isfile(output_path)) \ No newline at end of file + self.assertTrue(os.path.isfile(output_path_part0)) From 84b64464b9615bcbd4d7a1af326e1bb6f8af6208 Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Wed, 6 Jul 2022 11:49:45 -0400 Subject: [PATCH 26/32] another code extract fallback using new ast.unparse feature --- tuplex/python/tuplex/utils/source_vault.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/tuplex/python/tuplex/utils/source_vault.py b/tuplex/python/tuplex/utils/source_vault.py index ddc46d47f..22168c48c 100644 --- a/tuplex/python/tuplex/utils/source_vault.py +++ b/tuplex/python/tuplex/utils/source_vault.py @@ -12,6 +12,7 @@ import ast import astor import os +import sys from types import LambdaType, CodeType import logging @@ -59,7 +60,19 @@ def gen_code_for_lambda(lam): return s.strip()[1:-1] except Exception as e: - logging.debug('gen_code_for_lambda failed with {}'.format(e)) + print('gen_code_for_lambda failed with {}'.format(e)) + logging.debug('gen_code_for_lambda via astor failed with {}'.format(e)) + + # python3.9+ has ast.unparse + if sys.version_info.major >= 3 and sys.version_info.minor >= 9: + import ast + + try: + s = ast.unparse(lam) + return s + except Exception as e: + logging.debug('gen_code_for_lambda via ast (python3.9+) failed with {}'.format(e)) + return '' @@ -275,6 +288,8 @@ def extractAndPutAllLambdas(self, src_info, filename, lineno, colno, globals): else: self.lambdaFileDict[key] = [entry] else: + print('there are {} Lambdas'.format(len(Lams))) + # check that there are no globals when extracting function! if colno is None and len(globals) != 0: raise Exception('Found more than one lambda expression on {}:+{}. Either use ' @@ -315,4 +330,4 @@ def extractAndPutAllLambdas(self, src_info, filename, lineno, colno, globals): if key in self.lambdaFileDict.keys(): self.lambdaFileDict[key].append(entry) else: - self.lambdaFileDict[key] = [entry] \ No newline at end of file + self.lambdaFileDict[key] = [entry] From 92b1d7ce3228b836a8601247e2da6737de88a825 Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 6 Jul 2022 13:39:08 -0400 Subject: [PATCH 27/32] wip, failing test --- tuplex/test/wrappers/WrapperTest.cc | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tuplex/test/wrappers/WrapperTest.cc b/tuplex/test/wrappers/WrapperTest.cc index 5b51e1aaa..6bb35b986 100644 --- a/tuplex/test/wrappers/WrapperTest.cc +++ b/tuplex/test/wrappers/WrapperTest.cc @@ -2610,6 +2610,16 @@ TEST_F(WrapperTest, TracingVisitorError) { } } +// def test_sum_by_key(self): +// c = Context(self.conf) +// +// data = [(0, 10.0), (1, 20.0), (0, -4.5)] +// +// res = c.parallelize(data, columns=['id', 'volume']).aggregateByKey(lambda a, b: a + b, +// lambda a, x: a + x['volume'], +// 0.0, +// ['id']).collect() + //// debug any python module... ///** Takes a path and adds it to sys.paths by calling PyRun_SimpleString. From 7ae4b77be029c8d9d26067c963f95b095e6f83f7 Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 6 Jul 2022 14:11:46 -0400 Subject: [PATCH 28/32] fixes --- tuplex/adapters/cpython/src/PythonHelpers.cc | 2 + tuplex/python/include/PythonDataSet.h | 8 ++- tuplex/python/src/PythonDataSet.cc | 9 ++- tuplex/python/tuplex/dataset.py | 6 +- tuplex/python/tuplex/utils/source_vault.py | 1 - tuplex/test/wrappers/WrapperTest.cc | 59 +++++++++++++++++--- 6 files changed, 71 insertions(+), 14 deletions(-) diff --git a/tuplex/adapters/cpython/src/PythonHelpers.cc b/tuplex/adapters/cpython/src/PythonHelpers.cc index 22c95bfe0..11b22cb51 100644 --- a/tuplex/adapters/cpython/src/PythonHelpers.cc +++ b/tuplex/adapters/cpython/src/PythonHelpers.cc @@ -1740,6 +1740,8 @@ namespace python { #warning "better python error formatting needed!" } + // important to incref! + Py_XINCREF(obj); return obj; } diff --git a/tuplex/python/include/PythonDataSet.h b/tuplex/python/include/PythonDataSet.h index 665d68856..169712493 100644 --- a/tuplex/python/include/PythonDataSet.h +++ b/tuplex/python/include/PythonDataSet.h @@ -103,11 +103,15 @@ namespace tuplex { PythonDataSet aggregate(const std::string& comb, const std::string& comb_pickled, const std::string& agg, const std::string& agg_pickled, - const std::string& initial_value_pickled, const py::object& comb_closure=py::object(), const py::object& agg_closure=py::object()); + const std::string& initial_value_pickled, + const py::object& comb_closure=py::object(), + const py::object& agg_closure=py::object()); PythonDataSet aggregateByKey(const std::string& comb, const std::string& comb_pickled, const std::string& agg, const std::string& agg_pickled, - const std::string& initial_value_pickled, py::list columns); + const std::string& initial_value_pickled, py::list columns, + const py::object& comb_closure=py::object(), + const py::object& agg_closure=py::object()); // returns list of strings or empty list py::list columns(); diff --git a/tuplex/python/src/PythonDataSet.cc b/tuplex/python/src/PythonDataSet.cc index 3dd65d262..96b550431 100644 --- a/tuplex/python/src/PythonDataSet.cc +++ b/tuplex/python/src/PythonDataSet.cc @@ -419,7 +419,9 @@ namespace tuplex { PythonDataSet PythonDataSet::aggregateByKey(const std::string& comb, const std::string& comb_pickled, const std::string& agg, const std::string& agg_pickled, - const std::string& initial_value_pickled, py::list columns) { + const std::string& initial_value_pickled, py::list columns, + const py::object& comb_closure, + const py::object& agg_closure) { using namespace std; // @TODO: warning if udfs are wrongly submitted @@ -431,6 +433,11 @@ namespace tuplex { return pds; } + PyObject* combClosureObject = comb_closure.ptr(); + PyObject* aggClosureObject = agg_closure.ptr(); + auto combCE = closureFromDict(combClosureObject); + auto aggCE = closureFromDict(aggClosureObject); + // parse pickled initial_value if(initial_value_pickled.empty()) { PythonDataSet pds; diff --git a/tuplex/python/tuplex/dataset.py b/tuplex/python/tuplex/dataset.py index 53c85ff98..fecd257ee 100644 --- a/tuplex/python/tuplex/dataset.py +++ b/tuplex/python/tuplex/dataset.py @@ -553,10 +553,14 @@ def aggregateByKey(self, combine, aggregate, initial_value, key_columns): except UDFCodeExtractionError as e: logging.warn('Could not extract code for aggregate UDF {}. Details:\n{}'.format(ftor, e)) + g_comb = get_globals(combine) + g_agg = get_globals(aggregate) + ds = DataSet() ds._dataSet = self._dataSet.aggregateByKey(comb_code, comb_code_pickled, agg_code, agg_code_pickled, - cloudpickle.dumps(initial_value), key_columns) + cloudpickle.dumps(initial_value), key_columns, + g_comb, g_agg) return ds @property diff --git a/tuplex/python/tuplex/utils/source_vault.py b/tuplex/python/tuplex/utils/source_vault.py index 22168c48c..1c79f6841 100644 --- a/tuplex/python/tuplex/utils/source_vault.py +++ b/tuplex/python/tuplex/utils/source_vault.py @@ -60,7 +60,6 @@ def gen_code_for_lambda(lam): return s.strip()[1:-1] except Exception as e: - print('gen_code_for_lambda failed with {}'.format(e)) logging.debug('gen_code_for_lambda via astor failed with {}'.format(e)) # python3.9+ has ast.unparse diff --git a/tuplex/test/wrappers/WrapperTest.cc b/tuplex/test/wrappers/WrapperTest.cc index c31de57de..95c89c47b 100644 --- a/tuplex/test/wrappers/WrapperTest.cc +++ b/tuplex/test/wrappers/WrapperTest.cc @@ -2610,16 +2610,57 @@ TEST_F(WrapperTest, TracingVisitorError) { } } +TEST_F(WrapperTest, SumByKey) { + using namespace tuplex; -// def test_sum_by_key(self): -// c = Context(self.conf) -// -// data = [(0, 10.0), (1, 20.0), (0, -4.5)] -// -// res = c.parallelize(data, columns=['id', 'volume']).aggregateByKey(lambda a, b: a + b, -// lambda a, x: a + x['volume'], -// 0.0, -// ['id']).collect() + // def test_sum_by_key(self): + // c = Context(self.conf) + // + // data = [(0, 10.0), (1, 20.0), (0, -4.5)] + // + // res = c.parallelize(data, columns=['id', 'volume']).aggregateByKey(lambda a, b: a + b, + // lambda a, x: a + x['volume'], + // 0.0, + // ['id']).collect() + + auto ctx_opts = "{\"webui.enable\": false," + " \"driverMemory\": \"8MB\"," + " \"partitionSize\": \"256KB\"," + "\"executorCount\": 0," + "\"tuplex.scratchDir\": \"file://" + scratchDir + "\"," + "\"resolveWithInterpreterOnly\": true}"; + + + PythonContext ctx("", "", ctx_opts); + { + auto values_obj = PyList_New(3); + PyList_SetItem(values_obj, 0, python::runAndGet("L = (0, 10.0)", "L")); + PyList_SetItem(values_obj, 1, python::runAndGet("L = (1, 20.0)", "L")); + PyList_SetItem(values_obj, 2, python::runAndGet("L = (0, -4.0)", "L")); + + auto columns_obj = python::runAndGet("L = ['id', 'volume']", "L"); + auto key_columns_obj = python::runAndGet("['id']", "L"); + auto pickled_val = python::pickleObject(python::getMainModule(), PyFloat_FromDouble(0.0)); + + auto values = py::reinterpret_borrow(values_obj); + auto columns = py::reinterpret_borrow(columns_obj); + auto key_columns = py::reinterpret_borrow(key_columns_obj); + + auto res = ctx.parallelize(values, columns) + .aggregateByKey("lambda a, b: a + b", "", + "lambda a, x: a +x['volume']", "", + pickled_val, + key_columns) + .collect(); + auto resObj = res.ptr(); + ASSERT_TRUE(PyList_Check(resObj)); + EXPECT_GE(PyList_Size(resObj), 2); + + // print result out + PyObject_Print(resObj, stdout, 0); + std::cout< Date: Wed, 6 Jul 2022 14:33:30 -0400 Subject: [PATCH 29/32] fixing wrong memory deref --- tuplex/core/src/ee/local/LocalBackend.cc | 57 +++++++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/tuplex/core/src/ee/local/LocalBackend.cc b/tuplex/core/src/ee/local/LocalBackend.cc index 54816d7fb..39f4bc8c0 100644 --- a/tuplex/core/src/ee/local/LocalBackend.cc +++ b/tuplex/core/src/ee/local/LocalBackend.cc @@ -1844,6 +1844,61 @@ namespace tuplex { auto keylen = entry->keylen; auto data = (uint8_t*)entry->data; + // if no data found, no need to apply func. + if(!data) + return MAP_OK; + + // update data + // initialize + uint8_t* init_val = nullptr; + int64_t init_size = 0; + ctx->init_aggregate(&init_val, &init_size); + + // call combine over null-bucket + // decode first bucket values! + // --> for aggregate by key a single value is stored there + int64_t bucket_size = *(int64_t*)data; + uint8_t* bucket_val = data + 8; + auto new_val = init_val; + auto new_size = init_size; + auto rc = ctx->combine_aggregate(&new_val, &new_size, bucket_val, bucket_size); + + // rc no 0? -> resolve! + if(rc != 0) { + std::cerr<<"combine function failed"<(malloc(new_size + 8)); + *(int64_t*)new_data = new_size; + memcpy(new_data + 8, new_val, new_size); + + // free original aggregate (must come after data copy!) + free(init_val); + auto old_ptr = data; + + // assign to hashmap + entry->data = new_data; + free(old_ptr); + runtime::rtfree_all(); // combine aggregate allocates via runtime + + // // check + // uint8_t* bucket = nullptr; + //hashmap_get(ctx->hm, key, keylen, (void **) (&bucket)); + + return MAP_OK; + } + + // why two versions?? + static int apply_to_bucket_i64(const apply_context* ctx, int64_hashmap_element* entry) { + assert(ctx->hm); + auto key = entry->key; + auto data = (uint8_t*)entry->data; + + // if no data found, no need to apply func. + if(!data) + return MAP_OK; + // update data // initialize uint8_t* init_val = nullptr; @@ -1945,7 +2000,7 @@ namespace tuplex { ctx.hm = sink.hm; ctx.init_aggregate = init_aggregate; ctx.combine_aggregate = combine_aggregate; - int64_hashmap_iterate(sink.hm, reinterpret_cast(apply_to_bucket), &ctx); + int64_hashmap_iterate(sink.hm, reinterpret_cast(apply_to_bucket_i64), &ctx); } else { // the regular, bytes based hashmap apply_context ctx; From 910f06cb74190ffe2fe872086c7333da009331db Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 6 Jul 2022 15:27:48 -0400 Subject: [PATCH 30/32] cleanup --- tuplex/codegen/include/HashHelper.h | 47 ---------------------- tuplex/codegen/src/HashHelper.cc | 15 ------- tuplex/python/tuplex/utils/source_vault.py | 4 -- 3 files changed, 66 deletions(-) delete mode 100644 tuplex/codegen/include/HashHelper.h delete mode 100644 tuplex/codegen/src/HashHelper.cc diff --git a/tuplex/codegen/include/HashHelper.h b/tuplex/codegen/include/HashHelper.h deleted file mode 100644 index d5bd28685..000000000 --- a/tuplex/codegen/include/HashHelper.h +++ /dev/null @@ -1,47 +0,0 @@ -//--------------------------------------------------------------------------------------------------------------------// -// // -// Tuplex: Blazing Fast Python Data Science // -// // -// // -// (c) 2017 - 2021, Tuplex team // -// Created by Leonhard Spiegelberg first on 7/4/2022 // -// License: Apache 2.0 // -//--------------------------------------------------------------------------------------------------------------------// -#ifndef HASHHELPER_HEADER_ -#define HASHHELPER_HEADER_ - -#include "LLVMEnvironment.h" -#include "CodegenHelper.h" - -namespace tuplex { - namespace codegen { - - // this is a simple hashmap proxy structure - class HashProxy { - public: - /*! - * create a new hashmap (codegen) - * @param builder - * @param global whether to create the hashmap as global variable or not. - */ - HashProxy(llvm::IRBuilder<>& builder, bool global=false); - - /*! - * put a value into the hashmap - * @param builder - * @param key - * @param value if default serializable value, then no - */ - void put(llvm::IRBuilder<>& builder, - const SerializableValue& key, - const SerializableValue& value=SerializableValue()); - - }; - - // these are helper functions to deal with generating code to hash different keys etc. - extern void hashmap_put(llvm::IRBuilder<>& builder, - const SerializableValue& key, - const SerializableValue& value); - } -} -#endif \ No newline at end of file diff --git a/tuplex/codegen/src/HashHelper.cc b/tuplex/codegen/src/HashHelper.cc deleted file mode 100644 index 5990699b9..000000000 --- a/tuplex/codegen/src/HashHelper.cc +++ /dev/null @@ -1,15 +0,0 @@ -// -// Created by Leonhard Spiegelberg on 7/3/22. -// -#include - -namespace tuplex { - namespace codegen { - // these are helper functions to deal with generating code to hash different keys etc. - extern void hashmap_put(llvm::IRBuilder<>& builder, - const SerializableValue& key, - const SerializableValue& value) { - // check what type key is => this determines the structure - } - } -} \ No newline at end of file diff --git a/tuplex/python/tuplex/utils/source_vault.py b/tuplex/python/tuplex/utils/source_vault.py index 1c79f6841..7a6aabdeb 100644 --- a/tuplex/python/tuplex/utils/source_vault.py +++ b/tuplex/python/tuplex/utils/source_vault.py @@ -171,8 +171,6 @@ def get(self, ftor, filename, lineno, colno, globs): for entry in entries: if entry['code_hash'] == codeobj_hash: return entry['code'] - # # debug: - # print(self.lambdaFileDict) raise KeyError('Multiple lambdas found, but failed to retrieve code for this lambda expression.') else: raise KeyError('could not find lambda function') @@ -287,8 +285,6 @@ def extractAndPutAllLambdas(self, src_info, filename, lineno, colno, globals): else: self.lambdaFileDict[key] = [entry] else: - print('there are {} Lambdas'.format(len(Lams))) - # check that there are no globals when extracting function! if colno is None and len(globals) != 0: raise Exception('Found more than one lambda expression on {}:+{}. Either use ' From ab35c61464f75e3f3d7e743a0e4199a74dbc3202 Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 6 Jul 2022 15:57:27 -0400 Subject: [PATCH 31/32] add new test for bug94 --- tuplex/test/wrappers/WrapperTest.cc | 42 +++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tuplex/test/wrappers/WrapperTest.cc b/tuplex/test/wrappers/WrapperTest.cc index 95c89c47b..bc6906a37 100644 --- a/tuplex/test/wrappers/WrapperTest.cc +++ b/tuplex/test/wrappers/WrapperTest.cc @@ -2744,6 +2744,48 @@ TEST_F(WrapperTest, AllRows311) { } } +// bug 94: https://github.com/tuplex/tuplex/issues/94 +TEST_F(WrapperTest, DoubleCollect) { + using namespace tuplex; + + // ds = c.parallelize([(1, "A"),(2, "a"),(3, 2)]).filter(lambda a, b: a > 1) + // ds.collect() + // ds.collect() + + auto ctx_opts = "{\"webui.enable\": false," + " \"driverMemory\": \"8MB\"," + " \"partitionSize\": \"256KB\"," + "\"executorCount\": 0," + "\"tuplex.scratchDir\": \"file://" + scratchDir + "\"," + "\"resolveWithInterpreterOnly\": true}"; + + + PythonContext ctx("", "", ctx_opts); + { + auto values_obj = python::runAndGet("L = [(1, \"A\"),(2, \"a\"),(3, 2)]", "L"); + + auto values = py::reinterpret_borrow(values_obj); + + auto ds = ctx.parallelize(values) + .filter("lambda a, b: a > 1",""); + + auto res = ds.collect(); + auto resObj = res.ptr(); + ASSERT_TRUE(PyList_Check(resObj)); + EXPECT_EQ(PyList_Size(resObj), 2); + + // print result out + PyObject_Print(resObj, stdout, 0); + std::cout< Date: Wed, 6 Jul 2022 17:35:25 -0400 Subject: [PATCH 32/32] fix bug94 by making fallback partitions immortal as well --- tuplex/core/include/logical/ParallelizeOperator.h | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tuplex/core/include/logical/ParallelizeOperator.h b/tuplex/core/include/logical/ParallelizeOperator.h index 87666950c..816026747 100644 --- a/tuplex/core/include/logical/ParallelizeOperator.h +++ b/tuplex/core/include/logical/ParallelizeOperator.h @@ -48,7 +48,12 @@ namespace tuplex { */ std::vector getNormalPartitions(); - void setFallbackPartitions(const std::vector &fallbackPartitions) { _fallbackPartitions = fallbackPartitions; } + void setFallbackPartitions(const std::vector &fallbackPartitions) { + _fallbackPartitions = fallbackPartitions; + // parallelize does not own the partitions, they must become immortal to allow for multiple calls involving this operator + for(auto p : _fallbackPartitions) + p->makeImmortal(); + } std::vector getFallbackPartitions() { return _fallbackPartitions; } void setPartitionGroups(const std::vector& partitionGroups) { _partitionGroups = partitionGroups; }