Skip to main content

Adding Stateful Ingestion to a Source

Currently, datahub supports the Stale Metadata Removal and the Redunant Run Elimination use-cases on top of the more generic stateful ingestion capability available for the sources. This document describes how to add support for these two use-cases to new sources.

Adding Stale Metadata Removal to a Source

Adding the stale metadata removal use-case to a new source involves

  1. Defining the new checkpoint state that stores the list of entities emitted from a specific ingestion run.
  2. Modifying the SourceConfig associated with the source to use a custom stateful_ingestion config param.
  3. Modifying the SourceReport associated with the source to include soft-deleted entities in the report.
  4. Modifying the Source to
    1. Instantiate the StaleEntityRemovalHandler object
    2. Add entities from the current run to the state object
    3. Emit stale metadata removal workunits

The datahub.ingestion.source.state.stale_entity_removal_handler module provides the supporting infrastructure for all the steps described above and substantially simplifies the implementation on the source side. Below is a detailed explanation of each of these steps along with examples.

1. Defining the checkpoint state for the source.

The checkpoint state class is responsible for tracking the entities emitted from each ingestion run. If none of the existing states do not meet the needs of the new source, a new checkpoint state must be created. The state must inherit from the StaleEntityCheckpointStateBase abstract class shown below, and implement each of the abstract methods.

class StaleEntityCheckpointStateBase(CheckpointStateBase, ABC, Generic[Derived]):
"""
Defines the abstract interface for the checkpoint states that are used for stale entity removal.
Examples include sql_common state for tracking table and & view urns,
dbt that tracks node & assertion urns, kafka state tracking topic urns.
"""

@classmethod
@abstractmethod
def get_supported_types(cls) -> List[str]:
pass

@abstractmethod
def add_checkpoint_urn(self, type: str, urn: str) -> None:
"""
Adds an urn into the list used for tracking the type.
:param type: The type of the urn such as a 'table', 'view',
'node', 'topic', 'assertion' that the concrete sub-class understands.
:param urn: The urn string
:return: None.
"""
pass

@abstractmethod
def get_urns_not_in(
self, type: str, other_checkpoint_state: Derived
) -> Iterable[str]:
"""
Gets the urns present in this checkpoint but not the other_checkpoint for the given type.
:param type: The type of the urn such as a 'table', 'view',
'node', 'topic', 'assertion' that the concrete sub-class understands.
:param other_checkpoint_state: the checkpoint state to compute the urn set difference against.
:return: an iterable to the set of urns present in this checkpoing state but not in the other_checkpoint.
"""
pass

Examples:

  1. KafkaCheckpointState.
  2. DbtCheckpointState
  3. BaseSQLAlchemyCheckpointState

2. Modifying the SourceConfig

The source's config must inherit from StatefulIngestionConfigBase, and should declare a field named stateful_ingestion of type Optional[StatefulStaleMetadataRemovalConfig].

Examples:

  1. The KafkaSourceConfig
from typing import List, Optional
import pydantic
from datahub.ingestion.source.state.stale_entity_removal_handler import StatefulStaleMetadataRemovalConfig
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)

class KafkaSourceConfig(StatefulIngestionConfigBase):
# ...<other config params>...

stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
  1. The DBTStatefulIngestionConfig and the DBTConfig.

3. Modifying the SourceReport

The report class of the source should inherit from StaleEntityRemovalSourceReport whose definition is shown below.

from typing import List
from dataclasses import dataclass, field
from datahub.ingestion.source.state.stateful_ingestion_base import StatefulIngestionReport
@dataclass
class StaleEntityRemovalSourceReport(StatefulIngestionReport):
soft_deleted_stale_entities: List[str] = field(default_factory=list)

def report_stale_entity_soft_deleted(self, urn: str) -> None:
self.soft_deleted_stale_entities.append(urn)

Examples:

  1. The KafkaSourceReport
from dataclasses import dataclass
from datahub.ingestion.source.state.stale_entity_removal_handler import StaleEntityRemovalSourceReport
@dataclass
class KafkaSourceReport(StaleEntityRemovalSourceReport):
# <rest of kafka source report specific impl
  1. DBTSourceReport

4. Modifying the Source

The source must inherit from StatefulIngestionSourceBase.

4.1 Instantiate StaleEntityRemovalHandler in the __init__ method of the source.

Examples:

  1. The KafkaSource
