-
Notifications
You must be signed in to change notification settings - Fork 2k
fix(sampler): Respect randomizedSample flag at 100% percentage sampling #26966
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 16 commits
7054e63
389974e
b665d8a
34a6c8d
f746c76
6197729
68118b1
0180946
8324863
fb8c05a
cf253ac
53df3cb
bfd09e8
692f27b
e31a335
3909d3c
a084392
61b0678
7636fe3
3dcfe85
06f70ea
8667db8
b3bd457
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -3,6 +3,24 @@ SET json = JSON_REMOVE(json, '$.sourceConfig.config.computeMetrics') | |||||||||||||||||||||||||||||||||||||||||
| WHERE JSON_EXTRACT(json, '$.sourceConfig.config.computeMetrics') IS NOT NULL | ||||||||||||||||||||||||||||||||||||||||||
| AND pipelineType = 'profiler'; | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| -- Set randomizedSample to false where it was true (old default behavior) | ||||||||||||||||||||||||||||||||||||||||||
| UPDATE ingestion_pipeline_entity | ||||||||||||||||||||||||||||||||||||||||||
| SET json = JSON_SET(json, '$.sourceConfig.config.randomizedSample', false) | ||||||||||||||||||||||||||||||||||||||||||
| WHERE JSON_EXTRACT(json, '$.sourceConfig.config.randomizedSample') = true | ||||||||||||||||||||||||||||||||||||||||||
| AND pipelineType = 'profiler'; | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| UPDATE table_entity | ||||||||||||||||||||||||||||||||||||||||||
| SET json = JSON_SET(json, '$.tableProfilerConfig.randomizedSample', false) | ||||||||||||||||||||||||||||||||||||||||||
| WHERE JSON_EXTRACT(json, '$.tableProfilerConfig.randomizedSample') = true; | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| UPDATE database_entity | ||||||||||||||||||||||||||||||||||||||||||
| SET json = JSON_SET(json, '$.databaseProfilerConfig.randomizedSample', false) | ||||||||||||||||||||||||||||||||||||||||||
| WHERE JSON_EXTRACT(json, '$.databaseProfilerConfig.randomizedSample') = true; | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| UPDATE database_schema_entity | ||||||||||||||||||||||||||||||||||||||||||
| SET json = JSON_SET(json, '$.databaseSchemaProfilerConfig.randomizedSample', false) | ||||||||||||||||||||||||||||||||||||||||||
| WHERE JSON_EXTRACT(json, '$.databaseSchemaProfilerConfig.randomizedSample') = true; | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+6
to
+23
|
||||||||||||||||||||||||||||||||||||||||||
| -- Set randomizedSample to false where it was true (old default behavior) | |
| UPDATE ingestion_pipeline_entity | |
| SET json = JSON_SET(json, '$.sourceConfig.config.randomizedSample', false) | |
| WHERE JSON_EXTRACT(json, '$.sourceConfig.config.randomizedSample') = true | |
| AND pipelineType = 'profiler'; | |
| UPDATE table_entity | |
| SET json = JSON_SET(json, '$.tableProfilerConfig.randomizedSample', false) | |
| WHERE JSON_EXTRACT(json, '$.tableProfilerConfig.randomizedSample') = true; | |
| UPDATE database_entity | |
| SET json = JSON_SET(json, '$.databaseProfilerConfig.randomizedSample', false) | |
| WHERE JSON_EXTRACT(json, '$.databaseProfilerConfig.randomizedSample') = true; | |
| UPDATE database_schema_entity | |
| SET json = JSON_SET(json, '$.databaseSchemaProfilerConfig.randomizedSample', false) | |
| WHERE JSON_EXTRACT(json, '$.databaseSchemaProfilerConfig.randomizedSample') = true; | |
| -- Preserve existing randomizedSample values during migration. | |
| -- Any new default for randomizedSample must be applied only to missing/null | |
| -- fields in application logic or a separate, explicitly documented migration. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,7 +21,12 @@ | |
| from sqlalchemy.orm import DeclarativeBase | ||
|
|
||
| from metadata.generated.schema.entity.data.table import Column as EntityColumn | ||
| from metadata.generated.schema.entity.data.table import ColumnName, DataType, Table | ||
| from metadata.generated.schema.entity.data.table import ( | ||
| ColumnName, | ||
| DataType, | ||
| ProfileSampleType, | ||
| Table, | ||
| ) | ||
| from metadata.generated.schema.entity.services.connections.database.sqliteConnection import ( | ||
| SQLiteConnection, | ||
| SQLiteScheme, | ||
|
|
@@ -361,6 +366,114 @@ def test_sample_from_user_query(self, sampler_mock): | |
| names = [col.root for col in sample_data.columns] | ||
| assert names == ["id", "name"] | ||
|
|
||
| def test_full_percentage_randomized_uses_sample_query(self, sampler_mock): | ||
| """100% PERCENTAGE + randomizedSample=True should go through | ||
| get_sample_query so fetch_sample_data can ORDER BY the random column.""" | ||
| with patch.object(SQASampler, "build_table_orm", return_value=User): | ||
| sampler = SQASampler( | ||
| service_connection_config=self.sqlite_conn, | ||
| ometa_client=None, | ||
| entity=None, | ||
| sample_config=SampleConfig( | ||
| profileSampleType=ProfileSampleType.PERCENTAGE, | ||
| profileSample=100, | ||
| randomizedSample=True, | ||
| ), | ||
| sample_data_count=5, | ||
| ) | ||
|
|
||
| with patch.object( | ||
| sampler, "get_sample_query", wraps=sampler.get_sample_query | ||
| ) as mock_gsq: | ||
| sampler.fetch_sample_data() | ||
| assert mock_gsq.called | ||
|
Comment on lines
+369
to
+389
|
||
|
|
||
| def test_full_percentage_not_randomized_skips_sample_query(self, sampler_mock): | ||
| """100% PERCENTAGE + randomizedSample=False should short-circuit | ||
| to raw dataset and NOT call get_sample_query.""" | ||
| with patch.object(SQASampler, "build_table_orm", return_value=User): | ||
| sampler = SQASampler( | ||
| service_connection_config=self.sqlite_conn, | ||
| ometa_client=None, | ||
| entity=None, | ||
| sample_config=SampleConfig( | ||
| profileSampleType=ProfileSampleType.PERCENTAGE, | ||
| profileSample=100, | ||
| randomizedSample=False, | ||
| ), | ||
| sample_data_count=5, | ||
| ) | ||
|
|
||
| with patch.object( | ||
| sampler, "get_sample_query", wraps=sampler.get_sample_query | ||
| ) as mock_gsq: | ||
| sampler.fetch_sample_data() | ||
| assert not mock_gsq.called | ||
|
|
||
| def test_full_percentage_none_randomized_skips_sample_query(self, sampler_mock): | ||
| """100% PERCENTAGE + randomizedSample=None should short-circuit | ||
| (only explicit True enables randomization).""" | ||
| with patch.object(SQASampler, "build_table_orm", return_value=User): | ||
| sampler = SQASampler( | ||
| service_connection_config=self.sqlite_conn, | ||
| ometa_client=None, | ||
| entity=None, | ||
| sample_config=SampleConfig( | ||
| profileSampleType=ProfileSampleType.PERCENTAGE, | ||
| profileSample=100, | ||
| randomizedSample=None, | ||
| ), | ||
|
Comment on lines
+413
to
+425
|
||
| sample_data_count=5, | ||
| ) | ||
|
Comment on lines
+413
to
+427
|
||
|
|
||
| with patch.object( | ||
| sampler, "get_sample_query", wraps=sampler.get_sample_query | ||
| ) as mock_gsq: | ||
| sampler.fetch_sample_data() | ||
| assert not mock_gsq.called | ||
|
Comment on lines
+413
to
+433
|
||
|
|
||
| def test_randomized_true_produces_non_deterministic_rows(self, sampler_mock): | ||
| """With randomizedSample=True at 100% PERCENTAGE, multiple | ||
| fetch_sample_data calls should return rows in different orders.""" | ||
| with patch.object(SQASampler, "build_table_orm", return_value=User): | ||
| sampler = SQASampler( | ||
| service_connection_config=self.sqlite_conn, | ||
| ometa_client=None, | ||
| entity=None, | ||
| sample_config=SampleConfig( | ||
| profileSampleType=ProfileSampleType.PERCENTAGE, | ||
| profileSample=100, | ||
| randomizedSample=True, | ||
| ), | ||
| sample_data_count=5, | ||
| ) | ||
|
|
||
| results = [sampler.fetch_sample_data().rows for _ in range(10)] | ||
| assert any( | ||
| results[i] != results[0] for i in range(1, len(results)) | ||
| ), "Expected non-deterministic row ordering with randomizedSample=True" | ||
|
|
||
|
RajdeepKushwaha5 marked this conversation as resolved.
|
||
| def test_randomized_false_produces_deterministic_rows(self, sampler_mock): | ||
| """With randomizedSample=False at 100% PERCENTAGE, multiple | ||
| fetch_sample_data calls should return rows in the same order.""" | ||
| with patch.object(SQASampler, "build_table_orm", return_value=User): | ||
| sampler = SQASampler( | ||
| service_connection_config=self.sqlite_conn, | ||
| ometa_client=None, | ||
| entity=None, | ||
| sample_config=SampleConfig( | ||
| profileSampleType=ProfileSampleType.PERCENTAGE, | ||
| profileSample=100, | ||
| randomizedSample=False, | ||
| ), | ||
| sample_data_count=5, | ||
| ) | ||
|
|
||
| results = [sampler.fetch_sample_data().rows for _ in range(5)] | ||
| assert all( | ||
| results[i] == results[0] for i in range(1, len(results)) | ||
| ), "Expected deterministic row ordering with randomizedSample=False" | ||
|
|
||
| @classmethod | ||
| def tearDownClass(cls) -> None: | ||
| os.remove(cls.db_path) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,120 @@ | ||
| # Copyright 2025 Collate | ||
| # Licensed under the Collate Community License, Version 1.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| """ | ||
| Tests for 100% PERCENTAGE sampling edge case (#21304). | ||
|
|
||
| Verifies that the get_dataset() short-circuit at 100% correctly | ||
| respects the randomizedSample flag. Only an explicit True enables | ||
| randomization; None and False both skip randomization. | ||
| """ | ||
|
Comment on lines
+11
to
+17
|
||
| from unittest.mock import MagicMock, patch | ||
|
|
||
| from metadata.generated.schema.entity.data.table import ProfileSampleType | ||
| from metadata.sampler.models import SampleConfig | ||
|
|
||
|
|
||
| class TestSQASampler100Pct: | ||
| """Test SQASampler.get_dataset() at 100% PERCENTAGE sampling.""" | ||
|
|
||
| def _make_sampler(self, randomized_sample): | ||
| """Create a SQASampler mock with the given randomizedSample value.""" | ||
| with patch( | ||
| "metadata.sampler.sqlalchemy.sampler.SQASampler.__init__", | ||
| return_value=None, | ||
| ): | ||
| from metadata.sampler.sqlalchemy.sampler import SQASampler | ||
|
|
||
| sampler = SQASampler() | ||
| sampler.sample_config = SampleConfig( | ||
| profileSample=100, | ||
| profileSampleType=ProfileSampleType.PERCENTAGE, | ||
| randomizedSample=randomized_sample, | ||
| ) | ||
| sampler.sample_query = None | ||
| sampler.partition_details = None | ||
| sampler._table = MagicMock(name="raw_table") | ||
| sampler.get_sample_query = MagicMock( | ||
| name="get_sample_query", return_value=MagicMock(name="sample_cte") | ||
| ) | ||
| return sampler | ||
|
|
||
| def test_100_pct_randomized_true_delegates_to_sample_query(self): | ||
| """100% + randomizedSample=True should NOT short-circuit.""" | ||
| sampler = self._make_sampler(randomized_sample=True) | ||
| result = sampler.get_dataset() | ||
| sampler.get_sample_query.assert_called_once() | ||
| assert result == sampler.get_sample_query.return_value | ||
|
|
||
| def test_100_pct_randomized_false_returns_raw_dataset(self): | ||
| """100% + randomizedSample=False should short-circuit to raw dataset.""" | ||
| sampler = self._make_sampler(randomized_sample=False) | ||
| result = sampler.get_dataset() | ||
| sampler.get_sample_query.assert_not_called() | ||
| assert result == sampler._table | ||
|
|
||
| def test_100_pct_randomized_none_returns_raw_dataset(self): | ||
| """100% + randomizedSample=None should short-circuit (only explicit True randomizes).""" | ||
| sampler = self._make_sampler(randomized_sample=None) | ||
| result = sampler.get_dataset() | ||
| sampler.get_sample_query.assert_not_called() | ||
| assert result == sampler._table | ||
|
Comment on lines
+63
to
+68
|
||
|
|
||
|
|
||
| class TestDatalakeSampler100Pct: | ||
| """Test DatalakeSampler.get_dataset() at 100% PERCENTAGE sampling.""" | ||
|
|
||
| def _make_sampler(self, randomized_sample): | ||
| """Create a DatalakeSampler mock with the given randomizedSample value.""" | ||
| with patch( | ||
| "metadata.sampler.pandas.sampler.DatalakeSampler.__init__", | ||
| return_value=None, | ||
| ): | ||
| from metadata.sampler.pandas.sampler import DatalakeSampler | ||
|
|
||
| sampler = DatalakeSampler() | ||
| sampler.sample_config = SampleConfig( | ||
| profileSample=100, | ||
| profileSampleType=ProfileSampleType.PERCENTAGE, | ||
| randomizedSample=randomized_sample, | ||
| ) | ||
| sampler.sample_query = None | ||
| sampler.partition_details = None | ||
| table_mock = MagicMock(name="table_wrapper") | ||
| table_mock.dataframes = MagicMock(name="raw_dataframes") | ||
| sampler._table = table_mock | ||
| sampler.get_sampled_dataframe = MagicMock( | ||
| name="get_sampled_dataframe", | ||
| return_value=MagicMock(name="sampled_df"), | ||
| ) | ||
| sampler.service_connection_config = MagicMock() | ||
| sampler.connection = MagicMock() | ||
| return sampler | ||
|
|
||
| def test_100_pct_randomized_true_delegates_to_sampled_dataframe(self): | ||
| """100% + randomizedSample=True should NOT short-circuit.""" | ||
| sampler = self._make_sampler(randomized_sample=True) | ||
| result = sampler.get_dataset() | ||
| sampler.get_sampled_dataframe.assert_called_once() | ||
| assert result == sampler.get_sampled_dataframe.return_value | ||
|
|
||
|
Comment on lines
+101
to
+107
|
||
| def test_100_pct_randomized_false_returns_raw_dataset(self): | ||
| """100% + randomizedSample=False should short-circuit to raw dataset.""" | ||
| sampler = self._make_sampler(randomized_sample=False) | ||
| result = sampler.get_dataset() | ||
| sampler.get_sampled_dataframe.assert_not_called() | ||
| assert result == sampler._table.dataframes | ||
|
|
||
| def test_100_pct_randomized_none_returns_raw_dataset(self): | ||
| """100% + randomizedSample=None should short-circuit (only explicit True randomizes).""" | ||
| sampler = self._make_sampler(randomized_sample=None) | ||
| result = sampler.get_dataset() | ||
| sampler.get_sampled_dataframe.assert_not_called() | ||
| assert result == sampler._table.dataframes | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -175,7 +175,7 @@ | |
| "randomizedSample": { | ||
| "description": "Whether to randomize the sample data or not.", | ||
| "type": "boolean", | ||
| "default": true | ||
| "default": false | ||
|
TeddyCr marked this conversation as resolved.
|
||
| } | ||
|
Comment on lines
175
to
179
|
||
| } | ||
| }, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.