Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,110 @@ public interface BigtableChangeStreamToBigQueryOptions
String getDlqDirectory();

void setDlqDirectory(String value);

@TemplateParameter.Text(
order = 11,
optional = true,
description = "Column value transforms",
helpText =
"A comma-separated list of column value transforms. Each entry has the format "
+ "column_family:column_qualifier:TRANSFORM_TYPE. Supported TRANSFORM_TYPE values: "
+ "BIG_ENDIAN_TIMESTAMP (interprets 8-byte big-endian values as Unix epoch millis), "
+ "PROTO_DECODE(package.MessageName) (decodes protobuf-encoded values to JSON; "
+ "requires protoSchemaPath). For complex transformations, consider using a "
+ "JavaScript UDF. Note that column qualifiers containing commas are not supported "
+ "since comma is used as the entry delimiter.")
@Default.String("")
String getColumnTransforms();

void setColumnTransforms(String value);

@TemplateParameter.GcsReadFile(
order = 12,
optional = true,
description = "Cloud Storage path to the proto schema file",
helpText =
"The Cloud Storage location of the self-contained proto schema file. "
+ "For example, gs://path/to/my/file.pb. This file can be generated with the "
+ "--descriptor_set_out flag of the protoc command. The --include_imports flag "
+ "guarantees that the file is self-contained. Required when using "
+ "PROTO_DECODE() in columnTransforms. For legacy compatibility, when set along "
+ "with fullProtoMessageName, protoColumnFamily, and protoColumn, the pipeline "
+ "automatically generates a columnTransforms entry. Prefer using "
+ "columnTransforms with PROTO_DECODE() directly for multi-column proto decoding.")
@Default.String("")
String getProtoSchemaPath();

void setProtoSchemaPath(String value);

@TemplateParameter.Text(
order = 13,
optional = true,
regexes = {"^.+([a-zA-Z][a-zA-Z0-9_]+\\.?)+[a-zA-Z0-9_]$"},
description = "Full proto message name",
helpText =
"The full proto message name. For example, package.name.MessageName, "
+ "where package.name is the value provided for the package statement "
+ "and not the java_package statement. Used with the legacy proto options. "
+ "Prefer using columnTransforms with PROTO_DECODE() for new configurations.")
@Default.String("")
String getFullProtoMessageName();

void setFullProtoMessageName(String value);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for being consistent with src/main/java/com/google/cloud/teleport/v2/templates/PubsubProtoToBigQuery.java


@TemplateParameter.Text(
order = 14,
optional = true,
description = "Column family containing proto values",
helpText =
"The Bigtable column family containing protobuf-encoded values to decode. "
+ "Used with the legacy proto options. Together with protoColumn, this defines "
+ "the proto-mapped column. Prefer using columnTransforms with PROTO_DECODE() "
+ "for multi-column proto decoding.")
@Default.String("")
String getProtoColumnFamily();

void setProtoColumnFamily(String value);

@TemplateParameter.Text(
order = 15,
optional = true,
description = "Column qualifier containing proto values",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grouping together multiple data into single proto column is a best practice, but it's not always the case, what if you have second proto in other column family or another column.

helpText =
"The Bigtable column qualifier containing protobuf-encoded values to decode. "
+ "Used with the legacy proto options. Together with protoColumnFamily, this "
+ "defines the proto-mapped column. Prefer using columnTransforms with "
+ "PROTO_DECODE() for multi-column proto decoding.")
@Default.String("")
String getProtoColumn();

void setProtoColumn(String value);

@TemplateParameter.Boolean(
order = 16,
optional = true,
description = "Preserve proto field names in JSON output",
helpText =
"When set to true, preserves original proto field names (snake_case) in the "
+ "JSON output. When set to false, uses lowerCamelCase. Defaults to false.")
@Default.Boolean(false)
Boolean getPreserveProtoFieldNames();

void setPreserveProtoFieldNames(Boolean value);

@TemplateParameter.Long(
order = 17,
optional = true,
description = "Maximum decoded value size in bytes",
helpText =
"Maximum allowed size, in bytes, of a value after applying a columnTransforms "
+ "decoder (e.g. PROTO_DECODE). Values whose decoded output would exceed this "
+ "threshold are routed to the dead-letter queue with metadata only, and are "
+ "not written to BigQuery. The default of 10000000 (10 MB) matches BigQuery "
+ "Storage Write API's maximum row size, ensuring a single predictable error "
+ "funnel. Increase this if you write to a sink that accepts larger rows.")
@Default.Long(10_000_000L)
Long getMaxDecodedValueBytes();

