This document aims to provide the different scenarios while ingesting as well as transforming data in the warehouse.
Data Ingestion
Data ingestion is the process of collecting data from various sources and loading it into a target system such as a data warehouse, where it can be accessed and analyzed. There are several data ingestion approaches available, and choosing the right approach depends on the use case and specific requirements. In this article, we will discuss three common data ingestion approaches: Full Load with history, Incremental Load, and Full Load without history.
Scenario 1: Full Load with history
The full data from the Source table is ingested to new partitions in the Data Lake table.
Overview: The Full Load with history approach involves loading all of the data from the source system into the target system, including the changing data history. This approach is typically used when the source table does not have a column to identify the new or updated records and the ingestion process is unable to perform the change data capture. With this approach, we can ensure that the data lake can track the data changes with the changing time.
Partition: In this scenario, the Data Lake table can be partitioned based on the ingestion time (_PARTITIONTIME) on a DAY granularity.
Example:
On Day 1, 3 records are fetched from Source table, which is ingested to BigQuery Data Lake table partitioned by ingestion time (_PARTITIONTIME).
On Day 2, a new record is added in the Source table. So, all the records from Source table (in this case, a total of 4 records) are ingested to the Data Lake table into a new partition.
Scenario 2: Incremental Load
Ingestion pipeline performs change data capture to ingest the delta records to new partitions in the Data Lake table.
Overview: The Incremental Load approach involves only loading the new or changed data since the last load. This approach is typically used when the ingestion process can identify and capture the changes made to the data in the source system. With this approach, we can eliminate the need of loading the entire data during every ingestion process.
Partition: In this scenario, the Data Lake table can be partitioned:
- Based on the ingestion time partitioning
(_PARTITIONTIME)on a DAY granularity - Based on time-unit column partitioning, if present (For eg: update_dt). This partition column can be used in the downstream data loading as this would ensure better storage of data in tables and work better while backfilling.
Example:
On Day 1, 3 records are fetched from Source table, which is ingested to BigQuery Data Lake table partitioned by ingestion time (_PARTITIONTIME).
On Day 2, a new record is added in the Source table. So, the newly added record is ingested to the Data Lake table into the new partition.
Scenario 3: Full Load without history
In case of micro batches, ingestion pipeline deletes the partition from Data Lake table and full load from Source table is performed to the same partition of the Data Lake table.
Overview: The Full Load without history approach involves loading all of the data from the source system into the target system, but without the changing data history. This approach is typically used when the source table does not have a column to identify the new or updated records and the ingestion process is unable to perform the change data capture. With this approach, data lake tables won’t be able to track the data changes with the changing time
Partition: In this scenario, the Data Lake table can be partitioned:
- Based on the ingestion time partitioning
(_PARTITIONTIME)on a DAY granularity - Based on time-unit column partitioning. This partition column can be used in the downstream data loading as this would ensure better storage of data in tables and work better while backfilling.
Example:
On Day 1 (first ingestion), 3 records are fetched from Source table, which is ingested to BigQuery Data Lake table partitioned by ingestion time (_PARTITIONTIME).
On the same day, a new record is added in the Source table. So, the Data Lake table is truncated and all the records from the Source table are ingested to the Data Lake table into the same day partition.
The above example explains the partition on ‘DAY’ granularity. The principle would be the same even if the partition is on ‘HOUR’/‘MONTH’ granularity.
Data Transformation
This section covers the different scenarios while transforming data using dbt, even though the principles would be applicable to any tool.
Scenario 1: Truncate and Load
In this case, the new data always overwrites the existing data. Truncate and load can be used in the following scenarios:
- when you don’t worry about cost and time
- when you do not want history of previous data
Example: dim_another_example.sql in dbt-elt-template
Scenario 2: Incremental Load - Insert Overwrite
In this case, current as well as historical data is maintained in the table. Insert Overwrite incremental load can be used in the following scenarios:
- when cost and processing time are key considerations
- when you want history of previous data
- when the volume of table is high
- to reduce the build time as this transforms only new records
Insert Overwrite is a delete+insert strategy where it deletes the entire partition and inserts the records. This makes it more cost effective than the merge strategy as it avoids full table scan. However insert_overwrite can lead to data loss if not implemented correctly. We have to ensure that the Data Lake tables have the full data of the partition which we are planning to delete as part of the insert_overwrite strategy.
Late arriving data can be handled by identifying the max partition from target table and further fetching the data which is ‘greater than or equal to’ the identified max partition from the source table
Example: example.sql in dbt-elt-template.
load_based_filter_date() and date_variable(filter_name) macros from dbt_elt_template can be used while configuring ‘insert_overwrite’ incremental models to determine the incremental data to be fetched from the source table
As per the below example, we are creating an incremental table - “example” based on “insert_overwrite” incremental strategy. In this model, we are using load_based_filter_date() and date_variable(filter_name) macros
- load_based_filter_date() macro will call date_variable(‘historical’) for the initial load and calls date_variable(‘incremental’) for all the incremental loads.
- date_variable(filter_name): **date_variable(‘historical’) will return ‘1990-01-01’ for the initial load and date_variable(‘incremental’) will return _dbt_max_partition for the incremental loads.
_dbt_max_partition is a bigquery scripting variable provided by dbt to fetch the maximum partition in the target/destination table
Scenario 3: Incremental Load - Merge
Merge statements select rows from one or more sources for update or insertion into a target table. We can define a unique_key as part of the dbt configuration. A unique_key enables updating the existing records instead of just appending the rows to the destination/target table. If an updated record arrives for an existing unique_key, the updated record can replace the current record instead of being appended to the destination/target table.
Merge incremental load can be used in the following scenarios:
- when you don’t want duplicates in your table
- when you do not want history of previous data
- when the volume of target table is not high as this does a full table scan
Merge strategy updates the existing records and inserts new records in the table. However this would result in a full table scan which is not cost effective and when the data grows, this would result in a poor performance compared to insert_overwrite.
Late arriving data can be handled by identifying the max partition from target table and further fetching the data which is ‘greater than or equal to’ the identified max partition from the source table
Example: dim_example_merge.sql
max_partition(column_name) macro can be used while configuring ‘merge’ incremental models to determine the incremental data to be fetched from the source table.
As per the below example, we are creating an incremental table - dim_example_merge based on “merge” incremental strategy.
- max_partition(source_partition_timestamp) macro will execute and return the result of the query - “select max(source_partition_timestamp) from dim_example_merge”.
Thus the value of the “tgt_max_partition” variable in the above example would be assigned the maximum partition from the target/destination table - “dim_example_merge”. The source data can be filtered based on the value of the “tgt_max_parttion” variable
Incremental Load Based on Data Ingestion Scenario
This section explains how the tables in the Data warehouse can be incrementally loaded (insert_overwrite), based on different Data Ingestion scenarios.
| Data Ingestion Scenario | Incremental Load - ‘insert_overwrite’ |
|---|
| Scenario 1 - Full Load with history In this scenario of Data ingestion, the full source data would be available in every partition of the Data Lake table | insert_overwrite’ incremental strategy can be used for the incremental load of Data Warehouse tables. However, instead of delta, every partition in the Data Lake table would have the full data from source. Hence, need to derive the delta by looking at yesterday’s partition to see what has changed --- DATE_SUB(DATE(_dbt_max_partition), INTERVAL 1 DAY) | | Scenario 2- Incremental Load In this scenario of Data ingestion, the ingestion pipeline performs change data capture and ingests delta records to new partitions in the Data Lake table | insert_overwrite’ incremental strategy can be used for the incremental load of Data Warehouse tables. Every partition in the Data Lake table would have the delta records from source. Hence, we can filter the delta records from the Data Lake tables based on the _dbt_max_partition variable --- IFNULL(DATE(_dbt_max_partition), ‘1990-01-01’) (Refer Scenario 2 in Data Transformation) | | Scenario 3 - Full Load without history In this scenario of Data ingestion, the ingestion pipeline deletes the data and ingest the full source data in the same partition of the Data Lake table | insert_overwrite’ incremental strategy can be used for the incremental load of Data Warehouse tables. However, the partition in the Data Lake tables would be always overwritten with the latest data from the source. Hence, we can calculate FARM_FINGERPRINT of all columns from the Data Lake table and compare it with the FARM_FINGERPRINT of all columns from the Target table before loading data to the Target table |