-
Notifications
You must be signed in to change notification settings - Fork 2k
Expand file tree
/
Copy pathdb_utils.py
More file actions
168 lines (150 loc) · 5.94 KB
/
db_utils.py
File metadata and controls
168 lines (150 loc) · 5.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Helpers module for db sources
"""
import time
import traceback
from typing import Iterable, List, Union
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.parserconfig.queryParserConfig import (
QueryParserType,
)
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.ingestion.api.models import Either
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import (
get_lineage_by_query,
get_lineage_via_table_entity,
)
from metadata.ingestion.models.custom_pydantic import _strip_hostport_scheme
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.models import TableView
from metadata.utils import fqn
from metadata.utils.execution_time_tracker import calculate_execution_time_generator
from metadata.utils.logger import utils_logger
logger = utils_logger()
PUBLIC_SCHEMA = "public"
def clean_host_port(host_port: str) -> str:
"""
Strip URL scheme prefixes from a hostPort string.
Users sometimes enter a full URL (e.g. 'http://localhost:3306')
instead of just 'localhost:3306'. This strips the scheme to avoid
ValueError when parsing host and port.
Delegates to the stdlib-only helper colocated with ``BaseModel`` so the
behaviour stays in lockstep with Pydantic's ``model_post_init`` hook.
"""
value = host_port.strip()
if "://" not in value:
return value.rstrip("/")
return _strip_hostport_scheme(value)
def get_host_from_host_port(uri: str) -> str:
"""
if uri is like "localhost:9000"
then return the host "localhost"
"""
cleaned = clean_host_port(uri)
if cleaned.startswith("["):
return cleaned.split("]")[0] + "]"
return cleaned.split(":")[0]
# pylint: disable=too-many-locals
@calculate_execution_time_generator()
def get_view_lineage(
view: TableView,
metadata: OpenMetadata,
service_names: Union[str, List[str]],
connection_type: str,
timeout_seconds: int,
parser_type: QueryParserType,
) -> Iterable[Either[AddLineageRequest]]:
"""
Method to generate view lineage
Now supports cross-database lineage by accepting a list of service names.
"""
if isinstance(service_names, str):
service_names = [service_names]
table_name = view.table_name
schema_name = view.schema_name
db_name = view.db_name
schema_fallback = False
view_definition = view.view_definition
table_fqn = fqn.build(
metadata,
entity_type=Table,
service_name=service_names[0], # Use first service for table entity lookup
database_name=db_name,
schema_name=schema_name,
table_name=table_name,
)
table_entity: Table = metadata.get_by_name(
entity=Table,
fqn=table_fqn,
)
if not view_definition:
logger.warning(f"View definition for view {table_fqn} not available")
return
try:
connection_type = str(connection_type)
dialect = ConnectionTypeDialectMapper.dialect_of(connection_type)
start_time = time.time()
logger.debug(f"Processing view lineage for: {table_fqn}")
lineage_parser = LineageParser(
view_definition,
dialect,
timeout_seconds=timeout_seconds,
parser_type=parser_type,
)
query_hash = lineage_parser.query_hash
if table_entity.serviceType == DatabaseServiceType.Postgres:
# For Postgres, if schema is not defined, we need to use the public schema
schema_name = PUBLIC_SCHEMA
schema_fallback = True
end_time = time.time()
logger.debug(
f"[{query_hash}] Time taken to parse view lineage for: {table_fqn} is {end_time - start_time} seconds"
)
if lineage_parser.source_tables and lineage_parser.target_tables:
yield from get_lineage_by_query(
metadata,
query=view_definition,
service_names=service_names,
database_name=db_name,
schema_name=schema_name,
dialect=dialect,
timeout_seconds=timeout_seconds,
lineage_source=LineageSource.ViewLineage,
lineage_parser=lineage_parser,
schema_fallback=schema_fallback,
) or []
else:
yield from get_lineage_via_table_entity(
metadata,
table_entity=table_entity,
service_names=service_names,
database_name=db_name,
schema_name=schema_name,
query=view_definition,
dialect=dialect,
timeout_seconds=timeout_seconds,
lineage_source=LineageSource.ViewLineage,
lineage_parser=lineage_parser,
schema_fallback=schema_fallback,
) or []
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Could not parse query [{view_definition}] ingesting lineage failed: {exc}"
)