Skip to content

Commit 04c3f77

Browse files
committed
Merge branch 'main' into dependabot/npm_and_yarn/openmetadata-ui-core-components/src/main/resources/ui/vite-7.3.2
2 parents e621320 + 65effdb commit 04c3f77

File tree

21 files changed

+1154
-100
lines changed

21 files changed

+1154
-100
lines changed

ingestion/src/metadata/ingestion/source/database/bigquery/incremental_table_processor.py

Lines changed: 184 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,80 +9,231 @@
99
# See the License for the specific language governing permissions and
1010
# limitations under the License.
1111
"""
12-
Bigquery Incremental Table processing logic
12+
Bigquery Incremental Table processing logic.
13+
14+
Uses Cloud Logging API (entries.list) to detect table changes since last run.
15+
Optimized around the hard 60 requests/min quota per project:
16+
- Batches datasets into groups using the indexed field resource.labels.dataset_id
17+
- Bounded timestamp window [start_date, end_date) for deterministic results
18+
- Retries with linear backoff on ResourceExhausted (429)
19+
20+
Memory-optimized:
21+
- Processes entries page-by-page, releasing each page before fetching the next
22+
- Stores only (table_name -> is_deleted) per schema, no Pydantic models or timestamps
1323
"""
14-
from datetime import datetime
15-
from typing import List
24+
import time
25+
from datetime import datetime, timezone
26+
from typing import Dict, Iterable, List, Optional
1627

1728
import google.cloud.logging
29+
from google.api_core.exceptions import ResourceExhausted
1830
from google.cloud.logging_v2.entries import LogEntry
1931

2032
from metadata.ingestion.source.database.bigquery.models import (
21-
BigQueryTable,
2233
BigQueryTableMap,
2334
SchemaName,
2435
TableName,
2536
)
2637
from metadata.ingestion.source.database.bigquery.queries import (
2738
BIGQUERY_GET_CHANGED_TABLES_FROM_CLOUD_LOGGING,
2839
)
40+
from metadata.utils.logger import ingestion_logger
41+
42+
logger = ingestion_logger()
43+
44+
MAX_RETRIES = 3
45+
RETRY_BASE_WAIT = 60 # Cloud Logging quota resets per minute
46+
PAGE_SIZE = 10000
47+
DATASET_BATCH_SIZE = 50
48+
49+
50+
def _batch(items: List[str], batch_size: int) -> Iterable[List[str]]:
51+
"""Yield successive batches from a list."""
52+
for i in range(0, len(items), batch_size):
53+
yield items[i : i + batch_size]
54+
55+
56+
def _build_dataset_filter(datasets: List[str]) -> str:
57+
"""Build a Cloud Logging filter clause for a batch of dataset IDs.
58+
59+
Uses the indexed field resource.labels.dataset_id for efficient
60+
server-side filtering.
61+
"""
62+
if len(datasets) == 1:
63+
return f'AND resource.labels.dataset_id = "{datasets[0]}"'
64+
or_clause = " OR ".join(f'resource.labels.dataset_id = "{ds}"' for ds in datasets)
65+
return f"AND ({or_clause})"
2966

3067

3168
class BigQueryIncrementalTableProcessor:
3269
def __init__(self, client: google.cloud.logging.Client):
3370
self._client = client
34-
self._changed_tables_map = BigQueryTableMap(table_map={})
71+
self._changed_tables_map = BigQueryTableMap()
72+
self._query_failed = False
3573

3674
@classmethod
3775
def from_project(cls, project: str) -> "BigQueryIncrementalTableProcessor":
3876
client = google.cloud.logging.Client(project=project)
3977
return cls(client)
4078

