Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion tez-api/src/main/java/org/apache/tez/client/TezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1017,13 +1017,33 @@ public synchronized boolean waitTillReady(long timeout, TimeUnit unit)

private void waitNonSessionTillReady() throws IOException, TezException {
Preconditions.checkArgument(!isSession, "It is supposed to be only called in non-session mode");
// In non-session mode the AM is launched per-DAG and YARN briefly reports the app state as
// RUNNING before the AM has actually bound its RPC port (rpcPort == 0 or -1). During that
// window every call to getAMStatus() returns INITIALIZING, so without a deadline this loop
// would spin forever if the AM never successfully starts (e.g. container launch failure,
// resource contention on slow CI machines).
//
// TEZ_SESSION_CLIENT_TIMEOUT_SECS is re-used here as the upper bound because it already
// represents "how long the client is willing to wait for the AM to become contactable",
// which is exactly the semantic we need. Callers (e.g. @Test methods) must set their own
// hard timeout larger than this value so that this TezException — which carries the AM
// application ID and last-known status — surfaces instead of a generic test-timeout.
final long timeoutSecs = amConfig.getTezConfiguration().getLong(
TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS,
TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT);
final long deadline = System.currentTimeMillis() + timeoutSecs * 1000;
while (true) {
TezAppMasterStatus status = getAppMasterStatus();
// DAGClient will handle the AM SHUTDOWN case
if (status.equals(TezAppMasterStatus.RUNNING)
|| status.equals(TezAppMasterStatus.SHUTDOWN)) {
return;
}
if (System.currentTimeMillis() > deadline) {
throw new TezException("Timed out waiting for non-session AM to start (waited "
+ timeoutSecs + "s). AM status was: " + status
+ ". Check YARN logs for application " + lastSubmittedAppId);
}
try {
Thread.sleep(SLEEP_FOR_READY);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -1058,7 +1078,7 @@ DAGClient submitDAGApplication(ApplicationId appId, DAG dag)
throws TezException, IOException {
LOG.info("Submitting DAG application with id: " + appId);
try {
// Use the AMCredentials object in client mode, since this won't be re-used.
// Use the AMCredentials object in client mode, since this won't be reused.
// Ensures we don't fetch credentials unnecessarily if the user has already provided them.
Credentials credentials = amConfig.getCredentials();
if (credentials == null) {
Expand Down
18 changes: 17 additions & 1 deletion tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,23 @@ static DAGClientAMProtocolBlockingPB getAMProxy(FrameworkClient frameworkClient,
throw new TezException(e);
}

return getAMProxy(conf, appReport.getHost(), appReport.getRpcPort(),
// YARN-808: When the AM container is first allocated, YARN briefly reports state=RUNNING
// before the AM has registered with the RM or bound its RPC listener. During that window
// the ApplicationReport contains sentinel values that must not be passed to
// NetUtils.createSocketAddrForHost(), which would throw IllegalArgumentException:
// host == null / "N/A" : RM has not yet received an AM registerApplicationMaster() call
// rpcPort == 0 : protobuf wire default; AM has not called registerApplicationMaster() yet
// rpcPort == -1 : AM container launched but RPC server port not yet bound
// Returning null signals the caller to back off and retry rather than crashing.
String amHost = appReport.getHost();
int amPort = appReport.getRpcPort();
if (amHost == null || amHost.equals("N/A") || amPort <= 0) {
LOG.debug("AM RPC endpoint not yet available for {} (host={}, port={}) — will retry",
applicationId, amHost, amPort);
return null;
}

return getAMProxy(conf, amHost, amPort,
appReport.getClientToAMToken(), ugi);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,11 @@ boolean createAMProxyIfNeeded() throws IOException, TezException,

// YARN-808. Cannot ascertain if AM is ready until we connect to it.
// workaround check the default string set by YARN
// rpcPort == 0 : YARN protobuf default, AM not yet registered
// rpcPort == -1 : AM container allocated (state=RUNNING) but AM has not yet bound its RPC port
if(appReport.getHost() == null || appReport.getHost().equals("N/A") ||
appReport.getRpcPort() == 0){
// attempt not running
appReport.getRpcPort() <= 0){
// attempt not running or RPC port not yet available
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -178,7 +179,11 @@ private JSONObject readJson(InputStream in) throws IOException, JSONException {
//Read entire content to memory
final NonSyncByteArrayOutputStream bout = new NonSyncByteArrayOutputStream();
IOUtils.copy(in, bout);
return new JSONObject(new String(bout.toByteArray(), "UTF-8"));
String content = new String(bout.toByteArray(), StandardCharsets.UTF_8).trim();
if (content.isEmpty()) {
return null;
}
return new JSONObject(content);
}

/**
Expand All @@ -195,9 +200,17 @@ private void parseATSZipFile(File atsFile)
Enumeration<? extends ZipEntry> zipEntries = atsZipFile.entries();
while (zipEntries.hasMoreElements()) {
ZipEntry zipEntry = zipEntries.nextElement();
if (zipEntry.isDirectory()) {
LOG.debug("Skipping directory entry: {}", zipEntry.getName());
continue;
}
LOG.debug("Processing " + zipEntry.getName());
InputStream inputStream = atsZipFile.getInputStream(zipEntry);
JSONObject jsonObject = readJson(inputStream);
if (jsonObject == null) {
LOG.warn("Skipping empty zip entry: {}", zipEntry.getName());
continue;
}

//This json can contain dag, vertices, tasks, task_attempts
JSONObject dagJson = jsonObject.optJSONObject(Constants.DAG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public static void tearDown() {
// TODO Add cleanup code.
}

@Test(timeout = 60000)
@Test(timeout = 120000)
public void testSleepJob() throws TezException, IOException, InterruptedException {
SleepProcessorConfig spConf = new SleepProcessorConfig(1);

Expand Down Expand Up @@ -229,7 +229,7 @@ public void testSleepJob() throws TezException, IOException, InterruptedExceptio
tezSession.stop();
}

@Test(timeout = 60000)
@Test(timeout = 120000)
public void testNonDefaultFSStagingDir() throws Exception {
SleepProcessorConfig spConf = new SleepProcessorConfig(1);

Expand Down Expand Up @@ -270,7 +270,7 @@ public void testNonDefaultFSStagingDir() throws Exception {
}

// Submits a simple 5 stage sleep job using tez session. Then kills it.
@Test(timeout = 60000)
@Test(timeout = 120000)
public void testHistoryLogging() throws IOException,
InterruptedException, TezException, ClassNotFoundException, YarnException {
SleepProcessorConfig spConf = new SleepProcessorConfig(1);
Expand Down Expand Up @@ -327,7 +327,7 @@ public void testHistoryLogging() throws IOException,

// Submits a simple 5 stage sleep job using the DAG submit API instead of job
// client.
@Test(timeout = 60000)
@Test(timeout = 120000)
public void testMRRSleepJobDagSubmit() throws IOException,
InterruptedException, TezException, ClassNotFoundException, YarnException {
State finalState = testMRRSleepJobDagSubmitCore(false, false, false, false);
Expand All @@ -338,7 +338,7 @@ public void testMRRSleepJobDagSubmit() throws IOException,
}

// Submits a simple 5 stage sleep job using the DAG submit API. Then kills it.
@Test(timeout = 60000)
@Test(timeout = 120000)
public void testMRRSleepJobDagSubmitAndKill() throws IOException,
InterruptedException, TezException, ClassNotFoundException, YarnException {
State finalState = testMRRSleepJobDagSubmitCore(false, true, false, false);
Expand All @@ -349,7 +349,7 @@ public void testMRRSleepJobDagSubmitAndKill() throws IOException,
}

// Submits a DAG to AM via RPC after AM has started
@Test(timeout = 60000)
@Test(timeout = 120000)
public void testMRRSleepJobViaSession() throws IOException,
InterruptedException, TezException, ClassNotFoundException, YarnException {
State finalState = testMRRSleepJobDagSubmitCore(true, false, false, false);
Expand Down Expand Up @@ -506,19 +506,24 @@ public void testMultipleMRRSleepJobViaSession() throws IOException,
State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
tezSession, false, null, null);
Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
// Wait for the AM session to transition back to READY after DAG completion.
// The DAG reaching SUCCEEDED does not guarantee the AM session has already
// returned to READY - there is a brief cleanup window where it is still RUNNING.
tezSession.waitTillReady();
Assert.assertEquals(TezAppMasterStatus.READY,
tezSession.getAppMasterStatus());
finalState = testMRRSleepJobDagSubmitCore(true, false, false,
tezSession, false, null, null);
Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
tezSession.waitTillReady();
Assert.assertEquals(TezAppMasterStatus.READY,
tezSession.getAppMasterStatus());

stopAndVerifyYarnApp(tezSession);
}

// Submits a simple 5 stage sleep job using tez session. Then kills it.
@Test(timeout = 60000)
@Test(timeout = 120000)
public void testMRRSleepJobDagSubmitAndKillViaRPC() throws IOException,
InterruptedException, TezException, ClassNotFoundException, YarnException {
State finalState = testMRRSleepJobDagSubmitCore(true, true, false, false);
Expand All @@ -529,13 +534,13 @@ public void testMRRSleepJobDagSubmitAndKillViaRPC() throws IOException,
}

// Create and close a tez session without submitting a job
@Test(timeout = 60000)
@Test(timeout = 120000)
public void testTezSessionShutdown() throws IOException,
InterruptedException, TezException, ClassNotFoundException, YarnException {
testMRRSleepJobDagSubmitCore(true, false, true, false);
}

@Test(timeout = 60000)
@Test(timeout = 120000)
public void testAMSplitGeneration() throws IOException, InterruptedException,
TezException, ClassNotFoundException, YarnException {
testMRRSleepJobDagSubmitCore(true, false, false, true);
Expand Down Expand Up @@ -790,7 +795,7 @@ private static LocalResource createLocalResource(FileSystem fc, Path file,
resourceSize, resourceModificationTime);
}

@Test(timeout = 60000)
@Test(timeout = 120000)
public void testVertexGroups() throws Exception {
LOG.info("Running Group Test");
Path inPath = new Path(TEST_ROOT_DIR, "in-groups");
Expand All @@ -812,7 +817,7 @@ public void testVertexGroups() throws Exception {
}
}

@Test(timeout = 60000)
@Test(timeout = 120000)
public void testBroadcastAndOneToOne() throws Exception {
LOG.info("Running BroadcastAndOneToOne Test");
BroadcastAndOneToOneExample job = new BroadcastAndOneToOneExample();
Expand Down
Loading
Loading