feat(bigtable): add protobuf decoding to Bigtable Change Streams to BigQuery#3572
feat(bigtable): add protobuf decoding to Bigtable Change Streams to BigQuery#3572MattiasMTS wants to merge 7 commits intoGoogleCloudPlatform:mainfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the Bigtable Change Streams to BigQuery template by enabling the decoding of protobuf-encoded data directly within the pipeline. By providing a schema descriptor file, users can now automatically transform binary Bigtable cell values into readable JSON format in BigQuery. The implementation is designed to be backwards compatible and includes robust error handling to ensure data integrity when decoding fails or when specific columns do not require transformation. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
|
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
Add optional proto decoding support to the Bigtable Change Streams to BigQuery template. When configured, matching cell values are decoded from proto binary to JSON and written as STRING to the BigQuery value column. New parameters: protoSchemaPath, fullProtoMessageName, protoColumnFamily, protoColumn, preserveProtoFieldNames. All optional, backwards compatible. Uses DynamicMessage + JsonFormat.Printer from the proto-java library, with lazy synchronized init on workers for the non-serializable Descriptor.
Add a generic columnTransforms parameter for mapping column_family:column pairs to value transformers. Proto decode and column transforms share a unified TRANSFORMED_VALUE output path in BigQuery. Supported transforms: - BIG_ENDIAN_UINT64_TIMESTAMP_MS: 8-byte big-endian int64 → timestamp Architecture: ValueTransformer interface + ValueTransformerRegistry with two-level family→column map for O(1) lookup without per-element string concatenation. Transform priority: proto decode first, then columnTransforms. Includes proto decode parameters, options, pipeline wiring, BigQueryUtils formatter integration, and tests for both features.
0bb6d5e to
024da4e
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces support for decoding protobuf-encoded cell values and applying custom transformations to Bigtable change stream data before it is written to BigQuery. Key additions include a ProtoDecoder for schema-based JSON conversion and a ValueTransformerRegistry for handling specific data types like big-endian uint64 timestamps. The feedback identifies a potential issue in ValueTransformerRegistry.java where parsing configuration strings with split(":") may fail if column qualifiers contain colons, and suggests a more robust parsing approach using first and last index lookups.
| String[] parts = trimmed.split(":"); | ||
| if (parts.length != 3) { | ||
| throw new IllegalArgumentException( | ||
| "Invalid columnTransforms entry '" | ||
| + trimmed | ||
| + "'. Expected format: column_family:column:TRANSFORM_TYPE"); | ||
| } | ||
| String family = parts[0]; | ||
| String column = parts[1]; | ||
| String type = parts[2]; |
There was a problem hiding this comment.
Using split(":") to parse the configuration string is fragile because Bigtable column qualifiers can contain colons. Since the column_family name is restricted and cannot contain colons, and the TRANSFORM_TYPE is the last segment, it is safer to find the first and last colons to extract the components. This ensures that column qualifiers containing colons are handled correctly.
| String[] parts = trimmed.split(":"); | |
| if (parts.length != 3) { | |
| throw new IllegalArgumentException( | |
| "Invalid columnTransforms entry '" | |
| + trimmed | |
| + "'. Expected format: column_family:column:TRANSFORM_TYPE"); | |
| } | |
| String family = parts[0]; | |
| String column = parts[1]; | |
| String type = parts[2]; | |
| int firstColon = trimmed.indexOf(':'); | |
| int lastColon = trimmed.lastIndexOf(':'); | |
| if (firstColon == -1 || firstColon == lastColon) { | |
| throw new IllegalArgumentException( | |
| "Invalid columnTransforms entry '" | |
| + trimmed | |
| + "'. Expected format: column_family:column:TRANSFORM_TYPE"); | |
| } | |
| String family = trimmed.substring(0, firstColon); | |
| String column = trimmed.substring(firstColon + 1, lastColon); | |
| String type = trimmed.substring(lastColon + 1); |
|
Please also add new test to BigtableChangeStreamsToBigQueryIT |
Codecov Report❌ Patch coverage is ❌ Your patch check has failed because the patch coverage (0.00%) is below the target coverage (80.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #3572 +/- ##
============================================
- Coverage 52.12% 52.08% -0.05%
- Complexity 5644 5646 +2
============================================
Files 1040 1043 +3
Lines 63118 63181 +63
Branches 6922 6934 +12
============================================
+ Hits 32903 32905 +2
- Misses 27981 28044 +63
+ Partials 2234 2232 -2
🚀 New features to boost your workflow:
|
stankiewicz
left a comment
There was a problem hiding this comment.
great contribution! Few comments added
| @Default.String("") | ||
| String getFullProtoMessageName(); | ||
|
|
||
| void setFullProtoMessageName(String value); |
There was a problem hiding this comment.
thanks for being consistent with src/main/java/com/google/cloud/teleport/v2/templates/PubsubProtoToBigQuery.java
| + "(8-byte big-endian unsigned 64-bit integer as Unix epoch milliseconds, " | ||
| + "converted to a timestamp string).") | ||
| @Default.String("") | ||
| String getColumnTransforms(); |
There was a problem hiding this comment.
There is writeNumericTimestamps that converts int64 to timestamp for limited amount of columns. In your change, you are not only converting to some format (String in this case), but you also change endianness. Why not to timestamp type? why not to little endian uint? This looks like UDF to me with very limited scope that would require releasing a template whenever you want to add new transform.
Would JS UDF be good alternative?
| return null; | ||
| } | ||
| try { | ||
| long millis = ByteBuffer.wrap(bytes).getLong(); |
There was a problem hiding this comment.
add .order(ByteOrder.BIG_ENDIAN) to make it explicit and make code self documenting.
| /** | ||
| * Parses a comma-separated transform configuration string. | ||
| * | ||
| * @param config format: "family:column:TYPE,family2:column2:TYPE2" |
There was a problem hiding this comment.
TYPE is confusing, TRANSFORM_TYPE is used in other parts of javadoc.
| } | ||
|
|
||
| Map<String, Map<String, ValueTransformer>> transformersByFamily = new HashMap<>(); | ||
| for (String entry : config.split(",")) { |
There was a problem hiding this comment.
column name can be any value, can contain comma. Maybe state somewhere about such limitation.
| private static ValueTransformer createTransformer(String type) { | ||
| switch (type) { | ||
| case "BIG_ENDIAN_UINT64_TIMESTAMP_MS": | ||
| return new BigEndianTimestampTransformer(); | ||
| default: | ||
| throw new IllegalArgumentException( | ||
| "Unknown transform type '" | ||
| + type | ||
| + "'. Supported types: BIG_ENDIAN_UINT64_TIMESTAMP_MS"); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
while few transformers look small, it's cleaner and more efficient to reuse them. tranformers are stateless and thread safe (maybe worth writing this somewhere), you can implement memoization strategy:
private static final Map<String, ValueTransformer> TRANSFORMER_CACHE = new HashMap<>();
private static ValueTransformer createTransformer(String type) {
return TRANSFORMER_CACHE.computeIfAbsent(type, t -> {
switch (t) {
case "BIG_ENDIAN_UINT64_TIMESTAMP_MS":
return new BigEndianTimestampTransformer();
default:
throw new IllegalArgumentException("Unknown transform type '" + t + "'");
}
});
}
| boolean hasProtoColumn = !StringUtils.isBlank(options.getProtoColumn()); | ||
|
|
||
| if (hasProtoSchema || hasProtoMessage || hasProtoColumnFamily || hasProtoColumn) { | ||
| if (!hasProtoSchema || !hasProtoMessage || !hasProtoColumnFamily || !hasProtoColumn) { |
There was a problem hiding this comment.
difficult to read.. maybe:
boolean anyProtoSet = hasProtoSchema || hasProtoMessage || hasProtoColumnFamily || hasProtoColumn;
boolean allProtoSet = hasProtoSchema && hasProtoMessage && hasProtoColumnFamily && hasProtoColumn;
if (anyProtoSet && !allProtoSet) { throw new IllegalArgumentException( [..]
| "When using protobuf decoding, all of protoSchemaPath, fullProtoMessageName, " | ||
| + "protoColumnFamily, and protoColumn must be specified."); | ||
| } | ||
| protoDecoder = |
There was a problem hiding this comment.
ProtoDecoder protoDecoder = null;
if (allProtoSet) { protoDecoder = new ProtoDecoder( [..]
| @TemplateParameter.Text( | ||
| order = 14, | ||
| optional = true, | ||
| description = "Column qualifier containing proto values", |
There was a problem hiding this comment.
Grouping together multiple data into single proto column is a best practice, but it's not always the case, what if you have second proto in other column family or another column.
|
PubSub Proto to BQ has single proto so it's ok that config is only for single proto message. In BT there is a limit of 10 schema bundles per table, which suggest that there may be multiple proto types used. Eventually you may end up creating another transform_type: |
Address review comments: - Multi-proto support: PROTO_DECODE(package.MessageName) transform type supports multiple proto columns with different message types - Fix colon parsing with indexOf/lastIndexOf (column qualifiers can contain colons) - Add transformer memoization for shared instances - Rename TYPE to TRANSFORM_TYPE consistently - Explicit ByteOrder.BIG_ENDIAN in timestamp transformer - Simplify validation with anyProtoSet/allProtoSet pattern - Legacy proto options auto-translate to columnTransforms entries - Document comma limitation and JS UDF alternative
… decode - testBigtableChangeStreamsToBigQueryColumnTransform: verifies BIG_ENDIAN_TIMESTAMP transform converts 8-byte uint64 to timestamp - testBigtableChangeStreamsToBigQueryProtoDecodeViaTransform: verifies PROTO_DECODE() transform type with programmatically built descriptor
- Delete dead ProtoDecoder.java (replaced by ProtoDecodeTransformer) - Use double-checked locking in ensureInitialized() to avoid synchronized on every record in the hot path - Use two-level map (family -> column -> transformer) in ValueTransformerRegistry to avoid string concat per record - Deduplicate Mod constructors (3-arg delegates to 4-arg)
|
Let me know when it's ready. |
yes absolutely I'll let you know, boss! It's not ready yet. Sorry, been swamped in other stuff 😿 I'll try to focus getting this in shortly with high quality. |
…ement overflow
Bigtable cell values can reach 100 MB (vs Pub/Sub's 10 MB source cap),
so proto→JSON decoding can inflate a single row past Dataflow Windmill's
80 MB per-element commit limit and freeze the partition. Bound the decoded
output and route overflows to the existing severe DLQ with metadata only.
- Add maxDecodedValueBytes template option (default 10_000_000 bytes to
match BigQuery Storage Write API row-size limit).
- New Utf8BoundedAppendable + OversizedJsonException that track exact UTF-8
byte counts per code point and abort mid-serialization on overflow.
- ProtoDecodeTransformer.transformBounded: cheap raw-size pre-check plus
bounded JsonFormat.appendTo; returns a TransformResult { SUCCESS,
DECODE_ERROR, OVERSIZED, NO_TRANSFORMER } value type. The existing
unbounded transform(byte[]) entry point is preserved.
- ValueTransformerRegistry.transformBounded dispatches to the bounded
proto path and falls back to the unbounded path with a best-effort
size check for non-proto transformers.
- Mod carries a transient TransformResult the pipeline inspects; new
5-arg constructor threads maxDecodedValueBytes through (old 4-arg
constructor delegates with Long.MAX_VALUE for backward compatibility).
- ChangeStreamMutationToTableRowFn now emits MAIN_OUT (TableRow) and
OVERSIZED_DLQ_OUT (String) tagged outputs, increments an
oversizedDecodes counter plus a decodedValueBytes distribution, and
writes a compact per-row metadata record to the existing severe DLQ
sink.
Unit tests cover UTF-8 byte counting (ASCII, 2/3-byte chars, surrogate
pairs, boundary), the cheap-path and mid-decode oversized paths,
DECODE_ERROR for malformed bytes, NO_TRANSFORMER dispatch, and the
non-proto size bound. An IT case exercises the pipeline end-to-end with
a 10 KB cap, asserting the small cell reaches BigQuery and the oversized
cell's metadata reaches the DLQ instead.
Interface:
- Move transformBounded onto ValueTransformer as a default method; override
in ProtoDecodeTransformer.
- Drop instanceof branch from ValueTransformerRegistry.transformBounded;
it now just dispatches via the interface.
- Remove the transient TransformResult field / getter from Mod. A new
Mod.buildSetCell(...) static factory returns SetCellBuildResult
(Mod + TransformResult). Mod's existing 4-arg constructor is preserved
for backward compatibility (still no bound).
Reuse:
- Replace the bespoke oversized-DLQ JSON code with a
DeadLetterQueueSanitizer pattern: new OversizedValue data class as the
side-output element type and new OversizedValueDlqSanitizer that the
pipeline pipes through a MapElements before DLQWriteTransform.
- Use ChangelogColumn.getBqColumnName() for the canonical JSON keys
(row_key, column_family, column, source_instance/cluster/table,
commit_timestamp). Named constants for the non-column fields and the
reason literal.
- Drop all Jackson plumbing from BigtableChangeStreamsToBigQuery;
ThreadLocal<Gson> in the sanitizer matches the neighbouring
BigQueryDeadLetterQueueSanitizer.
Efficiency:
- Only call setCell.getValue().toByteArray() when a transformer is
registered for the column; 100 MB cells aren't copied when nobody
consumes them.
- Utf8BoundedAppendable now uses com.google.common.base.Utf8.encodedLength
per segment instead of per-codepoint arithmetic, preserving
abort-mid-decode semantics.
- Pre-size the StringBuilder to a capped maxBytes (64 .. 1 MiB).
- Simplify commit timestamp formatting to Instant#toString (ISO-8601
UTC, nanosecond precision).
Comments:
- OversizedJsonException#fillInStackTrace short-circuits (unwind-only
control flow).
- Tighten Mod / ProtoDecodeTransformer comments to WHY, drop the Jackson
LinkedHashMap note, drop the dangling transformResult javadoc.
Tests:
- Updated ValueTransformerRegistryTest for the interface change and added
a case exercising the default transformBounded on a non-proto
transformer.
- New OversizedValueDlqSanitizerTest covers the JSON schema and the
null-timestamp case.
- Updated BigtableChangeStreamsToBigQueryIT to parse the sanitizer
envelope ({message, error_message}).
Summary
Adds optional protobuf decoding support to the Bigtable Change Streams to BigQuery template. When configured, matching cell values are decoded from proto binary to JSON and written as a STRING to the BigQuery
valuecolumn.This fills a gap: the Pub/Sub Proto to BigQuery template already supports proto decoding, but the Bigtable CDC template does not — despite Bigtable being a common store for protobuf-encoded data (as evidenced by the Query protobuf data docs).
New parameters (all optional, backwards compatible)
protoSchemaPathFileDescriptorSet(.pb) filefullProtoMessageNamepackage.MessageName)protoColumnFamilyprotoColumnpreserveProtoFieldNamesWhen all four required params are set,
SetCellentries matching the configured column family and qualifier are decoded viaDynamicMessage.parseFrom()+JsonFormat.printer(). Non-matching cells and decode failures fall back to the existing raw value behavior.Reuses existing infrastructure
SchemaUtils.getProtoDomain()/SchemaUtils.createBigQuerySchema()fromv2/commonDynamicMessage+JsonFormat.Printer(same pattern asPubsubProtoToBigQuery)ProtoDecoderisSerializablewith lazy init of transientDescriptor/Printerfor worker-side initializationFiles changed
BigtableChangeStreamToBigQueryOptions.java— 5 new pipeline parametersBigtableChangeStreamsToBigQuery.java— validation,ProtoDecoderwiring, updatedChangeStreamMutationToTableRowFnMod.java— proto-awareSetCellconstructorBigQueryUtils.java—VALUE_STRINGformatter checks for decoded proto JSONProtoDecoder.java— new class, ~120 linesBigQueryUtilTest.java— 5 new tests covering decode, field name preservation, fallback, and backwards compatTest plan
mvn test -Dtest=BigQueryUtilTestmvn spotless:checkpassesmvn compilesucceeds