Skip to main content

Dagster

I was thinking where I should locate dagster in my library? python? ML? ETL?

I think it's should been as a separate data orchestration tool for machine learning, analytics, and ETL.

pip install dagster dagit

Solids and Pipelines#

Single Solid Pipeline#

Solids are individual units of computation that wire together to form pipeline

Lets make a first example where we would like to load dataframe

from dagster import solidfrom pyspark.sql import SparkSessionfrom pyspark.sql.dataframe import DataFrame
@soliddef get_customer_list(_) -> DataFrame:    spark = SparkSession.builder.getOrCreate()    return (        spark.read        .option("mergeSchema", True)        .orc("/datalake/test/customer")        .select(            "id"            ,"date_part"            ,"name"            ,"surname"            ,"lastmodified"        )    )

and to execute @solid we use @pipeline

from dagster import pipeline, execute_pipeline
from datasets.customers import get_customer_list
@pipelinedef nightly():    customer_list = get_customer_list()
if __name__ == "__main__":    execute_pipeline(nightly, run_config={})

Multiple Solids into Pipeline#

Now we look into our Single Solid Pipeline `get_customer_list and add two more solids which will create a programm to merge into current customer list only updated customers without running all partitions in the datalake.

from datetime import datetimefrom dagster import solid, Output, OutputDefinitionfrom pyspark.sql import SparkSessionfrom pyspark.sql.dataframe import DataFramefrom pyspark.sql.types import StructType, StructField, StringType, TimestampType
PATH = "/datalake/test/customer"
@soliddef get_current_customer(_) -> DataFrame:    spark = SparkSession.builder.getOrCreate()    try:        df = spark.read.format("delta").load(PATH)        if len(df.columns) == 0:            raise Exception("Empty DataFrame")    except:        # For the first time create the key and date columns        schema = StructType([            StructField("id", StringType(), True),            StructField("date_part", StringType(), True),            StructField("name", StringType(), True),            StructField("surname", StringType(), True),            StructField("lastmodified", StringType(), True),        ])        spark.createDataFrame(spark.sparkContext.emptyRDD(), schema).write.format("delta").mode("overwrite").save(PATH)        df = spark.read.format("delta").load(PATH)
    return df
@soliddef merge_new_customer_into_current(_, source: DataFrame, current: DataFrame) -> DataFrame:    spark = SparkSession.builder.getOrCreate()    now = datetime.utcnow()
    timestamp_column = "lastmodified"    key_column = "id"
    source.createOrReplaceTempView("source")    current.createOrReplaceTempView("current")    incremental = spark.sql(f"""        WITH latest_date_part_in_current AS (            SELECT COALESCE(max(date_part), "1970-01-01") FROM current        ),        lastest_record_in_current AS (            SELECT COALESCE(max({timestamp_column}), "1970-01-01T00:00:00.000") FROM current            WHERE date_part >= (SELECT * FROM latest_date_part_in_current)        ),        new_records_in_source AS (            SELECT * FROM source            WHERE ({key_column} IS NOT NULL) AND (date_part >= (SELECT * FROM latest_date_part_in_current)) AND ({timestamp_column} > (SELECT * FROM lastest_record_in_current))        ),        updates AS (            SELECT                *,                ROW_NUMBER() OVER (PARTITION BY {key_column} ORDER BY {timestamp_column} DESC) AS row            FROM new_records_in_source        )        SELECT *        FROM updates        WHERE row = 1    """)    incremental = incremental.persist()    incremental.createOrReplaceTempView("incremental")
    spark.sql(f"""        SELECT MIN(date_part), MAX(date_part), MIN({timestamp_column}), MAX({timestamp_column}), COUNT(*) FROM incremental    """).show(1, False)
    spark.sql(f"""        MERGE INTO delta.`{PATH}` current        USING incremental        ON current.{key_column} = incremental.{key_column}        WHEN MATCHED THEN UPDATE SET *        WHEN NOT MATCHED THEN INSERT *    """)
    incremental.unpersist()    spark.catalog.dropTempView("incremental")    spark.catalog.dropTempView("source")    spark.catalog.dropTempView("current")
    spark.sql(f"""        VACUUM delta.`{PATH}`    """)
    return current

and run @solids in @pipeline - one nightly job:

from dagster import pipeline, execute_pipeline
from datasets.customers import get_customer_listfrom datasets.customers_current import get_current_customer, merge_new_customer_into_current
@pipelinedef nightly():    customer_list = get_customer_list()    customer_current = get_current_customer()    merge_new_customer_into_current(customer_list, customer_current)
if __name__ == "__main__":    execute_pipeline(nightly, run_config={})

Testing Solids and Pipelines#

We'll use execute_pipeline() to test our pipeline, as well as execute_solid() to test our solid in isolation.