Late Arriving Data

This document discusses the causes of late arriving data, including technical glitches, delays in data processing, data integration issues, data validation and quality checks, and human error. It also provides examples of real-life causes of late arriving data and options for dealing with it, such as storing it in a staging area, using ingestion time-based partitioning, implementing incremental processing, adjusting data processing schedules, and implementing a “catch-up” process on a separate schedule. The document also includes a scenario and a dbt model for dealing with late arriving data in a manufacturing company.

Causes of Late Arriving Data

Late arriving data can be caused by various factors, some of which are:

  1. Technical glitches: Technical issues such as system crashes, network outages, or hardware failures can cause data to arrive late.
  2. Delays in data processing: Data processing can be delayed due to various reasons such as high system loads, insufficient resources, or software issues.
  3. Data integration issues: When data is collected from multiple sources, integration issues can arise, leading to delays in data arrival.
  4. Data validation and quality checks: In some cases, data may be delayed due to the need for extensive validation and quality checks to ensure its accuracy and completeness.
  5. Human error: Data entry errors or incorrect data formatting can also cause data to arrive late.

It is important to identify the causes of late arriving data to address them effectively and minimise their impact on data analysis and decision-making processes.

Here are some examples of real-life causes of late arriving data:

  1. A network outage at a hospital may cause lab test results to be delayed in reaching a patient’s electronic health record.
  2. A manufacturing company may experience delays in receiving data from their satellite-linked machines due to poor signal or poor weather conditions (e.g water quality sensors)
  3. A financial institution may experience delays in processing credit card transactions due to a software glitch.
  4. A marketing agency may receive data late from a social media platform due to integration issues between their systems. (e.g authentication failures and similar)
  5. A government agency may experience delays in receiving data due to extensive quality checks required to ensure that the data is accurate and complete.
  6. A company’s customer data (new & updates) arriving next day due to on-premise systems where data needs to be uploaded manually to cloud.

An ingestion example

From an ingestion perspective, consider the following data:

event_timestampreceived_timestampevent_typemeasurement_idwater_temperaturewater_quality_leveliron_content
2023-01-01 18:002023-01-03 09:00scheduledsome_uuid7.00.675ppm
2023-01-01 18:012023-01-01 18:05scheduledsome_uuid6.00.57ppm
2023-01-01 18:052023-01-02 09:00scheduledsome_uuid50.1100ppm

Let’s imagine you are running an analytical output on this data that calculates the average water_quality_level per 5 minutes, e.g:

SELECT five_minute_period,AVG(water_quality_level) as avg_quality_level
FROM some_table
GROUP BY five_minute_period

If you ran this at the following timestamps, you’d get different results for the same five minute period:

run_timefive_minute_periodavg_quality_level
2023-01-01 18:102023-01-01 18:000.67
2023-01-02 12:002023-01-01 18:000.385
2023-01-03 12:002023-01-01 18:000.42

the later runtime data is more accurate as it uses more of the data from the actual event period.

Before you decide how to deal with late arriving data, it’s a good idea to define the following things:

  • What defines “late” in the context of this data? Is there a service objective?
  • How late can the data be at maximum? (e.g a day, a month, a year?)
  • What volume of the data is late? (e.g 1%, 10%, 50%)
  • Is there a defined point at which data is “too” late to be considered?
  • What are the accuracy needs of the output - are their regulatory or legal constraints that mean the output has to be a certain level of accuracy and therefore the event to ingestion differential has to be resolved by parties outside of your control?

Options for dealing with late arriving data:

  • Storing late arriving data in a staging area: Late arriving data can be stored in a separate staging area until it can be processed and integrated with the rest of the data. This allows data analysts to access the most recent data as soon as possible, while minimising the impact of late arriving data on the rest of the data warehouse. This would theoretically deal with scenario above by making the result consistent, while the result would technically be inaccurate.

When to use it: You may choose this option if the late arriving data is low in volume, and won’t make a significant difference to your analytical output or decision making process.

  • Using ingestion or arrival time-based partitioning: Data can be partitioned based on time, such as by hour, day, or week. This allows for more efficient processing of late arriving data, as it can be added to the appropriate ingestion partition rather than having to reprocess the entire dataset.

When to use it: If you have a large data stream that you cannot effectively or cost-efficiently reprocess, a chosen window for late arriving data to be considered is hard to define or consistency in your output is more important than the accuracy the late arriving data would provide. To use this method, be sure to engage with your data engineering colleagues as there are idempotency considerations.

  • Implementing incremental processing: Rather than processing the entire dataset every time new data arrives, incremental processing can be used to update only the relevant portions of the data warehouse. This can also help to minimise the impact of late arriving data on the rest of the warehouse and preserves the history of the data.

When to use it: If the relevant data ingestion process is using the Truncate and Load strategy and is either becoming inefficient from a cost perspective, or you need the arrival/ingestion time metadata to support your transformation processes.

  • Adjusting data processing schedules: Depending on the nature of the data and the impact of late arriving data on data analysis and decision-making processes, data processing schedules can be adjusted to accommodate late arriving data. This may involve running more frequent data processing jobs, or adjusting the timing of existing jobs to ensure that the most recent data is available when needed.

When to use it: If you have data which is arriving more regularly than your ingestion or transformation processes, data can “miss the boat” and arrive too late to be considered in your analytical output. To account for this, you may choose to run your ingestion schedule more often (e.g change to a stream instead of a batch strategy) or you may run your transformation schedule on an increased cadence so that data is reprocessed more often for a higher accuracy. Of course each of these comes with their own cost tradeoffs.

  • Implementing a “catch-up” process on a separate schedule: Have a separate process once per appropriate period to fill in the late arriving data in the main source. This means that the past only changes on a predictable schedule.