void setMaxDecodedValueBytes(Long value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.BigQueryDestination;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.Mod;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.ModType;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.OversizedValue;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.BigQueryUtils;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.OversizedValueDlqSanitizer;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.TransformResult;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.ValueTransformerRegistry;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.utils.BigtableSource;
import java.util.ArrayList;
Expand All @@ -51,12 +55,17 @@
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.ExistingPipelineOptions;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand Down Expand Up @@ -182,6 +191,51 @@ public static PipelineResult run(BigtableChangeStreamToBigQueryOptions options)

BigQueryUtils bigQuery = new BigQueryUtils(sourceInfo, destinationInfo);

// Validate legacy proto options: either all must be set or none.
boolean anyProtoSet =
!StringUtils.isBlank(options.getProtoSchemaPath())
|| !StringUtils.isBlank(options.getFullProtoMessageName())
|| !StringUtils.isBlank(options.getProtoColumnFamily())
|| !StringUtils.isBlank(options.getProtoColumn());
boolean allProtoSet =
!StringUtils.isBlank(options.getProtoSchemaPath())
&& !StringUtils.isBlank(options.getFullProtoMessageName())
&& !StringUtils.isBlank(options.getProtoColumnFamily())
&& !StringUtils.isBlank(options.getProtoColumn());

if (anyProtoSet && !allProtoSet) {
throw new IllegalArgumentException(
"When using protobuf decoding, all of protoSchemaPath, fullProtoMessageName, "
+ "protoColumnFamily, and protoColumn must be specified.");
}

// Build columnTransforms config, prepending a synthetic entry for legacy proto options.
String columnTransforms = options.getColumnTransforms();
String protoSchemaPath = options.getProtoSchemaPath();
boolean preserveProtoFieldNames = options.getPreserveProtoFieldNames();

if (allProtoSet) {
String syntheticTransform =
options.getProtoColumnFamily()
+ ":"
+ options.getProtoColumn()
+ ":PROTO_DECODE("
+ options.getFullProtoMessageName()
+ ")";
columnTransforms =
StringUtils.isBlank(columnTransforms)
? syntheticTransform
: syntheticTransform + "," + columnTransforms;
LOG.info(
"Proto decoding enabled for {}.{} using message type {}",
options.getProtoColumnFamily(),
options.getProtoColumn(),
options.getFullProtoMessageName());
}

ValueTransformerRegistry transformerRegistry =
ValueTransformerRegistry.parse(columnTransforms, protoSchemaPath, preserveProtoFieldNames);

Pipeline pipeline = Pipeline.create(options);
DeadLetterQueueManager dlqManager = buildDlqManager(options);

Expand Down Expand Up @@ -210,10 +264,32 @@ public static PipelineResult run(BigtableChangeStreamToBigQueryOptions options)
.apply("Read from Cloud Bigtable Change Streams", readChangeStream)
.apply(Values.create());

PCollection<TableRow> changeStreamMutationToTableRow =
long maxDecodedValueBytes = options.getMaxDecodedValueBytes();
PCollectionTuple processed =
dataChangeRecord.apply(
"ChangeStreamMutation To TableRow",
ParDo.of(new ChangeStreamMutationToTableRowFn(sourceInfo, bigQuery)));
ParDo.of(
new ChangeStreamMutationToTableRowFn(
sourceInfo, bigQuery, transformerRegistry, maxDecodedValueBytes))
.withOutputTags(
ChangeStreamMutationToTableRowFn.MAIN_OUT,
TupleTagList.of(ChangeStreamMutationToTableRowFn.OVERSIZED_DLQ_OUT)));

PCollection<TableRow> changeStreamMutationToTableRow =
processed.get(ChangeStreamMutationToTableRowFn.MAIN_OUT);

// Route oversized decoded values to the severe DLQ with metadata only; the cell is not
// retried against BigQuery because it would also fail the Storage Write API's row-size limit.
processed
.get(ChangeStreamMutationToTableRowFn.OVERSIZED_DLQ_OUT)
.apply("Sanitize Oversized Values", MapElements.via(new OversizedValueDlqSanitizer()))
.apply(
"Write Oversized Values To DLQ",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqManager.getSevereDlqDirectory() + "YYYY/MM/dd/HH/mm/")
.withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
.setIncludePaneInfo(true)
.build());

