Search
Close this search box.
Cover Slide for Prefect for Scheduling Workflows

Data Engineer’s Lunch #44: Prefect

In Data Engineer’s Lunch #44: Prefect, we discussed Prefect for workflow management and how to get started. The live recording of the Data Engineer’s Lunch, which includes a more in-depth discussion, is also embedded below in case you were not able to attend live. If you would like to attend a Data Engineer’s Lunch live, it is hosted every Monday at noon EST. Register here now!

Prefect

Prefect is a workflow management system written in Python. It allows users to incorporate logging, retries, dynamic mapping, caching, failure notification, and more into your data pipeline. Prefect allows you to use your own code and transform it into a distributed pipeline. It takes your functions for data processing or orchestrating external systems and dictates the rules about when they should run.

In Prefect, a user defines and calls functions typical to a script while Prefect determines the workflow structure.

Prefect for Workflow Management diagram of Prefect's hybrid execution model.
Image from: https://docs.prefect.io/orchestration/

Getting Started with Prefect for Workflow Management

The most basic unit of function in Prefect is a task. This can be a function such as extracting, transforming, or loading data. These functions are written in Python. In order to incorporate Prefect, the function needs to be decorated with task imported. The following functions are an example of a very simple ETL flow. The first function extracts data from a local file. The second function transforms that data by adding 1 to the initial value. The third function then writes the data back to the file after transformation.

import csv
import datetime

from prefect import task, Flow, Parameter
from prefect.schedules import IntervalSchedule

@task(max_retries=5, retry_delay=datetime.timedelta(seconds=5))
def extract(path):
    with open(path, "r") as f:
        text = f.readline().strip()
    data = [ int(i) for i  in text.split(",") ]
    print(data)
    return data

@task
def transform(data):
    transformedData = [ i + 1 for i in data ]
    return transformedData

@task
def load(data, path):
    with open(path, "w") as f:
        csv_writer = csv.writer(f)
        csv_writer.writerow(data)

This next snippet of code is how we actually build the flow made up of our previously defined tasks:

def build_flow(schedule=None):
    with Flow("my_etl", schedule=schedule) as flow:
        path = Parameter(name="path", default="values.csv", required=False)
        data = extract(path)
        transformedData = transform(data)
        load(transformedData, "transformedValues.csv")
        result = load(transformedData, path)

    return flow

schedule = IntervalSchedule(
    start_date=datetime.datetime.now() + datetime.timedelta(seconds=1),
    interval=datetime.timedelta(minutes=2)
)

flow = build_flow(schedule)
flow.run(parameters={
    "path": "values.csv"
    })

Now, there are some bits of this code that need further explanation. In this basic ETL job, we are using some parameters. Frequently we want to use different parameters within a specific task. Prefect allows us to do this by importing Parameter. In the example above we are using a path parameter passed in as a dictionary when running our flow. Additionally, we have set this flow to run on a schedule of every two minutes.

For a more in-depth explanation of what is going on in this code and to see it run, check out the embedded YouTube video below. In addition to this, we show what Prefect Cloud’s dashboard looks like and explain how to connect our Prefect workflows to Prefect Cloud. Additionally, all of our live events can be rewatched on our YouTube channel, so be sure to subscribe and turn on your notifications!

Resources:

Cassandra.Link

Cassandra.Link is a knowledge base that we created for all things Apache Cassandra. Our goal with Cassandra.Link was to not only fill the gap of Planet Cassandra but to bring the Cassandra community together. Feel free to reach out if you wish to collaborate with us on this project in any capacity.

We are a technology company that specializes in building business platforms. If you have any questions about the tools discussed in this post or about any of our services, feel free to send us an email!