When to use it: If the data is valuable but predictable consistency is a higher priority than immediate accuracy.

Example scenario 1:

A company has a fleet of IoT devices that send data to a data warehouse for analysis. The data from these devices is crucial for the company’s analytical output. However, due to various factors, some messages may not be received for up to a day after they are sent. To ensure that the most recent data is available for analysis, the company needs to reprocess the data warehouse to include any late arriving messages. The data is too large to reprocess the entire stream cost-effectively.

In this case, an ingestion time based processing approach seems easiest.

dbt Model

The following dbt model demonstrates how you could reprocess data based on the ingestion_time column for the last 2 days as of the runtime:

-- model: my_table
{{
config(
  materialized='incremental',
  incremental_strategy='insert_overwrite'
  partition_by='date_trunc(''day'', ingestion_time)'
)
}}

WITH late_arriving AS (
  SELECT *
  FROM {{ ref('stg_late_events') }}
  {% if is_incremental() %}
	WHERE 
		ingestion_time >= (
			SELECT DATE_SUB(MAX(ingestion_time), INTERVAL 2 DAY) FROM {{ this }}
	)
	{% endif %}

),
on_time AS (
  SELECT *
  FROM {{ ref('stg_on_time_events') }}
  {% if is_incremental() %}
	WHERE 
		ingestion_time >= (
			SELECT DATE_SUB(MAX(ingestion_time), INTERVAL 2 DAY) FROM {{ this }}
	)
	{% endif %}
),
merged AS (
  SELECT *
  FROM on_time
  UNION ALL
  SELECT *
  FROM late_arriving
)
SELECT *
FROM merged

So this replaces the last few days each time - won’t that be confusing to the consumer?

Yes - it can be. Whether this is a problem or not depends on your stakeholder and whether together you can resolve the incidence of late arriving data.

Our current ingestion doesn’t record the ingestion/arrival metadata and our source is partitioned on the actual time of the event. Is there another way around this?

-- model: my_table
{{
  config(
    materialized='incremental',
    incremental_strategy='insert_overwrite',
    partition_by='date_trunc(''day'', event_time)'
  )
}}

with import_events as (
  select *
  from {{ ref('stg_events') }}
)

select *
from import_events
{% if is_incremental %}
  where event_time >= dateadd('day', -7, current_date)
{% endif %}

So this is a bit shorter, because the data is all in the same table - however if data arrives later than the 7 day window in here it will never make it through into your analytical output without a full refresh. In addition you will to have to scan the partitions for all the late arriving window, rather than just the ingestion_time partitions you haven’t processed yet.

Example scenario 2:

A manufacturing company receives messages from its machines regarding the operating temperature, efficiency, and other attributes. The machines occasionally lose their network connection, which means some data arrives late. To ensure that the most recent data is available for analysis, the company needs to reprocess the data warehouse to include any late arriving messages. The data is too large to reprocess the entire stream cost-effectively.

dbt Model:

The following dbt model incrementally loads data to a table, and reprocesses the last week to catch up late arriving data from the late_events table on Mondays during incremental runs:

-- model: manufacturing_data
{{
  config(
    materialized='incremental',
    incremental_strategy='insert_overwrite',
    partition_by='date_trunc(''day'', ingestion_time)'
  )
}}

WITH on_time_data AS (
  SELECT *
  FROM {{ ref('stg_events') }}
),
late_arriving_data AS (
  SELECT *
  FROM {{ ref('stg_late_events') }}
),
merged_data AS (
  SELECT *
  FROM on_time_data
{% if is_incremental() and extract('day', CURRENT_DATE()) = 1 %}
  UNION ALL
  SELECT *
  FROM late_arriving_data
{% endif %}
)
SELECT *
FROM merged_data
{% if is_incremental() and extract('day', CURRENT_DATE()) = 1 %}
WHERE ingestion_time >= DATEADD('day', -7, CURRENT_DATE())
{% elif is_incremental() %}
WHERE ingestion_time >= (SELECT MAX(ingestion_time) FROM {{ this }}
{% endif %}

This model separates the on-time data and late-arriving data, and then merges them for processing. The model is partitioned based on the ingestion_time column, and the late-arriving data is processed if the incremental run is on a Monday.

Summary of approaches:

  • Ingestion time partitioning
    • Pros
      • Can (usually) scan smaller numbers of partitions each day
      • Can account for any “lateness” of data because it doesn’t care when the event actually happened
      • Means that you can rebuild the picture as it was on a given day if necessary
    • Cons
      • Has some idempotency issues (e.g if the data needs reingesting the original ingestion_time may need to be preserved)
      • Means that if data arrives significantly late, the past can change which can be confusing to users
  • Event time partitioning
    • Pros
      • Can be searched by when the event actually occurred
      • Code to process can be shorter in theory
    • Cons
      • The past can change without a specific history marker, so once the data is there it may seem it has “always” been there, making it hard to understand changes
      • You may need to scan more partitions based on your late arriving data window

Summary of recommended approach when dealing with late arriving data

  1. If you have late arriving data, ask your data engineering colleagues to store ingestion/arrival time information
  2. Review the proportionality and lateness of the data, along with the requirements from the stakeholder
  3. Decide on a handling strategy with this information (discard, store separately, reprocess incrementally, adjust schedule to match, catchup process)
  4. Inform your stakeholders of the impact on analytics outputs changing
  5. Implement!
  6. If possible within your tech stack, add tests for proportionality/lateness of your late data so that if the ratios shift(e.g much more data starts being late or the average lateness moves), you can inform your stakeholders.