Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,85 @@ public interface BigtableChangeStreamToBigQueryOptions
String getDlqDirectory();

void setDlqDirectory(String value);

@TemplateParameter.GcsReadFile(
order = 11,
optional = true,
description = "Cloud Storage path to the proto schema file",
helpText =
"The Cloud Storage location of the self-contained proto schema file. "
+ "For example, gs://path/to/my/file.pb. This file can be generated with the "
+ "--descriptor_set_out flag of the protoc command. The --include_imports flag "
+ "guarantees that the file is self-contained. When set along with "
+ "fullProtoMessageName, the pipeline decodes matching cell values as protobuf "
+ "messages and writes them as JSON to BigQuery.")
@Default.String("")
String getProtoSchemaPath();

void setProtoSchemaPath(String value);

@TemplateParameter.Text(
order = 12,
optional = true,
regexes = {"^.+([a-zA-Z][a-zA-Z0-9_]+\\.?)+[a-zA-Z0-9_]$"},
description = "Full proto message name",
helpText =
"The full proto message name. For example, package.name.MessageName, "
+ "where package.name is the value provided for the package statement "
+ "and not the java_package statement.")
@Default.String("")
String getFullProtoMessageName();

void setFullProtoMessageName(String value);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for being consistent with src/main/java/com/google/cloud/teleport/v2/templates/PubsubProtoToBigQuery.java


@TemplateParameter.Text(
order = 13,
optional = true,
description = "Column family containing proto values",
helpText =
"The Bigtable column family containing protobuf-encoded values to decode. "
+ "Required when protoSchemaPath is set.")
@Default.String("")
String getProtoColumnFamily();

void setProtoColumnFamily(String value);

@TemplateParameter.Text(
order = 14,
optional = true,
description = "Column qualifier containing proto values",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grouping together multiple data into single proto column is a best practice, but it's not always the case, what if you have second proto in other column family or another column.

helpText =
"The Bigtable column qualifier containing protobuf-encoded values to decode. "
+ "Required when protoSchemaPath is set.")
@Default.String("")
String getProtoColumn();

void setProtoColumn(String value);

@TemplateParameter.Boolean(
order = 15,
optional = true,
description = "Preserve proto field names in JSON output",
helpText =
"When set to true, preserves original proto field names (snake_case) in the "
+ "JSON output. When set to false, uses lowerCamelCase. Defaults to false.")
@Default.Boolean(false)
Boolean getPreserveProtoFieldNames();

void setPreserveProtoFieldNames(Boolean value);

@TemplateParameter.Text(
order = 16,
optional = true,
description = "Column value transforms",
helpText =
"A comma-separated list of column value transformations in the format "
+ "column_family:column:TRANSFORM_TYPE. Matched cell values are transformed "
+ "before writing to BigQuery. Supported types: BIG_ENDIAN_UINT64_TIMESTAMP_MS "
+ "(8-byte big-endian unsigned 64-bit integer as Unix epoch milliseconds, "
+ "converted to a timestamp string).")
@Default.String("")
String getColumnTransforms();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is writeNumericTimestamps that converts int64 to timestamp for limited amount of columns. In your change, you are not only converting to some format (String in this case), but you also change endianness. Why not to timestamp type? why not to little endian uint? This looks like UDF to me with very limited scope that would require releasing a template whenever you want to add new transform.
Would JS UDF be good alternative?


