Azure Synapse Spark Pool: PySpark Upsert Function for Azure SQL

Rory McManus
3 min readMar 6, 2022

--

An Upsert is an RDBMS feature that allows a DML statement author to automatically either insert a row or if the row already exists, update that existing row instead.

From my experience building multiple Azure Data Platforms I have been able to develop reusable ELT functions that I can use from project to project, one being an Azure SQL upsert function.

Today I’m going to share with you how to create an Azure SQL Upsert function using PySpark. It can be reused across Azure Synapse workflows with minimal effort and flexibility.

Basic Upsert Logic

  1. Two tables are created, one staging table and one target table
  2. Data is loaded into the staging table
  3. The tables are joined on lookup columns and a conditional delta/watermark column to identify the matches
  4. If the record in the staging table exists in the target table, the record is updated in the target table
  5. If the record in the staging table does not exist in the target table, it is inserted into the target table

Azure SQL Upsert PySpark Function

Functionality

  • An input data frame is written to a staging table on Azure SQL
  • The function accepts a parameter for multiple lookup columns and/or an optional Delta column to join the staging and target tables
  • If a delta column is passed to the function, it will update the record in the target table provided the staging table record is newer than the target table record
  • The function will dynamically read the Dataframe columns to form part of the SQL Merge upsert and insert statements
  • The code will be integrated with Azure Key Vault to securely store keys, passwords, and server names etc

Before writing code, it is critical to understand the Spark Azure SQL Database connector. The connector does not support preUpdate or postUpdate statements following writing to a table. For this reason, we need to write the Dataframe to the staging table and subsequently pass the valid SQL merge statements to the PyODBC connector to execute the upsert.

Prerequisite

  • Create an Azure Key Vault service and grant appropriate access for the Azure Synapse Workspace.
  • Create the following KeyVault Entries which are used in the function to secure sensitive information

AzureSQL-ServerName: e.g. datamastery-sql-srv-prod-01

AzureSQL-DatabaseName: e.g. Datawarehouse

AzureSQL-Admin-Username: e.g. SqlAdmin

AzureSQL-Admin-Password: e.g. a very secure password :) — Password123

  • Azure SQL Target and Staging tables to be created with the correct data types and indexes to improve join performance

NOTE: If you want the staging table to be overwritten each time check the code comments to implement.

Input Parameters

  • df: Input Dataframe
  • azureSqlStagingTable: Name of the Azure SQL Target Table
  • azureSqlDWTable: Name of the Azure SQL Target DW Table
  • lookupColumns: Pipe separated columns that uniquely defines a record in input dataframe e.g. CustomerId or CustomerId|FirstName
  • deltaColumn: Name of watermark column in input dataframe

Code

Please see the comments on each block of code for an explanation.

Conclusion

If you would like a copy please drop me a message on LinkedIn.

I hope you have found this helpful and will save your company time and money getting started on your Azure Analytics Journey. Any thoughts, questions, corrections and suggestions are very welcome :)

Please share on LinkedIn if you found this useful #DataEngineering #AzureSynapse #Spark #PySpark #AzureSQL #SQL #ELT

If you liked this article, here are some other articles you may enjoy:

--

--

Rory McManus
Rory McManus

Written by Rory McManus

Director of Engineering & Owner Data Mastery | Official Azure Synapse Influencer & Databricks Master Sports enthusiast, sweet lover and triathlete!

No responses yet