-
Notifications
You must be signed in to change notification settings - Fork 723
Description
If the feature is related to a specific library below, please raise an issue in
the respective repo directly:
TensorFlow Data Validation Repo
TensorFlow Model Analysis Repo
System information
- TFX Version (you are using):
1.15
- Environment in which you plan to use the feature (e.g., Local
(Linux/MacOS/Windows), Interactive Notebook, Google Cloud, etc..): Local, GCP - Are you willing to contribute it (Yes/No): Yes
Describe the feature and the current behavior/state.
There are times when data needs to be transformed in some way before the training process begins. An example of this is adding a UUID to each example, or adding a data quality indicator. These features may not necessarily be needed at inference time, but may be useful in the training process. Another example could be in image processing, you can apply other image models, opencv functions etc., to add features. We should provide a PreTransform
component that allows for the application of arbitrary python based transformations to the data at scale using apache beam.
Will this change the current API? How?
It will add a new component named PreTransform
. This component will take in examples, a schema, and a udf that takes in a dictionary and outputs a dictionary. The component will produce examples. It should execute in distributed fashion using apache beam.
Who will benefit with this feature?
Users who want to do arbitrary transformations on data at scale using apache beam/Dataflow without having to deal with the underlying complexity.
Do you have a workaround or are completely blocked by this? :
We have written a version that runs on DirectRunner
- we can likely port this to GCP using custom containers etc., but would be nice to have this within TFX itself.
Name of your Organization (Optional)
Intuitive.cloud - GCP Partner
Any Other info.
The way this works is it takes in examples and a schema and processes using a ParDo. Within the ParDo, it parses the example using the feature_spec gotten from the schema. It converts the parsed example (dictionary of tensors) to a native python representation, applies the transformation function, converts the dictionary to tf.train.Example, and then serializes it so that it can be stored as tfrecords on disk.
Example UDF:
import uuid
import hashlib
from shapely.geometry import Point, Polygon
def transform_dict(element):
new_element = {}
total_string = ""
for k in sorted(element.keys()):
total_string += str(element[k])
new_element[k] = element[k]
encoded_string = total_string.encode('utf-8')
example_uuid = uuid.UUID(bytes=hashlib.sha1(encoded_string).digest()[:16]).hex
new_element['uuid'] = example_uuid
manhattan_polygon = Polygon([
(-74.025, 40.697), # Bottom left
(-73.97, 40.710), # Bottom right
(-73.97, 40.74), # Mid right
(-73.899, 40.818), # Top right
(-73.95, 40.84), # Top left
(-74.015, 40.750), # Mid left
])
pickup_longitude = element['pickup_longitude']
pickup_latitude = element['pickup_latitude']
pickup_point = Point(pickup_longitude, pickup_latitude)
dropoff_longitude = element['dropoff_longitude']
dropoff_latitude = element['dropoff_latitude']
dropoff_point = Point(dropoff_longitude, dropoff_latitude)
if pickup_point.within(manhattan_polygon) and dropoff_point.within(manhattan_polygon):
new_element['trip_within_manhattan'] = 1
else:
new_element['trip_within_manhattan'] = 0
return new_element