Skip to main content

Data Quality Module

The data quality module lives at ingestion/src/metadata/data_quality/. It runs test cases against database tables (or DataFrames) and publishes results back to the OpenMetadata server. This guide walks through the architecture top-down: from how a workflow is triggered, through the execution pipeline, down to individual validators.

Directory Layout

data_quality/
├── api/
│   └── models.py                  # DTOs: TableAndTests, TestCaseResults, TestCaseResultResponse
├── builders/
│   └── validator_builder.py       # Factory that creates validator instances
├── interface/                     # Database abstraction layer
│   ├── test_suite_interface.py    # Abstract base
│   ├── sqlalchemy/                # SQL databases (+ Snowflake, Databricks overrides)
│   └── pandas/                    # Datalake / DataFrame-based
├── processor/
│   └── test_case_runner.py        # Workflow processor — orchestrates test execution
├── runner/
│   ├── core.py                    # DataTestsRunner — runs a single test via interface
│   └── base_test_suite_source.py  # Creates the interface + sampler for a table
├── source/
│   └── test_suite.py              # Fetches tables and test cases from OM API
└── validations/                   # All validation logic
    ├── base_test_handler.py       # Abstract BaseTestValidator (template method)
    ├── models.py                  # Runtime parameter models
    ├── utils.py                   # Helpers
    ├── impact_score.py            # Dimensional result scoring
    ├── checkers/                  # Condition checkers (between bounds, etc.)
    ├── mixins/
    │   ├── protocols.py           # HasValidatorContext protocol
    │   ├── sqa_validator_mixin.py # SQLAlchemy query helpers
    │   └── pandas_validator_mixin.py
    ├── runtime_param_setter/      # Resolve dynamic parameters at execution time
    ├── column/                    # Column-level validators
    │   ├── base/                  #   Abstract implementations
    │   ├── sqlalchemy/            #   SQL implementations
    │   └── pandas/                #   DataFrame implementations
    └── table/                     # Table-level validators (same structure)

Workflow Pipeline

Data quality follows the standard Source → Processor → Sink pattern defined in workflow/data_quality.py:
┌──────────────────────────────────────────────────────────┐
│  TestSuiteSource                                         │
│  source/test_suite.py                                    │
│                                                          │
│  Fetches from OM API:                                    │
│  • Table entity                                          │
│  • Database service connection                           │
│  • Test cases linked to the test suite                   │
│                                                          │
│  Yields: TableAndTests                                   │
└──────────────────────┬───────────────────────────────────┘


┌──────────────────────────────────────────────────────────┐
│  TestCaseRunner                                          │
│  processor/test_case_runner.py                           │
│                                                          │
│  For each TableAndTests:                                 │
│  1. Filter to OM platform tests only                     │
│  2. Remove disabled and incompatible tests               │
│  3. Run each test case → collect results                 │
│                                                          │
│  Yields: TestCaseResults                                 │
└──────────────────────┬───────────────────────────────────┘


┌──────────────────────────────────────────────────────────┐
│  MetadataRestSink                                        │
│  ingestion/sink/metadata_rest.py                         │
│                                                          │
│  For each TestCaseResultResponse:                        │
│    POST /api/v1/dataQuality/testCases/{id}/testCaseResult│
└──────────────────────────────────────────────────────────┘

Test Execution Call Chain

This is the most important sequence to understand. When TestCaseRunner processes a test case, here is exactly what happens:
# 1. PROCESSOR calls the runner
TestCaseRunner._run_test_case(test_case)
    └── DataTestsRunner.run_and_handle(test_case)        # runner/core.py

# 2. RUNNER delegates to the interface
        └── TestSuiteInterface.run_test_case(test_case)   # interface/test_suite_interface.py

# 3. INTERFACE resolves runtime params and builds the validator
            ├── RuntimeParameterSetterFactory.get_runtime_param_setters(test_def_fqn)
# Adds dynamic params (e.g., table references for tableDiff)

            ├── _get_validator_builder(test_case, entity_type)
            │     ├── Fetch TestDefinition from OM API
            │     ├── Read validatorClass field (e.g., "columnValuesToBeNotNull")
            │     └── return ValidatorBuilder(runner, test_case, test_def)

            ├── validator_builder.set_runtime_params(param_setters)

            └── validator = validator_builder.validator
                  # Dynamically imports the correct validator class

# 4. VALIDATOR executes the test
                  └── validator.run_validation()           # validations/base_test_handler.py
                        ├── _run_validation()              # Subclass: query + evaluate
                        ├── _run_dimensional_validation()  # Optional: GROUP BY analysis
                        └── return TestCaseResult

Data Model

Three entities form the core model. All are defined as JSON Schemas in openmetadata-spec/ and auto-generated into Python models:
TestDefinition
  ├── name: str                    # e.g., "columnValuesToBeNotNull"
  ├── entityType: COLUMN | TABLE
  ├── validatorClass: str          # Maps to Python class name
  ├── parameters: List[ParamDef]   # Parameter schema
  └── supportedDataTypes: List     # Column types this test applies to

TestCase
  ├── testDefinition: FK           # Points to a TestDefinition
  ├── entityLink: str              # Table or column being tested
  ├── parameterValues: List        # Actual parameter values
  ├── dimensionColumns: List       # Optional: columns for GROUP BY analysis
  └── topDimensions: int           # Max dimension groups to report

TestCaseResult
  ├── testCaseStatus: Success | Failed | Aborted
  ├── result: str                  # Human-readable message
  ├── testResultValue: List        # Metric name → value pairs
  ├── passedRows / failedRows
  └── dimensionResults: List       # Per-group results (optional)

