Building a Parameterized Full & Incremental Load Pipeline in Azure Data Factory

Building a Parameterized Full & Incremental Load Pipeline in Azure Data Factory

In today’s dynamic data landscape, building a dynamic reusable ETL pipelines is essential. In this blog post, we will see how to build a parameterized Azure Data Factory (ADF) pipeline that supports both full and incremental loads using a metadata-driven approach. This solution leverages dynamic SQL, watermark control tables, and stored procedures to seamlessly manage multiple source–destination pairs.


1. Why a Metadata-Driven, Parameterized Pipeline?

Managing full loads and incremental loads for multiple tables can become complex quickly. By designing a parameterized pipeline, you can:

  • Dynamically drive table mappings: Store source/destination details, primary keys, and watermark column names in a metadata table.

  • Handle full and incremental loads: Use a pipeline parameter (e.g., LoadType) to determine whether to load all data or just new/changed data.

  • Simplify maintenance: A single, generic pipeline can serve multiple data sources by reading configuration details from your metadata table.

  • Enhance error handling: Centralize error logging and alerts so you’re notified immediately when something goes wrong.


2. Database Setup

Before configuring ADF, set up the necessary SQL objects. This includes your metadata table, watermark control table, source and destination tables, a stored procedure for updating watermark values and a stored procedure for capturing the error log.

2.1 Metadata Table (_FI)

The metadata table (MetadataTable_FI) stores table mappings and watermark configuration details.

-- Drop existing version if needed (for testing)
IF OBJECT_ID('dbo.MetadataTable_FI', 'U') IS NOT NULL
    DROP TABLE dbo.MetadataTable_FI;
GO

CREATE TABLE dbo.MetadataTable_FI (
    ID INT IDENTITY(1,1) PRIMARY KEY,
    s_schema NVARCHAR(50) NOT NULL,
    s_table NVARCHAR(50) NOT NULL,
    s_pk NVARCHAR(50) NOT NULL,
    t_schema NVARCHAR(50) NOT NULL,
    t_table NVARCHAR(50) NOT NULL,
    t_pk NVARCHAR(50) NOT NULL,
    s_watermark_column NVARCHAR(50) NOT NULL,  -
    s_watermark_dataType NVARCHAR(20) NOT NULL  
);
GO

2.2 Watermark Control Table (_FI)

This table (WatermarkControl_FI) tracks the last loaded watermark value for each source table.

-- Drop existing version if needed
IF OBJECT_ID('dbo.WatermarkControl_FI', 'U') IS NOT NULL
    DROP TABLE dbo.WatermarkControl_FI;
GO

CREATE TABLE dbo.WatermarkControl_FI (
    s_schema NVARCHAR(50) NOT NULL,
    s_table NVARCHAR(50) NOT NULL,
    LastWatermarkValue DATETIME2 NOT NULL,  -- Stored as NVARCHAR to support datetime or numeric values
    PRIMARY KEY (s_schema, s_table)
);
GO

-- Insert initial watermark values (a very early date so that first run loads everything)
INSERT INTO dbo.WatermarkControl_FI (s_schema, s_table, LastWatermarkValue)
VALUES
  ('dbo', 'CustomerSource_fi', '1900-01-01'),
  ('sales', 'OrderSource_fi', '1900-01-01');
GO

2.3 Sample Source & Destination Tables

I created sample tables for customers and error log table all table using a naming convention ending in _FI denoting Full/Incremental.

Customer Tables (dbo)

CREATE TABLE dbo.CustomerSource_fi  (
    CustomerID INT PRIMARY KEY,
    CustomerName NVARCHAR(100),
    CustomerAddress NVARCHAR(200),
    LastUpdated DATETIME2  -- Watermark column for incremental loads
);



INSERT INTO dbo.CustomerSource_fi  (CustomerID, CustomerName, CustomerAddress, LastUpdated)
VALUES
    (1, 'John Doe', '123 Main St', GETDATE()),
    (2, 'Jane Smith', '456 Elm St', GETDATE());

CREATE TABLE dbo.CustomerDest_fi  (
    CustomerID INT PRIMARY KEY,
    CustomerName NVARCHAR(100),
    CustomerAddress NVARCHAR(200),
    LastUpdated DATETIME2  -- You can use this column to store the last update time as well
);

Error log table (dbo)

-- Drop the ErrorLog_FI table if it exists (for testing purposes)
IF OBJECT_ID('dbo.ErrorLog_FI', 'U') IS NOT NULL
    DROP TABLE dbo.ErrorLog_FI;
GO

CREATE TABLE dbo.ErrorLog_FI (
    ErrorID INT IDENTITY(1,1) PRIMARY KEY,
    ActivityName NVARCHAR(100) NOT NULL,
    ErrorMessage NVARCHAR(4000) NOT NULL,
    ErrorTime DATETIME NOT NULL DEFAULT(GETDATE())
);
GO

3. Stored Procedures for Watermark Updates and Error Logging

3.1 Stored Procedure to Log Errors (usp_LogError_FI)

This procedure inserts error details into an error log table for later troubleshooting.

CREATE PROCEDURE dbo.usp_LogError_FI
    @ActivityName NVARCHAR(100),
    @ErrorMessage NVARCHAR(4000)
AS
BEGIN
    SET NOCOUNT ON;

    INSERT INTO dbo.ErrorLog_FI (ActivityName, ErrorMessage)
    VALUES (@ActivityName, @ErrorMessage);
END

3.2 Stored Procedure to Update the Watermark (usp_UpdateWatermarkControl_FI)

This procedure dynamically retrieves the new maximum watermark from the source and updates the control table. In its error handler, it logs errors via usp_LogError_FI.

