diff --git a/v2/googlecloud-to-neo4j/pom.xml b/v2/googlecloud-to-neo4j/pom.xml
index 814b8f03b5..d2284fa101 100644
--- a/v2/googlecloud-to-neo4j/pom.xml
+++ b/v2/googlecloud-to-neo4j/pom.xml
@@ -50,7 +50,7 @@
org.neo4j.importer
import-spec
- 1.0.0-rc11
+ 1.0.0-rc13
com.google.cloud.teleport.v2
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/ActionPreloadFactory.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/ActionPreloadFactory.java
deleted file mode 100644
index b6c5fa7908..0000000000
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/ActionPreloadFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (C) 2021 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.teleport.v2.neo4j.actions;
-
-import com.google.cloud.teleport.v2.neo4j.actions.preload.PreloadAction;
-import com.google.cloud.teleport.v2.neo4j.actions.preload.PreloadBigQueryAction;
-import com.google.cloud.teleport.v2.neo4j.actions.preload.PreloadCypherAction;
-import com.google.cloud.teleport.v2.neo4j.actions.preload.PreloadHttpAction;
-import com.google.cloud.teleport.v2.neo4j.model.job.ActionContext;
-import org.neo4j.importer.v1.actions.Action;
-
-/** Factory providing indirection to action handler. */
-public class ActionPreloadFactory {
-
- public static PreloadAction of(Action action, ActionContext context) {
- var actionType = action.getType();
- switch (actionType) {
- case "cypher":
- PreloadCypherAction cypher = new PreloadCypherAction();
- cypher.configure(action, context);
- return cypher;
- case "http":
- PreloadHttpAction http = new PreloadHttpAction();
- http.configure(action, context);
- return http;
- case "bigquery":
- PreloadBigQueryAction bigQuery = new PreloadBigQueryAction();
- bigQuery.configure(action, context);
- return bigQuery;
- }
- throw new RuntimeException("Unsupported preload action type: " + actionType);
- }
-}
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/preload/PreloadBigQueryAction.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/preload/PreloadBigQueryAction.java
deleted file mode 100644
index 8e44377ff9..0000000000
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/preload/PreloadBigQueryAction.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright (C) 2021 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.teleport.v2.neo4j.actions.preload;
-
-import com.google.cloud.bigquery.BigQuery;
-import com.google.cloud.bigquery.BigQueryOptions;
-import com.google.cloud.bigquery.QueryJobConfiguration;
-import com.google.cloud.bigquery.TableResult;
-import com.google.cloud.teleport.v2.neo4j.actions.BigQueryAction;
-import com.google.cloud.teleport.v2.neo4j.model.job.ActionContext;
-import java.util.ArrayList;
-import java.util.List;
-import org.neo4j.importer.v1.actions.Action;
-
-/** Query action handler. */
-public class PreloadBigQueryAction implements PreloadAction {
-
- private BigQueryAction action;
-
- @Override
- public void configure(Action action, ActionContext context) {
- this.action = (BigQueryAction) action;
- }
-
- @Override
- public List execute() {
- List msgs = new ArrayList<>();
- String sql = action.sql();
-
- try {
- BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
- QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(sql).build();
- msgs.add("Query: " + sql);
- TableResult queryResult = bigquery.query(queryConfig);
- msgs.add("Result rows: " + queryResult.getTotalRows());
-
- } catch (Exception e) {
- throw new RuntimeException(
- String.format("Exception running sql %s: %s", sql, e.getMessage()), e);
- }
-
- return msgs;
- }
-}
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/preload/PreloadCypherAction.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/preload/PreloadCypherAction.java
deleted file mode 100644
index 574b1b5c18..0000000000
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/preload/PreloadCypherAction.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright (C) 2021 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.teleport.v2.neo4j.actions.preload;
-
-import com.google.cloud.teleport.v2.neo4j.database.Neo4jConnection;
-import com.google.cloud.teleport.v2.neo4j.model.connection.ConnectionParams;
-import com.google.cloud.teleport.v2.neo4j.model.job.ActionContext;
-import com.google.cloud.teleport.v2.neo4j.telemetry.Neo4jTelemetry;
-import com.google.common.annotations.VisibleForTesting;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import org.apache.beam.sdk.transforms.SerializableBiFunction;
-import org.apache.commons.lang3.StringUtils;
-import org.neo4j.driver.TransactionConfig;
-import org.neo4j.importer.v1.actions.Action;
-import org.neo4j.importer.v1.actions.plugin.CypherAction;
-import org.neo4j.importer.v1.actions.plugin.CypherExecutionMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Cypher runner action handler. */
-public class PreloadCypherAction implements PreloadAction {
-
- private static final Logger LOG = LoggerFactory.getLogger(PreloadCypherAction.class);
- private final SerializableBiFunction
- connectionProvider;
-
- private String cypher;
- private ActionContext context;
- private CypherExecutionMode executionMode;
-
- public PreloadCypherAction() {
- this(Neo4jConnection::new);
- }
-
- @VisibleForTesting
- PreloadCypherAction(
- SerializableBiFunction connectionProvider) {
- this.connectionProvider = connectionProvider;
- }
-
- @Override
- public void configure(Action action, ActionContext context) {
- var cypherAction = (CypherAction) action;
- String cypher = cypherAction.getQuery();
- if (StringUtils.isEmpty(cypher)) {
- throw new RuntimeException("Cypher query not provided for preload cypher action.");
- }
- this.context = context;
- this.cypher = cypher;
- this.executionMode = cypherAction.getExecutionMode();
- }
-
- @Override
- public List execute() {
- try (Neo4jConnection connection =
- connectionProvider.apply(
- this.context.getNeo4jConnectionParams(), this.context.getTemplateVersion())) {
- LOG.info("Executing cypher: {}", cypher);
- try {
- run(connection);
- } catch (Exception e) {
- throw new RuntimeException(
- String.format("Exception running cypher, %s: %s", cypher, e.getMessage()), e);
- }
- return List.of();
- }
- }
-
- private void run(Neo4jConnection connection) {
- TransactionConfig txConfig =
- TransactionConfig.builder()
- .withMetadata(
- Neo4jTelemetry.transactionMetadata(
- Map.of(
- "sink", "neo4j",
- "step", "cypher-preload-action",
- "execution", executionMode.name().toLowerCase(Locale.ROOT))))
- .build();
- switch (executionMode) {
- case TRANSACTION:
- connection.writeTransaction(tx -> tx.run(cypher).consume(), txConfig);
- break;
- case AUTOCOMMIT:
- connection.runAutocommit(cypher, txConfig);
- break;
- }
- }
-}
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/preload/PreloadHttpAction.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/preload/PreloadHttpAction.java
deleted file mode 100644
index 136fbc9c9d..0000000000
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/preload/PreloadHttpAction.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright (C) 2021 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.teleport.v2.neo4j.actions.preload;
-
-import com.google.cloud.teleport.v2.neo4j.actions.HttpAction;
-import com.google.cloud.teleport.v2.neo4j.model.job.ActionContext;
-import com.google.cloud.teleport.v2.neo4j.utils.HttpUtils;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.neo4j.importer.v1.actions.Action;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Http action handler. */
-public class PreloadHttpAction implements PreloadAction {
-
- private static final Logger LOG = LoggerFactory.getLogger(PreloadHttpAction.class);
-
- private HttpAction action;
-
- @Override
- public void configure(Action action, ActionContext context) {
- this.action = (HttpAction) action;
- }
-
- @Override
- public List execute() {
- List msgs = new ArrayList<>();
- String uri = action.url();
- try (CloseableHttpResponse response =
- HttpUtils.getHttpResponse(action.method(), uri, action.headers())) {
- LOG.info("Request returned: {}", HttpUtils.getResponseContent(response));
-
- } catch (Exception e) {
- throw new RuntimeException(
- String.format("Exception making http post request: %s", e.getMessage()), e);
- }
-
- return msgs;
- }
-}
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/preload/package-info.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/preload/package-info.java
deleted file mode 100644
index bad0675c74..0000000000
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/preload/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright (C) 2021 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * This module supports preload actions which are run as the pipeline is being scaffolded.
- *
- * @author drumcircle
- * @version 1.0
- * @since 1.0
- */
-package com.google.cloud.teleport.v2.neo4j.actions.preload;
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/database/CypherPatterns.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/database/CypherPatterns.java
index 3a1d35a9e6..9a5839a6c9 100644
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/database/CypherPatterns.java
+++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/database/CypherPatterns.java
@@ -112,7 +112,7 @@ private static String optionsAsList(Collection> value) {
}
public static String propertyType(PropertyType propertyType) {
- return switch (propertyType) {
+ return switch (propertyType.getName()) {
case BOOLEAN -> "BOOLEAN";
case BOOLEAN_ARRAY -> "LIST";
case DATE -> "DATE";
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/database/Neo4jConnection.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/database/Neo4jConnection.java
index 6c17f5475d..98e019e5d7 100644
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/database/Neo4jConnection.java
+++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/database/Neo4jConnection.java
@@ -113,10 +113,10 @@ public void resetDatabase() {
var capabilities = capabilities();
if (capabilities.hasCreateOrReplaceDatabase()) {
- recreateDatabase(capabilities);
+ recreateDatabase();
} else {
deleteData();
- dropSchema(capabilities);
+ dropSchema();
}
} catch (Exception exception) {
LOG.error(
@@ -128,7 +128,7 @@ public void resetDatabase() {
}
}
- private void recreateDatabase(Neo4jCapabilities capabilities) {
+ private void recreateDatabase() {
try {
String database = !StringUtils.isEmpty(this.database) ? this.database : "neo4j";
String cypher = "CREATE OR REPLACE DATABASE $db WAIT 60 SECONDS";
@@ -140,7 +140,7 @@ private void recreateDatabase(Neo4jCapabilities capabilities) {
cypher, Map.of("db", database), databaseResetMetadata("create-replace-database"));
} catch (Exception ex) {
deleteData();
- dropSchema(capabilities);
+ dropSchema();
}
}
@@ -150,7 +150,7 @@ private void deleteData() {
runAutocommit(ddeCypher, databaseResetMetadata("cit-detach-delete"));
}
- private void dropSchema(Neo4jCapabilities capabilities) {
+ private void dropSchema() {
try (var session = getSession()) {
LOG.info("Dropping constraints");
var constraints =
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/ActionMapper.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/ActionMapper.java
index e5e7de929c..8e747b1718 100644
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/ActionMapper.java
+++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/ActionMapper.java
@@ -18,7 +18,7 @@
import com.google.cloud.teleport.v2.neo4j.actions.BigQueryAction;
import com.google.cloud.teleport.v2.neo4j.actions.HttpAction;
import com.google.cloud.teleport.v2.neo4j.actions.HttpMethod;
-import com.google.cloud.teleport.v2.neo4j.model.job.OptionsParams;
+import com.google.cloud.teleport.v2.neo4j.model.job.OverlayTokens;
import com.google.cloud.teleport.v2.neo4j.utils.ModelUtils;
import java.util.ArrayList;
import java.util.HashMap;
@@ -47,7 +47,7 @@ public static void index(JSONArray json, JobSpecIndex index) {
}
}
- public static List parse(JSONArray json, OptionsParams options) {
+ public static List parse(JSONArray json, OverlayTokens options) {
List actions = new ArrayList<>(json.length());
for (int i = 0; i < json.length(); i++) {
actions.add(parse(json.getJSONObject(i), options));
@@ -55,7 +55,7 @@ public static List parse(JSONArray json, OptionsParams options) {
return actions;
}
- private static Action parse(JSONObject json, OptionsParams templateOptions) {
+ private static Action parse(JSONObject json, OverlayTokens templateOptions) {
var type = json.getString("type").toLowerCase(Locale.ROOT);
var active = JsonObjects.getBooleanOrDefault(json, "active", true);
var name = json.getString("name");
@@ -65,29 +65,26 @@ private static Action parse(JSONObject json, OptionsParams templateOptions) {
active,
name,
mapStage(json.opt("execute_after"), json.opt("execute_after_name")),
- ModelUtils.replaceVariableTokens(
- (String) options.get("sql"), templateOptions.getTokenMap()));
+ ModelUtils.replaceVariableTokens((String) options.get("sql"), templateOptions.tokens()));
case "cypher" -> new CypherAction(
active,
name,
mapStage(json.opt("execute_after"), json.opt("execute_after_name")),
ModelUtils.replaceVariableTokens(
- (String) options.get("cypher"), templateOptions.getTokenMap()),
+ (String) options.get("cypher"), templateOptions.tokens()),
CypherExecutionMode.AUTOCOMMIT);
case "http_get" -> new HttpAction(
active,
name,
mapStage(json.opt("execute_after"), json.opt("execute_after_name")),
- ModelUtils.replaceVariableTokens(
- (String) options.get("url"), templateOptions.getTokenMap()),
+ ModelUtils.replaceVariableTokens((String) options.get("url"), templateOptions.tokens()),
HttpMethod.GET,
processValues(flattenObjectList(json, "headers"), templateOptions));
case "http_post" -> new HttpAction(
active,
name,
mapStage(json.opt("execute_after"), json.opt("execute_after_name")),
- ModelUtils.replaceVariableTokens(
- (String) options.get("url"), templateOptions.getTokenMap()),
+ ModelUtils.replaceVariableTokens((String) options.get("url"), templateOptions.tokens()),
HttpMethod.POST,
processValues(flattenObjectList(json, "headers"), templateOptions));
default -> throw new IllegalArgumentException(
@@ -104,14 +101,14 @@ private static ActionStage mapStage(Object executeAfter, Object executeAfterName
return ActionStage.START;
}
- private static Map processValues(Map map, OptionsParams options) {
+ private static Map processValues(Map map, OverlayTokens options) {
if (map == null) {
return null;
}
Map result = new HashMap<>(map.size());
for (String key : map.keySet()) {
String value = (String) map.get(key);
- result.put(key, ModelUtils.replaceVariableTokens(value, options.getTokenMap()));
+ result.put(key, ModelUtils.replaceVariableTokens(value, options.tokens()));
}
return result;
}
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/JobSpecMapper.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/JobSpecMapper.java
index ea169180c8..83e6dcd99e 100644
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/JobSpecMapper.java
+++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/JobSpecMapper.java
@@ -15,7 +15,7 @@
*/
package com.google.cloud.teleport.v2.neo4j.model.helpers;
-import com.google.cloud.teleport.v2.neo4j.model.job.OptionsParams;
+import com.google.cloud.teleport.v2.neo4j.model.job.OverlayTokens;
import com.google.cloud.teleport.v2.neo4j.utils.FileSystemUtils;
import java.io.StringReader;
import java.util.Collections;
@@ -42,7 +42,7 @@
public class JobSpecMapper {
private static final Logger LOG = LoggerFactory.getLogger(JobSpecMapper.class);
- public static ImportSpecification parse(String jobSpecUri, OptionsParams options) {
+ public static ImportSpecification parse(String jobSpecUri, OverlayTokens options) {
String content = fetchContent(jobSpecUri);
JSONObject spec = getJsonObject(content);
@@ -88,7 +88,7 @@ private static String fetchContent(String jobSpecUri) {
}
@Deprecated
- private static ImportSpecification parseLegacyJobSpec(OptionsParams options, JSONObject spec) {
+ private static ImportSpecification parseLegacyJobSpec(OverlayTokens options, JSONObject spec) {
LOG.debug("Converting legacy JSON job spec to new import specification format");
var configuration = parseConfig(spec);
var targets = extractTargets(spec);
@@ -128,7 +128,7 @@ private static Map parseConfig(JSONObject json) {
return json.has("config") ? json.getJSONObject("config").toMap() : Collections.emptyMap();
}
- private static List parseSources(JSONObject json, OptionsParams options) {
+ private static List parseSources(JSONObject json, OverlayTokens options) {
if (json.has("source")) {
return List.of(SourceMapper.parse(json.getJSONObject("source"), options));
}
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/OptionsParamsMapper.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/OptionsParamsMapper.java
deleted file mode 100644
index e8188d8eb9..0000000000
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/OptionsParamsMapper.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (C) 2022 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.teleport.v2.neo4j.model.helpers;
-
-import com.google.cloud.teleport.v2.neo4j.model.job.OptionsParams;
-import com.google.cloud.teleport.v2.neo4j.options.Neo4jFlexTemplateOptions;
-import org.apache.commons.lang3.StringUtils;
-
-/** Helper class for parsing json into OptionsParams model object. */
-public class OptionsParamsMapper {
-
- public static OptionsParams fromPipelineOptions(Neo4jFlexTemplateOptions pipelineOptions) {
- OptionsParams optionsParams = new OptionsParams();
- try {
- if (StringUtils.isNotEmpty(pipelineOptions.getOptionsJson())) {
- optionsParams.overlayTokens(pipelineOptions.getOptionsJson());
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return optionsParams;
- }
-}
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/OverlayTokenParser.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/OverlayTokenParser.java
new file mode 100644
index 0000000000..6cf2281598
--- /dev/null
+++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/OverlayTokenParser.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (C) 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.neo4j.model.helpers;
+
+import com.google.cloud.teleport.v2.neo4j.model.job.OverlayTokens;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OverlayTokenParser {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OverlayTokenParser.class);
+
+ public static OverlayTokens parse(String optionsJson) {
+ try {
+ return new OverlayTokens(doParse(optionsJson));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static Map doParse(String jsonTokens) {
+ if (StringUtils.isEmpty(jsonTokens)) {
+ return Map.of();
+ }
+ LOG.debug("Parsing overlay tokens: {}", jsonTokens);
+ var optionsJson = new JSONObject(jsonTokens);
+ var optionsKeys = optionsJson.keys();
+ var result = new HashMap();
+ while (optionsKeys.hasNext()) {
+ var key = optionsKeys.next();
+ var value = String.valueOf(optionsJson.opt(key));
+ result.put(key, value);
+ LOG.debug("{}: {}", key, optionsJson.opt(key));
+ }
+ return result;
+ }
+}
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/SourceMapper.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/SourceMapper.java
index e15c6957a1..79d5f1539b 100644
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/SourceMapper.java
+++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/SourceMapper.java
@@ -18,7 +18,7 @@
import static com.google.cloud.teleport.v2.neo4j.model.helpers.JsonObjects.getStringOrDefault;
import static com.google.cloud.teleport.v2.neo4j.model.helpers.JsonObjects.getStringOrNull;
-import com.google.cloud.teleport.v2.neo4j.model.job.OptionsParams;
+import com.google.cloud.teleport.v2.neo4j.model.job.OverlayTokens;
import com.google.cloud.teleport.v2.neo4j.model.sources.BigQuerySource;
import com.google.cloud.teleport.v2.neo4j.model.sources.ExternalTextSource;
import com.google.cloud.teleport.v2.neo4j.model.sources.InlineTextSource;
@@ -47,7 +47,7 @@ public class SourceMapper {
static final String DEFAULT_SOURCE_NAME = "";
static final Pattern NEWLINE_PATTERN = Pattern.compile("\\R");
- public static List parse(JSONArray rawSources, OptionsParams options) {
+ public static List parse(JSONArray rawSources, OverlayTokens options) {
List sources = new ArrayList<>(rawSources.length());
for (int i = 0; i < rawSources.length(); i++) {
sources.add(parse(rawSources.getJSONObject(i), options));
@@ -55,7 +55,7 @@ public static List parse(JSONArray rawSources, OptionsParams options) {
return sources;
}
- public static Source parse(JSONObject rawSource, OptionsParams options) {
+ public static Source parse(JSONObject rawSource, OverlayTokens options) {
var sourceType = getStringOrDefault(rawSource, "type", "text").toLowerCase(Locale.ROOT);
switch (sourceType) {
case "bigquery":
@@ -67,13 +67,13 @@ public static Source parse(JSONObject rawSource, OptionsParams options) {
}
}
- private static BigQuerySource parseBigQuerySource(JSONObject rawSource, OptionsParams options) {
+ private static BigQuerySource parseBigQuerySource(JSONObject rawSource, OverlayTokens options) {
var sourceName = getStringOrDefault(rawSource, "name", DEFAULT_SOURCE_NAME);
- var sql = ModelUtils.replaceVariableTokens(rawSource.getString("query"), options.getTokenMap());
+ var sql = ModelUtils.replaceVariableTokens(rawSource.getString("query"), options.tokens());
return new BigQuerySource(sourceName, sql);
}
- private static TextSource parseTextSource(JSONObject rawSource, OptionsParams options) {
+ private static TextSource parseTextSource(JSONObject rawSource, OverlayTokens options) {
var sourceName = getStringOrDefault(rawSource, "name", DEFAULT_SOURCE_NAME);
var header =
Arrays.asList(StringUtils.stripAll(rawSource.getString("ordered_field_names").split(",")));
@@ -84,7 +84,7 @@ private static TextSource parseTextSource(JSONObject rawSource, OptionsParams op
var separator = getStringOrNull(rawSource, "separator");
if (rawSource.has("uri") || rawSource.has("url")) {
var url = rawSource.has("uri") ? rawSource.getString("uri") : rawSource.getString("url");
- url = ModelUtils.replaceVariableTokens(url, options.getTokenMap());
+ url = ModelUtils.replaceVariableTokens(url, options.tokens());
return new ExternalTextSource(sourceName, List.of(url), header, format, delimiter, separator);
}
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/TargetSequence.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/StepSequence.java
similarity index 95%
rename from v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/TargetSequence.java
rename to v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/StepSequence.java
index ce469768d0..68999f11ca 100644
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/TargetSequence.java
+++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/StepSequence.java
@@ -21,7 +21,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.neo4j.importer.v1.targets.Target;
-public class TargetSequence implements Serializable {
+public class StepSequence implements Serializable {
private final Map targetSequences = new HashMap<>();
private final AtomicInteger nextNumber = new AtomicInteger(0);
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/TargetMapper.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/TargetMapper.java
index c71daaca74..7bcdd99ec2 100644
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/TargetMapper.java
+++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/helpers/TargetMapper.java
@@ -28,7 +28,7 @@
import static com.google.cloud.teleport.v2.neo4j.model.helpers.SourceMapper.DEFAULT_SOURCE_NAME;
import com.google.cloud.teleport.v2.neo4j.model.enums.ArtifactType;
-import com.google.cloud.teleport.v2.neo4j.model.job.OptionsParams;
+import com.google.cloud.teleport.v2.neo4j.model.job.OverlayTokens;
import com.google.cloud.teleport.v2.neo4j.transforms.Aggregation;
import com.google.cloud.teleport.v2.neo4j.transforms.Order;
import com.google.cloud.teleport.v2.neo4j.transforms.OrderBy;
@@ -91,7 +91,7 @@ public static void index(JSONArray json, JobSpecIndex index) {
}
public static Targets parse(
- JSONArray json, OptionsParams options, JobSpecIndex jobIndex, boolean indexAllProperties) {
+ JSONArray json, OverlayTokens options, JobSpecIndex jobIndex, boolean indexAllProperties) {
List nodes = new ArrayList<>();
List relationshipTargets = new ArrayList<>();
List queryTargets = new ArrayList<>();
@@ -205,9 +205,8 @@ private static RelationshipTarget parseEdge(
}
private static CustomQueryTarget parseCustomQuery(
- int index, JSONObject query, JobSpecIndex jobIndex, OptionsParams options) {
- String cypher =
- ModelUtils.replaceVariableTokens(query.getString("query"), options.getTokenMap());
+ int index, JSONObject query, JobSpecIndex jobIndex, OverlayTokens options) {
+ String cypher = ModelUtils.replaceVariableTokens(query.getString("query"), options.tokens());
String targetName = normalizeName(index, query.getString("name"), ArtifactType.custom_query);
return new CustomQueryTarget(
getBooleanOrDefault(query, "active", true),
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/job/OptionsParams.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/job/OptionsParams.java
deleted file mode 100644
index 0e992943c8..0000000000
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/job/OptionsParams.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright (C) 2021 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.teleport.v2.neo4j.model.job;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import org.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Runtime options object that coalesces arbitrary options. */
-public class OptionsParams implements Serializable {
-
- private static final Logger LOG = LoggerFactory.getLogger(OptionsParams.class);
-
- private final Map tokenMap = new HashMap<>();
-
- @JsonIgnore
- public void overlayTokens(String optionsJsonStr) {
- LOG.debug("Parsing pipeline options: {}", optionsJsonStr);
- JSONObject optionsJson = new JSONObject(optionsJsonStr);
- Iterator optionsKeys = optionsJson.keys();
- while (optionsKeys.hasNext()) {
- String optionsKey = optionsKeys.next();
- this.tokenMap.put(optionsKey, String.valueOf(optionsJson.opt(optionsKey)));
- LOG.debug("{}: {}", optionsKey, optionsJson.opt(optionsKey));
- }
- }
-
- public Map getTokenMap() {
- return tokenMap;
- }
-}
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/preload/PreloadAction.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/job/OverlayTokens.java
similarity index 58%
rename from v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/preload/PreloadAction.java
rename to v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/job/OverlayTokens.java
index cf7a2267ba..86b48dd530 100644
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/preload/PreloadAction.java
+++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/model/job/OverlayTokens.java
@@ -13,18 +13,16 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
-package com.google.cloud.teleport.v2.neo4j.actions.preload;
+package com.google.cloud.teleport.v2.neo4j.model.job;
-import com.google.cloud.teleport.v2.neo4j.model.job.ActionContext;
-import java.util.List;
-import org.neo4j.importer.v1.actions.Action;
+import java.io.Serializable;
+import java.util.Map;
-/**
- * Interface for running preload Actions. Before the pipeline loads, PCollections are not available.
- */
-public interface PreloadAction {
-
- void configure(Action action, ActionContext context);
+/** Runtime options object that coalesces arbitrary options. */
+public record OverlayTokens(Map tokens) implements Serializable {
- List execute();
+ @Override
+ public Map tokens() {
+ return Map.copyOf(tokens);
+ }
}
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/Provider.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/SourceProvider.java
similarity index 86%
rename from v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/Provider.java
rename to v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/SourceProvider.java
index 0b94b93d36..60eecc857e 100644
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/Provider.java
+++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/SourceProvider.java
@@ -16,7 +16,7 @@
package com.google.cloud.teleport.v2.neo4j.providers;
import com.google.cloud.teleport.v2.neo4j.model.helpers.TargetQuerySpec;
-import com.google.cloud.teleport.v2.neo4j.model.job.OptionsParams;
+import com.google.cloud.teleport.v2.neo4j.model.job.OverlayTokens;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
@@ -24,9 +24,9 @@
import org.apache.beam.sdk.values.Row;
/** Provider interface, implemented for every source. */
-public interface Provider {
+public interface SourceProvider {
- void configure(OptionsParams optionsParams);
+ void configure(OverlayTokens overlayTokens);
/**
* Push down capability determine whether groupings and aggregations are executed as SQL queries.
@@ -40,14 +40,14 @@ public interface Provider {
* does not support SQL push-down. For a SQL source with target transformations, this source query
* will not be made.
*/
- PTransform> querySourceBeamRows(Schema schema);
+ PTransform> querySourceRows(Schema schema);
/**
* Queries the source for a particular target. The TargetQuerySpec includes the source query so
* that sources that do not support push-down, additional transforms can be done in this
* transform.
*/
- PTransform> queryTargetBeamRows(TargetQuerySpec targetQuerySpec);
+ PTransform> querySourceRowsForTarget(TargetQuerySpec targetQuerySpec);
/**
* Queries the source to extract metadata. This transform returns zero rows and a valid schema
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/ProviderFactory.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/SourceProviderFactory.java
similarity index 74%
rename from v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/ProviderFactory.java
rename to v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/SourceProviderFactory.java
index 964c588bf5..5beae537b0 100644
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/ProviderFactory.java
+++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/SourceProviderFactory.java
@@ -15,25 +15,25 @@
*/
package com.google.cloud.teleport.v2.neo4j.providers;
-import com.google.cloud.teleport.v2.neo4j.model.helpers.TargetSequence;
+import com.google.cloud.teleport.v2.neo4j.model.helpers.StepSequence;
import com.google.cloud.teleport.v2.neo4j.model.sources.BigQuerySource;
import com.google.cloud.teleport.v2.neo4j.model.sources.TextSource;
-import com.google.cloud.teleport.v2.neo4j.providers.bigquery.BigQueryImpl;
-import com.google.cloud.teleport.v2.neo4j.providers.text.TextImpl;
+import com.google.cloud.teleport.v2.neo4j.providers.bigquery.BigQueryProvider;
+import com.google.cloud.teleport.v2.neo4j.providers.text.TextProvider;
import org.neo4j.importer.v1.sources.Source;
/**
* Factory for binding implementation adapters into framework. Currently, supports two providers:
* bigquery and text
*/
-public class ProviderFactory {
+public class SourceProviderFactory {
- public static Provider of(Source source, TargetSequence targetSequence) {
+ public static SourceProvider of(Source source, StepSequence targetSequence) {
switch (source.getType()) {
case "bigquery":
- return new BigQueryImpl((BigQuerySource) source, targetSequence);
+ return new BigQueryProvider((BigQuerySource) source, targetSequence);
case "text":
- return new TextImpl((TextSource) source, targetSequence);
+ return new TextProvider((TextSource) source, targetSequence);
default:
throw new RuntimeException("Unsupported source type: " + source);
}
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BigQueryImpl.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BigQueryProvider.java
similarity index 85%
rename from v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BigQueryImpl.java
rename to v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BigQueryProvider.java
index 2c5dd64db2..0136c18f7c 100644
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BigQueryImpl.java
+++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/bigquery/BigQueryProvider.java
@@ -17,11 +17,11 @@
import com.google.cloud.teleport.v2.neo4j.model.helpers.BigQuerySpec;
import com.google.cloud.teleport.v2.neo4j.model.helpers.BigQuerySpec.BigQuerySpecBuilder;
+import com.google.cloud.teleport.v2.neo4j.model.helpers.StepSequence;
import com.google.cloud.teleport.v2.neo4j.model.helpers.TargetQuerySpec;
-import com.google.cloud.teleport.v2.neo4j.model.helpers.TargetSequence;
-import com.google.cloud.teleport.v2.neo4j.model.job.OptionsParams;
+import com.google.cloud.teleport.v2.neo4j.model.job.OverlayTokens;
import com.google.cloud.teleport.v2.neo4j.model.sources.BigQuerySource;
-import com.google.cloud.teleport.v2.neo4j.providers.Provider;
+import com.google.cloud.teleport.v2.neo4j.providers.SourceProvider;
import com.google.cloud.teleport.v2.neo4j.utils.ModelUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
@@ -32,20 +32,20 @@
import org.slf4j.LoggerFactory;
/** Provider implementation for reading and writing BigQuery. */
-public class BigQueryImpl implements Provider {
- private static final Logger LOG = LoggerFactory.getLogger(BigQueryImpl.class);
+public class BigQueryProvider implements SourceProvider {
+ private static final Logger LOG = LoggerFactory.getLogger(BigQueryProvider.class);
private final BigQuerySource source;
- private final TargetSequence targetSequence;
- private OptionsParams optionsParams;
+ private final StepSequence targetSequence;
+ private OverlayTokens overlayTokens;
- public BigQueryImpl(BigQuerySource source, TargetSequence targetSequence) {
+ public BigQueryProvider(BigQuerySource source, StepSequence targetSequence) {
this.source = source;
this.targetSequence = targetSequence;
}
@Override
- public void configure(OptionsParams optionsParams) {
- this.optionsParams = optionsParams;
+ public void configure(OverlayTokens overlayTokens) {
+ this.overlayTokens = overlayTokens;
}
@Override
@@ -54,12 +54,13 @@ public boolean supportsSqlPushDown() {
}
@Override
- public PTransform> querySourceBeamRows(Schema schema) {
+ public PTransform> querySourceRows(Schema schema) {
return new BqQueryToRow(getSourceQueryBeamSpec());
}
@Override
- public PTransform> queryTargetBeamRows(TargetQuerySpec targetQuerySpec) {
+ public PTransform> querySourceRowsForTarget(
+ TargetQuerySpec targetQuerySpec) {
return new BqQueryToRow(getTargetQueryBeamSpec(targetQuerySpec));
}
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/text/TextImpl.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/text/TextProvider.java
similarity index 66%
rename from v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/text/TextImpl.java
rename to v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/text/TextProvider.java
index 2a7a5323d6..6a0326700c 100644
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/text/TextImpl.java
+++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/text/TextProvider.java
@@ -15,11 +15,11 @@
*/
package com.google.cloud.teleport.v2.neo4j.providers.text;
+import com.google.cloud.teleport.v2.neo4j.model.helpers.StepSequence;
import com.google.cloud.teleport.v2.neo4j.model.helpers.TargetQuerySpec;
-import com.google.cloud.teleport.v2.neo4j.model.helpers.TargetSequence;
-import com.google.cloud.teleport.v2.neo4j.model.job.OptionsParams;
+import com.google.cloud.teleport.v2.neo4j.model.job.OverlayTokens;
import com.google.cloud.teleport.v2.neo4j.model.sources.TextSource;
-import com.google.cloud.teleport.v2.neo4j.providers.Provider;
+import com.google.cloud.teleport.v2.neo4j.providers.SourceProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
@@ -27,20 +27,20 @@
import org.apache.beam.sdk.values.Row;
/** Provider implementation for reading and writing Text files. */
-public class TextImpl implements Provider {
+public class TextProvider implements SourceProvider {
private final TextSource source;
- private final TargetSequence targetSequence;
- private OptionsParams optionsParams;
+ private final StepSequence targetSequence;
+ private OverlayTokens overlayTokens;
- public TextImpl(TextSource source, TargetSequence targetSequence) {
+ public TextProvider(TextSource source, StepSequence targetSequence) {
this.source = source;
this.targetSequence = targetSequence;
}
@Override
- public void configure(OptionsParams optionsParams) {
- this.optionsParams = optionsParams;
+ public void configure(OverlayTokens overlayTokens) {
+ this.overlayTokens = overlayTokens;
}
@Override
@@ -49,13 +49,14 @@ public boolean supportsSqlPushDown() {
}
@Override
- public PTransform> querySourceBeamRows(Schema schema) {
+ public PTransform> querySourceRows(Schema schema) {
return new TextSourceFileToRow(source, schema);
}
@Override
- public PTransform> queryTargetBeamRows(TargetQuerySpec targetQuerySpec) {
- return new TextTargetToRow(optionsParams, targetSequence, targetQuerySpec);
+ public PTransform> querySourceRowsForTarget(
+ TargetQuerySpec targetQuerySpec) {
+ return new TextTargetToRow(overlayTokens, targetSequence, targetQuerySpec);
}
@Override
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/text/TextTargetToRow.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/text/TextTargetToRow.java
index 8cd61a473f..2e9daf0c4a 100644
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/text/TextTargetToRow.java
+++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/providers/text/TextTargetToRow.java
@@ -15,9 +15,9 @@
*/
package com.google.cloud.teleport.v2.neo4j.providers.text;
+import com.google.cloud.teleport.v2.neo4j.model.helpers.StepSequence;
import com.google.cloud.teleport.v2.neo4j.model.helpers.TargetQuerySpec;
-import com.google.cloud.teleport.v2.neo4j.model.helpers.TargetSequence;
-import com.google.cloud.teleport.v2.neo4j.model.job.OptionsParams;
+import com.google.cloud.teleport.v2.neo4j.model.job.OverlayTokens;
import com.google.cloud.teleport.v2.neo4j.transforms.CastExpandTargetRowFn;
import com.google.cloud.teleport.v2.neo4j.utils.BeamUtils;
import com.google.cloud.teleport.v2.neo4j.utils.ModelUtils;
@@ -41,13 +41,13 @@
public class TextTargetToRow extends PTransform> {
private static final Logger LOG = LoggerFactory.getLogger(TextTargetToRow.class);
- private final TargetSequence targetSequence;
+ private final StepSequence targetSequence;
private final TargetQuerySpec targetQuerySpec;
- private final OptionsParams optionsParams;
+ private final OverlayTokens overlayTokens;
public TextTargetToRow(
- OptionsParams optionsParams, TargetSequence targetSequence, TargetQuerySpec targetQuerySpec) {
- this.optionsParams = optionsParams;
+ OverlayTokens overlayTokens, StepSequence targetSequence, TargetQuerySpec targetQuerySpec) {
+ this.overlayTokens = overlayTokens;
this.targetSequence = targetSequence;
this.targetQuerySpec = targetQuerySpec;
}
@@ -94,6 +94,6 @@ public PCollection expand(PBegin input) {
}
private String getRewritten(String sql) {
- return ModelUtils.replaceVariableTokens(sql, optionsParams.getTokenMap());
+ return ModelUtils.replaceVariableTokens(sql, overlayTokens.tokens());
}
}
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java
index 26a82897f6..990d964cad 100644
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java
+++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java
@@ -15,72 +15,23 @@
*/
package com.google.cloud.teleport.v2.neo4j.templates;
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.mapping;
-import static java.util.stream.Collectors.toList;
-
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.Template.AdditionalDocumentationBlock;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
-import com.google.cloud.teleport.v2.neo4j.actions.ActionDoFnFactory;
-import com.google.cloud.teleport.v2.neo4j.actions.ActionPreloadFactory;
-import com.google.cloud.teleport.v2.neo4j.actions.preload.PreloadAction;
-import com.google.cloud.teleport.v2.neo4j.database.Neo4jConnection;
import com.google.cloud.teleport.v2.neo4j.model.InputValidator;
import com.google.cloud.teleport.v2.neo4j.model.Json;
import com.google.cloud.teleport.v2.neo4j.model.Json.ParsingResult;
import com.google.cloud.teleport.v2.neo4j.model.connection.ConnectionParams;
-import com.google.cloud.teleport.v2.neo4j.model.enums.ArtifactType;
import com.google.cloud.teleport.v2.neo4j.model.helpers.JobSpecMapper;
-import com.google.cloud.teleport.v2.neo4j.model.helpers.OptionsParamsMapper;
-import com.google.cloud.teleport.v2.neo4j.model.helpers.TargetQuerySpec;
-import com.google.cloud.teleport.v2.neo4j.model.helpers.TargetQuerySpec.TargetQuerySpecBuilder;
-import com.google.cloud.teleport.v2.neo4j.model.helpers.TargetSequence;
-import com.google.cloud.teleport.v2.neo4j.model.job.ActionContext;
-import com.google.cloud.teleport.v2.neo4j.model.job.OptionsParams;
+import com.google.cloud.teleport.v2.neo4j.model.helpers.OverlayTokenParser;
import com.google.cloud.teleport.v2.neo4j.options.Neo4jFlexTemplateOptions;
-import com.google.cloud.teleport.v2.neo4j.providers.Provider;
-import com.google.cloud.teleport.v2.neo4j.providers.ProviderFactory;
-import com.google.cloud.teleport.v2.neo4j.transforms.Neo4jRowWriterTransform;
-import com.google.cloud.teleport.v2.neo4j.utils.BeamBlock;
import com.google.cloud.teleport.v2.neo4j.utils.FileSystemUtils;
-import com.google.cloud.teleport.v2.neo4j.utils.ModelUtils;
-import com.google.cloud.teleport.v2.neo4j.utils.ProcessingCoder;
import com.google.cloud.teleport.v2.utils.SecretManagerUtils;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Wait;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.commons.lang3.StringUtils;
-import org.jetbrains.annotations.NotNull;
-import org.neo4j.importer.v1.Configuration;
-import org.neo4j.importer.v1.ImportSpecification;
-import org.neo4j.importer.v1.actions.Action;
-import org.neo4j.importer.v1.actions.ActionStage;
-import org.neo4j.importer.v1.sources.Source;
-import org.neo4j.importer.v1.targets.CustomQueryTarget;
-import org.neo4j.importer.v1.targets.NodeTarget;
-import org.neo4j.importer.v1.targets.RelationshipTarget;
-import org.neo4j.importer.v1.targets.Target;
-import org.neo4j.importer.v1.targets.TargetType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -129,62 +80,39 @@ public class GoogleCloudToNeo4j {
private static final Logger LOG = LoggerFactory.getLogger(GoogleCloudToNeo4j.class);
- private final OptionsParams optionsParams;
- private final ConnectionParams neo4jConnection;
- private final ImportSpecification importSpecification;
- private final Configuration globalSettings;
- private final Pipeline pipeline;
- private final String templateVersion;
- private final TargetSequence targetSequence = new TargetSequence();
-
/**
- * Main class for template. Initializes job using run-time on pipelineOptions.
+ * Runs a pipeline which reads data from various sources and writes it to Neo4j.
*
- * @param pipelineOptions framework supplied arguments
+ * @param args arguments to the pipeline
*/
- public GoogleCloudToNeo4j(Neo4jFlexTemplateOptions pipelineOptions) {
-
- ////////////////////////////
- // Job name gets a date on it when running within the container, but not with DirectRunner
- // final String jobName = pipelineOptions.getJobName() + "-" + System.currentTimeMillis();
- // pipelineOptions.setJobName(jobName);
-
- // Set pipeline options
- this.pipeline = Pipeline.create(pipelineOptions);
- FileSystems.setDefaultPipelineOptions(pipelineOptions);
- this.optionsParams = OptionsParamsMapper.fromPipelineOptions(pipelineOptions);
+ public static void main(String[] args) {
+ UncaughtExceptionLogger.register();
- // Validate pipeline
- processValidations(
+ Neo4jFlexTemplateOptions options =
+ PipelineOptionsFactory.fromArgs(args).withValidation().as(Neo4jFlexTemplateOptions.class);
+ ValidationErrors.processValidations(
"Errors found validating pipeline options: ",
- InputValidator.validateNeo4jPipelineOptions(pipelineOptions));
-
- this.templateVersion = readTemplateVersion(pipelineOptions);
-
- String neo4jConnectionJson = readConnectionSettings(pipelineOptions);
- ParsingResult parsingResult = InputValidator.validateNeo4jConnection(neo4jConnectionJson);
- if (!parsingResult.isSuccessful()) {
- processValidations(
- "Errors found validating Neo4j connection: ",
- parsingResult.formatErrors("Could not validate connection JSON"));
- }
- this.neo4jConnection = Json.map(parsingResult, ConnectionParams.class);
-
- this.importSpecification = JobSpecMapper.parse(pipelineOptions.getJobSpecUri(), optionsParams);
- globalSettings = importSpecification.getConfiguration();
-
- ///////////////////////////////////
+ InputValidator.validateNeo4jPipelineOptions(options));
- // Source specific validations
- for (Source source : importSpecification.getSources()) {
- // get provider implementation for source
- Provider providerImpl = ProviderFactory.of(source, targetSequence);
- providerImpl.configure(optionsParams);
+ if (StringUtils.isBlank(options.getDisabledAlgorithms())) {
+ options.setDisabledAlgorithms(
+ "SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon,"
+ + " NULL");
}
+ LOG.info("Job: {}", options.getJobSpecUri());
+ FileSystems.setDefaultPipelineOptions(options);
+ var overlayTokens = OverlayTokenParser.parse(options.getOptionsJson());
+ var templateVersion = readTemplateVersion(options);
+ var neo4jConnectionConfig = readConnectionConfiguration(options);
+ var importSpecification = JobSpecMapper.parse(options.getJobSpecUri(), overlayTokens);
+ var importPipeline =
+ new Neo4jImportPipeline(
+ options, templateVersion, neo4jConnectionConfig, overlayTokens, importSpecification);
+ importPipeline.run();
}
private static String readTemplateVersion(Neo4jFlexTemplateOptions options) {
- Map labels = options.as(DataflowPipelineOptions.class).getLabels();
+ var labels = options.as(DataflowPipelineOptions.class).getLabels();
String defaultVersion = "UNKNOWN";
if (labels == null) {
return defaultVersion;
@@ -192,6 +120,18 @@ private static String readTemplateVersion(Neo4jFlexTemplateOptions options) {
return labels.getOrDefault("goog-dataflow-provided-template-version", defaultVersion);
}
+ private static ConnectionParams readConnectionConfiguration(
+ Neo4jFlexTemplateOptions pipelineOptions) {
+ String neo4jConnectionJson = readConnectionSettings(pipelineOptions);
+ ParsingResult parsingResult = InputValidator.validateNeo4jConnection(neo4jConnectionJson);
+ if (!parsingResult.isSuccessful()) {
+ ValidationErrors.processValidations(
+ "Errors found validating Neo4j connection: ",
+ parsingResult.formatErrors("Could not validate connection JSON"));
+ }
+ return Json.map(parsingResult, ConnectionParams.class);
+ }
+
private static String readConnectionSettings(Neo4jFlexTemplateOptions options) {
String secretId = options.getNeo4jConnectionSecretId();
if (StringUtils.isNotEmpty(secretId)) {
@@ -205,393 +145,4 @@ private static String readConnectionSettings(Neo4jFlexTemplateOptions options) {
String.format("Unable to read Neo4j configuration at URI %s: ", uri), e);
}
}
-
- /**
- * Runs a pipeline which reads data from various sources and writes it to Neo4j.
- *
- * @param args arguments to the pipeline
- */
- public static void main(String[] args) {
- UncaughtExceptionLogger.register();
-
- Neo4jFlexTemplateOptions options =
- PipelineOptionsFactory.fromArgs(args).withValidation().as(Neo4jFlexTemplateOptions.class);
-
- // Allow users to supply their own list of disabled algorithms if necessary
- if (StringUtils.isBlank(options.getDisabledAlgorithms())) {
- options.setDisabledAlgorithms(
- "SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon,"
- + " NULL");
- }
-
- LOG.info("Job: {}", options.getJobSpecUri());
- GoogleCloudToNeo4j template = new GoogleCloudToNeo4j(options);
- template.run();
- }
-
- /** Raises RuntimeExceptions for validation errors. */
- private void processValidations(String description, List validationMessages) {
- StringBuilder sb = new StringBuilder();
- if (!validationMessages.isEmpty()) {
- for (String msg : validationMessages) {
- sb.append(msg);
- sb.append(System.lineSeparator());
- }
- throw new RuntimeException(description + " " + sb);
- }
- }
-
- public void run() {
-
- try (Neo4jConnection directConnect =
- new Neo4jConnection(this.neo4jConnection, this.templateVersion)) {
- boolean resetDb = globalSettings.get(Boolean.class, "reset_db").orElse(false);
- if (!resetDb) {
- directConnect.verifyConnectivity();
- } else {
- directConnect.resetDatabase();
- }
- }
-
- ////////////////////////////
- // If an action transformation has no upstream PCollection, it will use this default context
- PCollection defaultActionContext =
- pipeline.apply(
- "Default Context",
- Create.empty(TypeDescriptor.of(Row.class)).withCoder(ProcessingCoder.of()));
-
- var processingQueue = new BeamBlock(defaultActionContext);
-
- runPreloadActions(findActionsByStage(ActionStage.START).collect(toList()));
-
- Map>> preActionRows =
- findActionsByStages(
- Set.of(
- ActionStage.PRE_NODES, ActionStage.PRE_RELATIONSHIPS, ActionStage.PRE_QUERIES))
- .map(action -> Map.entry(action.getStage(), runAction(action, defaultActionContext)))
- .collect(
- groupingBy(
- Entry::getKey, mapping(Entry::getValue, Collectors.>toList())));
- var sourceRows = new ArrayList>(importSpecification.getSources().size());
- var targetRows = new HashMap>>(targetCount());
- var allActiveTargets = importSpecification.getTargets().getAllActive().stream().toList();
- var allActiveNodeTargets =
- importSpecification.getTargets().getNodes().stream()
- .filter(Target::isActive)
- .collect(toList());
-
- ////////////////////////////
- // Process sources
- for (var source : importSpecification.getSources()) {
- String sourceName = source.getName();
- var activeSourceTargets =
- allActiveTargets.stream()
- .filter(target -> target.getSource().equals(sourceName))
- .collect(toList());
- if (activeSourceTargets.isEmpty()) {
- return;
- }
-
- // get provider implementation for source
- Provider provider = ProviderFactory.of(source, targetSequence);
- provider.configure(optionsParams);
- PCollection sourceMetadata =
- pipeline.apply(
- String.format("Metadata for source %s", sourceName), provider.queryMetadata());
- sourceRows.add(sourceMetadata);
- Schema sourceBeamSchema = sourceMetadata.getSchema();
- processingQueue.addToQueue(ArtifactType.source, sourceName, defaultActionContext);
-
- ////////////////////////////
- // Optimization: if some of the current source's targets either
- // - do not alter the source query (i.e. define no transformations)
- // - or the source provider does not support SQL pushdown
- // then the source PCollection can be defined here and reused across all the relevant targets
- PCollection nullableSourceBeamRows = null;
- if (!provider.supportsSqlPushDown()
- || activeSourceTargets.stream()
- .anyMatch(target -> !ModelUtils.targetHasTransforms(target))) {
- nullableSourceBeamRows =
- pipeline
- .apply("Query " + sourceName, provider.querySourceBeamRows(sourceBeamSchema))
- .setRowSchema(sourceBeamSchema);
- }
-
- List nodeTargets = getTargetsByType(activeSourceTargets, TargetType.NODE);
- for (NodeTarget target : nodeTargets) {
- TargetQuerySpec targetQuerySpec =
- new TargetQuerySpecBuilder()
- .sourceBeamSchema(sourceBeamSchema)
- .nullableSourceRows(nullableSourceBeamRows)
- .target(target)
- .build();
- String nodeStepDescription =
- targetSequence.getSequenceNumber(target)
- + ": "
- + sourceName
- + "->"
- + target.getName()
- + " nodes";
- PCollection preInsertBeamRows =
- pipeline.apply(
- "Query " + nodeStepDescription, provider.queryTargetBeamRows(targetQuerySpec));
-
- List> dependencies =
- new ArrayList<>(preActionRows.getOrDefault(ActionStage.PRE_NODES, List.of()));
- dependencies.add(
- processingQueue.resolveOutputs(target.getDependencies(), nodeStepDescription));
-
- PCollection blockingReturn =
- preInsertBeamRows
- .apply(
- "** Unblocking "
- + nodeStepDescription
- + "(after "
- + String.join(", ", target.getDependencies())
- + " and pre-nodes actions)",
- Wait.on(dependencies))
- .setCoder(preInsertBeamRows.getCoder())
- .apply(
- "Writing " + nodeStepDescription,
- new Neo4jRowWriterTransform(
- importSpecification,
- neo4jConnection,
- templateVersion,
- targetSequence,
- target))
- .setCoder(preInsertBeamRows.getCoder());
-
- targetRows
- .computeIfAbsent(TargetType.NODE, (type) -> new ArrayList<>(nodeTargets.size()))
- .add(blockingReturn);
-
- processingQueue.addToQueue(ArtifactType.node, target.getName(), blockingReturn);
- }
-
- ////////////////////////////
- // Write relationship targets
- List relationshipTargets =
- getTargetsByType(activeSourceTargets, TargetType.RELATIONSHIP);
- for (var target : relationshipTargets) {
- var targetQuerySpec =
- new TargetQuerySpecBuilder()
- .nullableSourceRows(nullableSourceBeamRows)
- .sourceBeamSchema(sourceBeamSchema)
- .target(target)
- .startNodeTarget(
- findNodeTargetByName(
- allActiveNodeTargets, target.getStartNodeReference().getName()))
- .endNodeTarget(
- findNodeTargetByName(
- allActiveNodeTargets, target.getEndNodeReference().getName()))
- .build();
- String relationshipStepDescription =
- targetSequence.getSequenceNumber(target)
- + ": "
- + sourceName
- + "->"
- + target.getName()
- + " edges";
- PCollection preInsertBeamRows;
- if (ModelUtils.targetHasTransforms(target)) {
- preInsertBeamRows =
- pipeline.apply(
- "Query " + relationshipStepDescription,
- provider.queryTargetBeamRows(targetQuerySpec));
- } else {
- preInsertBeamRows = nullableSourceBeamRows;
- }
-
- List> dependencies =
- new ArrayList<>(preActionRows.getOrDefault(ActionStage.PRE_RELATIONSHIPS, List.of()));
- Set dependencyNames = new LinkedHashSet<>(target.getDependencies());
- dependencyNames.add(target.getStartNodeReference().getName());
- dependencyNames.add(target.getEndNodeReference().getName());
- dependencies.add(
- processingQueue.resolveOutputs(dependencyNames, relationshipStepDescription));
-
- PCollection blockingReturn =
- preInsertBeamRows
- .apply(
- "** Unblocking "
- + relationshipStepDescription
- + "(after "
- + String.join(", ", dependencyNames)
- + " and pre-relationships actions)",
- Wait.on(dependencies))
- .setCoder(preInsertBeamRows.getCoder())
- .apply(
- "Writing " + relationshipStepDescription,
- new Neo4jRowWriterTransform(
- importSpecification,
- neo4jConnection,
- templateVersion,
- targetSequence,
- target))
- .setCoder(preInsertBeamRows.getCoder());
-
- targetRows
- .computeIfAbsent(
- TargetType.RELATIONSHIP, (type) -> new ArrayList<>(relationshipTargets.size()))
- .add(blockingReturn);
- // serialize relationships
- processingQueue.addToQueue(ArtifactType.edge, target.getName(), blockingReturn);
- }
- ////////////////////////////
- // Custom query targets
- List customQueryTargets =
- getTargetsByType(activeSourceTargets, TargetType.QUERY);
- for (Target target : customQueryTargets) {
- String customQueryStepDescription =
- targetSequence.getSequenceNumber(target)
- + ": "
- + sourceName
- + "->"
- + target.getName()
- + " (custom query)";
-
- List> dependencies =
- new ArrayList<>(preActionRows.getOrDefault(ActionStage.PRE_QUERIES, List.of()));
- dependencies.add(
- processingQueue.resolveOutputs(target.getDependencies(), customQueryStepDescription));
-
- // note: nullableSourceBeamRows is guaranteed to be non-null here since custom query targets
- // cannot define source transformations
- PCollection blockingReturn =
- nullableSourceBeamRows
- .apply(
- "** Unblocking "
- + customQueryStepDescription
- + "(after "
- + String.join(", ", target.getDependencies())
- + ")",
- Wait.on(dependencies))
- .setCoder(nullableSourceBeamRows.getCoder())
- .apply(
- "Writing " + customQueryStepDescription,
- new Neo4jRowWriterTransform(
- importSpecification,
- neo4jConnection,
- templateVersion,
- targetSequence,
- target))
- .setCoder(nullableSourceBeamRows.getCoder());
-
- targetRows
- .computeIfAbsent(TargetType.QUERY, (type) -> new ArrayList<>(customQueryTargets.size()))
- .add(blockingReturn);
- processingQueue.addToQueue(ArtifactType.custom_query, target.getName(), blockingReturn);
- }
- }
-
- // Process POST-* actions, gather outputs and run END actions
- List> endActionDependencies =
- findActionsByStage(ActionStage.POST_SOURCES)
- .map(action -> runAction(action, defaultActionContext, sourceRows))
- .collect(Collectors.toCollection(ArrayList::new));
- endActionDependencies.addAll(
- findActionsByStage(ActionStage.POST_NODES)
- .map(
- action ->
- runAction(
- action,
- defaultActionContext,
- targetRows.getOrDefault(TargetType.NODE, List.of())))
- .collect(toList()));
- endActionDependencies.addAll(
- findActionsByStage(ActionStage.POST_RELATIONSHIPS)
- .map(
- action ->
- runAction(
- action,
- defaultActionContext,
- targetRows.getOrDefault(TargetType.RELATIONSHIP, List.of())))
- .collect(toList()));
- endActionDependencies.addAll(
- findActionsByStage(ActionStage.POST_QUERIES)
- .map(
- action ->
- runAction(
- action,
- defaultActionContext,
- targetRows.getOrDefault(TargetType.QUERY, List.of())))
- .collect(toList()));
- findActionsByStage(ActionStage.END)
- .map(action -> runAction(action, defaultActionContext, endActionDependencies))
- .forEach(GoogleCloudToNeo4j::noOp);
-
- // For a Dataflow Flex Template, do NOT waitUntilFinish().
- pipeline.run();
- }
-
- private PCollection runAction(Action action, PCollection defaultActionContext) {
- return runAction(action, defaultActionContext, List.of());
- }
-
- private PCollection runAction(
- Action action, PCollection defaultActionContext, List> dependencies) {
- var actionName = action.getName();
- return pipeline
- .apply(String.format("** Setup %s", actionName), Create.of(1))
- .apply(
- String.format("** Wait on %s dependencies", action.getStage()), Wait.on(dependencies))
- .setCoder(VarIntCoder.of())
- .apply(
- String.format("Running action %s", actionName),
- ParDo.of(ActionDoFnFactory.of(newActionContext(action))))
- .setCoder(defaultActionContext.getCoder());
- }
-
- private Stream findActionsByStage(ActionStage stage) {
- return findActionsByStages(Set.of(stage));
- }
-
- private Stream findActionsByStages(Set stages) {
- return importSpecification.getActions().stream()
- .filter(action -> stages.contains(action.getStage()));
- }
-
- private void runPreloadActions(List actions) {
- for (Action action : actions) {
- LOG.debug("Executing START action: {}", action.getName());
- // Get targeted execution context
- ActionContext context = new ActionContext(action, neo4jConnection, templateVersion);
- PreloadAction actionImpl = ActionPreloadFactory.of(action, context);
- List msgs = actionImpl.execute();
- for (String msg : msgs) {
- LOG.info("START action {} output: {}", action.getName(), msg);
- }
- }
- }
-
- @NotNull
- private ActionContext newActionContext(Action action) {
- return new ActionContext(action, this.neo4jConnection, this.templateVersion);
- }
-
- private static NodeTarget findNodeTargetByName(List nodes, String reference) {
- return nodes.stream()
- .filter(target -> reference.equals(target.getName()))
- .findFirst()
- .orElseThrow(
- () -> new IllegalArgumentException("Could not find active node target: " + reference));
- }
-
- @SuppressWarnings("unchecked")
- private List getTargetsByType(
- List activeSourceTargets, TargetType targetType) {
- return activeSourceTargets.stream()
- .filter(target -> target.getTargetType() == targetType)
- .map(target -> (T) target)
- .collect(toList());
- }
-
- private static void noOp(T item) {}
-
- private int targetCount() {
- var targets = this.importSpecification.getTargets();
- return targets.getNodes().size()
- + targets.getRelationships().size()
- + targets.getCustomQueries().size();
- }
}
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/Neo4jImportPipeline.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/Neo4jImportPipeline.java
new file mode 100644
index 0000000000..f3db120a5e
--- /dev/null
+++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/Neo4jImportPipeline.java
@@ -0,0 +1,361 @@
+/*
+ * Copyright (C) 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.neo4j.templates;
+
+import static org.neo4j.importer.v1.targets.TargetType.QUERY;
+
+import com.google.cloud.teleport.v2.neo4j.actions.ActionDoFnFactory;
+import com.google.cloud.teleport.v2.neo4j.model.connection.ConnectionParams;
+import com.google.cloud.teleport.v2.neo4j.model.helpers.StepSequence;
+import com.google.cloud.teleport.v2.neo4j.model.helpers.TargetQuerySpec;
+import com.google.cloud.teleport.v2.neo4j.model.helpers.TargetQuerySpec.TargetQuerySpecBuilder;
+import com.google.cloud.teleport.v2.neo4j.model.job.ActionContext;
+import com.google.cloud.teleport.v2.neo4j.model.job.OverlayTokens;
+import com.google.cloud.teleport.v2.neo4j.providers.SourceProvider;
+import com.google.cloud.teleport.v2.neo4j.providers.SourceProviderFactory;
+import com.google.cloud.teleport.v2.neo4j.transforms.Neo4jRowWriterTransform;
+import com.google.cloud.teleport.v2.neo4j.transforms.VerifyOrResetDatabaseFn;
+import com.google.cloud.teleport.v2.neo4j.utils.ProcessingCoder;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Wait;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.neo4j.importer.v1.ImportSpecification;
+import org.neo4j.importer.v1.actions.Action;
+import org.neo4j.importer.v1.pipeline.ActionStep;
+import org.neo4j.importer.v1.pipeline.ImportPipeline;
+import org.neo4j.importer.v1.pipeline.ImportStep;
+import org.neo4j.importer.v1.pipeline.SourceStep;
+import org.neo4j.importer.v1.pipeline.TargetStep;
+import org.neo4j.importer.v1.targets.NodeTarget;
+import org.neo4j.importer.v1.targets.RelationshipTarget;
+import org.neo4j.importer.v1.targets.Target;
+
+public class Neo4jImportPipeline {
+
+ private static final String STARTUP_STEP = "__VERIFY_OR_RESET__";
+
+ private final String templateVersion;
+
+ private final ConnectionParams neo4jConnectionConfig;
+
+ private final OverlayTokens overlayTokens;
+
+ private final ImportSpecification importSpecification;
+
+ private final ImportPipeline pipelineDescription;
+
+ private final PipelineRegistry pipelineRegistry;
+
+ private final PipelineOptions options;
+
+ private final StepSequence stepSequence;
+
+ public Neo4jImportPipeline(
+ PipelineOptions options,
+ String templateVersion,
+ ConnectionParams neo4jConnectionConfig,
+ OverlayTokens overlayTokens,
+ ImportSpecification importSpecification) {
+
+ this.options = options;
+ this.templateVersion = templateVersion;
+ this.neo4jConnectionConfig = neo4jConnectionConfig;
+ this.overlayTokens = overlayTokens;
+ this.importSpecification = importSpecification;
+ this.pipelineDescription = ImportPipeline.of(importSpecification);
+ this.pipelineRegistry = new PipelineRegistry();
+ this.stepSequence = new StepSequence();
+ }
+
+ public void run() {
+ var pipeline = Pipeline.create(options);
+
+ pipelineRegistry.registerDependency(STARTUP_STEP, checkConnectionOrResetDb(pipeline));
+
+ pipelineDescription.forEach(
+ step -> {
+ if (step instanceof SourceStep) {
+ handleSource(pipeline, (SourceStep) step);
+ } else if (step instanceof TargetStep) {
+ handleTarget(pipeline, (TargetStep) step);
+ } else if (step instanceof ActionStep) {
+ handleAction(pipeline, (ActionStep) step);
+ }
+ });
+
+ pipeline.run();
+ }
+
+ void handleSource(Pipeline pipeline, SourceStep step) {
+ var source = step.source();
+ var provider = SourceProviderFactory.of(source, stepSequence);
+ provider.configure(overlayTokens);
+
+ var name = step.name();
+ var metadata =
+ pipeline.apply(String.format("Metadata for source %s", name), provider.queryMetadata());
+ metadata =
+ metadata
+ .apply(
+ String.format("Wait for source %s startup dependency", name),
+ Wait.on(List.of(pipelineRegistry.findDependency(STARTUP_STEP))))
+ .setCoder(metadata.getCoder());
+ var schema = metadata.getSchema();
+ pipelineRegistry.registerSourceContext(
+ source.getName(),
+ new SourceContext(name, provider, schema, pipelineRegistry.findDependency(STARTUP_STEP)));
+ pipelineRegistry.registerDependency(step.name(), metadata);
+ }
+
+ void handleTarget(Pipeline pipeline, TargetStep step) {
+ var target = findTarget(step.name());
+ var source = pipelineRegistry.findSource(step.sourceName());
+ var targetName = target.getName();
+ var sourceRows = querySourceRows(pipeline, source, target);
+
+ PCollection targetWrite =
+ sourceRows
+ .apply("Wait for " + targetName + " dependencies", Wait.on(resolveDependencies(step)))
+ .setCoder(sourceRows.getCoder())
+ .apply(
+ "Write " + targetName,
+ new Neo4jRowWriterTransform(
+ importSpecification,
+ neo4jConnectionConfig,
+ templateVersion,
+ stepSequence,
+ target))
+ .apply("Completion " + targetName, Count.globally());
+
+ pipelineRegistry.registerDependency(targetName, targetWrite);
+ }
+
+ PCollection querySourceRows(Pipeline pipeline, SourceContext source, Target target) {
+ if (target.getTargetType() == QUERY) {
+ return source.getOrCreateRows(pipeline);
+ }
+ var sourceProvider = source.provider();
+ var targetQuerySpec = buildTargetQuerySpec(pipeline, source, target);
+ return pipeline.apply(
+ "Query " + target.getName(),
+ // apply transforms, either:
+ // - by pushing down changes to SQL (if source supports SQL pushdown)
+ // - or by applying them to the reused source rows
+ sourceProvider.querySourceRowsForTarget(targetQuerySpec));
+ }
+
+ TargetQuerySpec buildTargetQuerySpec(Pipeline pipeline, SourceContext source, Target target) {
+ PCollection baseRows = null;
+ if (!source.provider().supportsSqlPushDown()) {
+ // re-use source rows since source query cannot be modified by pushdown
+ baseRows = source.getOrCreateRows(pipeline);
+ }
+ var specBuilder =
+ new TargetQuerySpecBuilder()
+ .sourceBeamSchema(source.schema())
+ .nullableSourceRows(baseRows)
+ .target(target);
+ if (target instanceof RelationshipTarget relationshipTarget) {
+ specBuilder
+ .startNodeTarget(findNodeTarget(relationshipTarget.getStartNodeReference().getName()))
+ .endNodeTarget(findNodeTarget(relationshipTarget.getEndNodeReference().getName()));
+ }
+ return specBuilder.build();
+ }
+
+ void handleAction(Pipeline pipeline, ActionStep step) {
+ var action = step.action();
+ var actionName = action.getName();
+ var actionRows =
+ pipeline
+ .apply(String.format("** Setup %s", actionName), Create.of(1))
+ .apply(
+ String.format("** Wait on %s dependencies", action.getStage()),
+ Wait.on(resolveDependencies(step)))
+ .setCoder(VarIntCoder.of())
+ .apply(
+ String.format("Running action %s", actionName),
+ ParDo.of(ActionDoFnFactory.of(newActionContext(action))))
+ .setCoder(ProcessingCoder.of());
+
+ pipelineRegistry.registerDependency(actionName, actionRows);
+ }
+
+ List> resolveDependencies(ImportStep step) {
+ var dependencies = new ArrayList>();
+ dependencies.add(pipelineRegistry.findDependency(STARTUP_STEP));
+ for (ImportStep dependency : step.dependencies()) {
+ dependencies.add(pipelineRegistry.findDependency(dependency.name()));
+ }
+ return dependencies;
+ }
+
+ void registerDependency(String name, PCollection> dependency) {
+ pipelineRegistry.registerDependency(name, dependency);
+ }
+
+ void registerSourceContext(String name, SourceContext context) {
+ pipelineRegistry.registerSourceContext(name, context);
+ }
+
+ PCollection> findDependency(String name) {
+ return pipelineRegistry.findDependency(name);
+ }
+
+ SourceContext findSourceContext(String name) {
+ return pipelineRegistry.findSource(name);
+ }
+
+ private PCollection checkConnectionOrResetDb(Pipeline pipeline) {
+ var resetDb = pipelineDescription.configuration().get(Boolean.class, "reset_db").orElse(false);
+ var description = resetDb ? "Database reset" : "Connectivity check";
+
+ return pipeline
+ .apply("Start " + description, Create.of(1L))
+ .apply(
+ description,
+ ParDo.of(new VerifyOrResetDatabaseFn(neo4jConnectionConfig, templateVersion, resetDb)));
+ }
+
+ private ActionContext newActionContext(Action action) {
+ return new ActionContext(action, neo4jConnectionConfig, templateVersion);
+ }
+
+ private Target findTarget(String targetName) {
+ return importSpecification.getTargets().getAllActive().stream()
+ .filter(target -> target.getName().equals(targetName))
+ .findFirst()
+ .orElseThrow(
+ () -> new IllegalArgumentException("Could not find active target: " + targetName));
+ }
+
+ private NodeTarget findNodeTarget(String nodeTargetName) {
+ return importSpecification.getTargets().getNodes().stream()
+ .filter(Target::isActive)
+ .filter(target -> target.getName().equals(nodeTargetName))
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ "Could not find active node target: " + nodeTargetName));
+ }
+
+ private static class PipelineRegistry {
+
+ private final Map sources = new HashMap<>();
+
+ private final Map> dependencies = new HashMap<>();
+
+ public void registerSourceContext(String sourceName, SourceContext context) {
+ sources.put(sourceName, context);
+ }
+
+ public void registerDependency(String targetName, PCollection> completion) {
+ dependencies.put(targetName, completion);
+ }
+
+ public SourceContext findSource(String name) {
+ return sources.get(name);
+ }
+
+ public PCollection> findDependency(String name) {
+ return dependencies.get(name);
+ }
+ }
+
+ static final class SourceContext implements Serializable {
+
+ private final String name;
+
+ private final SourceProvider provider;
+
+ private final Schema schema;
+
+ private final PCollection> startupDependency;
+
+ private PCollection rows;
+
+ SourceContext(
+ String name, SourceProvider provider, Schema schema, PCollection> startupDependency) {
+ this.name = name;
+ this.provider = provider;
+ this.schema = schema;
+ this.startupDependency = startupDependency;
+ }
+
+ public PCollection getOrCreateRows(Pipeline pipeline) {
+ if (rows == null) {
+ rows =
+ pipeline
+ .apply(String.format("Query for source %s", name), provider.querySourceRows(schema))
+ .apply(
+ String.format("Wait for source %s startup dependency", name),
+ Wait.on(List.of(startupDependency)))
+ .setRowSchema(schema);
+ }
+ return rows;
+ }
+
+ public SourceProvider provider() {
+ return provider;
+ }
+
+ public Schema schema() {
+ return schema;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof SourceContext that)) {
+ return false;
+ }
+ return Objects.equals(name, that.name)
+ && Objects.equals(provider, that.provider)
+ && Objects.equals(schema, that.schema);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, provider, schema);
+ }
+
+ @Override
+ public String toString() {
+ return "SourceContext{"
+ + "name='"
+ + name
+ + '\''
+ + ", provider="
+ + provider
+ + ", schema="
+ + schema
+ + '}';
+ }
+ }
+}
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/ValidationErrors.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/ValidationErrors.java
new file mode 100644
index 0000000000..f64edd4dd0
--- /dev/null
+++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/ValidationErrors.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (C) 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.neo4j.templates;
+
+import java.util.List;
+
+class ValidationErrors {
+
+ /** Raises RuntimeExceptions for validation errors. */
+ static void processValidations(String description, List validationMessages) {
+ StringBuilder sb = new StringBuilder();
+ if (!validationMessages.isEmpty()) {
+ for (String msg : validationMessages) {
+ sb.append(msg);
+ sb.append(System.lineSeparator());
+ }
+ throw new RuntimeException(description + " " + sb);
+ }
+ }
+}
diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/transforms/Neo4jBlockingUnwindFn.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/transforms/Neo4jBlockingUnwindFn.java
index 2975a948db..0197692eef 100644
--- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/transforms/Neo4jBlockingUnwindFn.java
+++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/transforms/Neo4jBlockingUnwindFn.java
@@ -15,6 +15,7 @@
*/
package com.google.cloud.teleport.v2.neo4j.transforms;
+import com.google.cloud.teleport.v2.neo4j.database.CypherGenerator;
import com.google.cloud.teleport.v2.neo4j.database.Neo4jConnection;
import com.google.cloud.teleport.v2.neo4j.telemetry.Neo4jTelemetry;
import com.google.cloud.teleport.v2.neo4j.telemetry.ReportedSourceType;
@@ -30,6 +31,10 @@
import org.apache.beam.sdk.values.Row;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.summary.ResultSummary;
+import org.neo4j.importer.v1.ImportSpecification;
+import org.neo4j.importer.v1.targets.CustomQueryTarget;
+import org.neo4j.importer.v1.targets.EntityTarget;
+import org.neo4j.importer.v1.targets.Target;
import org.neo4j.importer.v1.targets.TargetType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +43,9 @@
public class Neo4jBlockingUnwindFn extends DoFn>, Row> {
private static final Logger LOG = LoggerFactory.getLogger(Neo4jBlockingUnwindFn.class);
- private final String cypher;
+ private final ImportSpecification importSpecification;
+ private final Target target;
+ private final String staticCypher;
private final SerializableFunction> parametersFunction;
private final boolean logCypher;
private final String unwindMapName;
@@ -46,8 +53,30 @@ public class Neo4jBlockingUnwindFn extends DoFn>, Row>
private final List