Validator Architecture

Validators use a template method pattern with mixins for database-specific logic.

Class Hierarchy

BaseTestValidator                          # validations/base_test_handler.py
│   run_validation()                       #   Template method (calls hooks below)
│   _run_validation()                      #   Abstract — implement test logic
│   _evaluate_test_condition()             #   Abstract — pass/fail check
│   _format_result_message()               #   Abstract — human-readable output
│   _run_dimensional_validation()          #   Dimensional GROUP BY logic
│   _execute_dimensional_validation()      #   Abstract — build dimension query

├── Base implementations                   # validations/column/base/ or table/base/
│   e.g., BaseColumnValuesToBeNotNullValidator
│         Implements _run_validation, _evaluate_test_condition, _format_result_message
│         Leaves metric computation abstract

├── SQL implementations                    # validations/column/sqlalchemy/
│   e.g., ColumnValuesToBeNotNullValidator(Base + SQAValidatorMixin)
│         SQAValidatorMixin provides: get_column(), run_query_results()

└── Pandas implementations                 # validations/column/pandas/
    e.g., PandasColumnValuesToBeNotNullValidator(Base + PandasValidatorMixin)
          PandasValidatorMixin provides: get_column(), run_dataframe_results()

How Validators Are Discovered

Validators are dynamically imported at runtime based on the validatorClass field from the TestDefinition:
# ValidatorBuilder calls import_test_case_class() which resolves:
#   entity_type = "column" or "table"
#   runner_type = "sqlalchemy" or "pandas"
#   validator_class = from TestDefinition.validatorClass

module_path = f"metadata.data_quality.validations.{entity_type}.{runner_type}.{validator_class}"
# Example: metadata.data_quality.validations.column.sqlalchemy.columnValuesToBeNotNull
This means adding a new test is purely additive — no registry to update, no imports to add. Just place the file in the right directory.

Dimensional Analysis

When a test case has dimensionColumns configured, the validator runs the test per group using GROUP BY:
For dimensionColumns = ["region", "product"]:

1. Run GROUP BY region  → compute metrics per region value
2. Run GROUP BY product → compute metrics per product value
3. For each group:
   ├── Evaluate pass/fail
   ├── Calculate impact score
   └── Create DimensionResult
4. Keep top N groups by impact score, aggregate rest as "Others"
Impact score (validations/impact_score.py) ranks which groups matter most:
impact = (failure_rate² × volume_factor × sample_weight) / 1.5
  • failure_rate² — emphasizes high failure percentages
  • volume_factor — tiered by row count (0.25 for <10 rows → 1.5 for ≥100k)
  • sample_weight — credibility discount for small samples

Runtime Parameter Setters

Some tests need parameters that can only be resolved at execution time. The RuntimeParameterSetterFactory (validations/runtime_param_setter/param_setter_factory.py) maps test definition names to setter classes:
TestSetterWhat It Does
tableDiffTableDiffParamsSetterFetches the comparison table, resolves key columns from constraints
tableCustomSQLQueryTableCustomSQLQueryParamsSetterCompiles SQL with runtime table references
*RuleLibrarySqlExpressionRuleLibrarySqlExpressionParamsSetterBuilds SQL from the rule library

Adding a New Test

1

Create the base validator

Create validations/column/base/myNewTest.py (or table/base/ for table-level tests).Extend BaseTestValidator and implement:
  • _run_validation() — core test logic
  • _evaluate_test_condition() — return pass/fail
  • _format_result_message() — human-readable result string
2

Create the SQLAlchemy implementation

Create validations/column/sqlalchemy/myNewTest.py.Inherit from your base class and SQAValidatorMixin. The mixin gives you get_column(), run_query_results(), and _execute_dimensional_validation().
3

Create the Pandas implementation

Create validations/column/pandas/myNewTest.py.Inherit from your base class and PandasValidatorMixin.
4

Register the TestDefinition

Create a TestDefinition in OpenMetadata with:
  • validatorClass matching your file name (e.g., "myNewTestValidator")
  • entityType set to COLUMN or TABLE
  • Parameter definitions for any inputs your test needs
No imports or registries need to be updated. The ValidatorBuilder discovers your validator automatically from the file path convention.

Key Design Patterns

PatternWhereWhy
Template MethodBaseTestValidator.run_validation()Shared flow (run → evaluate → format → dimensional) with subclass hooks
Strategy via MixinsSQAValidatorMixin, PandasValidatorMixinSame test logic, different data access (SQL vs DataFrame)
FactoryValidatorBuilder, RuntimeParameterSetterFactoryDecouple creation from usage
Dynamic Importimport_test_case_class()Zero-config validator discovery
ProtocolHasValidatorContextStructural typing for validator dependencies

Key Files Quick Reference

What you want to doStart here
Understand the workflow pipelineworkflow/data_quality.py
See how test cases are fetchedsource/test_suite.py
See how tests are filtered and dispatchedprocessor/test_case_runner.py
Understand validator creationbuilders/validator_builder.py
Read the base validator logicvalidations/base_test_handler.py
See a concrete SQL validatorvalidations/column/sqlalchemy/columnValuesToBeNotNull.py
See a concrete Pandas validatorvalidations/column/pandas/columnValuesToBeNotNull.py
Understand dimensional analysisvalidations/base_test_handler.py_run_dimensional_validation()
Add runtime parameter resolutionvalidations/runtime_param_setter/param_setter_factory.py
See how results are publishedingestion/sink/metadata_rest.pywrite_test_case_result_list()
All paths above are relative to ingestion/src/metadata/. For example, source/test_suite.py means ingestion/src/metadata/data_quality/source/test_suite.py.