这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tuplex/python/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ FILE(COPY ${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/__init__.py
${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/jupyter.py
${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/reflection.py
${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/source_vault.py
${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/framework.py
${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/tracebacks.py
${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/version.py
${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/globs.py
Expand Down Expand Up @@ -135,6 +136,7 @@ FILE(COPY ${CMAKE_CURRENT_SOURCE_DIR}/tests/test_tuples.py
${CMAKE_CURRENT_SOURCE_DIR}/tests/test_closure.py
${CMAKE_CURRENT_SOURCE_DIR}/tests/test_import.py
${CMAKE_CURRENT_SOURCE_DIR}/tests/test_math.py
${CMAKE_CURRENT_SOURCE_DIR}/tests/test_aggregates.py
${CMAKE_CURRENT_SOURCE_DIR}/tests/helper.py
DESTINATION ${PYTHON_DIST_DIR}/tests)
FILE(COPY ${CMAKE_CURRENT_SOURCE_DIR}/tuplex/libexec/__init__.py DESTINATION ${PYTHON_DIST_DIR}/tuplex/libexec)
Expand Down
55 changes: 55 additions & 0 deletions tuplex/python/tests/test_aggregates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/usr/bin/env python3
#----------------------------------------------------------------------------------------------------------------------#
# #
# Tuplex: Blazing Fast Python Data Science #
# #
# #
# (c) 2017 - 2021, Tuplex team #
# Created by Leonhard Spiegelberg first on 1/1/2021 #
# License: Apache 2.0 #
#----------------------------------------------------------------------------------------------------------------------#

import unittest
import functools
import random
import numpy as np
from tuplex import *

class TestAggregates(unittest.TestCase):
def setUp(self):
self.conf = {"webui.enable": False, "driverMemory": "8MB", "partitionSize": "256KB"}

def test_simple_count(self):
c = Context(self.conf)

data = [1, 2, 3, 4, 5, 6]
res = c.parallelize(data).aggregate(lambda a, b: a + b, lambda a, x: a + 1, 0).collect()
self.assertEqual(res[0], len(data))

def test_simple_sum(self):
c = Context(self.conf)

data = [1, 2, 3, 4, 5, 6]
res = c.parallelize(data).aggregate(lambda a, b: a + b, lambda a, x: a + x, 0).collect()
self.assertEqual(res[0], sum(data))

def test_sum_by_key(self):
c = Context(self.conf)

data = [(0, 10.0), (1, 20.0), (0, -4.5)]

res = c.parallelize(data, columns=['id', 'volume']).aggregateByKey(lambda a, b: a + b,
lambda a, x: a + x['volume'],
0.0,
['id']).collect()

self.assertEqual(len(res), 2)

# sort result for comparison (in the future Tuplex should provide a function for this!)
res = sorted(res, key=lambda t: t[0])

self.assertEqual(res[0][0], 0)
self.assertEqual(res[1][0], 1)

self.assertAlmostEqual(res[0][1], 10.0 - 4.5)
self.assertAlmostEqual(res[1][1], 20.0)
49 changes: 24 additions & 25 deletions tuplex/python/tuplex/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@

import cloudpickle
import sys
import logging

from .libexec.tuplex import _Context, _DataSet
from tuplex.utils.reflection import get_source as get_udf_source
from tuplex.utils.reflection import get_globals
from tuplex.utils.framework import UDFCodeExtractionError
from tuplex.utils.source_vault import SourceVault
from .exceptions import classToExceptionCode

Expand Down Expand Up @@ -60,8 +62,8 @@ def map(self, ftor):
try:
# convert code object to str representation
code = get_udf_source(ftor)
except Exception as e:
raise Exception('Could not extract code for {}. Details:\n{}'.format(ftor, e)) from None
except UDFCodeExtractionError as e:
logging.warn('Could not extract code for {}. Details:\n{}'.format(ftor, e))

g = get_globals(ftor)

Expand Down Expand Up @@ -89,8 +91,8 @@ def filter(self, ftor):
try:
# convert code object to str representation
code = get_udf_source(ftor)
except Exception as e:
raise Exception('Could not extract code for {}.Details:\n{}'.format(ftor, e))
except UDFCodeExtractionError as e:
logging.warn('Could not extract code for {}. Details:\n{}'.format(ftor, e))

g = get_globals(ftor)
ds = DataSet()
Expand Down Expand Up @@ -167,8 +169,8 @@ def resolve(self, eclass, ftor):
try:
# convert code object to str representation
code = get_udf_source(ftor)
except Exception as e:
raise Exception('Could not extract code for {}.Details:\n{}'.format(ftor, e))
except UDFCodeExtractionError as e:
logging.warn('Could not extract code for {}. Details:\n{}'.format(ftor, e))

g = get_globals(ftor)
ds = DataSet()
Expand Down Expand Up @@ -196,8 +198,8 @@ def withColumn(self, column, ftor):
try:
# convert code object to str representation
code = get_udf_source(ftor)
except Exception as e:
raise Exception('Could not extract code for {}.Details:\n{}'.format(ftor, e))
except UDFCodeExtractionError as e:
logging.warn('Could not extract code for {}. Details:\n{}'.format(ftor, e))
g = get_globals(ftor)
ds = DataSet()
ds._dataSet = self._dataSet.withColumn(column, code, cloudpickle.dumps(ftor), g)
Expand Down Expand Up @@ -225,8 +227,8 @@ def mapColumn(self, column, ftor):
try:
# convert code object to str representation
code = get_udf_source(ftor)
except Exception as e:
raise Exception('Could not extract code for {}.Details:\n{}'.format(ftor, e)) from None
except UDFCodeExtractionError as e:
logging.warn('Could not extract code for {}. Details:\n{}'.format(ftor, e))
g = get_globals(ftor)
ds = DataSet()
ds._dataSet = self._dataSet.mapColumn(column, code, cloudpickle.dumps(ftor), g)
Expand Down Expand Up @@ -441,8 +443,8 @@ def tocsv(self, path, part_size=0, num_rows=max_rows, num_parts=0, part_name_gen
try:
# convert code object to str representation
code = get_udf_source(part_name_generator)
except Exception as e:
raise Exception('Could not extract code for {}.Details:\n{}'.format(part_name_generator, e))
except UDFCodeExtractionError as e:
logging.warn('Could not extract code for {}. Details:\n{}'.format(ftor, e))

# clamp max rows
if num_rows > max_rows:
Expand Down Expand Up @@ -470,14 +472,14 @@ def aggregate(self, combine, aggregate, initial_value):
try:
# convert code object to str representation
comb_code = get_udf_source(combine)
except Exception as e:
raise Exception('Could not extract code for {}.Details:\n{}'.format(combine, e))
except UDFCodeExtractionError as e:
logging.warn('Could not extract code for combine UDF {}. Details:\n{}'.format(combine, e))

try:
# convert code object to str representation
agg_code = get_udf_source(aggregate)
except Exception as e:
raise Exception('Could not extract code for {}.Details:\n{}'.format(aggregate, e))
except UDFCodeExtractionError as e:
logging.warn('Could not extract code for aggregate UDF {}. Details:\n{}'.format(aggregate, e))

g_comb = get_globals(combine)
g_agg = get_globals(aggregate)
Expand All @@ -502,20 +504,17 @@ def aggregateByKey(self, combine, aggregate, initial_value, key_columns):
agg_code, agg_code_pickled = '', ''
try:
# convert code object to str representation
comb_code = get_lambda_source(combine)
comb_code = get_udf_source(combine)
comb_code_pickled = cloudpickle.dumps(combine)
except:
print('{} is not a lambda function or its code could not be extracted'.format(combine))
except UDFCodeExtractionError as e:
logging.warn('Could not extract code for combine UDF {}. Details:\n{}'.format(ftor, e))

try:
# convert code object to str representation
agg_code = get_lambda_source(aggregate)
agg_code = get_udf_source(aggregate)
agg_code_pickled = cloudpickle.dumps(aggregate)
except:
print('{} is not a lambda function or its code could not be extracted'.format(aggregate))

print(comb_code)
print(agg_code)
except UDFCodeExtractionError as e:
logging.warn('Could not extract code for aggregate UDF {}. Details:\n{}'.format(ftor, e))

ds = DataSet()
ds._dataSet = self._dataSet.aggregateByKey(comb_code, comb_code_pickled,
Expand Down
19 changes: 19 additions & 0 deletions tuplex/python/tuplex/utils/framework.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env python3
#----------------------------------------------------------------------------------------------------------------------#
# #
# Tuplex: Blazing Fast Python Data Science #
# #
# #
# (c) 2017 - 2021, Tuplex team #
# Created by Leonhard Spiegelberg first on 8/3/2021 #
# License: Apache 2.0 #
#----------------------------------------------------------------------------------------------------------------------#

# this file contains Framework specific exceptions
class TuplexException(Exception):
"""Base Exception class on which all Tuplex Framework specific exceptions are based"""
pass

class UDFCodeExtractionError(TuplexException):
"""thrown when UDF code extraction/reflection failed"""
pass