from datahub.ingestion.source.state.stateful_ingestion_base import StatefulIngestionSourceBase
from datahub.ingestion.source.state.stale_entity_removal_handler import StaleEntityRemovalHandler
class KafkaSource(StatefulIngestionSourceBase):
def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext):
# <Rest of KafkaSource initialization>
# Create and register the stateful ingestion stale entity removal handler.
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
source=self,
config=self.source_config,
state_type_class=KafkaCheckpointState,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)

4.2 Adding entities from current run to the state object.

Use the add_entity_to_state method of the StaleEntityRemovalHandler.

Examples:

# Kafka
self.stale_entity_removal_handler.add_entity_to_state(
type="topic",
urn=topic_urn,)

# DBT
self.stale_entity_removal_handler.add_entity_to_state(
type="dataset",
urn=node_datahub_urn
)
self.stale_entity_removal_handler.add_entity_to_state(
type="assertion",
urn=node_datahub_urn,
)

4.3 Emitting soft-delete workunits associated with the stale entities.

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
#
# Emit the rest of the workunits for the source.
# NOTE: Populating the current state happens during the execution of this code.
# ...

# Clean up stale entities at the end
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()

Adding Redundant Run Elimination to a Source

This use-case applies to the sources that drive ingestion by querying logs over a specified duration via the config(such as snowflake usage, bigquery usage etc.). It typically involves expensive and long-running queries. To add redundant run elimination to a new source to prevent the expensive reruns for the same time range(potentially due to a user error or a scheduler malfunction), the following steps are required.

  1. Update the SourceConfig
  2. Update the SourceReport
  3. Modify the Source to
    1. Instantiate the RedundantRunSkipHandler object.
    2. Check if the current run should be skipped.
    3. Update the state for the current run(start & end times).

The datahub.ingestion.source.state.redundant_run_skip_handler modules provides the supporting infrastructure required for all the steps described above.

NOTE: The handler currently uses a simple state, the BaseUsageCheckpointState, across all sources it supports (unlike the StaleEntityRemovalHandler).

1. Modifying the SourceConfig

The SourceConfig must inherit from the StatefulRedundantRunSkipConfig class.

Examples:

  1. Snowflake Usage
from datahub.ingestion.source.state.redundant_run_skip_handler import (
StatefulRedundantRunSkipConfig,
)
class SnowflakeStatefulIngestionConfig(StatefulRedundantRunSkipConfig):
pass

2. Modifying the SourceReport

The SourceReport must inherit from the StatefulIngestionReport class. Examples:

  1. Snowflake Usage
@dataclass
class SnowflakeUsageReport(BaseSnowflakeReport, StatefulIngestionReport):
# <members specific to snowflake usage report>

3. Modifying the Source

The source must inherit from StatefulIngestionSourceBase.

3.1 Instantiate RedundantRunSkipHandler in the __init__ method of the source.

The source should instantiate an instance of the RedundantRunSkipHandler in its __init__ method. Examples: Snowflake Usage

from datahub.ingestion.source.state.redundant_run_skip_handler import (
RedundantRunSkipHandler,
)
class SnowflakeUsageSource(StatefulIngestionSourceBase):

def __init__(self, config: SnowflakeUsageConfig, ctx: PipelineContext):
super(SnowflakeUsageSource, self).__init__(config, ctx)
self.config: SnowflakeUsageConfig = config
self.report: SnowflakeUsageReport = SnowflakeUsageReport()
# Create and register the stateful ingestion use-case handlers.
self.redundant_run_skip_handler = RedundantRunSkipHandler(
source=self,
config=self.config,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)

3.2 Checking if the current run should be skipped.

The sources can query if the current run should be skipped using should_skip_this_run method of RedundantRunSkipHandler. This should done from the get_workunits method, before doing any other work.

Example code:

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
# Skip a redundant run
if self.redundant_run_skip_handler.should_skip_this_run(
cur_start_time_millis=datetime_to_ts_millis(self.config.start_time)
):
return
# Generate the workunits.

3.3 Updating the state for the current run.

The source should use the update_state method of RedundantRunSkipHandler to update the current run's state if the run has not been skipped. This step can be performed in the get_workunits if the run has not been skipped.

Example code:

    def get_workunits(self) -> Iterable[MetadataWorkUnit]:
# Skip a redundant run
if self.redundant_run_skip_handler.should_skip_this_run(
cur_start_time_millis=datetime_to_ts_millis(self.config.start_time)
):
return

# Generate the workunits.
# <code for generating the workunits>
# Update checkpoint state for this run.
self.redundant_run_skip_handler.update_state(
start_time_millis=datetime_to_ts_millis(self.config.start_time),
end_time_millis=datetime_to_ts_millis(self.config.end_time),
)