-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat(bigtable): add protobuf decoding to Bigtable Change Streams to BigQuery #3572
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
dc90c38
024da4e
2a093db
d375c53
446fb3a
ed946bc
d2ae237
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -149,4 +149,85 @@ public interface BigtableChangeStreamToBigQueryOptions | |
| String getDlqDirectory(); | ||
|
|
||
| void setDlqDirectory(String value); | ||
|
|
||
| @TemplateParameter.GcsReadFile( | ||
| order = 11, | ||
| optional = true, | ||
| description = "Cloud Storage path to the proto schema file", | ||
| helpText = | ||
| "The Cloud Storage location of the self-contained proto schema file. " | ||
| + "For example, gs://path/to/my/file.pb. This file can be generated with the " | ||
| + "--descriptor_set_out flag of the protoc command. The --include_imports flag " | ||
| + "guarantees that the file is self-contained. When set along with " | ||
| + "fullProtoMessageName, the pipeline decodes matching cell values as protobuf " | ||
| + "messages and writes them as JSON to BigQuery.") | ||
| @Default.String("") | ||
| String getProtoSchemaPath(); | ||
|
|
||
| void setProtoSchemaPath(String value); | ||
|
|
||
| @TemplateParameter.Text( | ||
| order = 12, | ||
| optional = true, | ||
| regexes = {"^.+([a-zA-Z][a-zA-Z0-9_]+\\.?)+[a-zA-Z0-9_]$"}, | ||
| description = "Full proto message name", | ||
| helpText = | ||
| "The full proto message name. For example, package.name.MessageName, " | ||
| + "where package.name is the value provided for the package statement " | ||
| + "and not the java_package statement.") | ||
| @Default.String("") | ||
| String getFullProtoMessageName(); | ||
|
|
||
| void setFullProtoMessageName(String value); | ||
|
|
||
| @TemplateParameter.Text( | ||
| order = 13, | ||
| optional = true, | ||
| description = "Column family containing proto values", | ||
| helpText = | ||
| "The Bigtable column family containing protobuf-encoded values to decode. " | ||
| + "Required when protoSchemaPath is set.") | ||
| @Default.String("") | ||
| String getProtoColumnFamily(); | ||
|
|
||
| void setProtoColumnFamily(String value); | ||
|
|
||
| @TemplateParameter.Text( | ||
| order = 14, | ||
| optional = true, | ||
| description = "Column qualifier containing proto values", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Grouping together multiple data into single proto column is a best practice, but it's not always the case, what if you have second proto in other column family or another column. |
||
| helpText = | ||
| "The Bigtable column qualifier containing protobuf-encoded values to decode. " | ||
| + "Required when protoSchemaPath is set.") | ||
| @Default.String("") | ||
| String getProtoColumn(); | ||
|
|
||
| void setProtoColumn(String value); | ||
|
|
||
| @TemplateParameter.Boolean( | ||
| order = 15, | ||
| optional = true, | ||
| description = "Preserve proto field names in JSON output", | ||
| helpText = | ||
| "When set to true, preserves original proto field names (snake_case) in the " | ||
| + "JSON output. When set to false, uses lowerCamelCase. Defaults to false.") | ||
| @Default.Boolean(false) | ||
| Boolean getPreserveProtoFieldNames(); | ||
|
|
||
| void setPreserveProtoFieldNames(Boolean value); | ||
|
|
||
| @TemplateParameter.Text( | ||
| order = 16, | ||
| optional = true, | ||
| description = "Column value transforms", | ||
| helpText = | ||
| "A comma-separated list of column value transformations in the format " | ||
| + "column_family:column:TRANSFORM_TYPE. Matched cell values are transformed " | ||
| + "before writing to BigQuery. Supported types: BIG_ENDIAN_UINT64_TIMESTAMP_MS " | ||
| + "(8-byte big-endian unsigned 64-bit integer as Unix epoch milliseconds, " | ||
| + "converted to a timestamp string).") | ||
| @Default.String("") | ||
| String getColumnTransforms(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is |
||
|
|
||
| void setColumnTransforms(String value); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,6 +36,8 @@ | |
| import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.Mod; | ||
| import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.ModType; | ||
| import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.BigQueryUtils; | ||
| import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.ProtoDecoder; | ||
| import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.ValueTransformerRegistry; | ||
| import com.google.cloud.teleport.v2.transforms.DLQWriteTransform; | ||
| import com.google.cloud.teleport.v2.utils.BigtableSource; | ||
| import java.util.ArrayList; | ||
|
|
@@ -182,6 +184,38 @@ public static PipelineResult run(BigtableChangeStreamToBigQueryOptions options) | |
|
|
||
| BigQueryUtils bigQuery = new BigQueryUtils(sourceInfo, destinationInfo); | ||
|
|
||
| ProtoDecoder protoDecoder = null; | ||
| boolean hasProtoSchema = !StringUtils.isBlank(options.getProtoSchemaPath()); | ||
| boolean hasProtoMessage = !StringUtils.isBlank(options.getFullProtoMessageName()); | ||
| boolean hasProtoColumnFamily = !StringUtils.isBlank(options.getProtoColumnFamily()); | ||
| boolean hasProtoColumn = !StringUtils.isBlank(options.getProtoColumn()); | ||
|
|
||
| if (hasProtoSchema || hasProtoMessage || hasProtoColumnFamily || hasProtoColumn) { | ||
| if (!hasProtoSchema || !hasProtoMessage || !hasProtoColumnFamily || !hasProtoColumn) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. difficult to read.. maybe: |
||
| throw new IllegalArgumentException( | ||
| "When using protobuf decoding, all of protoSchemaPath, fullProtoMessageName, " | ||
| + "protoColumnFamily, and protoColumn must be specified."); | ||
| } | ||
| protoDecoder = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| new ProtoDecoder( | ||
| options.getProtoSchemaPath(), | ||
| options.getFullProtoMessageName(), | ||
| options.getProtoColumnFamily(), | ||
| options.getProtoColumn(), | ||
| options.getPreserveProtoFieldNames()); | ||
| LOG.info( | ||
| "Proto decoding enabled for {}.{} using message type {}", | ||
| options.getProtoColumnFamily(), | ||
| options.getProtoColumn(), | ||
| options.getFullProtoMessageName()); | ||
| } | ||
|
|
||
| ValueTransformerRegistry transformerRegistry = | ||
| ValueTransformerRegistry.parse(options.getColumnTransforms()); | ||
| if (transformerRegistry != null) { | ||
| LOG.info("Column transforms enabled: {}", options.getColumnTransforms()); | ||
| } | ||
|
|
||
| Pipeline pipeline = Pipeline.create(options); | ||
| DeadLetterQueueManager dlqManager = buildDlqManager(options); | ||
|
|
||
|
|
@@ -213,7 +247,9 @@ public static PipelineResult run(BigtableChangeStreamToBigQueryOptions options) | |
| PCollection<TableRow> changeStreamMutationToTableRow = | ||
| dataChangeRecord.apply( | ||
| "ChangeStreamMutation To TableRow", | ||
| ParDo.of(new ChangeStreamMutationToTableRowFn(sourceInfo, bigQuery))); | ||
| ParDo.of( | ||
| new ChangeStreamMutationToTableRowFn( | ||
| sourceInfo, bigQuery, protoDecoder, transformerRegistry))); | ||
|
|
||
| Write<TableRow> bigQueryWrite = | ||
| BigQueryIO.<TableRow>write() | ||
|
|
@@ -324,10 +360,18 @@ private static String getBigQueryProjectId(BigtableChangeStreamToBigQueryOptions | |
| static class ChangeStreamMutationToTableRowFn extends DoFn<ChangeStreamMutation, TableRow> { | ||
| private final BigtableSource sourceInfo; | ||
| private final BigQueryUtils bigQuery; | ||
|
|
||
| ChangeStreamMutationToTableRowFn(BigtableSource source, BigQueryUtils bigQuery) { | ||
| private final ProtoDecoder protoDecoder; | ||
| private final ValueTransformerRegistry transformerRegistry; | ||
|
|
||
| ChangeStreamMutationToTableRowFn( | ||
| BigtableSource source, | ||
| BigQueryUtils bigQuery, | ||
| ProtoDecoder protoDecoder, | ||
| ValueTransformerRegistry transformerRegistry) { | ||
| this.sourceInfo = source; | ||
| this.bigQuery = bigQuery; | ||
| this.protoDecoder = protoDecoder; | ||
| this.transformerRegistry = transformerRegistry; | ||
| } | ||
|
|
||
| @ProcessElement | ||
|
|
@@ -339,7 +383,7 @@ public void process(@Element ChangeStreamMutation input, OutputReceiver<TableRow | |
| Mod mod = null; | ||
| switch (modType) { | ||
| case SET_CELL: | ||
| mod = new Mod(sourceInfo, input, (SetCell) entry); | ||
| mod = new Mod(sourceInfo, input, (SetCell) entry, protoDecoder, transformerRegistry); | ||
| break; | ||
| case DELETE_CELLS: | ||
| mod = new Mod(sourceInfo, input, (DeleteCells) entry); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| /* | ||
| * Copyright (C) 2023 Google LLC | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
| * use this file except in compliance with the License. You may obtain a copy of | ||
| * the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
| * License for the specific language governing permissions and limitations under | ||
| * the License. | ||
| */ | ||
| package com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils; | ||
|
|
||
| import java.nio.ByteBuffer; | ||
| import java.time.Instant; | ||
| import java.time.ZoneId; | ||
| import java.time.format.DateTimeFormatter; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| /** | ||
| * Transforms an 8-byte big-endian 64-bit integer representing Unix epoch milliseconds into a | ||
| * BigQuery-compatible timestamp string. | ||
| */ | ||
| public class BigEndianTimestampTransformer implements ValueTransformer { | ||
|
|
||
| private static final long serialVersionUID = 1L; | ||
| private static final Logger LOG = LoggerFactory.getLogger(BigEndianTimestampTransformer.class); | ||
| private static final DateTimeFormatter FORMATTER = | ||
| DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS").withZone(ZoneId.of("UTC")); | ||
|
|
||
| @Override | ||
| public String transform(byte[] bytes) { | ||
| if (bytes == null || bytes.length != 8) { | ||
| LOG.warn( | ||
| "Expected 8 bytes for big-endian uint64 timestamp, got {}", | ||
| bytes == null ? "null" : bytes.length); | ||
| return null; | ||
| } | ||
| try { | ||
| long millis = ByteBuffer.wrap(bytes).getLong(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add .order(ByteOrder.BIG_ENDIAN) to make it explicit and make code self documenting. |
||
| return FORMATTER.format(Instant.ofEpochMilli(millis)); | ||
| } catch (Exception e) { | ||
| LOG.warn("Failed to decode big-endian timestamp: {}", e.getMessage()); | ||
| return null; | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for being consistent with src/main/java/com/google/cloud/teleport/v2/templates/PubsubProtoToBigQuery.java