Dagster
I was thinking where I should locate dagster in my library? python? ML? ETL?
I think it's should been as a separate data orchestration tool for machine learning, analytics, and ETL.
pip install dagster dagit
#
Solids and Pipelines#
Single Solid PipelineSolids are individual units of computation that wire together to form pipeline
Lets make a first example where we would like to load dataframe
from dagster import solidfrom pyspark.sql import SparkSessionfrom pyspark.sql.dataframe import DataFrame
@soliddef get_customer_list(_) -> DataFrame: spark = SparkSession.builder.getOrCreate() return ( spark.read .option("mergeSchema", True) .orc("/datalake/test/customer") .select( "id" ,"date_part" ,"name" ,"surname" ,"lastmodified" ) )
and to execute @solid we use @pipeline
from dagster import pipeline, execute_pipeline
from datasets.customers import get_customer_list
@pipelinedef nightly(): customer_list = get_customer_list()
if __name__ == "__main__": execute_pipeline(nightly, run_config={})
#
Multiple Solids into PipelineNow we look into our Single Solid Pipeline `get_customer_list
and add two more solids which will create a programm to merge into current customer list only updated customers without running all partitions in the datalake.
from datetime import datetimefrom dagster import solid, Output, OutputDefinitionfrom pyspark.sql import SparkSessionfrom pyspark.sql.dataframe import DataFramefrom pyspark.sql.types import StructType, StructField, StringType, TimestampType
PATH = "/datalake/test/customer"
@soliddef get_current_customer(_) -> DataFrame: spark = SparkSession.builder.getOrCreate() try: df = spark.read.format("delta").load(PATH) if len(df.columns) == 0: raise Exception("Empty DataFrame") except: # For the first time create the key and date columns schema = StructType([ StructField("id", StringType(), True), StructField("date_part", StringType(), True), StructField("name", StringType(), True), StructField("surname", StringType(), True), StructField("lastmodified", StringType(), True), ]) spark.createDataFrame(spark.sparkContext.emptyRDD(), schema).write.format("delta").mode("overwrite").save(PATH) df = spark.read.format("delta").load(PATH)
return df
@soliddef merge_new_customer_into_current(_, source: DataFrame, current: DataFrame) -> DataFrame: spark = SparkSession.builder.getOrCreate() now = datetime.utcnow()
timestamp_column = "lastmodified" key_column = "id"
source.createOrReplaceTempView("source") current.createOrReplaceTempView("current") incremental = spark.sql(f""" WITH latest_date_part_in_current AS ( SELECT COALESCE(max(date_part), "1970-01-01") FROM current ), lastest_record_in_current AS ( SELECT COALESCE(max({timestamp_column}), "1970-01-01T00:00:00.000") FROM current WHERE date_part >= (SELECT * FROM latest_date_part_in_current) ), new_records_in_source AS ( SELECT * FROM source WHERE ({key_column} IS NOT NULL) AND (date_part >= (SELECT * FROM latest_date_part_in_current)) AND ({timestamp_column} > (SELECT * FROM lastest_record_in_current)) ), updates AS ( SELECT *, ROW_NUMBER() OVER (PARTITION BY {key_column} ORDER BY {timestamp_column} DESC) AS row FROM new_records_in_source ) SELECT * FROM updates WHERE row = 1 """) incremental = incremental.persist() incremental.createOrReplaceTempView("incremental")
spark.sql(f""" SELECT MIN(date_part), MAX(date_part), MIN({timestamp_column}), MAX({timestamp_column}), COUNT(*) FROM incremental """).show(1, False)
spark.sql(f""" MERGE INTO delta.`{PATH}` current USING incremental ON current.{key_column} = incremental.{key_column} WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """)
incremental.unpersist() spark.catalog.dropTempView("incremental") spark.catalog.dropTempView("source") spark.catalog.dropTempView("current")
spark.sql(f""" VACUUM delta.`{PATH}` """)
return current
and run @solids in @pipeline - one nightly job:
from dagster import pipeline, execute_pipeline
from datasets.customers import get_customer_listfrom datasets.customers_current import get_current_customer, merge_new_customer_into_current
@pipelinedef nightly(): customer_list = get_customer_list() customer_current = get_current_customer() merge_new_customer_into_current(customer_list, customer_current)
if __name__ == "__main__": execute_pipeline(nightly, run_config={})
#
Testing Solids and PipelinesWe'll use execute_pipeline() to test our pipeline, as well as execute_solid() to test our solid in isolation.