Data Quality Module
The data quality module lives at ingestion/src/metadata/data_quality/. It runs test cases against database tables (or DataFrames) and publishes results back to the OpenMetadata server.
This guide walks through the architecture top-down: from how a workflow is triggered, through the execution pipeline, down to individual validators.
Directory Layout
data_quality/
├── api/
│ └── models.py # DTOs: TableAndTests, TestCaseResults, TestCaseResultResponse
├── builders/
│ └── validator_builder.py # Factory that creates validator instances
├── interface/ # Database abstraction layer
│ ├── test_suite_interface.py # Abstract base
│ ├── sqlalchemy/ # SQL databases (+ Snowflake, Databricks overrides)
│ └── pandas/ # Datalake / DataFrame-based
├── processor/
│ └── test_case_runner.py # Workflow processor — orchestrates test execution
├── runner/
│ ├── core.py # DataTestsRunner — runs a single test via interface
│ └── base_test_suite_source.py # Creates the interface + sampler for a table
├── source/
│ └── test_suite.py # Fetches tables and test cases from OM API
└── validations/ # All validation logic
├── base_test_handler.py # Abstract BaseTestValidator (template method)
├── models.py # Runtime parameter models
├── utils.py # Helpers
├── impact_score.py # Dimensional result scoring
├── checkers/ # Condition checkers (between bounds, etc.)
├── mixins/
│ ├── protocols.py # HasValidatorContext protocol
│ ├── sqa_validator_mixin.py # SQLAlchemy query helpers
│ └── pandas_validator_mixin.py
├── runtime_param_setter/ # Resolve dynamic parameters at execution time
├── column/ # Column-level validators
│ ├── base/ # Abstract implementations
│ ├── sqlalchemy/ # SQL implementations
│ └── pandas/ # DataFrame implementations
└── table/ # Table-level validators (same structure)
Workflow Pipeline
Data quality follows the standard Source → Processor → Sink pattern defined in workflow/data_quality.py:
┌──────────────────────────────────────────────────────────┐
│ TestSuiteSource │
│ source/test_suite.py │
│ │
│ Fetches from OM API: │
│ • Table entity │
│ • Database service connection │
│ • Test cases linked to the test suite │
│ │
│ Yields: TableAndTests │
└──────────────────────┬───────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ TestCaseRunner │
│ processor/test_case_runner.py │
│ │
│ For each TableAndTests: │
│ 1. Filter to OM platform tests only │
│ 2. Remove disabled and incompatible tests │
│ 3. Run each test case → collect results │
│ │
│ Yields: TestCaseResults │
└──────────────────────┬───────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ MetadataRestSink │
│ ingestion/sink/metadata_rest.py │
│ │
│ For each TestCaseResultResponse: │
│ POST /api/v1/dataQuality/testCases/{id}/testCaseResult│
└──────────────────────────────────────────────────────────┘
Test Execution Call Chain
This is the most important sequence to understand. When TestCaseRunner processes a test case, here is exactly what happens:
# 1. PROCESSOR calls the runner
TestCaseRunner._run_test_case(test_case)
└── DataTestsRunner.run_and_handle(test_case) # runner/core.py
# 2. RUNNER delegates to the interface
└── TestSuiteInterface.run_test_case(test_case) # interface/test_suite_interface.py
# 3. INTERFACE resolves runtime params and builds the validator
├── RuntimeParameterSetterFactory.get_runtime_param_setters(test_def_fqn)
│ # Adds dynamic params (e.g., table references for tableDiff)
│
├── _get_validator_builder(test_case, entity_type)
│ ├── Fetch TestDefinition from OM API
│ ├── Read validatorClass field (e.g., "columnValuesToBeNotNull")
│ └── return ValidatorBuilder(runner, test_case, test_def)
│
├── validator_builder.set_runtime_params(param_setters)
│
└── validator = validator_builder.validator
# Dynamically imports the correct validator class
# 4. VALIDATOR executes the test
└── validator.run_validation() # validations/base_test_handler.py
├── _run_validation() # Subclass: query + evaluate
├── _run_dimensional_validation() # Optional: GROUP BY analysis
└── return TestCaseResult
Data Model
Three entities form the core model. All are defined as JSON Schemas in openmetadata-spec/ and auto-generated into Python models:
TestDefinition
├── name: str # e.g., "columnValuesToBeNotNull"
├── entityType: COLUMN | TABLE
├── validatorClass: str # Maps to Python class name
├── parameters: List[ParamDef] # Parameter schema
└── supportedDataTypes: List # Column types this test applies to
TestCase
├── testDefinition: FK # Points to a TestDefinition
├── entityLink: str # Table or column being tested
├── parameterValues: List # Actual parameter values
├── dimensionColumns: List # Optional: columns for GROUP BY analysis
└── topDimensions: int # Max dimension groups to report
TestCaseResult
├── testCaseStatus: Success | Failed | Aborted
├── result: str # Human-readable message
├── testResultValue: List # Metric name → value pairs
├── passedRows / failedRows
└── dimensionResults: List # Per-group results (optional)
Validator Architecture
Validators use a template method pattern with mixins for database-specific logic.
Class Hierarchy
BaseTestValidator # validations/base_test_handler.py
│ run_validation() # Template method (calls hooks below)
│ _run_validation() # Abstract — implement test logic
│ _evaluate_test_condition() # Abstract — pass/fail check
│ _format_result_message() # Abstract — human-readable output
│ _run_dimensional_validation() # Dimensional GROUP BY logic
│ _execute_dimensional_validation() # Abstract — build dimension query
│
├── Base implementations # validations/column/base/ or table/base/
│ e.g., BaseColumnValuesToBeNotNullValidator
│ Implements _run_validation, _evaluate_test_condition, _format_result_message
│ Leaves metric computation abstract
│
├── SQL implementations # validations/column/sqlalchemy/
│ e.g., ColumnValuesToBeNotNullValidator(Base + SQAValidatorMixin)
│ SQAValidatorMixin provides: get_column(), run_query_results()
│
└── Pandas implementations # validations/column/pandas/
e.g., PandasColumnValuesToBeNotNullValidator(Base + PandasValidatorMixin)
PandasValidatorMixin provides: get_column(), run_dataframe_results()
How Validators Are Discovered
Validators are dynamically imported at runtime based on the validatorClass field from the TestDefinition:
# ValidatorBuilder calls import_test_case_class() which resolves:
# entity_type = "column" or "table"
# runner_type = "sqlalchemy" or "pandas"
# validator_class = from TestDefinition.validatorClass
module_path = f"metadata.data_quality.validations.{entity_type}.{runner_type}.{validator_class}"
# Example: metadata.data_quality.validations.column.sqlalchemy.columnValuesToBeNotNull
This means adding a new test is purely additive — no registry to update, no imports to add. Just place the file in the right directory.
Dimensional Analysis
When a test case has dimensionColumns configured, the validator runs the test per group using GROUP BY:
For dimensionColumns = ["region", "product"]:
1. Run GROUP BY region → compute metrics per region value
2. Run GROUP BY product → compute metrics per product value
3. For each group:
├── Evaluate pass/fail
├── Calculate impact score
└── Create DimensionResult
4. Keep top N groups by impact score, aggregate rest as "Others"
Impact score (validations/impact_score.py) ranks which groups matter most:
impact = (failure_rate² × volume_factor × sample_weight) / 1.5
failure_rate² — emphasizes high failure percentages
volume_factor — tiered by row count (0.25 for <10 rows → 1.5 for ≥100k)
sample_weight — credibility discount for small samples
Runtime Parameter Setters
Some tests need parameters that can only be resolved at execution time. The RuntimeParameterSetterFactory (validations/runtime_param_setter/param_setter_factory.py) maps test definition names to setter classes:
| Test | Setter | What It Does |
|---|
tableDiff | TableDiffParamsSetter | Fetches the comparison table, resolves key columns from constraints |
tableCustomSQLQuery | TableCustomSQLQueryParamsSetter | Compiles SQL with runtime table references |
*RuleLibrarySqlExpression | RuleLibrarySqlExpressionParamsSetter | Builds SQL from the rule library |
Adding a New Test
Create the base validator
Create validations/column/base/myNewTest.py (or table/base/ for table-level tests).Extend BaseTestValidator and implement:
_run_validation() — core test logic
_evaluate_test_condition() — return pass/fail
_format_result_message() — human-readable result string
Create the SQLAlchemy implementation
Create validations/column/sqlalchemy/myNewTest.py.Inherit from your base class and SQAValidatorMixin. The mixin gives you get_column(), run_query_results(), and _execute_dimensional_validation().
Create the Pandas implementation
Create validations/column/pandas/myNewTest.py.Inherit from your base class and PandasValidatorMixin.
Register the TestDefinition
Create a TestDefinition in OpenMetadata with:
validatorClass matching your file name (e.g., "myNewTestValidator")
entityType set to COLUMN or TABLE
- Parameter definitions for any inputs your test needs
No imports or registries need to be updated. The ValidatorBuilder discovers your validator automatically from the file path convention.
Key Design Patterns
| Pattern | Where | Why |
|---|
| Template Method | BaseTestValidator.run_validation() | Shared flow (run → evaluate → format → dimensional) with subclass hooks |
| Strategy via Mixins | SQAValidatorMixin, PandasValidatorMixin | Same test logic, different data access (SQL vs DataFrame) |
| Factory | ValidatorBuilder, RuntimeParameterSetterFactory | Decouple creation from usage |
| Dynamic Import | import_test_case_class() | Zero-config validator discovery |
| Protocol | HasValidatorContext | Structural typing for validator dependencies |
Key Files Quick Reference
| What you want to do | Start here |
|---|
| Understand the workflow pipeline | workflow/data_quality.py |
| See how test cases are fetched | source/test_suite.py |
| See how tests are filtered and dispatched | processor/test_case_runner.py |
| Understand validator creation | builders/validator_builder.py |
| Read the base validator logic | validations/base_test_handler.py |
| See a concrete SQL validator | validations/column/sqlalchemy/columnValuesToBeNotNull.py |
| See a concrete Pandas validator | validations/column/pandas/columnValuesToBeNotNull.py |
| Understand dimensional analysis | validations/base_test_handler.py → _run_dimensional_validation() |
| Add runtime parameter resolution | validations/runtime_param_setter/param_setter_factory.py |
| See how results are published | ingestion/sink/metadata_rest.py → write_test_case_result_list() |
All paths above are relative to ingestion/src/metadata/. For example, source/test_suite.py means ingestion/src/metadata/data_quality/source/test_suite.py.