An opinionated framework for big MultiProcessing Extract, Transform, Load jobs. Python 3 native. There's a Python 2 backport available in a separate branch if you want it; it may lag behind the master branch.
©2015 Jorge Herskovic <first initial + last name @ gee mail dot com>
We write lots of large ETL jobs. Most of them start with slicing input into little pieces, then processing each of those pieces, then writing the results somewhere else. We tend to use Python's multiprocessing module for this, and many of our problems are embarrassingly parallel. But that use of multiprocessing leads to a lot of boilerplate code. Boilerplate that interrupts the flow of our programs and makes them look ugly and less readable.
Hence, MPETL. To only write this once and let the actual processing code take center stage.
MPETL works on pipelines and is meant for embarrassingly parallel ETL jobs. A pipeline starts at an origin and ends in destinations. It has tasks that get applied to data along the way. It can consist of any number of intermediate tasks. Origins are responsible for yielding chunks of data that can be ingested by tasks, which are responsible for producing chunks of data that can be ingested by destinations.
The only thing MPETL guarantees here is that the data will make it all the way through, and that the pipeline will be parallel.
Parameters MUST be picklable, since they're going into Queues under the covers.
The following is an example of a single-origin, single-destination, single-task pipeline that capitalizes a nursery rhyme in an unnecessarily convoluted way.
import mpetl
EXAMPLE_TEXT = """Mary had a little lamb,
His fleece was white as snow,
And everywhere that Mary went,
The lamb was sure to go.
"""
def origin_function(text):
for line in text.split('\n'):
yield line
return
def process_line(one_line):
return one_line.upper()
def destination_function(one_line):
print(one_line)
if __name__ == "__main__":
pipeline = mpetl.Pipeline()
pipeline.add_origin(origin_function)
pipeline.add_task(process_line)
pipeline.add_destination(destination_function)
pipeline.start()
pipeline.feed(EXAMPLE_TEXT)
pipeline.join()IPC overhead can dominate a pipeline if the processing itself is relatively cheap. You can therefore specify a chunk_size when calling add_task or its siblings. The results will be gathered in chunk_size blocks to be passed around. Note that this will interfere with multiprocessing granularity, i.e., the ability to distribute work well, so it should be used only for very large numbers of fast tasks.
Database access is expensive and creating connections over and over can chew a lot of overhead. mpetl will let you specify a callable that will be called once per process, and that can return a value that will be then passed to your task function as the process_persistent parameter. Give the add_task/origin/destination declaration a setup parameter that is a callable which returns a single value. For obvious reasons, this callable should be self-contained. You can also specify a teardown which will receive the same object and can... well... tear it down.
pipeline.add_task(save_stuff_to_db, setup=create_connection)
If you do this, save_stuff_to_db needs to take a process_persistent parameter:
def save_stuff_to_db(stuff_to_be_saved, process_persistent):
add_origin and add_destination behave like add_task. All origins will be executed in the chronological order
they are added. All destinations will be executed in the chronological order in which they are added. Origins always
execute before tasks, which always execute before destinations.
In other words, the following order of calls:
pipeline.add_destination(function_1)
pipeline.add_origin(function_2)
pipeline.add_task(function_3)
pipeline.add_task(function_4)
pipeline.add_destination(function_5)
pipeline.add_task(function_6)
pipeline.add_origin(function_7)
pipeline.add_task(function_8)Will result in the following actual pipeline:
function_2 -> function_7 -> function_3 -> function_4 -> function_6 -> function_8 -> function_1 -> function_5
Don't say I didn't warn you. The idea is that you should write your pipelines in the order in which they execute.
The easiest way to get your results back, if your last task/destination yields them, is to iterate through the as_completed() iterator of the pipeline. You can choose to do this after processing is completed, by join()ing the pipeline before calling as_completed(), or to do it "live," by calling as_completed() without joining.
Please be aware that due to implementation limitations calling as_completed without join()ing will call join() for you in a background thread. In turn, this means that you MUST have fed the pipeline all the data it will consume (the reason for this is that there's no other way of knowing when all of the data is processed, and hence no other way of stopping the iteration!). In short, this is only useful for very long-running final steps.
WEIRDNESS WILL HAPPEN IF YOU IGNORE THIS WARNING.
You can send the output of a pipeline to another pipeline. Do do this, you MUST give the destination pipeline a unique name, which must be a hashable value. Please use a string.
from mpetl import Pipeline
def send_directly_to_second(something):
Pipeline.send("some name", something)
def uppercase_it(something):
yield something.upper()
first=Pipeline()
first.add_task(send_directly_to_second)
second=Pipeline(name="some name")
second.add_task(uppercase_it)
first.start()
second.start()
first.feed("Hello world")
first.join()
second.join()
# Should print "HELLO WORLD"
for result in second.as_completed():
print(result)MPETL uses nose for its tests. Run nosetests in its root directory to execute all tests. The test suite, quite on
purpose, creates hundreds of processes. This can make the OS go over its file handle limit, especially on OS X out of
the box. The symptom is something like
OSError: [Errno 24] Too many open files
In this case, run
$ ulimit -n 2048
to increase the OS file handle limit before running the tests.
This library might run on Windows, but due to the high process creation overhead in that OS, it will more "crawl" than run. I don't recommend it. MPETL depends on cheap process creation to be worthwhile.