void setColumnTransforms(String value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.Mod;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.ModType;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.BigQueryUtils;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.ProtoDecoder;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.ValueTransformerRegistry;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.utils.BigtableSource;
import java.util.ArrayList;
Expand Down Expand Up @@ -182,6 +184,38 @@ public static PipelineResult run(BigtableChangeStreamToBigQueryOptions options)

BigQueryUtils bigQuery = new BigQueryUtils(sourceInfo, destinationInfo);

ProtoDecoder protoDecoder = null;
boolean hasProtoSchema = !StringUtils.isBlank(options.getProtoSchemaPath());
boolean hasProtoMessage = !StringUtils.isBlank(options.getFullProtoMessageName());
boolean hasProtoColumnFamily = !StringUtils.isBlank(options.getProtoColumnFamily());
boolean hasProtoColumn = !StringUtils.isBlank(options.getProtoColumn());

if (hasProtoSchema || hasProtoMessage || hasProtoColumnFamily || hasProtoColumn) {
if (!hasProtoSchema || !hasProtoMessage || !hasProtoColumnFamily || !hasProtoColumn) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

difficult to read.. maybe:

boolean anyProtoSet = hasProtoSchema || hasProtoMessage || hasProtoColumnFamily || hasProtoColumn;
boolean allProtoSet = hasProtoSchema && hasProtoMessage && hasProtoColumnFamily && hasProtoColumn;

if (anyProtoSet && !allProtoSet) { throw new IllegalArgumentException( [..]

throw new IllegalArgumentException(
"When using protobuf decoding, all of protoSchemaPath, fullProtoMessageName, "
+ "protoColumnFamily, and protoColumn must be specified.");
}
protoDecoder =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProtoDecoder protoDecoder = null;
    if (allProtoSet) { protoDecoder = new ProtoDecoder( [..]

new ProtoDecoder(
options.getProtoSchemaPath(),
options.getFullProtoMessageName(),
options.getProtoColumnFamily(),
options.getProtoColumn(),
options.getPreserveProtoFieldNames());
LOG.info(
"Proto decoding enabled for {}.{} using message type {}",
options.getProtoColumnFamily(),
options.getProtoColumn(),
options.getFullProtoMessageName());
}

ValueTransformerRegistry transformerRegistry =
ValueTransformerRegistry.parse(options.getColumnTransforms());
if (transformerRegistry != null) {
LOG.info("Column transforms enabled: {}", options.getColumnTransforms());
}

Pipeline pipeline = Pipeline.create(options);
DeadLetterQueueManager dlqManager = buildDlqManager(options);

Expand Down Expand Up @@ -213,7 +247,9 @@ public static PipelineResult run(BigtableChangeStreamToBigQueryOptions options)
PCollection<TableRow> changeStreamMutationToTableRow =
dataChangeRecord.apply(
"ChangeStreamMutation To TableRow",
ParDo.of(new ChangeStreamMutationToTableRowFn(sourceInfo, bigQuery)));
ParDo.of(
new ChangeStreamMutationToTableRowFn(
sourceInfo, bigQuery, protoDecoder, transformerRegistry)));

Write<TableRow> bigQueryWrite =
BigQueryIO.<TableRow>write()
Expand Down Expand Up @@ -324,10 +360,18 @@ private static String getBigQueryProjectId(BigtableChangeStreamToBigQueryOptions
static class ChangeStreamMutationToTableRowFn extends DoFn<ChangeStreamMutation, TableRow> {
private final BigtableSource sourceInfo;
private final BigQueryUtils bigQuery;

ChangeStreamMutationToTableRowFn(BigtableSource source, BigQueryUtils bigQuery) {
private final ProtoDecoder protoDecoder;
private final ValueTransformerRegistry transformerRegistry;

ChangeStreamMutationToTableRowFn(
BigtableSource source,
BigQueryUtils bigQuery,
ProtoDecoder protoDecoder,
ValueTransformerRegistry transformerRegistry) {
this.sourceInfo = source;
this.bigQuery = bigQuery;
this.protoDecoder = protoDecoder;
this.transformerRegistry = transformerRegistry;
}

@ProcessElement
Expand All @@ -339,7 +383,7 @@ public void process(@Element ChangeStreamMutation input, OutputReceiver<TableRow
Mod mod = null;
switch (modType) {
case SET_CELL:
mod = new Mod(sourceInfo, input, (SetCell) entry);
mod = new Mod(sourceInfo, input, (SetCell) entry, protoDecoder, transformerRegistry);
break;
case DELETE_CELLS:
mod = new Mod(sourceInfo, input, (DeleteCells) entry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.google.cloud.bigtable.data.v2.models.DeleteFamily;
import com.google.cloud.bigtable.data.v2.models.Range.BoundType;
import com.google.cloud.bigtable.data.v2.models.SetCell;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.ProtoDecoder;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.ValueTransformerRegistry;
import com.google.cloud.teleport.v2.utils.BigtableSource;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
Expand All @@ -49,6 +51,8 @@ public final class Mod implements Serializable {

private static final String PATTERN_FORMAT = "yyyy-MM-dd HH:mm:ss.SSSSSS";

public static final String TRANSFORMED_VALUE = "TRANSFORMED_VALUE";

private static final ThreadLocal<ObjectMapper> OBJECT_MAPPER =
ThreadLocal.withInitial(ObjectMapper::new);

Expand Down Expand Up @@ -79,6 +83,38 @@ public Mod(BigtableSource source, ChangeStreamMutation mutation, SetCell setCell
this.changeJson = convertPropertiesToJson(propertiesMap);
}

public Mod(
BigtableSource source,
ChangeStreamMutation mutation,
SetCell setCell,
ProtoDecoder protoDecoder,
ValueTransformerRegistry transformerRegistry) {
this(mutation.getCommitTimestamp(), ModType.SET_CELL);

Map<String, Object> propertiesMap = Maps.newHashMap();
setCommonProperties(propertiesMap, source, mutation);
setSpecificProperties(propertiesMap, setCell);

if (protoDecoder != null || transformerRegistry != null) {
String qualifierStr = setCell.getQualifier().toStringUtf8();
byte[] valueBytes = setCell.getValue().toByteArray();
String transformed = null;

if (protoDecoder != null && protoDecoder.matches(setCell.getFamilyName(), qualifierStr)) {
transformed = protoDecoder.decode(valueBytes);
}
if (transformed == null && transformerRegistry != null) {
transformed =
transformerRegistry.transform(setCell.getFamilyName(), qualifierStr, valueBytes);
}
if (transformed != null) {
propertiesMap.put(TRANSFORMED_VALUE, transformed);
}
}

this.changeJson = convertPropertiesToJson(propertiesMap);
}

public Mod(BigtableSource source, ChangeStreamMutation mutation, DeleteCells deleteCells) {
this.commitTimestampNanos = mutation.getCommitTimestamp().getNano();
this.commitTimestampSeconds = mutation.getCommitTimestamp().getEpochSecond();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (C) 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Transforms an 8-byte big-endian 64-bit integer representing Unix epoch milliseconds into a
* BigQuery-compatible timestamp string.
*/
public class BigEndianTimestampTransformer implements ValueTransformer {

private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(BigEndianTimestampTransformer.class);
private static final DateTimeFormatter FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS").withZone(ZoneId.of("UTC"));

@Override
public String transform(byte[] bytes) {
if (bytes == null || bytes.length != 8) {
LOG.warn(
"Expected 8 bytes for big-endian uint64 timestamp, got {}",
bytes == null ? "null" : bytes.length);
return null;
}
try {
long millis = ByteBuffer.wrap(bytes).getLong();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add .order(ByteOrder.BIG_ENDIAN) to make it explicit and make code self documenting.

return FORMATTER.format(Instant.ofEpochMilli(millis));
} catch (Exception e) {
LOG.warn("Failed to decode big-endian timestamp: {}", e.getMessage());
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,12 @@ public class BigQueryUtils implements Serializable {
FORMATTERS.put(
ChangelogColumn.VALUE_STRING,
(bq, chg) -> {
if (chg.has(Mod.TRANSFORMED_VALUE)) {
return chg.getString(Mod.TRANSFORMED_VALUE);
}
if (!chg.has(ChangelogColumn.VALUE_BYTES.name())) {
return null;
}

String valueEncoded = chg.getString(ChangelogColumn.VALUE_BYTES.name());
return bq.convertBase64ToString(valueEncoded);
});
Expand Down
Loading
Loading