41-
def _is_table_deleted(self, entry: LogEntry) -> bool:
42-
if "tableDeletion" in entry.payload.get("metadata").keys():
43-
return True
44-
return False
79+
@staticmethod
80+
def _is_table_deleted(entry: LogEntry) -> bool:
81+
metadata = entry.payload.get("metadata") or {}
82+
return "tableDeletion" in metadata
83+
84+
def _process_entry(self, entry: LogEntry):
85+
"""Extract dataset/table from a single Cloud Logging entry."""
86+
payload = entry.payload
87+
if not isinstance(payload, dict):
88+
logger.debug("Skipping non-dict Cloud Logging entry payload: %s", payload)
89+
return
90+
resource_name = payload.get("resourceName", "")
91+
parts = resource_name.split("/")
92+
if len(parts) < 6:
93+
return
94+
95+
self._changed_tables_map.update(
96+
schema_name=parts[3],
97+
table_name=parts[5],
98+
deleted=self._is_table_deleted(entry),
99+
)
45100

46-
def set_changed_tables_map(
101+
def _fetch_batch(
47102
self,
48103
project: str,
49-
dataset: str,
50104
start_date: datetime,
105+
end_date: datetime,
106+
dataset_filter: str,
51107
):
52-
table_map = {}
108+
"""Fetch Cloud Logging entries for a batch of datasets with retry logic.
53109
110+
Iterates entries one-by-one from the Cloud Logging generator and
111+
feeds each to _process_entry. On ResourceExhausted (429), retries
112+
up to MAX_RETRIES times with linear backoff. On retry,
113+
already-processed entries are deduplicated by BigQueryTableMap.update().
114+
"""
54115
resource_names = [f"projects/{project}"]
55116
filters = BIGQUERY_GET_CHANGED_TABLES_FROM_CLOUD_LOGGING.format(
56117
project=project,
57-
dataset=dataset,
58118
start_date=start_date.strftime("%Y-%m-%dT%H:%M:%SZ"),
119+
end_date=end_date.strftime("%Y-%m-%dT%H:%M:%SZ"),
120+
dataset_filter=dataset_filter,
59121
)
60122

61-
entries = self._client.list_entries(
62-
resource_names=resource_names,
63-
filter_=filters,
64-
order_by=google.cloud.logging.DESCENDING,
65-
)
123+
for attempt in range(MAX_RETRIES):
124+
try:
125+
entries = self._client.list_entries(
126+
resource_names=resource_names,
127+
filter_=filters,
128+
order_by=google.cloud.logging.DESCENDING,
129+
page_size=PAGE_SIZE,
130+
)
131+
total = 0
132+
for entry in entries:
133+
total += 1
134+
self._process_entry(entry)
135+
if total % 10000 == 0:
136+
logger.info("Processed %d Cloud Logging entries so far", total)
137+
if total > 0:
138+
logger.info("Finished processing %d Cloud Logging entries", total)
139+
return
140+
except ResourceExhausted:
141+
if attempt < MAX_RETRIES - 1:
142+
wait = RETRY_BASE_WAIT * (attempt + 1)
143+
logger.warning(
144+
"Cloud Logging quota exceeded, retrying in %ds "
145+
"(attempt %d/%d)",
146+
wait,
147+
attempt + 1,
148+
MAX_RETRIES,
149+
)
150+
time.sleep(wait)
151+
else:
152+
logger.error(
153+
"Cloud Logging quota exceeded after %d retries. "
154+
"Falling back to full extraction.",
155+
MAX_RETRIES,
156+
)
157+
self._query_failed = True
158+
except Exception as exc:
159+
logger.error("Failed to query Cloud Logging: %s", exc)
160+
self._query_failed = True
161+
return
162+
163+
def set_tables_map(
164+
self,
165+
project: str,
166+
start_date: datetime,
167+
datasets: Optional[List[str]] = None,
168+
):
169+
"""Fetch changed tables from Cloud Logging, batching datasets for efficiency.
66170
67-
for entry in entries:
68-
table_name = entry.payload.get("resourceName", "").split("/")[-1]
171+
Batches datasets into groups of DATASET_BATCH_SIZE and queries each batch
172+
separately. This keeps the indexed field resource.labels.dataset_id in the
173+
filter while reducing total API calls from N to ceil(N / DATASET_BATCH_SIZE).
69174
70-
if table_name in table_map:
71-
continue
175+
Uses a bounded timestamp window [start_date, end_date) to ensure
176+
deterministic results and prevent data gaps between runs.
72177
73-
table_map[table_name] = BigQueryTable(
74-
name=table_name,
75-
timestamp=entry.timestamp,
76-
deleted=self._is_table_deleted(entry),
178+
Args:
179+
project: GCP project ID
180+
start_date: Only fetch changes after this timestamp
181+
datasets: List of dataset IDs to query. If None, queries all datasets
182+
in the project (no dataset_id filter).
183+
"""
184+
end_date = datetime.now(timezone.utc)
185+
num_datasets = len(datasets) if datasets else 0
186+
num_batches = (
187+
(num_datasets + DATASET_BATCH_SIZE - 1) // DATASET_BATCH_SIZE
188+
if num_datasets
189+
else 1
190+
)
191+
192+
logger.info(
193+
"Querying Cloud Logging for project '%s': %d datasets in %d batch(es), "
194+
"window [%s, %s)",
195+
project,
196+
num_datasets,
197+
num_batches,
198+
start_date.strftime("%Y-%m-%dT%H:%M:%SZ"),
199+
end_date.strftime("%Y-%m-%dT%H:%M:%SZ"),
200+
)
201+
202+
if datasets is None:
203+
logger.debug("No dataset filter — querying all datasets in project")
204+
self._fetch_batch(project, start_date, end_date, dataset_filter="")
205+
elif datasets:
206+
for batch_idx, dataset_batch in enumerate(
207+
_batch(datasets, DATASET_BATCH_SIZE), start=1
208+
):
209+
if self._query_failed:
210+
logger.warning(
211+
"Skipping remaining %d batch(es) due to prior failure",
212+
num_batches - batch_idx + 1,
213+
)
214+
return
215+
logger.debug(
216+
"Fetching batch %d/%d (%d datasets)",
217+
batch_idx,
218+
num_batches,
219+
len(dataset_batch),
220+
)
221+
dataset_filter = _build_dataset_filter(dataset_batch)
222+
self._fetch_batch(project, start_date, end_date, dataset_filter)
223+
else:
224+
logger.info(
225+
"No datasets to query after filtering for project '%s'", project
77226
)
78-
self._changed_tables_map.add(dataset, table_map)
79227

80228
def get_deleted(self, schema_name: SchemaName) -> List[TableName]:
81-
if self._changed_tables_map:
82-
return self._changed_tables_map.get_deleted(schema_name)
83-
return []
229+
return self._changed_tables_map.get_deleted(schema_name)
84230

85231
def get_not_deleted(self, schema_name: SchemaName) -> List[TableName]:
86-
if self._changed_tables_map:
87-
return self._changed_tables_map.get_not_deleted(schema_name)
88-
return []
232+
return self._changed_tables_map.get_not_deleted(schema_name)
233+
234+
def get_all_deleted(self) -> Dict[SchemaName, List[TableName]]:
235+
return self._changed_tables_map.get_all_deleted()
236+
237+
@property
238+
def query_failed(self) -> bool:
239+
return self._query_failed

ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py

Lines changed: 70 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -400,13 +400,21 @@ def query_table_names_and_types(
400400
):
401401
continue
402402

403-
if self.incremental.enabled:
403+
if (
404+
self.incremental.enabled
405+
and not self.incremental_table_processor.query_failed
406+
):
404407
if (
405408
table.table_id
406409
not in self.incremental_table_processor.get_not_deleted(
407410
schema_name
408411
)
409412
):
413+
logger.debug(
414+
"Skipping unchanged table '%s.%s'",
415+
schema_name,
416+
table.table_id,
417+
)
410418
continue
411419

412420
yield TableNameAndType(
@@ -649,16 +657,29 @@ def get_schema_description(self, schema_name: str) -> Optional[str]:
649657
return ""
650658

651659
def _prepare_schema_incremental_data(self, schema_name: str):
652-
"""Prepares the data for Incremental Extraction.
660+
"""Adds deleted tables for this schema to the global context.
653661
654-
1. Queries Cloud Logging for the changes
655-
2. Sets the table map with the changes within the BigQueryIncrementalTableProcessor
656-
3. Adds the Deleted Tables to the context
662+
Cloud Logging is already queried in get_database_names() for all
663+
datasets at once. This method just reads from the populated map.
657664
"""
658-
self.incremental_table_processor.set_changed_tables_map(
659-
project=self.context.get().database,
660-
dataset=schema_name,
661-
start_date=self.incremental.start_datetime_utc,
665+
if self.incremental_table_processor.query_failed:
666+
logger.debug(
667+
"Skipping incremental data for schema '%s' — "
668+
"Cloud Logging query failed, using full extraction",
669+
schema_name,
670+
)
671+
return
672+
673+
deleted_tables = self.incremental_table_processor.get_deleted(schema_name)
674+
not_deleted_tables = self.incremental_table_processor.get_not_deleted(
675+
schema_name
676+
)
677+
logger.info(
678+
"Incremental extraction for schema '%s': "
679+
"%d changed table(s), %d deleted table(s)",
680+
schema_name,
681+
len(not_deleted_tables),
682+
len(deleted_tables),
662683
)
663684

664685
self.context.get_global().deleted_tables.extend(
@@ -671,9 +692,7 @@ def _prepare_schema_incremental_data(self, schema_name: str):
671692
schema_name=schema_name,
672693
table_name=table_name,
673694
)
674-
for table_name in self.incremental_table_processor.get_deleted(
675-
schema_name
676-
)
695+
for table_name in deleted_tables
677696
]
678697
)
679698

@@ -686,6 +705,27 @@ def get_raw_database_schema_names(self) -> Iterable[str]:
686705
for dataset in datasets:
687706
yield dataset.dataset_id
688707

708+
def _get_filtered_datasets(self, project_id: str) -> List[str]:
709+
"""Return dataset IDs that pass the schema filter pattern."""
710+
return [
711+
schema_name
712+
for schema_name in self.get_raw_database_schema_names()
713+
if not filter_by_schema(
714+
self.source_config.schemaFilterPattern,
715+
(
716+
fqn.build(
717+
self.metadata,
718+
entity_type=DatabaseSchema,
719+
service_name=self.context.get().database_service,
720+
database_name=project_id,
721+
schema_name=schema_name,
722+
)
723+
if self.source_config.useFqnForFiltering
724+
else schema_name
725+
),
726+
)
727+
]
728+
689729
def _get_filtered_schema_names(
690730
self, return_fqn: bool = False, add_to_status: bool = True
691731
) -> Iterable[str]:
@@ -890,6 +930,24 @@ def get_database_names(self) -> Iterable[str]:
890930
self.incremental_table_processor = (
891931
BigQueryIncrementalTableProcessor.from_project(project_id)
892932
)
933+
filtered_datasets = self._get_filtered_datasets(project_id)
934+
logger.info(
935+
"Starting incremental extraction for project '%s' "
936+
"with %d datasets",
937+
project_id,
938+
len(filtered_datasets),
939+
)
940+
self.incremental_table_processor.set_tables_map(
941+
project=project_id,
942+
start_date=self.incremental.start_datetime_utc,
943+
datasets=filtered_datasets,
944+
)
945+
if self.incremental_table_processor.query_failed:
946+
logger.warning(
947+
"Cloud Logging query failed for project '%s'. "
948+
"Falling back to full extraction.",
949+
project_id,
950+
)
893951
yield project_id
894952
except Exception as exc:
895953
logger.debug(traceback.format_exc())

0 commit comments

Comments
 (0)