From 7113c1a9d4c0c640e15454a195db084f38f13f35 Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Tue, 3 Aug 2021 22:20:39 -0400 Subject: [PATCH 1/2] Rahul's fix + custom exception --- tuplex/python/CMakeLists.txt | 1 + tuplex/python/tuplex/dataset.py | 49 ++++++++++++------------- tuplex/python/tuplex/utils/framework.py | 19 ++++++++++ 3 files changed, 44 insertions(+), 25 deletions(-) create mode 100644 tuplex/python/tuplex/utils/framework.py diff --git a/tuplex/python/CMakeLists.txt b/tuplex/python/CMakeLists.txt index 47b44149c..22fb8a563 100644 --- a/tuplex/python/CMakeLists.txt +++ b/tuplex/python/CMakeLists.txt @@ -106,6 +106,7 @@ FILE(COPY ${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/__init__.py ${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/jupyter.py ${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/reflection.py ${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/source_vault.py + ${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/framework.py ${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/tracebacks.py ${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/version.py ${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/globs.py diff --git a/tuplex/python/tuplex/dataset.py b/tuplex/python/tuplex/dataset.py index 6f7cc030e..7d41d4186 100644 --- a/tuplex/python/tuplex/dataset.py +++ b/tuplex/python/tuplex/dataset.py @@ -11,10 +11,12 @@ import cloudpickle import sys +import logging from .libexec.tuplex import _Context, _DataSet from tuplex.utils.reflection import get_source as get_udf_source from tuplex.utils.reflection import get_globals +from tuplex.utils.framework import UDFCodeExtractionError from tuplex.utils.source_vault import SourceVault from .exceptions import classToExceptionCode @@ -60,8 +62,8 @@ def map(self, ftor): try: # convert code object to str representation code = get_udf_source(ftor) - except Exception as e: - raise Exception('Could not extract code for {}. Details:\n{}'.format(ftor, e)) from None + except UDFCodeExtractionError as e: + logging.warn('Could not extract code for {}. Details:\n{}'.format(ftor, e)) g = get_globals(ftor) @@ -89,8 +91,8 @@ def filter(self, ftor): try: # convert code object to str representation code = get_udf_source(ftor) - except Exception as e: - raise Exception('Could not extract code for {}.Details:\n{}'.format(ftor, e)) + except UDFCodeExtractionError as e: + logging.warn('Could not extract code for {}. Details:\n{}'.format(ftor, e)) g = get_globals(ftor) ds = DataSet() @@ -167,8 +169,8 @@ def resolve(self, eclass, ftor): try: # convert code object to str representation code = get_udf_source(ftor) - except Exception as e: - raise Exception('Could not extract code for {}.Details:\n{}'.format(ftor, e)) + except UDFCodeExtractionError as e: + logging.warn('Could not extract code for {}. Details:\n{}'.format(ftor, e)) g = get_globals(ftor) ds = DataSet() @@ -196,8 +198,8 @@ def withColumn(self, column, ftor): try: # convert code object to str representation code = get_udf_source(ftor) - except Exception as e: - raise Exception('Could not extract code for {}.Details:\n{}'.format(ftor, e)) + except UDFCodeExtractionError as e: + logging.warn('Could not extract code for {}. Details:\n{}'.format(ftor, e)) g = get_globals(ftor) ds = DataSet() ds._dataSet = self._dataSet.withColumn(column, code, cloudpickle.dumps(ftor), g) @@ -225,8 +227,8 @@ def mapColumn(self, column, ftor): try: # convert code object to str representation code = get_udf_source(ftor) - except Exception as e: - raise Exception('Could not extract code for {}.Details:\n{}'.format(ftor, e)) from None + except UDFCodeExtractionError as e: + logging.warn('Could not extract code for {}. Details:\n{}'.format(ftor, e)) g = get_globals(ftor) ds = DataSet() ds._dataSet = self._dataSet.mapColumn(column, code, cloudpickle.dumps(ftor), g) @@ -441,8 +443,8 @@ def tocsv(self, path, part_size=0, num_rows=max_rows, num_parts=0, part_name_gen try: # convert code object to str representation code = get_udf_source(part_name_generator) - except Exception as e: - raise Exception('Could not extract code for {}.Details:\n{}'.format(part_name_generator, e)) + except UDFCodeExtractionError as e: + logging.warn('Could not extract code for {}. Details:\n{}'.format(ftor, e)) # clamp max rows if num_rows > max_rows: @@ -470,14 +472,14 @@ def aggregate(self, combine, aggregate, initial_value): try: # convert code object to str representation comb_code = get_udf_source(combine) - except Exception as e: - raise Exception('Could not extract code for {}.Details:\n{}'.format(combine, e)) + except UDFCodeExtractionError as e: + logging.warn('Could not extract code for combine UDF {}. Details:\n{}'.format(combine, e)) try: # convert code object to str representation agg_code = get_udf_source(aggregate) - except Exception as e: - raise Exception('Could not extract code for {}.Details:\n{}'.format(aggregate, e)) + except UDFCodeExtractionError as e: + logging.warn('Could not extract code for aggregate UDF {}. Details:\n{}'.format(aggregate, e)) g_comb = get_globals(combine) g_agg = get_globals(aggregate) @@ -502,20 +504,17 @@ def aggregateByKey(self, combine, aggregate, initial_value, key_columns): agg_code, agg_code_pickled = '', '' try: # convert code object to str representation - comb_code = get_lambda_source(combine) + comb_code = get_udf_source(combine) comb_code_pickled = cloudpickle.dumps(combine) - except: - print('{} is not a lambda function or its code could not be extracted'.format(combine)) + except UDFCodeExtractionError as e: + logging.warn('Could not extract code for combine UDF {}. Details:\n{}'.format(ftor, e)) try: # convert code object to str representation - agg_code = get_lambda_source(aggregate) + agg_code = get_udf_source(aggregate) agg_code_pickled = cloudpickle.dumps(aggregate) - except: - print('{} is not a lambda function or its code could not be extracted'.format(aggregate)) - - print(comb_code) - print(agg_code) + except UDFCodeExtractionError as e: + logging.warn('Could not extract code for aggregate UDF {}. Details:\n{}'.format(ftor, e)) ds = DataSet() ds._dataSet = self._dataSet.aggregateByKey(comb_code, comb_code_pickled, diff --git a/tuplex/python/tuplex/utils/framework.py b/tuplex/python/tuplex/utils/framework.py new file mode 100644 index 000000000..d5d36d225 --- /dev/null +++ b/tuplex/python/tuplex/utils/framework.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 +#----------------------------------------------------------------------------------------------------------------------# +# # +# Tuplex: Blazing Fast Python Data Science # +# # +# # +# (c) 2017 - 2021, Tuplex team # +# Created by Leonhard Spiegelberg first on 8/3/2021 # +# License: Apache 2.0 # +#----------------------------------------------------------------------------------------------------------------------# + +# this file contains Framework specific exceptions +class TuplexException(Exception): + """Base Exception class on which all Tuplex Framework specific exceptions are based""" + pass + +class UDFCodeExtractionError(TuplexException): + """thrown when UDF code extraction/reflection failed""" + pass \ No newline at end of file From fa749038378ed760dd1906a3f051a02b9174fa71 Mon Sep 17 00:00:00 2001 From: Leonhard Spiegelberg Date: Tue, 3 Aug 2021 22:38:44 -0400 Subject: [PATCH 2/2] adding aggregate test --- tuplex/python/CMakeLists.txt | 1 + tuplex/python/tests/test_aggregates.py | 55 ++++++++++++++++++++++++++ 2 files changed, 56 insertions(+) create mode 100644 tuplex/python/tests/test_aggregates.py diff --git a/tuplex/python/CMakeLists.txt b/tuplex/python/CMakeLists.txt index 22fb8a563..9349e0c72 100644 --- a/tuplex/python/CMakeLists.txt +++ b/tuplex/python/CMakeLists.txt @@ -136,6 +136,7 @@ FILE(COPY ${CMAKE_CURRENT_SOURCE_DIR}/tests/test_tuples.py ${CMAKE_CURRENT_SOURCE_DIR}/tests/test_closure.py ${CMAKE_CURRENT_SOURCE_DIR}/tests/test_import.py ${CMAKE_CURRENT_SOURCE_DIR}/tests/test_math.py + ${CMAKE_CURRENT_SOURCE_DIR}/tests/test_aggregates.py ${CMAKE_CURRENT_SOURCE_DIR}/tests/helper.py DESTINATION ${PYTHON_DIST_DIR}/tests) FILE(COPY ${CMAKE_CURRENT_SOURCE_DIR}/tuplex/libexec/__init__.py DESTINATION ${PYTHON_DIST_DIR}/tuplex/libexec) diff --git a/tuplex/python/tests/test_aggregates.py b/tuplex/python/tests/test_aggregates.py new file mode 100644 index 000000000..aef5f56fb --- /dev/null +++ b/tuplex/python/tests/test_aggregates.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +#----------------------------------------------------------------------------------------------------------------------# +# # +# Tuplex: Blazing Fast Python Data Science # +# # +# # +# (c) 2017 - 2021, Tuplex team # +# Created by Leonhard Spiegelberg first on 1/1/2021 # +# License: Apache 2.0 # +#----------------------------------------------------------------------------------------------------------------------# + +import unittest +import functools +import random +import numpy as np +from tuplex import * + +class TestAggregates(unittest.TestCase): + def setUp(self): + self.conf = {"webui.enable": False, "driverMemory": "8MB", "partitionSize": "256KB"} + + def test_simple_count(self): + c = Context(self.conf) + + data = [1, 2, 3, 4, 5, 6] + res = c.parallelize(data).aggregate(lambda a, b: a + b, lambda a, x: a + 1, 0).collect() + self.assertEqual(res[0], len(data)) + + def test_simple_sum(self): + c = Context(self.conf) + + data = [1, 2, 3, 4, 5, 6] + res = c.parallelize(data).aggregate(lambda a, b: a + b, lambda a, x: a + x, 0).collect() + self.assertEqual(res[0], sum(data)) + + def test_sum_by_key(self): + c = Context(self.conf) + + data = [(0, 10.0), (1, 20.0), (0, -4.5)] + + res = c.parallelize(data, columns=['id', 'volume']).aggregateByKey(lambda a, b: a + b, + lambda a, x: a + x['volume'], + 0.0, + ['id']).collect() + + self.assertEqual(len(res), 2) + + # sort result for comparison (in the future Tuplex should provide a function for this!) + res = sorted(res, key=lambda t: t[0]) + + self.assertEqual(res[0][0], 0) + self.assertEqual(res[1][0], 1) + + self.assertAlmostEqual(res[0][1], 10.0 - 4.5) + self.assertAlmostEqual(res[1][1], 20.0)