Skip to content

Commit 993e665

Browse files
committed
Addings logs to debugging archive artifact
1 parent 5662c1e commit 993e665

File tree

2 files changed

+28
-1
lines changed

2 files changed

+28
-1
lines changed

cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/SparkRuntimeService.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,10 @@ protected void startUp() throws Exception {
248248
SparkRuntimeEnv.setProperty(key, sparkDefaultConf.getProperty(key));
249249
}
250250

251+
LOG.warn("SANKET : jobFile : " + jobFile.getPath());
252+
251253
if (masterEnv != null) {
254+
LOG.warn("SANKET : masterEnv != null : ");
252255
// Add cconf, hconf, metrics.properties, logback for master environment
253256
localizeResources.add(new LocalizeResource(saveCConf(cConfCopy, tempDir)));
254257
Configuration hConf = contextConfig.set(runtimeContext, pluginArchive).getConfiguration();
@@ -278,6 +281,7 @@ protected void startUp() throws Exception {
278281
// Localize all the files from user resources
279282
List<File> files = copyUserResources(context.getLocalizeResources(), tempDir);
280283
for (File file : files) {
284+
LOG.warn("SANKET : local files for loop : " + file.getAbsolutePath());
281285
localizeResources.add(new LocalizeResource(file));
282286
}
283287

@@ -289,6 +293,7 @@ protected void startUp() throws Exception {
289293
}
290294

291295
} else if (isLocal) {
296+
LOG.warn("SANKET : islocal");
292297
// In local mode, always copy (or link if local) user requested resources
293298
copyUserResources(context.getLocalizeResources(), tempDir);
294299

@@ -299,6 +304,7 @@ protected void startUp() throws Exception {
299304

300305
extractPySparkLibrary(tempDir, extraPySparkFiles);
301306
} else {
307+
LOG.warn("SANKET : Master is NUL and not LOCAL : ");
302308
// Localize all user requested files in distributed mode
303309
distributedUserResources(context.getLocalizeResources(), localizeResources);
304310

@@ -342,6 +348,7 @@ protected void startUp() throws Exception {
342348

343349
// Localize the spark.jar archive, which contains all CDAP and dependency jars
344350
File sparkJar = new File(tempDir, CDAP_SPARK_JAR);
351+
LOG.warn("SANKET : sparkJar : " + sparkJar.getPath());
345352
classpath = joiner.join(Iterables.transform(buildDependencyJar(sparkJar),
346353
name -> Paths.get("$PWD", CDAP_SPARK_JAR, name).toString()));
347354
localizeResources.add(new LocalizeResource(sparkJar, true));
@@ -356,6 +363,7 @@ protected void startUp() throws Exception {
356363
// Localize extra jars and append to the end of the classpath
357364
List<String> extraJars = new ArrayList<>();
358365
for (URI jarURI : CConfigurationUtil.getExtraJars(cConfCopy)) {
366+
LOG.warn("SANKET : extra jarURI : " + jarURI.getPath());
359367
extraJars.add(Paths.get("$PWD", LocalizationUtils.getLocalizedName(jarURI)).toString());
360368
localizeResources.add(new LocalizeResource(jarURI, false));
361369
}
@@ -613,6 +621,8 @@ private Map<String, String> createSubmitConfigs(File localDir, @Nullable String
613621
boolean localMode,
614622
Iterable<URI> pyFiles) {
615623

624+
LOG.warn("SANKET : createSubmitConfigs : classpath : " + classpath);
625+
616626
// Setup configs from the default spark conf
617627
Map<String, String> configs = new HashMap<>(Maps.fromProperties(SparkPackageUtils.getSparkDefaultConf()));
618628

@@ -740,7 +750,7 @@ private Iterable<String> buildDependencyJar(File targetFile) throws IOException,
740750
Set<String> classpath = new TreeSet<>();
741751
try (JarOutputStream jarOut = new JarOutputStream(new BufferedOutputStream(new FileOutputStream(targetFile)))) {
742752
jarOut.setLevel(Deflater.NO_COMPRESSION);
743-
753+
LOG.warn("SANKET : targetFile : " + targetFile.getPath());
744754
// Zip all the jar files under the same directory that contains the jar for this class and twill class.
745755
// Those are the directory created by TWILL that contains all dependency jars for this container
746756
for (String className : Arrays.asList(getClass().getName(), TwillRunnable.class.getName())) {
@@ -750,6 +760,7 @@ private Iterable<String> buildDependencyJar(File targetFile) throws IOException,
750760
File libDir = new File(ClassLoaders.getClassPathURL(className, classURL).toURI()).getParentFile();
751761

752762
for (File file : DirUtils.listFiles(libDir, "jar")) {
763+
LOG.warn("SANKET : buildDependencyJar : " + file.getPath());
753764
if (classpath.add(file.getName())) {
754765
jarOut.putNextEntry(new JarEntry(file.getName()));
755766
Files.copy(file, jarOut);
@@ -946,7 +957,9 @@ private void distributedUserResources(Map<String, LocalizeResource> resources,
946957
List<LocalizeResource> result) throws URISyntaxException {
947958
for (Map.Entry<String, LocalizeResource> entry : resources.entrySet()) {
948959
URI uri = entry.getValue().getURI();
960+
LOG.warn("SANKET : distributedUserResources : " + uri.getPath());
949961
URI actualURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), entry.getKey());
962+
LOG.warn("SANKET : distributedUserResources : actualURI : " + uri.getPath());
950963
result.add(new LocalizeResource(actualURI, entry.getValue().isArchive()));
951964
}
952965
}

cdap-spark-core-base/src/main/java/io/cdap/cdap/app/runtime/spark/submit/AbstractSparkSubmitter.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,15 @@ private void submit(SparkRuntimeContext runtimeContext, String[] args) {
223223
private List<String> createSubmitArguments(SparkRuntimeContext runtimeContext, Map<String, String> configs,
224224
List<LocalizeResource> resources, URI jobFile) throws Exception {
225225
SparkSpecification spec = runtimeContext.getSparkSpecification();
226+
LOG.warn("SANKET : createSubmitArguments : ALL LOCAL RESOURCE ");
227+
for (LocalizeResource lr : resources) {
228+
LOG.warn("SANKET : createSubmitArguments : LocalizeResource : " + lr.getURI().getPath());
229+
}
230+
231+
LOG.warn("SANKET : createSubmitArguments : ALL CONFIGS ");
232+
configs.entrySet().forEach(entry -> {
233+
LOG.warn("SANKET : Key: " + entry.getKey() + ", Value: " + entry.getValue());
234+
});
226235

227236
ImmutableList.Builder<String> builder = ImmutableList.builder();
228237
Iterable<LocalizeResource> archivesIterable = getArchives(resources);
@@ -235,6 +244,11 @@ private List<String> createSubmitArguments(SparkRuntimeContext runtimeContext, M
235244
BiConsumer<String, String> confAdder = (k, v) -> builder.add("--conf").add(k + "=" + v);
236245
configs.forEach(confAdder);
237246

247+
LOG.warn("SANKET : createSubmitArguments : ALL archives ");
248+
for (LocalizeResource lr : archivesIterable){
249+
LOG.warn("SANKET : archivesIterable : " + lr.getURI());
250+
}
251+
238252
String archives = Joiner.on(',').join(Iterables.transform(archivesIterable,
239253
getLocalizeResourceToURIFunc()));
240254
String files = Joiner.on(',').join(Iterables.transform(filesIterable, getLocalizeResourceToURIFunc()));

0 commit comments

Comments
 (0)