diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSparkPipelineDriver.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSparkPipelineDriver.java index f3cc3e25e797..c831b1f27ff9 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSparkPipelineDriver.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/BatchSparkPipelineDriver.java @@ -62,6 +62,7 @@ import io.cdap.cdap.etl.spark.function.JoinOnFunction; import io.cdap.cdap.etl.spark.function.PluginFunctionContext; import io.cdap.cdap.internal.io.SchemaTypeAdapter; +import org.apache.spark.SparkFiles; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; @@ -74,6 +75,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -203,7 +205,8 @@ public void run(DatasetContext context) throws Exception { BatchPhaseSpec phaseSpec = GSON.fromJson(sec.getSpecification().getProperty(Constants.PIPELINEID), BatchPhaseSpec.class); - Path configFile = sec.getLocalizationContext().getLocalFile("HydratorSpark.config").toPath(); + //Issue : NoSuchFileException HydratorSpark.config + Path configFile = Paths.get(SparkFiles.get("HydratorSpark.config")); try (BufferedReader reader = Files.newBufferedReader(configFile, StandardCharsets.UTF_8)) { String object = reader.readLine(); SparkBatchSourceSinkFactoryInfo sourceSinkInfo = GSON.fromJson(object, SparkBatchSourceSinkFactoryInfo.class); diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkPreparer.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkPreparer.java index 00266a0c8cbe..fd92a146595c 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkPreparer.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SparkPreparer.java @@ -82,8 +82,9 @@ public List prepare(PhaseSpec phaseSpec) throws TransactionFailureException, InstantiationException, IOException { stageOperations = new HashMap<>(); stagePartitions = new HashMap<>(); - - File configFile = File.createTempFile("HydratorSpark", ".config"); + // Issue : NoSuchFileException HydratorSpark.config +// File configFile = File.createTempFile("HydratorSpark", ".config"); + File configFile = new File("/tmp/HydratorSpark.config"); if (!configFile.getParentFile().exists()) { configFile.getParentFile().mkdirs(); } diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index 85d1872f892f..be0bb5cf3978 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -6593,4 +6593,17 @@ + + + app.program.runtime.monitor.type.gcp-serverless-dataproc + url + + provisioner.system.properties.gcp-serverless-dataproc.bucket: gs://df-5174705604629642991-qceyg67cd4i67jwoaizbbqaaaa + provisioner.system.properties.gcp-serverless-dataproc.labels: goog-datafusion-edition=developer,goog-datafusion-instance=dev-serverless,goog-datafusion-project=cdf-test-317207,goog-datafusion-version=latest + provisioner.system.properties.gcp-serverless-dataproc.projectId: cdf-test-317207 + provisioner.system.properties.gcp-serverless-dataproc.runtime.job.manager: "true" + provisioner.system.properties.gcp-serverless-dataproc.token.endpoint: https://cdap-dev-serverless-task-worker:11020/v3Internal/worker/token + provisioner.system.properties.gcp-serverless-dataproc.troubleshootingDocsURL: https://cloud.google.com/dataproc/docs/troubleshooting + + diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/ServerlessDataprocProvisioner.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/ServerlessDataprocProvisioner.java new file mode 100644 index 000000000000..15b652f6c597 --- /dev/null +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/ServerlessDataprocProvisioner.java @@ -0,0 +1,180 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.runtime.spi.provisioner.dataproc; + +import com.google.common.base.Strings; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.runtime.spi.RuntimeMonitorType; +import io.cdap.cdap.runtime.spi.common.DataprocImageVersion; +import io.cdap.cdap.runtime.spi.common.DataprocUtils; +import io.cdap.cdap.runtime.spi.provisioner.Cluster; +import io.cdap.cdap.runtime.spi.provisioner.ClusterStatus; +import io.cdap.cdap.runtime.spi.provisioner.PollingStrategies; +import io.cdap.cdap.runtime.spi.provisioner.PollingStrategy; +import io.cdap.cdap.runtime.spi.provisioner.ProvisionerContext; +import io.cdap.cdap.runtime.spi.provisioner.ProvisionerSpecification; +import io.cdap.cdap.runtime.spi.runtimejob.DataprocClusterInfo; +import io.cdap.cdap.runtime.spi.runtimejob.DataprocRuntimeJobManager; +import io.cdap.cdap.runtime.spi.runtimejob.RuntimeJobDetail; +import io.cdap.cdap.runtime.spi.runtimejob.RuntimeJobManager; +import io.cdap.cdap.runtime.spi.runtimejob.ServerlessDataprocRuntimeJobManager; +import io.cdap.cdap.runtime.spi.ssh.SSHKeyPair; +import io.cdap.cdap.runtime.spi.ssh.SSHPublicKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +/** + * Provisioner to submit a job to Dataproc's Serverless (Dataproc Batch) + */ +public class ServerlessDataprocProvisioner extends AbstractDataprocProvisioner { + + private static final Logger LOG = LoggerFactory.getLogger(ServerlessDataprocProvisioner.class); + + private static final ProvisionerSpecification SPEC = new ProvisionerSpecification( + "gcp-serverless-dataproc", "Serverless Dataproc", + "Connect and Execute jobs on Serverless Dataproc (Batches)."); + // Keys for looking up system properties + + private static final String CLUSTER_NAME = "SERVERLESS_DATAPROC"; + private static final DataprocClientFactory CLIENT_FACTORY = new DefaultDataprocClientFactory(); + + public ServerlessDataprocProvisioner() { + super(SPEC); + } + + @Override + public void validateProperties(Map properties) { + // Creates the DataprocConf for validation + DataprocConf.create(properties); + } + + @Override + protected String getClusterName(ProvisionerContext context) { + return context.getProperties().get(CLUSTER_NAME); + } + + @Override + public Cluster createCluster(ProvisionerContext context) throws Exception { + + // Responsibilities during existing dp cluster : + //TODO 1: Ensure labels are added while submitting a job. from AbstractDataprocProvisioner#getCommonDataprocLabels + //TODO 2: Ensure SparkRuntime Version (image) is compatible while submitting job. + Map contextProperties = createContextProperties(context); + DataprocConf conf = DataprocConf.create(contextProperties); + + // Return a FAKE CLUSTER for now + return new Cluster( + CLUSTER_NAME, + ClusterStatus.RUNNING, + Collections.emptyList(), Collections.emptyMap()); + } + + @Override + protected void doDeleteCluster(ProvisionerContext context, Cluster cluster, DataprocConf conf) { + // no-op + } + + @Override + public ClusterStatus getClusterStatus(ProvisionerContext context, Cluster cluster) { + ClusterStatus status = cluster.getStatus(); + return status == ClusterStatus.DELETING ? ClusterStatus.NOT_EXISTS : status; + } + + @Override + public Cluster getClusterDetail(ProvisionerContext context, Cluster cluster) { + return new Cluster(cluster, getClusterStatus(context, cluster)); + } + + @Override + public PollingStrategy getPollingStrategy(ProvisionerContext context, Cluster cluster) { + if (cluster.getStatus() == ClusterStatus.CREATING) { + return PollingStrategies.fixedInterval(0, TimeUnit.SECONDS); + } + DataprocConf conf = DataprocConf.create(createContextProperties(context)); + return PollingStrategies.fixedInterval(conf.getPollInterval(), TimeUnit.SECONDS); + } + + /** + * Provides implementation of {@link RuntimeJobManager}. + */ + @Override + public Optional getRuntimeJobManager(ProvisionerContext context) { + Map properties = createContextProperties(context); + DataprocConf conf = DataprocConf.create(properties); + + // if this system property is not provided, we will assume that ssh should be used instead of + // runtime job manager for job launch. +// if (!conf.isRuntimeJobManagerEnabled()) { +// return Optional.empty(); +// } + try { + String clusterName = getClusterName(context); + String projectId = conf.getProjectId(); + String region = conf.getRegion(); + String bucket = + conf.getGcsBucket() != null ? conf.getGcsBucket() : properties.get(DataprocUtils.BUCKET); + return Optional.of( + new ServerlessDataprocRuntimeJobManager( + new DataprocClusterInfo(context, clusterName, conf.getDataprocCredentials(), + getRootUrl(conf), projectId, + region, bucket, getCommonDataprocLabels(context)), + Collections.unmodifiableMap(properties), context.getCDAPVersionInfo(), getImageVersion(conf))); + } catch (Exception e) { + throw new RuntimeException("Error while getting credentials for dataproc. ", e); + } + } +// +// @Override +// public ClusterStatus deleteClusterWithStatus(ProvisionerContext context, Cluster cluster) throws Exception { +// LOG.warn("SANKET here in deleteClusterWithStatus"); +// RuntimeJobManager jobManager = getRuntimeJobManager(context).orElse(null); +// +// if (jobManager != null) { +// LOG.warn("SANKET here in deleteClusterWithStatus : jobManager"); +// try { +// RuntimeJobDetail jobDetail = jobManager.getDetail(context.getProgramRunInfo()).orElse(null); +// if (jobDetail != null && !jobDetail.getStatus().isTerminated()) { +// LOG.warn("SANKET : trying to cancel for running " ); +// jobManager.kill(jobDetail); +// } +// } catch (Exception e) { +// LOG.warn(" Failed to cancel job "); +// return ClusterStatus.RUNNING; +// } finally { +// jobManager.close(); +// } +// +// } +// return ClusterStatus.DELETING; +// } + + String getImageVersion(DataprocConf conf) { + String imageVersion = conf.getImageVersion(); + if (imageVersion == null) { + imageVersion = "1.1"; + } + LOG.warn("Going for Serverless version : " + imageVersion); + return imageVersion; + } +} \ No newline at end of file diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocJobMain.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocJobMain.java index f18d4d151d64..af1e54780aba 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocJobMain.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocJobMain.java @@ -61,6 +61,8 @@ public class DataprocJobMain { * @throws Exception any exception while running the job */ public static void main(String[] args) throws Exception { + + LOG.info("SANKET start"); Map> arguments = fromPosixArray(args); if (!arguments.containsKey(RUNTIME_JOB_CLASS)) { @@ -99,7 +101,13 @@ public static void main(String[] args) throws Exception { String sparkCompat = arguments.get(SPARK_COMPAT).iterator().next(); String applicationJarLocalizedName = arguments.get(Constants.Files.APPLICATION_JAR).iterator() .next(); - String launchMode = arguments.get(LAUNCH_MODE).iterator().next(); + //TODO from serverless job manager + String launchMode = "CLIENT"; //arguments.get(LAUNCH_MODE).iterator().next(); + + ClassLoader cl = DataprocJobMain.class.getClassLoader(); + if (!(cl instanceof URLClassLoader)) { + throw new RuntimeException("Classloader is expected to be an instance of URLClassLoader"); + } // create classpath from resources, application and twill jars URL[] urls = getClasspath(Arrays.asList(Constants.Files.RESOURCES_JAR, @@ -115,13 +123,15 @@ public static void main(String[] args) throws Exception { CompletableFuture completion = new CompletableFuture<>(); try { Thread.currentThread().setContextClassLoader(newCl); - + LOG.warn("SANKET 2"); // load environment class and create instance of it String dataprocEnvClassName = DataprocRuntimeEnvironment.class.getName(); + LOG.warn("SANKET 3"); Class dataprocEnvClass = newCl.loadClass(dataprocEnvClassName); Object newDataprocEnvInstance = dataprocEnvClass.newInstance(); try { + LOG.warn("SANKET 4"); // call initialize() method on dataprocEnvClass Method initializeMethod = dataprocEnvClass.getMethod("initialize", String.class, String.class); @@ -186,6 +196,13 @@ private static URL[] getClasspath(List jarFiles) throws IOException { urls.addAll(createClassPathUrls(jarDir)); } + ClassLoader cl = DataprocJobMain.class.getClassLoader(); + + if (cl instanceof URLClassLoader && cl != ClassLoader.getSystemClassLoader()) { + urls.addAll(Arrays.asList(((URLClassLoader) cl).getURLs())); + } + + // Add the system class path to the URL list for (String path : System.getProperty("java.class.path").split(File.pathSeparator)) { try { @@ -288,7 +305,8 @@ private static Map> fromPosixArray(String[] args) { private static ClassLoader createContainerClassLoader(URL[] classpath) { String containerClassLoaderName = System.getProperty(Constants.TWILL_CONTAINER_CLASSLOADER); URLClassLoader classLoader = new URLClassLoader(classpath, - DataprocJobMain.class.getClassLoader().getParent()); +// DataprocJobMain.class.getClassLoader().getParent()); + ClassLoader.getSystemClassLoader().getParent()); if (containerClassLoaderName == null) { return classLoader; } diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeEnvironment.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeEnvironment.java index 85fdaf7851a5..13e9767383a6 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeEnvironment.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeEnvironment.java @@ -113,7 +113,9 @@ public void destroy() { if (zkServer != null) { zkServer.stopAndWait(); } - if (locationFactory != null) { + // TODO : skipping to test. +// if (locationFactory != null) { + if (false) { Location location = locationFactory.create("/"); try { location.delete(true); diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeJobManager.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeJobManager.java index 6d7003d6c801..a799bb9d6945 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeJobManager.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeJobManager.java @@ -135,7 +135,7 @@ public class DataprocRuntimeJobManager implements RuntimeJobManager { private volatile JobControllerClient jobControllerClient; private volatile ClusterControllerClient clusterControllerClient; // CDAP specific artifacts which will be cached in GCS. - private static final List artifactsCacheablePerCDAPVersion = new ArrayList<>( + static final List artifactsCacheablePerCDAPVersion = new ArrayList<>( Arrays.asList(Constants.Files.TWILL_JAR, Constants.Files.LAUNCHER_JAR) ); private static final int SNAPSHOT_EXPIRE_DAYS = 7; @@ -470,8 +470,8 @@ public void close() { /** * Returns list of runtime local files with twill.jar and launcher.jar added to it. */ - private List getRuntimeLocalFiles(Collection runtimeLocalFiles, - File tempDir) throws Exception { + List getRuntimeLocalFiles(Collection runtimeLocalFiles, + File tempDir) throws Exception { LocationFactory locationFactory = new LocalLocationFactory(tempDir); List localFiles = new ArrayList<>(runtimeLocalFiles); localFiles.add(getTwillJar(locationFactory)); @@ -505,7 +505,7 @@ private LocalFile getLauncherJar(LocationFactory locationFactory) throws IOExcep * * @return true if delete lifecycle with days since custom time is set on the bucket. */ - private boolean validateDeleteLifecycle(String bucketName, String run) { + boolean validateDeleteLifecycle(String bucketName, String run) { Storage storage = getStorageClient(); Bucket bucket = storage.get(bucketName); for (BucketInfo.LifecycleRule rule : bucket.getLifecycleRules()) { @@ -547,8 +547,8 @@ private boolean validateDeleteLifecycle(String bucketName, String run) { * Upload cacheable files uploads the file to GCS if the file does not exists. Once uploaded, it * also sets custom time on the object. */ - private LocalFile uploadCacheableFile(String bucket, String targetFilePath, - LocalFile localFile) + LocalFile uploadCacheableFile(String bucket, String targetFilePath, + LocalFile localFile) throws IOException, StorageException { Storage storage = getStorageClient(); BlobId blobId = BlobId.of(bucket, targetFilePath); @@ -929,7 +929,7 @@ private void stopJob(String jobId) throws Exception { } } - private String getPath(String... pathSubComponents) { + String getPath(String... pathSubComponents) { return Joiner.on("/").join(pathSubComponents); } diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/ServerlessDataprocRuntimeJobManager.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/ServerlessDataprocRuntimeJobManager.java new file mode 100644 index 000000000000..24a879e7f8d6 --- /dev/null +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/ServerlessDataprocRuntimeJobManager.java @@ -0,0 +1,532 @@ +package io.cdap.cdap.runtime.spi.runtimejob; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.longrunning.OperationFuture; +import com.google.api.gax.rpc.AlreadyExistsException; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.StatusCode; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.dataproc.v1.Batch; +import com.google.cloud.dataproc.v1.BatchControllerClient; +import com.google.cloud.dataproc.v1.BatchControllerSettings; +import com.google.cloud.dataproc.v1.BatchOperationMetadata; +import com.google.cloud.dataproc.v1.EnvironmentConfig; +import com.google.cloud.dataproc.v1.ExecutionConfig; +import com.google.cloud.dataproc.v1.JobControllerClient; +import com.google.cloud.dataproc.v1.LocationName; +import com.google.cloud.dataproc.v1.RuntimeConfig; +import com.google.cloud.dataproc.v1.SparkBatch; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.longrunning.OperationsClient; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCodeType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.runtime.spi.CacheableLocalFile; +import io.cdap.cdap.runtime.spi.ProgramRunInfo; +import io.cdap.cdap.runtime.spi.VersionInfo; +import io.cdap.cdap.runtime.spi.common.DataprocMetric; +import io.cdap.cdap.runtime.spi.common.DataprocUtils; +import io.cdap.cdap.runtime.spi.provisioner.ProvisionerContext; +import io.cdap.cdap.runtime.spi.provisioner.dataproc.DataprocRuntimeException; +import org.apache.twill.api.LocalFile; +import org.apache.twill.internal.Constants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.Future; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import javax.annotation.Nullable; + +public class ServerlessDataprocRuntimeJobManager extends DataprocRuntimeJobManager { + + private static final Logger LOG = LoggerFactory.getLogger(ServerlessDataprocRuntimeJobManager.class); + private static final Pattern DATAPROC_BATCH_ID_PATTERN = Pattern.compile("[a-z0-9][a-z0-9\\-]{2,61}[a-z0-9]"); + + private final ProvisionerContext provisionerContext; + private final String bucket; + private final String region; + private final Map provisionerProperties; + private final VersionInfo cdapVersionInfo; + private final String projectId; + private final Map labels; + //dataproc job labels (must match '[\p{Ll}\p{Lo}][\p{Ll}\p{Lo}\p{N}_-]{0,62}' pattern) + private static final String LABEL_CDAP_PROGRAM = "cdap-program"; + private static final String LABEL_CDAP_PROGRAM_TYPE = "cdap-program-type"; + private volatile BatchControllerClient batchControllerClient; + private final GoogleCredentials credentials; + private final String endpoint; + private final String imageVersion; + + + /** + * Created by dataproc provisioner with properties that are needed by dataproc runtime job + * manager. + * + * @param clusterInfo dataproc cluster information + * @param provisionerProperties + * @param cdapVersionInfo + */ + public ServerlessDataprocRuntimeJobManager(DataprocClusterInfo clusterInfo, + Map provisionerProperties, + VersionInfo cdapVersionInfo, String imageVersion) { + + super(clusterInfo, provisionerProperties, cdapVersionInfo); + this.provisionerContext = clusterInfo.getProvisionerContext(); + this.bucket = clusterInfo.getBucket(); + this.region = clusterInfo.getRegion(); + this.cdapVersionInfo = cdapVersionInfo; + this.provisionerProperties = provisionerProperties; + this.projectId = clusterInfo.getProjectId(); + this.labels = clusterInfo.getLabels(); + this.credentials = clusterInfo.getCredentials(); + this.endpoint = clusterInfo.getEndpoint(); + this.imageVersion = imageVersion; + } + + @Override + public void launch(RuntimeJobInfo runtimeJobInfo) throws Exception { + String bucket = DataprocUtils.getBucketName(this.bucket); + ProgramRunInfo runInfo = runtimeJobInfo.getProgramRunInfo(); + + // Caching is disabled if it's been explicitly disabled or delete lifecycle is not set on the bucket. + boolean gcsCacheEnabled = Boolean.parseBoolean( + provisionerContext.getProperties().getOrDefault(DataprocUtils.GCS_CACHE_ENABLED, "true")) + || !validateDeleteLifecycle(bucket, runInfo.getRun()); + + LOG.debug( + "Launching run {} with following configurations: project {}, region {}, bucket {}.", + runInfo.getRun(), projectId, region, bucket); + if (!gcsCacheEnabled) { + LOG.warn("Launching run {} without GCS caching. This slows launch time.", runInfo.getRun()); + } + + File tempDir = DataprocUtils.CACHE_DIR_PATH.toFile(); + boolean disableLocalCaching = Boolean.parseBoolean( + provisionerContext.getProperties().getOrDefault(DataprocUtils.LOCAL_CACHE_DISABLED, + "false")); + // In dataproc bucket, the run root will be /cdap-job//. All the files without _cache_ in their + // filename for this run will be copied under that base dir. + String runRootPath = getPath(DataprocUtils.CDAP_GCS_ROOT, runInfo.getRun()); + // In dataproc bucket, the shared folder for artifacts will be /cdap-job/cached-artifacts. + // All instances of CacheableLocalFile will be copied to the shared folder if they do not exist. + String cacheRootPath = getPath(DataprocUtils.CDAP_GCS_ROOT, + DataprocUtils.CDAP_CACHED_ARTIFACTS); + String cdapVersion; + if (cdapVersionInfo.isSnapshot()) { + cdapVersion = String.format("%s.%s.%s-SNAPSHOT", cdapVersionInfo.getMajor(), + cdapVersionInfo.getMinor(), + cdapVersionInfo.getFix()); + } else { + cdapVersion = String.format("%s.%s.%s", cdapVersionInfo.getMajor(), + cdapVersionInfo.getMinor(), + cdapVersionInfo.getFix()); + } + + //Serverless needs to be in Client mode to make workflow run on master. + LaunchMode launchMode = LaunchMode.CLIENT; + + DataprocMetric.Builder submitJobMetric = + DataprocMetric.builder("provisioner.submitJob.response.count") + .setRegion(region) + .setLaunchMode(launchMode); + + try { + // step 1: build twill.jar and launcher.jar and add them to files to be copied to gcs + if (disableLocalCaching) { + LOG.debug("Local caching is disabled, " + + "continuing without caching twill and dataproc launcher jars."); + tempDir = Files.createTempDirectory("dataproc.launcher").toFile(); + } + List localFiles = getRuntimeLocalFiles(runtimeJobInfo.getLocalizeFiles(), tempDir); + + // step 2: upload all the necessary files to gcs so that those files are available to dataproc job + List> uploadFutures = new ArrayList<>(); + for (LocalFile fileToUpload : localFiles) { + boolean cacheable = gcsCacheEnabled && fileToUpload instanceof CacheableLocalFile; + String targetFilePath = getPath(cacheable ? cacheRootPath : runRootPath, + fileToUpload.getName()); + String targetFilePathWithVersion = getPath(cacheRootPath, cdapVersion, + fileToUpload.getName()); + + if (gcsCacheEnabled && artifactsCacheablePerCDAPVersion.contains(fileToUpload.getName())) { + // upload artifacts cacheable per cdap version to /cdap-job/cached-artifacts// + uploadFutures.add( + provisionerContext.execute( + () -> uploadCacheableFile(bucket, targetFilePathWithVersion, fileToUpload)) + .toCompletableFuture()); + } else { + if (cacheable) { + // upload cacheable artifacts to /cdap-job/cached-artifacts/ + uploadFutures.add( + provisionerContext.execute( + () -> uploadCacheableFile(bucket, targetFilePath, fileToUpload)) + .toCompletableFuture()); + } else { + // non-cacheable artifacts to /cdap-job// + uploadFutures.add(provisionerContext.execute( + () -> uploadFile(bucket, targetFilePath, fileToUpload, false)) + .toCompletableFuture()); + } + } + } + + List uploadedFiles = new ArrayList<>(); + for (Future uploadFuture : uploadFutures) { + uploadedFiles.add(uploadFuture.get()); + } + + // step 3: build the hadoop job request to be submitted to dataproc + Batch batch = getSubmitBatchRequest(runtimeJobInfo, uploadedFiles); + // step 4: submit hadoop job to dataproc + try { + LocationName locationName = LocationName.newBuilder() + .setProject(projectId).setLocation(region).build(); + OperationFuture submitJobAsOperationAsyncRequest = + getBatchControllerClient().createBatchAsync(locationName, batch, getBatchId(runInfo)); + LOG.warn("SANKET : afterjobsumbit"); +// LOG.warn("Successfully submitted BATCH job {} to Serverless", +// submitJobAsOperationAsyncRequest.get().getName()); + } catch (AlreadyExistsException ex) { + //the job id already exists, ignore the job. + LOG.warn("The dataproc job {} already exists. Ignoring resubmission of the job.", + getBatchId(runInfo)); + } + DataprocUtils.emitMetric(provisionerContext, submitJobMetric.build()); + } catch (Exception e) { + String errorReason = String.format("Error while launching job %s on Serverless Dataproc.", getBatchId(runInfo)); + // delete all uploaded gcs files in case of exception + DataprocUtils.deleteGcsPath(getStorageClient(), bucket, runRootPath); + DataprocUtils.emitMetric(provisionerContext, submitJobMetric.setException(e).build()); + // ResourceExhaustedException indicates Dataproc agent running on master node + // isn't emitting heartbeat. This usually indicates master VM crashing due to OOM. + ErrorCategory errorCategory = new ErrorCategory(ErrorCategory.ErrorCategoryEnum.STARTING); + if (e instanceof ApiException) { + int statusCode = + ((ApiException) e).getStatusCode().getCode().getHttpStatusCode(); + ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode); + throw new DataprocRuntimeException.Builder() + .withCause(e) + .withErrorCategory(errorCategory) + .withErrorMessage(e.getMessage()) + .withErrorReason(DataprocUtils.getErrorReason(errorReason, e)) + .withErrorType(pair.getErrorType()) + .withErrorCodeType(ErrorCodeType.HTTP) + .withErrorCode(String.valueOf(statusCode)) + .withDependency(true) + .build(); + } + throw new DataprocRuntimeException.Builder() + .withErrorMessage(e.getMessage()) + .withErrorReason(errorReason) + .withErrorCategory(errorCategory) + .withCause(e) + .build(); + } finally { + if (disableLocalCaching) { + DataprocUtils.deleteDirectoryContents(tempDir); + } + } + + } + + @Override + public Optional getDetail(ProgramRunInfo programRunInfo) throws Exception { + String jobId = getBatchId(programRunInfo); + try { + LOG.warn(" SANKET : in : jobId : {} : projectId : {} , region : {}", jobId, projectId, region); + + //TODO :: Just after "batchControllerClient.createBatchAsync" the below line may give NOT_FOUND . Need to figure + // how to handle this + + Batch batch = getBatchControllerClient().getBatch(getFullBatchName(projectId, region, jobId)); + return Optional.of(new DataprocRuntimeJobDetail(getProgramRunInfo(batch), + getRuntimeJobStatus(batch), + getJobStatusDetails(batch), jobId)); + } catch (ApiException e) { + /* + LOG.warn(" SANKET : e.getStatusCode().getCode() : " + e.getStatusCode().getCode()); + if (e.getStatusCode().getCode() != StatusCode.Code.NOT_FOUND + || e.getStatusCode().getCode() != StatusCode.Code.CANCELLED) { + throw new Exception(String.format("Error while getting details for job %s on cluster %s.", + jobId, clusterName), e); + } + // Status is not found if job is finished or manually deleted by the user + LOG.debug("Dataproc job {} does not exist in project {}, region {}.", jobId, projectId, + region);*/ + } + return Optional.empty(); + } + + /** + * Returns job state details, such as an error description if the state is ERROR. For other job + * states, returns null. + */ + @Nullable + private String getJobStatusDetails(Batch job) { + return job.getState().name(); //TODO : Check for better details + } + + + private ProgramRunInfo getProgramRunInfo(Batch batch) { + Map jobPropertiesPrefixed = batch.getRuntimeConfig().getPropertiesMap(); + String prefix = "spark:"; + Map jobProperties = jobPropertiesPrefixed.entrySet().stream() + .collect(Collectors.toMap( + entry -> { + String key = entry.getKey(); + return key.startsWith(prefix) ? key.substring(prefix.length()) : key; + }, + Map.Entry::getValue + )); + + //Returns the Map with key prefixed with `spark:` + ProgramRunInfo.Builder builder = new ProgramRunInfo.Builder() + .setNamespace(jobProperties.get(CDAP_RUNTIME_NAMESPACE)) + .setApplication(jobProperties.get(CDAP_RUNTIME_APPLICATION)) + .setVersion(jobProperties.get(CDAP_RUNTIME_VERSION)) + .setProgramType(jobProperties.get(CDAP_RUNTIME_PROGRAM_TYPE)) + .setProgram(jobProperties.get(CDAP_RUNTIME_PROGRAM)) + .setRun(jobProperties.get(CDAP_RUNTIME_RUNID)); + return builder.build(); + } + + private String getFullBatchName(String project, String region, String jobId){ + return String.format("projects/%s/locations/%s/batches/%s", project, region, jobId); + } + + + /** + * Returns {@link RuntimeJobStatus}. + */ + private RuntimeJobStatus getRuntimeJobStatus(Batch batch) { + Batch.State state = batch.getState(); + RuntimeJobStatus runtimeJobStatus; + switch (state) { + case STATE_UNSPECIFIED: + case PENDING: + runtimeJobStatus = RuntimeJobStatus.STARTING; + break; + case RUNNING: + runtimeJobStatus = RuntimeJobStatus.RUNNING; + break; + case SUCCEEDED: + runtimeJobStatus = RuntimeJobStatus.COMPLETED; + break; + case CANCELLING: + runtimeJobStatus = RuntimeJobStatus.STOPPING; + break; + case CANCELLED: + runtimeJobStatus = RuntimeJobStatus.STOPPED; + break; + case FAILED: + runtimeJobStatus = RuntimeJobStatus.FAILED; + break; + default: + // this needed for ATTEMPT_FAILURE state which is a state for restartable job. Currently we do not launch + // restartable jobs + throw new IllegalStateException( + String.format("Unsupported job state %s of the dataproc job %s ", batch.getState(), + batch.getName())); + } + return runtimeJobStatus; + } + + + + /** + * Creates and returns dataproc job submit request. + */ + private Batch getSubmitBatchRequest(RuntimeJobInfo runtimeJobInfo, + List localFiles) { + String applicationJarLocalizedName = runtimeJobInfo.getArguments().get(Constants.Files.APPLICATION_JAR); + + LaunchMode launchMode = LaunchMode.CLIENT; + + SparkBatch.Builder sparkBatchBuilder = + SparkBatch.newBuilder() + .setMainClass(DataprocJobMain.class.getName()) + .addAllArgs(getArguments(runtimeJobInfo, localFiles, provisionerContext.getSparkCompat().getCompat(), + applicationJarLocalizedName, launchMode)); + + for (LocalFile localFile : localFiles) { + // add jar file + URI uri = localFile.getURI(); + if (localFile.getName().endsWith("jar")) { + sparkBatchBuilder.addJarFileUris(uri.toString()); + } else { + sparkBatchBuilder.addFileUris(uri.toString()); + } + } +// +// // MANUAL ADDING JARS FOR TEST +// String[] fileUris = { +// "gs://serverlessdataproc/sanket_lib/ch.qos.logback.logback-classic-1.2.11.jar", +// "gs://serverlessdataproc/sanket_lib/ch.qos.logback.logback-core-1.2.11.jar", +// "gs://serverlessdataproc/sanket_lib/com.101tec.zkclient-0.10.jar", +// "gs://serverlessdataproc/sanket_lib/com.google.code.findbugs.jsr305-2.0.1.jar", +// "gs://serverlessdataproc/sanket_lib/com.google.code.gson.gson-2.3.1.jar", +// "gs://serverlessdataproc/sanket_lib/com.google.errorprone.error_prone_annotations-2.18.0.jar", +// "gs://serverlessdataproc/sanket_lib/com.google.guava.guava-20.0.jar", +// "gs://serverlessdataproc/sanket_lib/com.yammer.metrics.metrics-core-2.2.0.jar", +// "gs://serverlessdataproc/sanket_lib/io.cdap.twill.twill-api-1.3.1.jar", +// "gs://serverlessdataproc/sanket_lib/io.cdap.twill.twill-common-1.3.1.jar", +// "gs://serverlessdataproc/sanket_lib/io.cdap.twill.twill-core-1.3.1.jar", +// "gs://serverlessdataproc/sanket_lib/io.cdap.twill.twill-discovery-api-1.3.1.jar", +// "gs://serverlessdataproc/sanket_lib/io.cdap.twill.twill-discovery-core-1.3.1.jar", +// "gs://serverlessdataproc/sanket_lib/io.cdap.twill.twill-yarn-1.3.1.jar", +// "gs://serverlessdataproc/sanket_lib/io.cdap.twill.twill-zookeeper-1.3.1.jar", +// "gs://serverlessdataproc/sanket_lib/io.netty.netty-buffer-4.1.75.Final.jar", +// "gs://serverlessdataproc/sanket_lib/io.netty.netty-codec-4.1.75.Final.jar", +// "gs://serverlessdataproc/sanket_lib/io.netty.netty-codec-http-4.1.75.Final.jar", +// "gs://serverlessdataproc/sanket_lib/io.netty.netty-common-4.1.75.Final.jar", +// "gs://serverlessdataproc/sanket_lib/io.netty.netty-transport-4.1.75.Final.jar", +// "gs://serverlessdataproc/sanket_lib/lib-ch.qos.logback.logback-classic-1.2.11.jar", +// "gs://serverlessdataproc/sanket_lib/net.sf.jopt-simple.jopt-simple-3.2.jar", +// "gs://serverlessdataproc/sanket_lib/org.apache.kafka.kafka-clients-0.10.2.2.jar", +// "gs://serverlessdataproc/sanket_lib/org.apache.kafka.kafka_2.12-0.10.2.2.jar", +// "gs://serverlessdataproc/sanket_lib/org.scala-lang.modules.scala-parser-combinators_2.12-1.0.4.jar", +// "gs://serverlessdataproc/sanket_lib/org.scala-lang.scala-library-2.12.15.jar", +// "gs://serverlessdataproc/sanket_lib/org.slf4j.slf4j-api-1.7.15.jar" +// }; +// +// for(String uri : fileUris) { +// LOG.info(" SANKET ADDING FILE : {}", uri); +// sparkBatchBuilder.addJarFileUris(uri); +// } + + // TODO : HARDCODED PROPS : Need to define flow for this + + ExecutionConfig executionConfig = ExecutionConfig.newBuilder() + .setNetworkUri(provisionerContext.getProperties().getOrDefault("network", "default")) + .setSubnetworkUri("pga-subnet") + .build(); + + //TODO : To make this an advanced option via UI +// SparkHistoryServerConfig sparkHistoryServerConfig = SparkHistoryServerConfig.newBuilder() +// .setDataprocCluster("projects/cdf-test-317207/regions/us-west1/clusters/sanket-spark-history").build(); +// +// PeripheralsConfig peripheralsConfig = PeripheralsConfig.newBuilder() +// .setSparkHistoryServerConfig(sparkHistoryServerConfig) +// .build(); +// + + EnvironmentConfig environmentConfig = EnvironmentConfig.newBuilder() + .setExecutionConfig(executionConfig) +// .setPeripheralsConfig(peripheralsConfig) + .build(); + + + RuntimeConfig runtimeConfig = RuntimeConfig.newBuilder() + .setVersion(imageVersion) + .putAllProperties(getProperties(runtimeJobInfo)).build(); + + ProgramRunInfo runInfo = runtimeJobInfo.getProgramRunInfo(); + Batch.Builder dataprocBatchBuilder = Batch.newBuilder() + // use program run uuid as hadoop job id on dataproc + // place the job on provisioned cluster +// .setPlacement(JobPlacement.newBuilder().setClusterName(clusterName).build()) //TODO figure out the use + // add same labels as provisioned cluster + .putAllLabels(labels) + // Job label values must match the pattern '[\p{Ll}\p{Lo}\p{N}_-]{0,63}' + // Since program name and type are class names they should follow that pattern once we remove all + // capitals + .putLabels(LABEL_CDAP_PROGRAM, runInfo.getProgram().toLowerCase()) + .putLabels(LABEL_CDAP_PROGRAM_TYPE, runInfo.getProgramType().toLowerCase()) + .setRuntimeConfig(runtimeConfig) + .setEnvironmentConfig(environmentConfig) + .setSparkBatch(sparkBatchBuilder.build()); + + return dataprocBatchBuilder.build(); + + } + + @Override + public void kill(RuntimeJobDetail jobDetail) throws Exception { + LOG.error("SANKET : in kill "); + if (jobDetail == null) { + return; + } + LOG.error("SANKET : in kill 2"); + RuntimeJobStatus status = jobDetail.getStatus(); + if (status.isTerminated() || status == RuntimeJobStatus.STOPPING) { + return; + } + + // stop dataproc job + stopJob(getBatchId(jobDetail.getRunInfo())); + } + + /** + * Stops the dataproc job. Returns job object if it was stopped. + */ + private void stopJob(String jobId) throws Exception { + LOG.error("SANKET : in stopJob 1"); + Batch batch = getBatchControllerClient().getBatch(getFullBatchName(projectId, region, jobId)); + + try { + OperationsClient operationsClient = getBatchControllerClient().getOperationsClient(); + String operationName = batch.getOperation(); + LOG.info("Try to stop batch {} with operation name {}", batch.getName(), operationName); + operationsClient.cancelOperation(operationName); + } catch (Exception e) { + LOG.error("Encountered exception while stopping batch {}", batch.getName()); + } + } + + /** + * Returns a {@link JobControllerClient} to interact with Dataproc Job API. + */ + private BatchControllerClient getBatchControllerClient() throws IOException { + BatchControllerClient client = batchControllerClient; + if (client != null) { + return client; + } + + synchronized (this) { + client = batchControllerClient; + if (client != null) { + return client; + } + + // instantiate a dataproc job controller client + CredentialsProvider credentialsProvider = FixedCredentialsProvider.create(credentials); + this.batchControllerClient = client = BatchControllerClient.create( + BatchControllerSettings.newBuilder().setCredentialsProvider(credentialsProvider) + .setEndpoint(String.format("%s-%s", region, endpoint)).build()); + } + return client; + } + + // Job name with Joiner_Test fails. + public static String getBatchId(ProgramRunInfo runInfo) { + List parts = ImmutableList.of( + runInfo.getNamespace().substring(0,Math.min(runInfo.getNamespace().length(),5)).toLowerCase(), + runInfo.getApplication().substring(0,Math.min(runInfo.getApplication().length(),15)).toLowerCase(), + runInfo.getProgram().toLowerCase()); + String joined = Joiner.on("-").join(parts); + joined = joined.substring(0, Math.min(joined.length(), 26)); + joined = joined + "-" + runInfo.getRun(); + if (!DATAPROC_BATCH_ID_PATTERN.matcher(joined).matches()) { + throw new IllegalArgumentException( + String.format("Job ID %s is not a valid dataproc job id. ", joined)); + } + + //A batch ID must start and end in a letter or a number, be between 4 and 63 characters long, and contain only + //lowercase letters, numbers, and hyphens + + + return joined; + } + +} \ No newline at end of file diff --git a/cdap-runtime-ext-dataproc/src/main/resources/META-INF/services/io.cdap.cdap.runtime.spi.provisioner.Provisioner b/cdap-runtime-ext-dataproc/src/main/resources/META-INF/services/io.cdap.cdap.runtime.spi.provisioner.Provisioner index 581186e1f676..aa0bdf61eee5 100644 --- a/cdap-runtime-ext-dataproc/src/main/resources/META-INF/services/io.cdap.cdap.runtime.spi.provisioner.Provisioner +++ b/cdap-runtime-ext-dataproc/src/main/resources/META-INF/services/io.cdap.cdap.runtime.spi.provisioner.Provisioner @@ -16,3 +16,4 @@ io.cdap.cdap.runtime.spi.provisioner.dataproc.DataprocProvisioner io.cdap.cdap.runtime.spi.provisioner.dataproc.ExistingDataprocProvisioner +io.cdap.cdap.runtime.spi.provisioner.dataproc.ServerlessDataprocProvisioner diff --git a/cdap-runtime-ext-dataproc/src/main/resources/gcp-serverless-dataproc.json b/cdap-runtime-ext-dataproc/src/main/resources/gcp-serverless-dataproc.json new file mode 100644 index 000000000000..0656eebd0f66 --- /dev/null +++ b/cdap-runtime-ext-dataproc/src/main/resources/gcp-serverless-dataproc.json @@ -0,0 +1,181 @@ +{ + "icon": { + "type": "inline", + "arguments": { + "data": "" + } + }, + "configuration-groups": [ + { + "label": "GCP Account Information", + "properties": [ + { + "widget-type": "textbox", + "label": "Project ID", + "name": "projectId", + "description": "Google Cloud Project ID, which uniquely identifies your project. You can find it on the Dashboard in the Cloud Platform Console. If the system is running on Google Cloud Platform, this can be left blank or set to 'auto-detect' and the system's project id will be used.", + "widget-attributes": { + "placeholder": "Specify your GCP Project ID" + } + }, + { + "widget-type": "securekey-textarea", + "label": "Creator Service Account Key", + "name": "accountKey", + "description": "Service account key used to create Dataproc clusters. Paste the contents of the service account key JSON file that you can download from the service accounts section under IAM and admin on the Cloud Console. If the system is running on Google Cloud Platform, this can be left blank or set to 'auto-detect' and the system's credentials will be used.", + "widget-attributes": { + "placeholder": "Specify the GCP service account" + } + } + ] + }, + { + "label": "General Settings", + "properties": [ + { + "widget-type": "select", + "label": "Region", + "name": "region", + "description": "Regions are collections of zones. Zones have high-bandwidth, low-latency network connections to other zones in the same region. For more information, refer to https://cloud.google.com/compute/docs/regions-zones/. If the system is running on Google Cloud Platform, this can be set to 'auto-detect' and the system's region will be used.", + "widget-attributes": { + "values": [ + "auto-detect", + "asia-east1", + "asia-east2", + "asia-northeast1", + "asia-northeast2", + "asia-northeast3", + "asia-south1", + "asia-southeast1", + "asia-southeast2", + "australia-southeast1", + "europe-north1", + "europe-west1", + "europe-west2", + "europe-west3", + "europe-west4", + "europe-west6", + "northamerica-northeast1", + "southamerica-east1", + "us-central1", + "us-east1", + "us-east4", + "us-west1", + "us-west2", + "us-west3", + "us-west4" + ], + "default": "us-east1", + "size": "medium" + } + }, + { + "widget-type": "textbox", + "label": "Zone", + "name": "zone", + "description": "A zone is an isolated location within a region. The fully-qualified name for a zone is made up of -. For example, the fully-qualified name for zone a in region us-central1 is us-central1-a. For more information, refer to https://cloud.google.com/compute/docs/regions-zones/. If the system is running on Google Cloud Platform, this can be set to 'auto-detect' and the system's zone will be used.", + "widget-attributes": { + "size": "medium" + } + }, + { + "widget-type": "textbox", + "label": "Network", + "name": "network", + "description": "Select a VPC network in the specified project to use when creating clusters with this profile. If this is left blank or set to 'auto-detect', a network from the project will be chosen.", + "widget-attributes": { + "default": "default", + "size": "medium" + } + }, + { + "widget-type": "textbox", + "label": "Network Host Project ID", + "name": "networkHostProjectId", + "description": "Google Cloud Project ID, which uniquely identifies the project where the network resides. This can be left blank if the network resides in the same project as specified in the Project ID. In case of Shared VPC this must be set to the host project ID where the network resides.", + "widget-attributes": { + "size": "medium" + } + }, + { + "widget-type": "textbox", + "label": "Subnet", + "name": "subnet", + "description": "Subnet to use when creating clusters. The subnet must be within the given network, and it must be for the same region that the zone is in. If this is left blank, a subnet will automatically be chosen based on the network and zone.", + "widget-attributes": { + "size": "medium" + } + }, + { + "widget-type": "textbox", + "label": "Runner Service Account", + "name": "serviceAccount", + "description": "Name of the service account of the Dataproc virtual machines (VM) that are used to run programs. If none is given, the default Compute Engine service account will be used.", + "widget-attributes": { + "size": "medium" + } + } + ] + }, + { + "label": "Advanced", + "properties": [ + { + "widget-type": "radio-group", + "label": "Launch Mode", + "name": "launchMode", + "description": "Whether to launch the program directly in the Dataproc job (client mode) or in a separate container (cluster mode). Client mode will result in faster start up times and fewer cluster resources used, but may run into errors if the launcher requires more memory.", + "widget-attributes": { + "layout": "inline", + "size": "medium", + "default": "cluster", + "options": [ + { + "id": "client", + "label": "Client" + }, + { + "id": "cluster", + "label": "Cluster" + } + ] + } + }, + { + "widget-type": "textbox", + "label": "GCS Bucket", + "name": "gcsBucket", + "description": "Google Cloud Storage bucket used to stage job dependencies and config files for running pipelines in Google Cloud Dataproc", + "widget-attributes": { + "size": "medium" + } + }, + { + "widget-type": "textbox", + "label": "Image Version", + "name": "imageVersion", + "description": "The Dataproc serverless image version.", + "widget-attributes": { + "size": "medium" + } + } + ] + } + ], + "filters": [ + { + "name": "Show disableGCSCaching", + "condition": { + "property": "disableGCSCaching", + "operator": "equal", + "value": "true" + }, + "show": [ + { + "name": "disableGCSCaching", + "type": "property" + } + ] + } + ] +} \ No newline at end of file diff --git a/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/SparkProgramRunner.java b/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/SparkProgramRunner.java index 9647532e75f6..4747281d4681 100644 --- a/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/SparkProgramRunner.java +++ b/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/SparkProgramRunner.java @@ -39,6 +39,7 @@ import io.cdap.cdap.app.runtime.spark.submit.DistributedSparkSubmitter; import io.cdap.cdap.app.runtime.spark.submit.LocalSparkSubmitter; import io.cdap.cdap.app.runtime.spark.submit.MasterEnvironmentSparkSubmitter; +import io.cdap.cdap.app.runtime.spark.submit.ServerlessDataprocSubmitter; import io.cdap.cdap.app.runtime.spark.submit.SparkSubmitter; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; @@ -220,7 +221,12 @@ public ProgramController run(Program program, ProgramOptions options) { SparkSubmitter submitter; // If MasterEnvironment is not available, use non-master env spark submitters MasterEnvironment masterEnv = MasterEnvironments.getMasterEnvironment(); - if (masterEnv != null && cConf.getBoolean(Constants.Environment.PROGRAM_SUBMISSION_MASTER_ENV_ENABLED, true)) { + //TODO : figure out that this is serverless + if (true) { + String schedulerQueue = options.getArguments().getOption(AppFabric.APP_SCHEDULER_QUEUE); + submitter = new ServerlessDataprocSubmitter(hConf, locationFactory, host, runtimeContext, + schedulerQueue, LaunchMode.CLIENT); + } else if (masterEnv != null && cConf.getBoolean(Constants.Environment.PROGRAM_SUBMISSION_MASTER_ENV_ENABLED, true)) { submitter = new MasterEnvironmentSparkSubmitter(cConf, locationFactory, host, runtimeContext, masterEnv, options); } else { diff --git a/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/SparkRuntimeService.java b/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/SparkRuntimeService.java index cdbb2e9fc613..e8e974ff802a 100644 --- a/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/SparkRuntimeService.java +++ b/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/SparkRuntimeService.java @@ -248,7 +248,10 @@ protected void startUp() throws Exception { SparkRuntimeEnv.setProperty(key, sparkDefaultConf.getProperty(key)); } + LOG.warn("SANKET : jobFile : " + jobFile.getPath()); + if (masterEnv != null) { + LOG.warn("SANKET : masterEnv != null : "); // Add cconf, hconf, metrics.properties, logback for master environment localizeResources.add(new LocalizeResource(saveCConf(cConfCopy, tempDir))); Configuration hConf = contextConfig.set(runtimeContext, pluginArchive).getConfiguration(); @@ -278,6 +281,7 @@ protected void startUp() throws Exception { // Localize all the files from user resources List files = copyUserResources(context.getLocalizeResources(), tempDir); for (File file : files) { + LOG.warn("SANKET : local files for loop : " + file.getAbsolutePath()); localizeResources.add(new LocalizeResource(file)); } @@ -289,6 +293,7 @@ protected void startUp() throws Exception { } } else if (isLocal) { + LOG.warn("SANKET : islocal"); // In local mode, always copy (or link if local) user requested resources copyUserResources(context.getLocalizeResources(), tempDir); @@ -299,6 +304,7 @@ protected void startUp() throws Exception { extractPySparkLibrary(tempDir, extraPySparkFiles); } else { + LOG.warn("SANKET : Master is NUL and not LOCAL : "); // Localize all user requested files in distributed mode distributedUserResources(context.getLocalizeResources(), localizeResources); @@ -342,6 +348,7 @@ protected void startUp() throws Exception { // Localize the spark.jar archive, which contains all CDAP and dependency jars File sparkJar = new File(tempDir, CDAP_SPARK_JAR); + LOG.warn("SANKET : sparkJar : " + sparkJar.getPath()); classpath = joiner.join(Iterables.transform(buildDependencyJar(sparkJar), name -> Paths.get("$PWD", CDAP_SPARK_JAR, name).toString())); localizeResources.add(new LocalizeResource(sparkJar, true)); @@ -356,6 +363,7 @@ protected void startUp() throws Exception { // Localize extra jars and append to the end of the classpath List extraJars = new ArrayList<>(); for (URI jarURI : CConfigurationUtil.getExtraJars(cConfCopy)) { + LOG.warn("SANKET : extra jarURI : " + jarURI.getPath()); extraJars.add(Paths.get("$PWD", LocalizationUtils.getLocalizedName(jarURI)).toString()); localizeResources.add(new LocalizeResource(jarURI, false)); } @@ -613,6 +621,8 @@ private Map createSubmitConfigs(File localDir, @Nullable String boolean localMode, Iterable pyFiles) { + LOG.warn("SANKET : createSubmitConfigs : classpath : " + classpath); + // Setup configs from the default spark conf Map configs = new HashMap<>(Maps.fromProperties(SparkPackageUtils.getSparkDefaultConf())); @@ -740,7 +750,7 @@ private Iterable buildDependencyJar(File targetFile) throws IOException, Set classpath = new TreeSet<>(); try (JarOutputStream jarOut = new JarOutputStream(new BufferedOutputStream(new FileOutputStream(targetFile)))) { jarOut.setLevel(Deflater.NO_COMPRESSION); - + LOG.warn("SANKET : targetFile : " + targetFile.getPath()); // Zip all the jar files under the same directory that contains the jar for this class and twill class. // Those are the directory created by TWILL that contains all dependency jars for this container for (String className : Arrays.asList(getClass().getName(), TwillRunnable.class.getName())) { @@ -750,6 +760,7 @@ private Iterable buildDependencyJar(File targetFile) throws IOException, File libDir = new File(ClassLoaders.getClassPathURL(className, classURL).toURI()).getParentFile(); for (File file : DirUtils.listFiles(libDir, "jar")) { + LOG.warn("SANKET : buildDependencyJar : " + file.getPath()); if (classpath.add(file.getName())) { jarOut.putNextEntry(new JarEntry(file.getName())); Files.copy(file, jarOut); @@ -946,7 +957,9 @@ private void distributedUserResources(Map resources, List result) throws URISyntaxException { for (Map.Entry entry : resources.entrySet()) { URI uri = entry.getValue().getURI(); + LOG.warn("SANKET : distributedUserResources : " + uri.getPath()); URI actualURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), entry.getKey()); + LOG.warn("SANKET : distributedUserResources : actualURI : " + uri.getPath()); result.add(new LocalizeResource(actualURI, entry.getValue().isArchive())); } } diff --git a/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/distributed/SparkContainerLauncher.java b/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/distributed/SparkContainerLauncher.java index 5e535872552a..718372631be6 100644 --- a/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/distributed/SparkContainerLauncher.java +++ b/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/distributed/SparkContainerLauncher.java @@ -184,7 +184,8 @@ public static void launch(String mainClassName, String[] args, boolean removeMai // that it passes executor environment via command line properties, which get resolved by yarn launcher, // which causes executor logs attempt to write to driver log directory if (System.getProperty("spark.executorEnv.CDAP_LOG_DIR") != null) { - System.setProperty("spark.executorEnv.CDAP_LOG_DIR", ""); +// System.setProperty("spark.executorEnv.CDAP_LOG_DIR", ""); + System.setProperty("spark.executorEnv.CDAP_LOG_DIR", "/tmp"); } // Optionally starts Py4j Gateway server in the executor container diff --git a/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/submit/AbstractSparkSubmitter.java b/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/submit/AbstractSparkSubmitter.java index ece257c5bb43..6347d4251eae 100644 --- a/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/submit/AbstractSparkSubmitter.java +++ b/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/submit/AbstractSparkSubmitter.java @@ -23,6 +23,7 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.io.Files; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.Uninterruptibles; import io.cdap.cdap.api.spark.SparkSpecification; @@ -35,7 +36,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.net.URI; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -184,6 +187,10 @@ protected boolean waitForFinish() throws Exception { return true; } + protected Function getLocalizeResourceToURIFunc() { + return RESOURCE_TO_PATH; + } + /** * Submits the Spark job using {@link SparkSubmit}. * @@ -219,6 +226,15 @@ private void submit(SparkRuntimeContext runtimeContext, String[] args) { private List createSubmitArguments(SparkRuntimeContext runtimeContext, Map configs, List resources, URI jobFile) throws Exception { SparkSpecification spec = runtimeContext.getSparkSpecification(); + LOG.warn("SANKET : createSubmitArguments : ALL LOCAL RESOURCE "); + for (LocalizeResource lr : resources) { + LOG.warn("SANKET : createSubmitArguments : LocalizeResource : " + lr.getURI().getPath()); + } + + LOG.warn("SANKET : createSubmitArguments : ALL CONFIGS "); + configs.entrySet().forEach(entry -> { + LOG.warn("SANKET : Key: " + entry.getKey() + ", Value: " + entry.getValue()); + }); ImmutableList.Builder builder = ImmutableList.builder(); Iterable archivesIterable = getArchives(resources); @@ -231,16 +247,74 @@ private List createSubmitArguments(SparkRuntimeContext runtimeContext, M BiConsumer confAdder = (k, v) -> builder.add("--conf").add(k + "=" + v); configs.forEach(confAdder); - String archives = Joiner.on(',').join(Iterables.transform(archivesIterable, RESOURCE_TO_PATH)); - String files = Joiner.on(',').join(Iterables.transform(filesIterable, RESOURCE_TO_PATH)); + + Map mapArgs = runtimeContext.getProgramOptions().getUserArguments().asMap(); + + + mapArgs.entrySet().forEach(entry -> { + LOG.warn("SANKET USER ARG : Key: " + entry.getKey() + ", Value: " + entry.getValue()); + }); + + String artifactTry = null; + LOG.warn("SANKET : createSubmitArguments : ALL archives "); + for (LocalizeResource lr : archivesIterable){ + LOG.warn("SANKET : archivesIterable : " + lr.getURI()); + if (lr.getURI().getPath().contains("artifacts_archive")){ + LOG.warn("SANKET : archivesIterable : COPYING : " + lr.getURI()); + File tmpDir = Files.createTempDir(); + File artifacts_archive_jar = tmpDir.toPath().resolve("artifacts_archive.jar").toFile(); + File file = new File(lr.getURI()); + Files.copy(file, artifacts_archive_jar); + LOG.warn("SANKET : archivesIterable : COPIED to : " + artifacts_archive_jar.getAbsolutePath()); + artifactTry = artifacts_archive_jar.getAbsolutePath(); + + } + } + + String archives = Joiner.on(',').join(Iterables.transform(archivesIterable, + getLocalizeResourceToURIFunc())); + + if (artifactTry != null){ + archives = archives + ",file:" +artifactTry; + } + + String files = Joiner.on(',').join(Iterables.transform(filesIterable, getLocalizeResourceToURIFunc())); if (!Strings.isNullOrEmpty(archives)) { - builder.add("--archives").add(archives); +// builder.add("--archives").add(archives); } if (!Strings.isNullOrEmpty(files)) { builder.add("--files").add(files); } + + List jarss = new ArrayList<>(); + // Put all jars in jjars + for (LocalizeResource lr : archivesIterable) { + if (lr.getURI().getPath().contains("artifacts_archive")){ + jarss.add(lr.getURI().getPath()); + } + + if (lr.getURI().getPath().contains("cdap-spark")){ + jarss.add(lr.getURI().getPath()); + } + } + + for (LocalizeResource lr : filesIterable) { + if (lr.getURI().getPath().contains("program.jar")){ + jarss.add(lr.getURI().getPath()); + } + + if (lr.getURI().getPath().contains("cdap-spark-launcher.jar")){ + jarss.add(lr.getURI().getPath()); + } + } + String jars = String.join(",", jarss); + +// String jars = "gs://0000_sanket/serverless_jars/jars/USER-datagen-plugins-0.1.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/USER-trash-plugin-1.2.0.jar,gs://0000_sanket/serverless_jars/jars/aopalliance.aopalliance-1.0.jar,gs://0000_sanket/serverless_jars/jars/cdap-etl-api-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/cdap-etl-api-spark-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/cdap-etl-batch-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/cdap-etl-core-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/cdap-etl-proto-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/ch.qos.logback.logback-classic-1.2.11.jar,gs://0000_sanket/serverless_jars/jars/ch.qos.logback.logback-core-1.2.11.jar,gs://0000_sanket/serverless_jars/jars/ch.qos.reload4j.reload4j-1.2.22.jar,gs://0000_sanket/serverless_jars/jars/com.fasterxml.jackson.core.jackson-annotations-2.15.1.jar,gs://0000_sanket/serverless_jars/jars/com.google.code.findbugs.jsr305-2.0.1.jar,gs://0000_sanket/serverless_jars/jars/com.google.code.gson.gson-2.3.1.jar,gs://0000_sanket/serverless_jars/jars/com.google.errorprone.error_prone_annotations-2.36.0.jar,gs://0000_sanket/serverless_jars/jars/com.google.inject.extensions.guice-assistedinject-4.0.jar,gs://0000_sanket/serverless_jars/jars/com.google.inject.extensions.guice-multibindings-4.0.jar,gs://0000_sanket/serverless_jars/jars/com.google.inject.guice-4.0.jar,gs://0000_sanket/serverless_jars/jars/commons-beanutils.commons-beanutils-1.7.0.jar,gs://0000_sanket/serverless_jars/jars/commons-io.commons-io-2.12.0.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-api-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-api-common-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-api-spark3_2.12-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-app-fabric-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-common-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-data-fabric-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-error-api-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-features-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-formats-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-log-publisher-spi-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-master-spi-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-messaging-spi-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-metadata-spi-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-proto-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-runtime-spi-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-security-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-security-spi-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-spark-core3_2.12-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-spark-python-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-storage-spi-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-system-app-api-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-tms-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-watchdog-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.cdap.cdap-watchdog-api-6.12.0-SNAPSHOT.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.common.common-http-0.13.1.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.common.common-io-0.13.1.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.http.netty-http-1.7.0.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.twill.twill-api-1.4.0.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.twill.twill-common-1.4.0.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.twill.twill-core-1.4.0.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.twill.twill-discovery-api-1.4.0.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.twill.twill-discovery-core-1.4.0.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.twill.twill-yarn-1.4.0.jar,gs://0000_sanket/serverless_jars/jars/io.cdap.twill.twill-zookeeper-1.4.0.jar,gs://0000_sanket/serverless_jars/jars/io.dropwizard.metrics.metrics-core-3.1.2.jar,gs://0000_sanket/serverless_jars/jars/io.netty.netty-buffer-4.1.75.Final.jar,gs://0000_sanket/serverless_jars/jars/io.netty.netty-codec-4.1.75.Final.jar,gs://0000_sanket/serverless_jars/jars/io.netty.netty-codec-http-4.1.75.Final.jar,gs://0000_sanket/serverless_jars/jars/io.netty.netty-common-4.1.75.Final.jar,gs://0000_sanket/serverless_jars/jars/io.netty.netty-handler-4.1.75.Final.jar,gs://0000_sanket/serverless_jars/jars/io.netty.netty-transport-4.1.75.Final.jar,gs://0000_sanket/serverless_jars/jars/it.unimi.dsi.fastutil-6.5.6.jar,gs://0000_sanket/serverless_jars/jars/javax.inject.javax.inject-1.jar,gs://0000_sanket/serverless_jars/jars/javax.ws.rs.javax.ws.rs-api-2.0.jar,gs://0000_sanket/serverless_jars/jars/net.sf.jopt-simple.jopt-simple-3.2.jar,gs://0000_sanket/serverless_jars/jars/org.apache.avro.avro-1.11.4.jar,gs://0000_sanket/serverless_jars/jars/org.apache.commons.commons-compress-1.22.jar,gs://0000_sanket/serverless_jars/jars/org.apache.commons.commons-dbcp2-2.9.0.jar,gs://0000_sanket/serverless_jars/jars/org.apache.commons.commons-pool2-2.10.0.jar,gs://0000_sanket/serverless_jars/jars/org.apache.tephra.tephra-api-0.15.0-incubating.jar,gs://0000_sanket/serverless_jars/jars/org.apache.tephra.tephra-core-0.15.0-incubating.jar,gs://0000_sanket/serverless_jars/jars/org.apache.thrift.libthrift-0.9.3.jar,gs://0000_sanket/serverless_jars/jars/org.bouncycastle.bcpkix-jdk15on-1.70.jar,gs://0000_sanket/serverless_jars/jars/org.bouncycastle.bcprov-jdk15on-1.70.jar,gs://0000_sanket/serverless_jars/jars/org.bouncycastle.bcutil-jdk15on-1.70.jar,gs://0000_sanket/serverless_jars/jars/org.conscrypt.conscrypt-openjdk-uber-2.5.1.jar,gs://0000_sanket/serverless_jars/jars/org.fusesource.leveldbjni.leveldbjni-all-1.8.jar,gs://0000_sanket/serverless_jars/jars/org.iq80.leveldb.leveldb-0.12-uber.jar,gs://0000_sanket/serverless_jars/jars/org.ow2.asm.asm-7.1.jar,gs://0000_sanket/serverless_jars/jars/org.ow2.asm.asm-commons-7.1.jar,gs://0000_sanket/serverless_jars/jars/org.ow2.asm.asm-tree-7.1.jar,gs://0000_sanket/serverless_jars/jars/org.quartz-scheduler.quartz-2.2.0.jar,gs://0000_sanket/serverless_jars/jars/org.slf4j.jcl-over-slf4j-1.7.15.jar,gs://0000_sanket/serverless_jars/jars/org.slf4j.jul-to-slf4j-1.7.15.jar,gs://0000_sanket/serverless_jars/jars/org.slf4j.slf4j-api-1.7.15.jar,gs://0000_sanket/serverless_jars/jars/zookeeper-3.4.6.jar"; + + builder.add("--jars").add(jars); + URI newJobFile = getJobFile(); if (newJobFile != null) { jobFile = newJobFile; diff --git a/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/submit/ServerlessDataprocSubmitter.java b/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/submit/ServerlessDataprocSubmitter.java new file mode 100644 index 000000000000..16dc4e33aeae --- /dev/null +++ b/cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/submit/ServerlessDataprocSubmitter.java @@ -0,0 +1,78 @@ +package io.cdap.cdap.app.runtime.spark.submit; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import io.cdap.cdap.app.runtime.spark.SparkRuntimeContext; +import io.cdap.cdap.runtime.spi.runtimejob.LaunchMode; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.twill.filesystem.LocationFactory; +import org.jetbrains.annotations.Nullable; +import io.cdap.cdap.internal.app.runtime.distributed.LocalizeResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class ServerlessDataprocSubmitter extends DistributedSparkSubmitter { + + private static final Logger LOG = LoggerFactory.getLogger(ServerlessDataprocSubmitter.class); + + private static final Function RESOURCE_TO_PATH = input -> + input.getURI().toString().split("#")[0]; + private static final Pattern LOCAL_MASTER_PATTERN = Pattern.compile("local\\[([0-9]+|\\*)\\]"); + + + public ServerlessDataprocSubmitter(Configuration hConf, LocationFactory locationFactory, + String hostname, SparkRuntimeContext runtimeContext, + @Nullable String schedulerQueueName, LaunchMode launchMode) { + super(hConf, locationFactory, hostname, runtimeContext, schedulerQueueName, launchMode); + } + + @Override + protected void addMaster(Map configs, ImmutableList.Builder argBuilder) { + // Use at least two threads for Spark Streaming + String masterArg = "local[2]"; + + String master = configs.get("spark.master"); + if (master != null) { + Matcher matcher = LOCAL_MASTER_PATTERN.matcher(master); + if (matcher.matches()) { + masterArg = "local[" + matcher.group(1) + "]"; + } + } +// argBuilder.add("--master").add(masterArg); + } + + @Override + protected Map generateSubmitConf(Map appConf) { + Map config = new HashMap<>(); + config.put("spark.executorEnv.CDAP_LOG_DIR", ApplicationConstants.LOG_DIR_EXPANSION_VAR); + // TODO : Error : for distributed spark : $destFile exists and does not match contents + config.put("spark.files",""); + config.put("spark.jars",""); + config.put("spark.repl.local.jars",""); + // TODO : Error : DataprocMetricsListener is not a subclass of org.apache.spark.scheduler.SparkListenerInterface + config.put("spark.dataproc.listeners",""); + + // Make Spark UI runs on random port. By default, Spark UI runs on port 4040 and it will do a sequential search + // of the next port if 4040 is already occupied. However, during the process, it unnecessarily logs big stacktrace + // as WARN, which pollute the logs a lot if there are concurrent Spark job running (e.g. a fork in Workflow). + config.put("spark.ui.port", "0"); + + //// TODO : error : '-Xlog:gc*:file=/gc.log:time,level,tags:filecount=10,filesize=1M', see error log for details. + config.put("spark.driver.extraJavaOptions", "-XX:+UseG1GC -verbose:class -Xlog:gc*:file=/tmp/gc.log:time,level,tags:filecount=10,filesize=1M -XX:+ExitOnOutOfMemoryError -Dstreaming.checkpoint.rewrite.enabled=true"); + config.put("spark.executor.extraJavaOptions","-XX:+UseG1GC -verbose:class -Xlog:gc*:file=/tmp/gc.log:time,level,tags:filecount=10,filesize=1M -XX:+ExitOnOutOfMemoryError -Dstreaming.checkpoint.rewrite.enabled=true"); + + return config; + } + + @Override + protected Function getLocalizeResourceToURIFunc() { + return RESOURCE_TO_PATH; + } + +}