CREATE PROCEDURE dbo.usp_UpdateWatermarkControl_FI
    @s_schema NVARCHAR(50),
    @s_table NVARCHAR(50),
    @s_watermark_column  NVARCHAR(50)
AS
BEGIN
    SET NOCOUNT ON;

    DECLARE @sql NVARCHAR(MAX);
    DECLARE @NewWatermark DATETIME2;
    DECLARE @NewWatermarkOUT DATETIME2;

    BEGIN TRY
        BEGIN TRANSACTION;

-- Build dynamic SQL to get the new maximum watermark value from the source table.
        SET @sql = N'SELECT @NewWatermarkOUT = MAX(' + QUOTENAME(@s_watermark_column) + N') ' +
           N'FROM ' + QUOTENAME(@s_schema) + N'.' + QUOTENAME(@s_table);

-- Execute the dynamic SQL and capture the output in @NewWatermark.
    EXEC sp_executesql 
     @sql, 
     N'@NewWatermarkOUT DATETIME2 OUTPUT', 
     @NewWatermarkOUT = @NewWatermark OUTPUT;

        -- Update the watermark control table (WatermarkControl_FI) with the new watermark value.
        IF EXISTS (
            SELECT 1 
            FROM dbo.WatermarkControl_FI
            WHERE s_schema = @s_schema AND s_table = @s_table
        )
        BEGIN
            UPDATE dbo.WatermarkControl_FI
            SET LastWatermarkValue = @NewWatermark
            WHERE s_schema = @s_schema AND s_table = @s_table;
        END
        ELSE
        BEGIN
            INSERT INTO dbo.WatermarkControl_FI (s_schema, s_table, LastWatermarkValue)
            VALUES (@s_schema, @s_table, @NewWatermark);
        END

        COMMIT TRANSACTION;
    END TRY
    BEGIN CATCH
        IF @@TRANCOUNT > 0
            ROLLBACK TRANSACTION;

        DECLARE @ErrorMessage NVARCHAR(4000) = ERROR_MESSAGE();
        DECLARE @ErrorSeverity INT = ERROR_SEVERITY();
        DECLARE @ErrorState INT = ERROR_STATE();

        -- Log the error details using the error logging stored procedure.
        EXEC dbo.usp_LogError_FI
             @ActivityName = 'usp_UpdateWatermarkControl_FI',
             @ErrorMessage = @ErrorMessage;

        -- Re-throw the error to propagate it.
        RAISERROR(@ErrorMessage, @ErrorSeverity, @ErrorState);
    END CATCH
END

4. Building the ADF Pipeline

The ADF pipeline is parameterized and driven by metadata. Here’s an overview of the key components:

4.1 Parameterized Datasets

  • Source Dataset:
    Create ParameterizedSourceDataset with parameters for SchemaName and TableName.

  • Sink Dataset:
    Create ParameterizedSinkDataset with parameters for SchemaName and TableName.

4.2 Pipeline Activities

a. Lookup Activity ("LookupMetadata_FI")

  • Reads the metadata from MetadataTable_FI.

b. ForEach Activity ("ForEachMetadataTable")

  • Iterates through each row of metadata.

Items pipeline expression builder

Inside the ForEach loop, add:

i. Lookup Watermark Activity ("LookupWatermark_FI")
  • Retrieves the last watermark value from WatermarkControl_FI for the current source.

Query:

@concat(
    'SELECT CAST(LastWatermarkValue AS DATETIME2) AS LastWatermarkValue FROM dbo.WatermarkControl_FI WHERE s_schema = ''',
    item().s_schema,
    ''' AND s_table = ''',
    item().s_table,
    ''''
)
ii. Copy Activity ("CopyData_FI")
  • Uses a dynamic source query to perform a full load or incremental load based on the pipeline parameter LoadType.

Query:

@if(
   equals(pipeline().parameters.LoadType, 'Full'),
   concat('SELECT * FROM ', item().s_schema, '.', item().s_table),
   concat(
       'SELECT * FROM ', item().s_schema, '.', item().s_table,
       ' WHERE ', item().s_watermark_column, ' > CAST(''',
       activity('LookupWatermark_FI').output.firstRow.LastWatermarkValue, ''' AS DATETIME2)',
       ' AND ', item().s_watermark_column, ' <= (SELECT MAX(', item().s_watermark_column, ') FROM ', item().s_schema, '.', item().s_table, ')'
   )
)

Pre Copy Script

@{if(equals(pipeline().parameters.LoadType, 'Full'),concat('TRUNCATE TABLE ', item().t_schema, '.', item().t_table),'')
}

Explanation:

  • For a full load(passed via a pipeline parameter named LoadType with value Full) , the query returns all rows.

  • For an incremental load(passed via a pipeline parameter named LoadType with value Incremental), it filters rows based on the watermark column value being greater than the stored watermark and less than or equal to the maximum value in the source table.

iii. Execute Stored Procedure ("UpdateWatermark_FI")
  • Calls usp_UpdateWatermarkControl_FI to update the watermark after an incremental load.

4.3 Pipeline Run Results

4.4 Future Enhancements: Robust Global Error Handling

  • Activity-Level Retry Policies: Set retries (e.g., 3 retries, 30-second intervals) on critical activities.

  • On-Failure Branches: Connect failure outputs to an Execute Stored Procedure activity that calls usp_LogError_FI to log errors.

  • Azure Monitor Alerts:

    • Enable ADF diagnostic logs and integrated them with a Log Analytics workspace.

    • Create alert rules in Azure Monitor to trigger email or SMS notifications when pipeline failures occur.


5. Conclusion

By implementing a parameterized, metadata-driven ADF pipeline, you can easily handle both full and incremental loads across multiple source–destination pairs. This approach not only streamlines your data integration process but also improves maintainability.