Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
30896bc
Adding Serverless Provisioner
sahusanket Mar 17, 2025
5d20905
ClassLoading issue for DataprocRuntimeEnvironment
sahusanket Mar 18, 2025
29b0fcf
ClassLoading issue for DataprocRuntimeEnvironment
sahusanket Mar 19, 2025
5a544c4
Issue : When running with master 'yarn' either HADOOP_CONF_DIR or YA…
sahusanket Mar 19, 2025
5e462a8
Issue : URI has a fragment component
sahusanket Mar 20, 2025
78cff2e
Issue : ElementTrackingStore cannot be cast to class org.apache.spa…
sahusanket Mar 24, 2025
f1f6208
Issue : jar exists and does not match contents of spark
sahusanket Mar 25, 2025
61764ea
Issue : DataprocMetricsListener is not a subclass of org.apache.spa…
sahusanket Mar 25, 2025
3facac0
Issue : NoSuchFileException HydratorSpark.config
sahusanket Mar 25, 2025
09d5bfa
Clean up1 : Pom dependency mistake
sahusanket Mar 26, 2025
6f73250
Adding ServerlessDataprocSubmitter
sahusanket Mar 26, 2025
5a77bd4
Adding network field im json
sahusanket Mar 27, 2025
73e0288
Removing prefix for Getting Job Details
sahusanket Mar 27, 2025
e61cf14
Adding support for Stopping or Killing job which is CANCEL for batches.
sahusanket May 8, 2025
a0a7a3e
Kill serverless job is running in Deprovision state
sahusanket May 8, 2025
d66df04
Testing skip delete of file.
sahusanket May 9, 2025
a6f546f
Reverting kill job on deprovision
sahusanket May 9, 2025
5aaa59e
Removing GET ...makes it sync and wait
sahusanket May 14, 2025
a0d753c
Removing master add in spark submit
sahusanket May 14, 2025
85720d5
fixing LOG_DIR
sahusanket May 14, 2025
5f0c7d9
Adding verbose option in java driver n executor
sahusanket May 15, 2025
75ab81a
trying adding Worker entry point for rewrite and intercept
sahusanket May 15, 2025
5662c1e
Revert "trying adding Worker entry point for rewrite and intercept"
sahusanket May 15, 2025
993e665
Addings logs to debugging archive artifact
sahusanket May 15, 2025
78488d4
Manually renaming artifacts archive
sahusanket May 15, 2025
13015f1
Manually adding --jars with artifacts
sahusanket May 16, 2025
c7f1c97
adding user arg
sahusanket May 16, 2025
ef58dda
adding all jars in --jars
sahusanket May 19, 2025
a955d4a
commenting arhchives to test
sahusanket May 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import io.cdap.cdap.etl.spark.function.JoinOnFunction;
import io.cdap.cdap.etl.spark.function.PluginFunctionContext;
import io.cdap.cdap.internal.io.SchemaTypeAdapter;
import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
Expand All @@ -74,6 +75,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -203,7 +205,8 @@ public void run(DatasetContext context) throws Exception {
BatchPhaseSpec phaseSpec = GSON.fromJson(sec.getSpecification().getProperty(Constants.PIPELINEID),
BatchPhaseSpec.class);

Path configFile = sec.getLocalizationContext().getLocalFile("HydratorSpark.config").toPath();
//Issue : NoSuchFileException HydratorSpark.config
Path configFile = Paths.get(SparkFiles.get("HydratorSpark.config"));
try (BufferedReader reader = Files.newBufferedReader(configFile, StandardCharsets.UTF_8)) {
String object = reader.readLine();
SparkBatchSourceSinkFactoryInfo sourceSinkInfo = GSON.fromJson(object, SparkBatchSourceSinkFactoryInfo.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ public List<Finisher> prepare(PhaseSpec phaseSpec)
throws TransactionFailureException, InstantiationException, IOException {
stageOperations = new HashMap<>();
stagePartitions = new HashMap<>();

File configFile = File.createTempFile("HydratorSpark", ".config");
// Issue : NoSuchFileException HydratorSpark.config
// File configFile = File.createTempFile("HydratorSpark", ".config");
File configFile = new File("/tmp/HydratorSpark.config");
if (!configFile.getParentFile().exists()) {
configFile.getParentFile().mkdirs();
}
Expand Down
13 changes: 13 additions & 0 deletions cdap-common/src/main/resources/cdap-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6593,4 +6593,17 @@
</description>
</property>

<!-- ONLY FOR TESTING : MAKE SURE TO ADD THIS IN CLH -->
<property>
<name>app.program.runtime.monitor.type.gcp-serverless-dataproc</name>
<value>url</value>
<description>
provisioner.system.properties.gcp-serverless-dataproc.bucket: gs://df-5174705604629642991-qceyg67cd4i67jwoaizbbqaaaa
provisioner.system.properties.gcp-serverless-dataproc.labels: goog-datafusion-edition=developer,goog-datafusion-instance=dev-serverless,goog-datafusion-project=cdf-test-317207,goog-datafusion-version=latest
provisioner.system.properties.gcp-serverless-dataproc.projectId: cdf-test-317207
provisioner.system.properties.gcp-serverless-dataproc.runtime.job.manager: "true"
provisioner.system.properties.gcp-serverless-dataproc.token.endpoint: https://cdap-dev-serverless-task-worker:11020/v3Internal/worker/token
provisioner.system.properties.gcp-serverless-dataproc.troubleshootingDocsURL: https://cloud.google.com/dataproc/docs/troubleshooting
</description>
</property>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.runtime.spi.provisioner.dataproc;

import com.google.common.base.Strings;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.runtime.spi.RuntimeMonitorType;
import io.cdap.cdap.runtime.spi.common.DataprocImageVersion;
import io.cdap.cdap.runtime.spi.common.DataprocUtils;
import io.cdap.cdap.runtime.spi.provisioner.Cluster;
import io.cdap.cdap.runtime.spi.provisioner.ClusterStatus;
import io.cdap.cdap.runtime.spi.provisioner.PollingStrategies;
import io.cdap.cdap.runtime.spi.provisioner.PollingStrategy;
import io.cdap.cdap.runtime.spi.provisioner.ProvisionerContext;
import io.cdap.cdap.runtime.spi.provisioner.ProvisionerSpecification;
import io.cdap.cdap.runtime.spi.runtimejob.DataprocClusterInfo;
import io.cdap.cdap.runtime.spi.runtimejob.DataprocRuntimeJobManager;
import io.cdap.cdap.runtime.spi.runtimejob.RuntimeJobDetail;
import io.cdap.cdap.runtime.spi.runtimejob.RuntimeJobManager;
import io.cdap.cdap.runtime.spi.runtimejob.ServerlessDataprocRuntimeJobManager;
import io.cdap.cdap.runtime.spi.ssh.SSHKeyPair;
import io.cdap.cdap.runtime.spi.ssh.SSHPublicKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

/**
* Provisioner to submit a job to Dataproc's Serverless (Dataproc Batch)
*/
public class ServerlessDataprocProvisioner extends AbstractDataprocProvisioner {

private static final Logger LOG = LoggerFactory.getLogger(ServerlessDataprocProvisioner.class);

private static final ProvisionerSpecification SPEC = new ProvisionerSpecification(
"gcp-serverless-dataproc", "Serverless Dataproc",
"Connect and Execute jobs on Serverless Dataproc (Batches).");
// Keys for looking up system properties

private static final String CLUSTER_NAME = "SERVERLESS_DATAPROC";
private static final DataprocClientFactory CLIENT_FACTORY = new DefaultDataprocClientFactory();

public ServerlessDataprocProvisioner() {
super(SPEC);
}

@Override
public void validateProperties(Map<String, String> properties) {
// Creates the DataprocConf for validation
DataprocConf.create(properties);
}

@Override
protected String getClusterName(ProvisionerContext context) {
return context.getProperties().get(CLUSTER_NAME);
}

@Override
public Cluster createCluster(ProvisionerContext context) throws Exception {

// Responsibilities during existing dp cluster :
//TODO 1: Ensure labels are added while submitting a job. from AbstractDataprocProvisioner#getCommonDataprocLabels
//TODO 2: Ensure SparkRuntime Version (image) is compatible while submitting job.
Map<String, String> contextProperties = createContextProperties(context);
DataprocConf conf = DataprocConf.create(contextProperties);

// Return a FAKE CLUSTER for now
return new Cluster(
CLUSTER_NAME,
ClusterStatus.RUNNING,
Collections.emptyList(), Collections.emptyMap());
}

@Override
protected void doDeleteCluster(ProvisionerContext context, Cluster cluster, DataprocConf conf) {
// no-op
}

@Override
public ClusterStatus getClusterStatus(ProvisionerContext context, Cluster cluster) {
ClusterStatus status = cluster.getStatus();
return status == ClusterStatus.DELETING ? ClusterStatus.NOT_EXISTS : status;
}

@Override
public Cluster getClusterDetail(ProvisionerContext context, Cluster cluster) {
return new Cluster(cluster, getClusterStatus(context, cluster));
}

@Override
public PollingStrategy getPollingStrategy(ProvisionerContext context, Cluster cluster) {
if (cluster.getStatus() == ClusterStatus.CREATING) {
return PollingStrategies.fixedInterval(0, TimeUnit.SECONDS);
}
DataprocConf conf = DataprocConf.create(createContextProperties(context));
return PollingStrategies.fixedInterval(conf.getPollInterval(), TimeUnit.SECONDS);
}

/**
* Provides implementation of {@link RuntimeJobManager}.
*/
@Override
public Optional<RuntimeJobManager> getRuntimeJobManager(ProvisionerContext context) {
Map<String, String> properties = createContextProperties(context);
DataprocConf conf = DataprocConf.create(properties);

// if this system property is not provided, we will assume that ssh should be used instead of
// runtime job manager for job launch.
// if (!conf.isRuntimeJobManagerEnabled()) {
// return Optional.empty();
// }
try {
String clusterName = getClusterName(context);
String projectId = conf.getProjectId();
String region = conf.getRegion();
String bucket =
conf.getGcsBucket() != null ? conf.getGcsBucket() : properties.get(DataprocUtils.BUCKET);
return Optional.of(
new ServerlessDataprocRuntimeJobManager(
new DataprocClusterInfo(context, clusterName, conf.getDataprocCredentials(),
getRootUrl(conf), projectId,
region, bucket, getCommonDataprocLabels(context)),
Collections.unmodifiableMap(properties), context.getCDAPVersionInfo(), getImageVersion(conf)));
} catch (Exception e) {
throw new RuntimeException("Error while getting credentials for dataproc. ", e);
}
}
//
// @Override
// public ClusterStatus deleteClusterWithStatus(ProvisionerContext context, Cluster cluster) throws Exception {
// LOG.warn("SANKET here in deleteClusterWithStatus");
// RuntimeJobManager jobManager = getRuntimeJobManager(context).orElse(null);
//
// if (jobManager != null) {
// LOG.warn("SANKET here in deleteClusterWithStatus : jobManager");
// try {
// RuntimeJobDetail jobDetail = jobManager.getDetail(context.getProgramRunInfo()).orElse(null);
// if (jobDetail != null && !jobDetail.getStatus().isTerminated()) {
// LOG.warn("SANKET : trying to cancel for running " );
// jobManager.kill(jobDetail);
// }
// } catch (Exception e) {
// LOG.warn(" Failed to cancel job ");
// return ClusterStatus.RUNNING;
// } finally {
// jobManager.close();
// }
//
// }
// return ClusterStatus.DELETING;
// }

String getImageVersion(DataprocConf conf) {
String imageVersion = conf.getImageVersion();
if (imageVersion == null) {
imageVersion = "1.1";
}
LOG.warn("Going for Serverless version : " + imageVersion);
return imageVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class DataprocJobMain {
* @throws Exception any exception while running the job
*/
public static void main(String[] args) throws Exception {

LOG.info("SANKET start");
Map<String, Collection<String>> arguments = fromPosixArray(args);

if (!arguments.containsKey(RUNTIME_JOB_CLASS)) {
Expand Down Expand Up @@ -99,7 +101,13 @@ public static void main(String[] args) throws Exception {
String sparkCompat = arguments.get(SPARK_COMPAT).iterator().next();
String applicationJarLocalizedName = arguments.get(Constants.Files.APPLICATION_JAR).iterator()
.next();
String launchMode = arguments.get(LAUNCH_MODE).iterator().next();
//TODO from serverless job manager
String launchMode = "CLIENT"; //arguments.get(LAUNCH_MODE).iterator().next();

ClassLoader cl = DataprocJobMain.class.getClassLoader();
if (!(cl instanceof URLClassLoader)) {
throw new RuntimeException("Classloader is expected to be an instance of URLClassLoader");
}

// create classpath from resources, application and twill jars
URL[] urls = getClasspath(Arrays.asList(Constants.Files.RESOURCES_JAR,
Expand All @@ -115,13 +123,15 @@ public static void main(String[] args) throws Exception {
CompletableFuture<?> completion = new CompletableFuture<>();
try {
Thread.currentThread().setContextClassLoader(newCl);

LOG.warn("SANKET 2");
// load environment class and create instance of it
String dataprocEnvClassName = DataprocRuntimeEnvironment.class.getName();
LOG.warn("SANKET 3");
Class<?> dataprocEnvClass = newCl.loadClass(dataprocEnvClassName);
Object newDataprocEnvInstance = dataprocEnvClass.newInstance();

try {
LOG.warn("SANKET 4");
// call initialize() method on dataprocEnvClass
Method initializeMethod = dataprocEnvClass.getMethod("initialize", String.class,
String.class);
Expand Down Expand Up @@ -186,6 +196,13 @@ private static URL[] getClasspath(List<String> jarFiles) throws IOException {
urls.addAll(createClassPathUrls(jarDir));
}

ClassLoader cl = DataprocJobMain.class.getClassLoader();

if (cl instanceof URLClassLoader && cl != ClassLoader.getSystemClassLoader()) {
urls.addAll(Arrays.asList(((URLClassLoader) cl).getURLs()));
}


// Add the system class path to the URL list
for (String path : System.getProperty("java.class.path").split(File.pathSeparator)) {
try {
Expand Down Expand Up @@ -288,7 +305,8 @@ private static Map<String, Collection<String>> fromPosixArray(String[] args) {
private static ClassLoader createContainerClassLoader(URL[] classpath) {
String containerClassLoaderName = System.getProperty(Constants.TWILL_CONTAINER_CLASSLOADER);
URLClassLoader classLoader = new URLClassLoader(classpath,
DataprocJobMain.class.getClassLoader().getParent());
// DataprocJobMain.class.getClassLoader().getParent());
ClassLoader.getSystemClassLoader().getParent());
if (containerClassLoaderName == null) {
return classLoader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ public void destroy() {
if (zkServer != null) {
zkServer.stopAndWait();
}
if (locationFactory != null) {
// TODO : skipping to test.
// if (locationFactory != null) {
if (false) {
Location location = locationFactory.create("/");
try {
location.delete(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public class DataprocRuntimeJobManager implements RuntimeJobManager {
private volatile JobControllerClient jobControllerClient;
private volatile ClusterControllerClient clusterControllerClient;
// CDAP specific artifacts which will be cached in GCS.
private static final List<String> artifactsCacheablePerCDAPVersion = new ArrayList<>(
static final List<String> artifactsCacheablePerCDAPVersion = new ArrayList<>(
Arrays.asList(Constants.Files.TWILL_JAR, Constants.Files.LAUNCHER_JAR)
);
private static final int SNAPSHOT_EXPIRE_DAYS = 7;
Expand Down Expand Up @@ -470,8 +470,8 @@ public void close() {
/**
* Returns list of runtime local files with twill.jar and launcher.jar added to it.
*/
private List<LocalFile> getRuntimeLocalFiles(Collection<? extends LocalFile> runtimeLocalFiles,
File tempDir) throws Exception {
List<LocalFile> getRuntimeLocalFiles(Collection<? extends LocalFile> runtimeLocalFiles,
File tempDir) throws Exception {
LocationFactory locationFactory = new LocalLocationFactory(tempDir);
List<LocalFile> localFiles = new ArrayList<>(runtimeLocalFiles);
localFiles.add(getTwillJar(locationFactory));
Expand Down Expand Up @@ -505,7 +505,7 @@ private LocalFile getLauncherJar(LocationFactory locationFactory) throws IOExcep
*
* @return true if delete lifecycle with days since custom time is set on the bucket.
*/
private boolean validateDeleteLifecycle(String bucketName, String run) {
boolean validateDeleteLifecycle(String bucketName, String run) {
Storage storage = getStorageClient();
Bucket bucket = storage.get(bucketName);
for (BucketInfo.LifecycleRule rule : bucket.getLifecycleRules()) {
Expand Down Expand Up @@ -547,8 +547,8 @@ private boolean validateDeleteLifecycle(String bucketName, String run) {
* Upload cacheable files uploads the file to GCS if the file does not exists. Once uploaded, it
* also sets custom time on the object.
*/
private LocalFile uploadCacheableFile(String bucket, String targetFilePath,
LocalFile localFile)
LocalFile uploadCacheableFile(String bucket, String targetFilePath,
LocalFile localFile)
throws IOException, StorageException {
Storage storage = getStorageClient();
BlobId blobId = BlobId.of(bucket, targetFilePath);
Expand Down Expand Up @@ -929,7 +929,7 @@ private void stopJob(String jobId) throws Exception {
}
}

private String getPath(String... pathSubComponents) {
String getPath(String... pathSubComponents) {
return Joiner.on("/").join(pathSubComponents);
}

Expand Down
Loading