Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,16 @@ class ColumnTypeParser:
except ImportError:
pass

try:
# pylint: disable=import-outside-toplevel
from metadata.ingestion.source.database.pinotdb.custom_types import (
PinotJSONType,
)

_COLUMN_TYPE_MAPPING[PinotJSONType] = "JSON"
except ImportError:
pass

@staticmethod
def get_column_type(column_type: Any) -> str:
for func in [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Custom Pinot SQLAlchemy types."""
import json
from typing import Any

from sqlalchemy import types

from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()


class PinotJSONType(types.JSON):
"""
Replace SQLAlchemy's default JSON result processor to normalize Pinot's
engine-dependent payloads.

The single-stage engine can return already-deserialized Python containers,
while the multistage engine returns raw JSON strings. This custom type
accepts both shapes while preserving JSON semantics for OpenMetadata.
"""

def result_processor(self, dialect, coltype):
def process(value: Any) -> Any:
if value is None or isinstance(value, (dict, list)):
return value

if isinstance(value, (str, bytes, bytearray)):
try:
return json.loads(value)
except (TypeError, ValueError) as exc:
logger.warning(
"Failed to deserialize Pinot JSON value. Returning raw value instead: %s",
exc,
)
return value

return value
Comment on lines +33 to +47
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Edge Case: result_processor passes through numeric/bool JSON scalars unparsed

In PinotJSONType.result_processor, a raw JSON numeric or boolean scalar (e.g., the string "42" or "true" representing a JSON column value) would fall through to the isinstance(value, str) branch and be parsed correctly via json.loads. However, if Pinot's single-stage engine returns a Python int, float, or bool (which are valid JSON scalars), these fall through to the final return value without being covered by the isinstance(value, (dict, list)) short-circuit. This is functionally correct (they're already deserialized), but the docstring and the test coverage only describe containers. Consider adding a test case for scalar JSON values (int/float/bool) to document this behavior explicitly.

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion


return process
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.pinotdb.custom_types import PinotJSONType


def get_type_custom(data_type, field_size):
Expand All @@ -36,7 +37,7 @@ def get_type_custom(data_type, field_size):
"boolean": types.Boolean,
"timestamp": types.TIMESTAMP,
"string": types.String,
"json": types.JSON,
"json": PinotJSONType,
"bytes": types.LargeBinary,
"big_decimal": types.DECIMAL,
# Complex types
Expand Down
53 changes: 49 additions & 4 deletions ingestion/tests/unit/topology/database/test_pinotdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
Unit tests for PinotDB column type mapping.

Verifies that Pinot scalar types resolve to the correct
OpenMetadata DataType string via get_type_custom + ColumnTypeParser.
Complex types (struct, map, array) are excluded: ARRAY requires a
constructor argument and their BLOB/ARRAY mappings are covered by
the generic column_type_parser tests.
OpenMetadata DataType string via get_type_custom + ColumnTypeParser, and
that Pinot JSON values are normalized consistently regardless of whether the
driver returns already-deserialized containers or raw JSON strings.
"""
import logging

import pytest

from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.ingestion.source.database.pinotdb.custom_types import PinotJSONType
from metadata.ingestion.source.database.pinotdb.metadata import get_type_custom


Expand Down Expand Up @@ -54,3 +56,46 @@ def test_double_not_mapped_to_int():
result = _resolve("double")
assert result != "INT", "Pinot DOUBLE is incorrectly mapped to INT"
assert result == "DOUBLE"


def test_json_type_uses_pinot_custom_type():
assert get_type_custom("json", None) is PinotJSONType
assert ColumnTypeParser.get_column_type(PinotJSONType()) == "JSON"


@pytest.mark.parametrize(
"raw_value, expected",
[
pytest.param(
[{"name": "alpha"}, {"name": "beta"}],
[{"name": "alpha"}, {"name": "beta"}],
id="single-stage-list",
),
pytest.param(
'[{"name": "alpha"}, {"name": "beta"}]',
[{"name": "alpha"}, {"name": "beta"}],
id="multistage-string",
),
pytest.param(
b'{"name": "alpha"}',
{"name": "alpha"},
id="bytes-payload",
),
pytest.param(None, None, id="null"),
],
)
def test_pinot_json_result_processor_normalizes_values(raw_value, expected):
processor = PinotJSONType().result_processor(dialect=None, coltype=None)
assert processor is not None
assert processor(raw_value) == expected


def test_pinot_json_result_processor_falls_back_for_malformed_json(caplog):
raw_value = "{not-json"
processor = PinotJSONType().result_processor(dialect=None, coltype=None)
assert processor is not None

with caplog.at_level(logging.WARNING):
assert processor(raw_value) == raw_value

assert "Failed to deserialize Pinot JSON value" in caplog.text
Loading