Skip to content

Commit eb4b7a2

Browse files
committed
Adding ServerlessDataprocSubmitter
1 parent e655d12 commit eb4b7a2

File tree

3 files changed

+90
-29
lines changed

3 files changed

+90
-29
lines changed

cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/SparkProgramRunner.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import io.cdap.cdap.app.runtime.spark.submit.DistributedSparkSubmitter;
4040
import io.cdap.cdap.app.runtime.spark.submit.LocalSparkSubmitter;
4141
import io.cdap.cdap.app.runtime.spark.submit.MasterEnvironmentSparkSubmitter;
42+
import io.cdap.cdap.app.runtime.spark.submit.ServerlessDataprocSubmitter;
4243
import io.cdap.cdap.app.runtime.spark.submit.SparkSubmitter;
4344
import io.cdap.cdap.common.conf.CConfiguration;
4445
import io.cdap.cdap.common.conf.Constants;
@@ -220,7 +221,12 @@ public ProgramController run(Program program, ProgramOptions options) {
220221
SparkSubmitter submitter;
221222
// If MasterEnvironment is not available, use non-master env spark submitters
222223
MasterEnvironment masterEnv = MasterEnvironments.getMasterEnvironment();
223-
if (masterEnv != null && cConf.getBoolean(Constants.Environment.PROGRAM_SUBMISSION_MASTER_ENV_ENABLED, true)) {
224+
//TODO : figure out that this is serverless
225+
if (true) {
226+
String schedulerQueue = options.getArguments().getOption(AppFabric.APP_SCHEDULER_QUEUE);
227+
submitter = new ServerlessDataprocSubmitter(hConf, locationFactory, host, runtimeContext,
228+
schedulerQueue, LaunchMode.CLIENT);
229+
} else if (masterEnv != null && cConf.getBoolean(Constants.Environment.PROGRAM_SUBMISSION_MASTER_ENV_ENABLED, true)) {
224230
submitter = new MasterEnvironmentSparkSubmitter(cConf, locationFactory, host, runtimeContext,
225231
masterEnv, options);
226232
} else {

cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/submit/AbstractSparkSubmitter.java

Lines changed: 9 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@
4545
import java.util.concurrent.Executors;
4646
import java.util.concurrent.TimeUnit;
4747
import java.util.function.BiConsumer;
48-
import java.util.regex.Matcher;
49-
import java.util.regex.Pattern;
5048
import javax.annotation.Nullable;
5149

5250
/**
@@ -57,8 +55,7 @@ public abstract class AbstractSparkSubmitter implements SparkSubmitter {
5755
private static final Logger LOG = LoggerFactory.getLogger(AbstractSparkSubmitter.class);
5856

5957
// Transforms LocalizeResource to URI string
60-
private static final Function<LocalizeResource, String> RESOURCE_TO_PATH = input ->
61-
input.getURI().toString().split("#")[0];
58+
private static final Function<LocalizeResource, String> RESOURCE_TO_PATH = input -> input.getURI().toString();
6259

6360
@Override
6461
public final <V> SparkJobFuture<V> submit(SparkRuntimeContext runtimeContext,
@@ -187,6 +184,10 @@ protected boolean waitForFinish() throws Exception {
187184
return true;
188185
}
189186

187+
protected Function<LocalizeResource, String> getLocalizeResourceToURIFunc() {
188+
return RESOURCE_TO_PATH;
189+
}
190+
190191
/**
191192
* Submits the Spark job using {@link SparkSubmit}.
192193
*
@@ -208,21 +209,7 @@ private void submit(SparkRuntimeContext runtimeContext, String[] args) {
208209
ClassLoaders.setContextClassLoader(oldClassLoader);
209210
}
210211
}
211-
private static final Pattern LOCAL_MASTER_PATTERN = Pattern.compile("local\\[([0-9]+|\\*)\\]");
212-
protected void addMasterPOC(Map<String, String> configs, ImmutableList.Builder<String> argBuilder) {
213-
// Use at least two threads for Spark Streaming
214-
String masterArg = "local[2]";
215-
216-
String master = configs.get("spark.master");
217-
if (master != null) {
218-
Matcher matcher = LOCAL_MASTER_PATTERN.matcher(master);
219-
if (matcher.matches()) {
220-
masterArg = "local[" + matcher.group(1) + "]";
221-
}
222-
}
223212

224-
argBuilder.add("--master").add(masterArg);
225-
}
226213
/**
227214
* Creates the list of arguments that will be used for calling {@link SparkSubmit#main(String[])}.
228215
*
@@ -241,22 +228,16 @@ private List<String> createSubmitArguments(SparkRuntimeContext runtimeContext, M
241228
Iterable<LocalizeResource> archivesIterable = getArchives(resources);
242229
Iterable<LocalizeResource> filesIterable = getFiles(resources);
243230

244-
// addMaster(configs, builder);
245-
addMasterPOC(configs, builder);
231+
addMaster(configs, builder);
246232
builder.add("--conf").add("spark.app.name=" + spec.getName());
247233

248234
configs.putAll(generateSubmitConf(configs));
249-
// TODO : Error : for distributed spark : $destFile exists and does not match contents
250-
configs.put("spark.files","");
251-
configs.put("spark.jars","");
252-
configs.put("spark.repl.local.jars","");
253-
// TODO : Error : DataprocMetricsListener is not a subclass of org.apache.spark.scheduler.SparkListenerInterface
254-
configs.put("spark.dataproc.listeners","");
255235
BiConsumer<String, String> confAdder = (k, v) -> builder.add("--conf").add(k + "=" + v);
256236
configs.forEach(confAdder);
257237

258-
String archives = Joiner.on(',').join(Iterables.transform(archivesIterable, RESOURCE_TO_PATH));
259-
String files = Joiner.on(',').join(Iterables.transform(filesIterable, RESOURCE_TO_PATH));
238+
String archives = Joiner.on(',').join(Iterables.transform(archivesIterable,
239+
getLocalizeResourceToURIFunc()));
240+
String files = Joiner.on(',').join(Iterables.transform(filesIterable, getLocalizeResourceToURIFunc()));
260241

261242
if (!Strings.isNullOrEmpty(archives)) {
262243
builder.add("--archives").add(archives);
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package io.cdap.cdap.app.runtime.spark.submit;
2+
3+
import com.google.common.base.Function;
4+
import com.google.common.collect.ImmutableList;
5+
import io.cdap.cdap.app.runtime.spark.SparkRuntimeContext;
6+
import io.cdap.cdap.runtime.spi.runtimejob.LaunchMode;
7+
import org.apache.hadoop.conf.Configuration;
8+
import org.apache.hadoop.yarn.api.ApplicationConstants;
9+
import org.apache.twill.filesystem.LocationFactory;
10+
import org.jetbrains.annotations.Nullable;
11+
import io.cdap.cdap.internal.app.runtime.distributed.LocalizeResource;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
import java.util.HashMap;
16+
import java.util.Map;
17+
import java.util.regex.Matcher;
18+
import java.util.regex.Pattern;
19+
20+
public class ServerlessDataprocSubmitter extends DistributedSparkSubmitter {
21+
22+
private static final Logger LOG = LoggerFactory.getLogger(ServerlessDataprocSubmitter.class);
23+
24+
private static final Function<LocalizeResource, String> RESOURCE_TO_PATH = input ->
25+
input.getURI().toString().split("#")[0];
26+
private static final Pattern LOCAL_MASTER_PATTERN = Pattern.compile("local\\[([0-9]+|\\*)\\]");
27+
28+
29+
public ServerlessDataprocSubmitter(Configuration hConf, LocationFactory locationFactory,
30+
String hostname, SparkRuntimeContext runtimeContext,
31+
@Nullable String schedulerQueueName, LaunchMode launchMode) {
32+
super(hConf, locationFactory, hostname, runtimeContext, schedulerQueueName, launchMode);
33+
}
34+
35+
@Override
36+
protected void addMaster(Map<String, String> configs, ImmutableList.Builder<String> argBuilder) {
37+
// Use at least two threads for Spark Streaming
38+
String masterArg = "local[2]";
39+
40+
String master = configs.get("spark.master");
41+
if (master != null) {
42+
Matcher matcher = LOCAL_MASTER_PATTERN.matcher(master);
43+
if (matcher.matches()) {
44+
masterArg = "local[" + matcher.group(1) + "]";
45+
}
46+
}
47+
argBuilder.add("--master").add(masterArg);
48+
}
49+
50+
@Override
51+
protected Map<String, String> generateSubmitConf(Map<String, String> appConf) {
52+
Map<String, String> config = new HashMap<>();
53+
config.put("spark.executorEnv.CDAP_LOG_DIR", ApplicationConstants.LOG_DIR_EXPANSION_VAR);
54+
// TODO : Error : for distributed spark : $destFile exists and does not match contents
55+
config.put("spark.files","");
56+
config.put("spark.jars","");
57+
config.put("spark.repl.local.jars","");
58+
// TODO : Error : DataprocMetricsListener is not a subclass of org.apache.spark.scheduler.SparkListenerInterface
59+
config.put("spark.dataproc.listeners","");
60+
61+
// Make Spark UI runs on random port. By default, Spark UI runs on port 4040 and it will do a sequential search
62+
// of the next port if 4040 is already occupied. However, during the process, it unnecessarily logs big stacktrace
63+
// as WARN, which pollute the logs a lot if there are concurrent Spark job running (e.g. a fork in Workflow).
64+
config.put("spark.ui.port", "0");
65+
66+
return config;
67+
}
68+
69+
@Override
70+
protected Function<LocalizeResource, String> getLocalizeResourceToURIFunc() {
71+
return RESOURCE_TO_PATH;
72+
}
73+
74+
}

0 commit comments

Comments
 (0)