Write<TableRow> bigQueryWrite =
BigQueryIO.<TableRow>write()
Expand Down Expand Up @@ -319,27 +395,62 @@ private static String getBigQueryProjectId(BigtableChangeStreamToBigQueryOptions

/**
* DoFn that converts a {@link ChangeStreamMutation} to multiple {@link Mod} in serialized JSON
* format.
* format. SET_CELL mutations whose decoded value exceeds the configured {@code
* maxDecodedValueBytes} are emitted to {@link #OVERSIZED_DLQ_OUT} as a metadata-only record
* instead of the main {@link TableRow} output.
*/
static class ChangeStreamMutationToTableRowFn extends DoFn<ChangeStreamMutation, TableRow> {

/** Main output: successfully built {@link TableRow}s to write to BigQuery. */
public static final TupleTag<TableRow> MAIN_OUT = new TupleTag<TableRow>() {};

/** Side output: metadata describing oversized decoded values, routed to DLQ. */
public static final TupleTag<OversizedValue> OVERSIZED_DLQ_OUT =
new TupleTag<OversizedValue>() {};

private final BigtableSource sourceInfo;
private final BigQueryUtils bigQuery;

ChangeStreamMutationToTableRowFn(BigtableSource source, BigQueryUtils bigQuery) {
private final ValueTransformerRegistry transformerRegistry;
private final long maxDecodedValueBytes;

private final org.apache.beam.sdk.metrics.Counter oversizedDecodesCounter =
Metrics.counter(ChangeStreamMutationToTableRowFn.class, "oversizedDecodes");
private final Distribution decodedValueBytesDistribution =
Metrics.distribution(ChangeStreamMutationToTableRowFn.class, "decodedValueBytes");

ChangeStreamMutationToTableRowFn(
BigtableSource source,
BigQueryUtils bigQuery,
ValueTransformerRegistry transformerRegistry,
long maxDecodedValueBytes) {
this.sourceInfo = source;
this.bigQuery = bigQuery;
this.transformerRegistry = transformerRegistry;
this.maxDecodedValueBytes = maxDecodedValueBytes;
}

@ProcessElement
public void process(@Element ChangeStreamMutation input, OutputReceiver<TableRow> receiver)
public void process(@Element ChangeStreamMutation input, MultiOutputReceiver receiver)
throws Exception {
for (Entry entry : input.getEntries()) {
ModType modType = getModType(entry);

Mod mod = null;
Mod mod;
switch (modType) {
case SET_CELL:
mod = new Mod(sourceInfo, input, (SetCell) entry);
SetCell setCell = (SetCell) entry;
Mod.SetCellBuildResult buildResult =
Mod.buildSetCell(
sourceInfo, input, setCell, transformerRegistry, maxDecodedValueBytes);
TransformResult result = buildResult.transformResult();
if (result.status() == TransformResult.Status.SUCCESS) {
decodedValueBytesDistribution.update(result.decodedBytes());
} else if (result.status() == TransformResult.Status.OVERSIZED) {
oversizedDecodesCounter.inc();
receiver.get(OVERSIZED_DLQ_OUT).output(toOversizedValue(input, setCell, result));
continue;
}
mod = buildResult.mod();
break;
case DELETE_CELLS:
mod = new Mod(sourceInfo, input, (DeleteCells) entry);
Expand All @@ -358,11 +469,26 @@ public void process(@Element ChangeStreamMutation input, OutputReceiver<TableRow

TableRow tableRow = new TableRow();
if (bigQuery.setTableRowFields(mod, tableRow)) {
receiver.output(tableRow);
receiver.get(MAIN_OUT).output(tableRow);
}
}
}

private OversizedValue toOversizedValue(
ChangeStreamMutation mutation, SetCell setCell, TransformResult result) {
return new OversizedValue(
mutation.getRowKey().toStringUtf8(),
mutation.getCommitTimestamp(),
setCell.getFamilyName(),
setCell.getQualifier().toStringUtf8(),
result.rawBytes(),
result.decodedBytes(),
maxDecodedValueBytes,
sourceInfo.getInstanceId(),
mutation.getSourceClusterId(),
sourceInfo.getTableId());
}

private ModType getModType(Entry entry) {
if (entry instanceof SetCell) {
return ModType.SET_CELL;
Expand Down
Loading