How to find changes in Tamr mastering outputs with Databricks Delta Lake

When exporting published clusters from Tamr, the entire dataset will be exported, but you may be interested in finding what has changed since the last time you exported that dataset.

This is one of the strong features of Delta Lake, an open-source library repackaged by Databricks which offers versioning of dataset in a Data Lake.

The following code will read your most recent exported published clusters, save them as a new version of a Delta File in Databricks and look at changes across two versions.

{Databricks Notebook}
# Read the csv file using pyspark
file_name = <my_published_clusters_with_data>.csv
df = spark.read.option("header", True).csv(f"<my_azure_container_path>/{file_name}")

# Save the data as an additional version of your Delta Lake File
df.write.format("delta").mode("overwrite").save(f"<my_azure_container_delta_file_path>
/{file_name}/")

# Load Delta Lake Table
deltaTable = DeltaTable.forPath(spark,
f"<my_azure_container_delta_file_path>/{file_name}/")

# Display the version history of the Delta Lake Table
hist = deltaTable.history()
display(hist)

# Display the changes in cluster name between the last two exports
latest_version = hist.groupBy().max("version").collect()[0][0]
prev_version = latest_version - 1

display(spark.sql(f"""
 SELECT * FROM <my_databricks_table>.{file_name}@v{latest_version} 
 WHERE persistentid_source IN (
     SELECT distinct(persistentid_source) 
     FROM <my_databricks_table_name>.{file_name}@v{latest_version} CURR
     LEFT ANTI JOIN <my_databricks_table_name>.{file_name}@v{prev_version} OLD
     ON 
       CURR.cluster_name = OLD.cluster_name
   )
"""))
{/Databricks Notebook}

If you have never saved your published clusters in a Databricks table, you can do the following to create it:

{Databricks Notebook}
# Read the csv file using pyspark and save it as a Delta Lake File
file_name = <my_published_clusters_with_data>.csv
df = spark.read.option("header", True).csv(f"<my_azure_container_path>/{file_name}")
df.write.format("delta").mode("overwrite").save(f"<my_azure_container_delta_file_path>
/{file_name}/")

%sql
create database if not exists <my_databricks_table_name>;

# Load data into the Databricks table
spark.sql(f"CREATE TABLE <my_databricks_table_name>.{file_name} USING DELTA LOCATION
'<my_azure_container_delta_file_path>/{file_name}/' ")
{/Databricks Notebook}

 

Did this page help you?