From 52826e7e80f746004614a0e3da764e35103d5105 Mon Sep 17 00:00:00 2001 From: Mahesh Raju Somalaraju Date: Tue, 16 Jun 2026 10:42:44 +0530 Subject: [PATCH] TEZ-4725: Fix flaky tests in recovery, history-parser, and MRR integration suites --- .../java/org/apache/tez/client/TezClient.java | 22 ++++- .../org/apache/tez/client/TezClientUtils.java | 18 +++- .../dag/api/client/rpc/DAGClientRPCImpl.java | 6 +- .../tez/history/parser/ATSFileParser.java | 15 ++- .../tez/mapreduce/TestMRRJobsDAGApi.java | 27 +++--- .../TestAMRecoveryAggregationBroadcast.java | 94 +++++++++++++++++-- 6 files changed, 156 insertions(+), 26 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index 74942ac06d..c91eae391c 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -1017,6 +1017,21 @@ public synchronized boolean waitTillReady(long timeout, TimeUnit unit) private void waitNonSessionTillReady() throws IOException, TezException { Preconditions.checkArgument(!isSession, "It is supposed to be only called in non-session mode"); + // In non-session mode the AM is launched per-DAG and YARN briefly reports the app state as + // RUNNING before the AM has actually bound its RPC port (rpcPort == 0 or -1). During that + // window every call to getAMStatus() returns INITIALIZING, so without a deadline this loop + // would spin forever if the AM never successfully starts (e.g. container launch failure, + // resource contention on slow CI machines). + // + // TEZ_SESSION_CLIENT_TIMEOUT_SECS is re-used here as the upper bound because it already + // represents "how long the client is willing to wait for the AM to become contactable", + // which is exactly the semantic we need. Callers (e.g. @Test methods) must set their own + // hard timeout larger than this value so that this TezException — which carries the AM + // application ID and last-known status — surfaces instead of a generic test-timeout. + final long timeoutSecs = amConfig.getTezConfiguration().getLong( + TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS, + TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT); + final long deadline = System.currentTimeMillis() + timeoutSecs * 1000; while (true) { TezAppMasterStatus status = getAppMasterStatus(); // DAGClient will handle the AM SHUTDOWN case @@ -1024,6 +1039,11 @@ private void waitNonSessionTillReady() throws IOException, TezException { || status.equals(TezAppMasterStatus.SHUTDOWN)) { return; } + if (System.currentTimeMillis() > deadline) { + throw new TezException("Timed out waiting for non-session AM to start (waited " + + timeoutSecs + "s). AM status was: " + status + + ". Check YARN logs for application " + lastSubmittedAppId); + } try { Thread.sleep(SLEEP_FOR_READY); } catch (InterruptedException e) { @@ -1058,7 +1078,7 @@ DAGClient submitDAGApplication(ApplicationId appId, DAG dag) throws TezException, IOException { LOG.info("Submitting DAG application with id: " + appId); try { - // Use the AMCredentials object in client mode, since this won't be re-used. + // Use the AMCredentials object in client mode, since this won't be reused. // Ensures we don't fetch credentials unnecessarily if the user has already provided them. Credentials credentials = amConfig.getCredentials(); if (credentials == null) { diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java index 9e6338ddd8..fa4ae266c7 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java @@ -961,7 +961,23 @@ static DAGClientAMProtocolBlockingPB getAMProxy(FrameworkClient frameworkClient, throw new TezException(e); } - return getAMProxy(conf, appReport.getHost(), appReport.getRpcPort(), + // YARN-808: When the AM container is first allocated, YARN briefly reports state=RUNNING + // before the AM has registered with the RM or bound its RPC listener. During that window + // the ApplicationReport contains sentinel values that must not be passed to + // NetUtils.createSocketAddrForHost(), which would throw IllegalArgumentException: + // host == null / "N/A" : RM has not yet received an AM registerApplicationMaster() call + // rpcPort == 0 : protobuf wire default; AM has not called registerApplicationMaster() yet + // rpcPort == -1 : AM container launched but RPC server port not yet bound + // Returning null signals the caller to back off and retry rather than crashing. + String amHost = appReport.getHost(); + int amPort = appReport.getRpcPort(); + if (amHost == null || amHost.equals("N/A") || amPort <= 0) { + LOG.debug("AM RPC endpoint not yet available for {} (host={}, port={}) — will retry", + applicationId, amHost, amPort); + return null; + } + + return getAMProxy(conf, amHost, amPort, appReport.getClientToAMToken(), ugi); } diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java index c0d8836634..f8388e2713 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java @@ -281,9 +281,11 @@ boolean createAMProxyIfNeeded() throws IOException, TezException, // YARN-808. Cannot ascertain if AM is ready until we connect to it. // workaround check the default string set by YARN + // rpcPort == 0 : YARN protobuf default, AM not yet registered + // rpcPort == -1 : AM container allocated (state=RUNNING) but AM has not yet bound its RPC port if(appReport.getHost() == null || appReport.getHost().equals("N/A") || - appReport.getRpcPort() == 0){ - // attempt not running + appReport.getRpcPort() <= 0){ + // attempt not running or RPC port not yet available return false; } diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java index 06f9db1fb0..c892695b9a 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.Enumeration; import java.util.Iterator; import java.util.List; @@ -178,7 +179,11 @@ private JSONObject readJson(InputStream in) throws IOException, JSONException { //Read entire content to memory final NonSyncByteArrayOutputStream bout = new NonSyncByteArrayOutputStream(); IOUtils.copy(in, bout); - return new JSONObject(new String(bout.toByteArray(), "UTF-8")); + String content = new String(bout.toByteArray(), StandardCharsets.UTF_8).trim(); + if (content.isEmpty()) { + return null; + } + return new JSONObject(content); } /** @@ -195,9 +200,17 @@ private void parseATSZipFile(File atsFile) Enumeration zipEntries = atsZipFile.entries(); while (zipEntries.hasMoreElements()) { ZipEntry zipEntry = zipEntries.nextElement(); + if (zipEntry.isDirectory()) { + LOG.debug("Skipping directory entry: {}", zipEntry.getName()); + continue; + } LOG.debug("Processing " + zipEntry.getName()); InputStream inputStream = atsZipFile.getInputStream(zipEntry); JSONObject jsonObject = readJson(inputStream); + if (jsonObject == null) { + LOG.warn("Skipping empty zip entry: {}", zipEntry.getName()); + continue; + } //This json can contain dag, vertices, tasks, task_attempts JSONObject dagJson = jsonObject.optJSONObject(Constants.DAG); diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java index 5f5ca3702b..a89dfc9762 100644 --- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java +++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java @@ -186,7 +186,7 @@ public static void tearDown() { // TODO Add cleanup code. } - @Test(timeout = 60000) + @Test(timeout = 120000) public void testSleepJob() throws TezException, IOException, InterruptedException { SleepProcessorConfig spConf = new SleepProcessorConfig(1); @@ -229,7 +229,7 @@ public void testSleepJob() throws TezException, IOException, InterruptedExceptio tezSession.stop(); } - @Test(timeout = 60000) + @Test(timeout = 120000) public void testNonDefaultFSStagingDir() throws Exception { SleepProcessorConfig spConf = new SleepProcessorConfig(1); @@ -270,7 +270,7 @@ public void testNonDefaultFSStagingDir() throws Exception { } // Submits a simple 5 stage sleep job using tez session. Then kills it. - @Test(timeout = 60000) + @Test(timeout = 120000) public void testHistoryLogging() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException { SleepProcessorConfig spConf = new SleepProcessorConfig(1); @@ -327,7 +327,7 @@ public void testHistoryLogging() throws IOException, // Submits a simple 5 stage sleep job using the DAG submit API instead of job // client. - @Test(timeout = 60000) + @Test(timeout = 120000) public void testMRRSleepJobDagSubmit() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException { State finalState = testMRRSleepJobDagSubmitCore(false, false, false, false); @@ -338,7 +338,7 @@ public void testMRRSleepJobDagSubmit() throws IOException, } // Submits a simple 5 stage sleep job using the DAG submit API. Then kills it. - @Test(timeout = 60000) + @Test(timeout = 120000) public void testMRRSleepJobDagSubmitAndKill() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException { State finalState = testMRRSleepJobDagSubmitCore(false, true, false, false); @@ -349,7 +349,7 @@ public void testMRRSleepJobDagSubmitAndKill() throws IOException, } // Submits a DAG to AM via RPC after AM has started - @Test(timeout = 60000) + @Test(timeout = 120000) public void testMRRSleepJobViaSession() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException { State finalState = testMRRSleepJobDagSubmitCore(true, false, false, false); @@ -506,11 +506,16 @@ public void testMultipleMRRSleepJobViaSession() throws IOException, State finalState = testMRRSleepJobDagSubmitCore(true, false, false, tezSession, false, null, null); Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState); + // Wait for the AM session to transition back to READY after DAG completion. + // The DAG reaching SUCCEEDED does not guarantee the AM session has already + // returned to READY - there is a brief cleanup window where it is still RUNNING. + tezSession.waitTillReady(); Assert.assertEquals(TezAppMasterStatus.READY, tezSession.getAppMasterStatus()); finalState = testMRRSleepJobDagSubmitCore(true, false, false, tezSession, false, null, null); Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState); + tezSession.waitTillReady(); Assert.assertEquals(TezAppMasterStatus.READY, tezSession.getAppMasterStatus()); @@ -518,7 +523,7 @@ public void testMultipleMRRSleepJobViaSession() throws IOException, } // Submits a simple 5 stage sleep job using tez session. Then kills it. - @Test(timeout = 60000) + @Test(timeout = 120000) public void testMRRSleepJobDagSubmitAndKillViaRPC() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException { State finalState = testMRRSleepJobDagSubmitCore(true, true, false, false); @@ -529,13 +534,13 @@ public void testMRRSleepJobDagSubmitAndKillViaRPC() throws IOException, } // Create and close a tez session without submitting a job - @Test(timeout = 60000) + @Test(timeout = 120000) public void testTezSessionShutdown() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException { testMRRSleepJobDagSubmitCore(true, false, true, false); } - @Test(timeout = 60000) + @Test(timeout = 120000) public void testAMSplitGeneration() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException { testMRRSleepJobDagSubmitCore(true, false, false, true); @@ -790,7 +795,7 @@ private static LocalResource createLocalResource(FileSystem fc, Path file, resourceSize, resourceModificationTime); } - @Test(timeout = 60000) + @Test(timeout = 120000) public void testVertexGroups() throws Exception { LOG.info("Running Group Test"); Path inPath = new Path(TEST_ROOT_DIR, "in-groups"); @@ -812,7 +817,7 @@ public void testVertexGroups() throws Exception { } } - @Test(timeout = 60000) + @Test(timeout = 120000) public void testBroadcastAndOneToOne() throws Exception { LOG.info("Running BroadcastAndOneToOne Test"); BroadcastAndOneToOneExample job = new BroadcastAndOneToOneExample(); diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecoveryAggregationBroadcast.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecoveryAggregationBroadcast.java index 979b696cbd..bd06f19b69 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecoveryAggregationBroadcast.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecoveryAggregationBroadcast.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -68,6 +69,7 @@ import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.DAGStatus.State; import org.apache.tez.dag.api.client.StatusGetOpts; +import org.apache.tez.dag.api.client.VertexStatus; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.app.RecoveryParser; import org.apache.tez.dag.history.HistoryEvent; @@ -105,7 +107,6 @@ public class TestAMRecoveryAggregationBroadcast { private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestAMRecoveryAggregationBroadcast.class.getName() + "-tmpDir"; private static final Path INPUT_FILE = new Path(TEST_ROOT_DIR, "input.csv"); - private static final Path OUT_PATH = new Path(TEST_ROOT_DIR, "out-groups"); private static final String EXPECTED_OUTPUT = "1-5\n1-5\n1-5\n1-5\n1-5\n" + "2-4\n2-4\n2-4\n2-4\n" + "3-3\n3-3\n3-3\n" + "4-2\n4-2\n" + "5-1\n"; private static final String TABLE_SCAN_SLEEP = "tez.test.table.scan.sleep"; @@ -119,6 +120,7 @@ public class TestAMRecoveryAggregationBroadcast { private TezConfiguration tezConf; private TezClient tezSession; + private Path outPath; @BeforeClass public static void setupAll() { @@ -174,6 +176,9 @@ public void setup() throws Exception { .valueOf(new Random().nextInt(100000)))); TezClientUtils.ensureStagingDirExists(dfsConf, remoteStagingDir); + outPath = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, + "out-groups-" + new Random().nextInt(100000))); + tezConf = new TezConfiguration(tezCluster.getConfig()); tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0); tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "INFO"); @@ -198,12 +203,20 @@ public void teardown() throws InterruptedException { } } tezSession = null; + if (outPath != null) { + try { + remoteFs.delete(outPath, true); + } catch (IOException e) { + LOG.warn("Failed to delete output path: " + outPath, e); + } + outPath = null; + } } @Test(timeout = 120000) public void testSucceed() throws Exception { DAG dag = createDAG("Succeed"); - TezCounters counters = runDAGAndVerify(dag, false); + TezCounters counters = runDAGAndVerify(dag, new String[0]); assertEquals(3, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue()); List historyEvents1 = readRecoveryLog(1); @@ -219,7 +232,8 @@ public void testSucceed() throws Exception { public void testTableScanTemporalFailure() throws Exception { tezConf.setBoolean(TABLE_SCAN_SLEEP, true); DAG dag = createDAG("TableScanTemporalFailure"); - TezCounters counters = runDAGAndVerify(dag, true); + // Kill the AM before TABLE_SCAN finishes - no vertices need to complete first + TezCounters counters = runDAGAndVerify(dag, new String[0], true); assertEquals(3, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue()); List historyEvents1 = readRecoveryLog(1); @@ -239,7 +253,8 @@ public void testTableScanTemporalFailure() throws Exception { public void testAggregationTemporalFailure() throws Exception { tezConf.setBoolean(AGGREGATION_SLEEP, true); DAG dag = createDAG("AggregationTemporalFailure"); - TezCounters counters = runDAGAndVerify(dag, true); + // Wait for TABLE_SCAN to complete before killing - AGGREGATION will still be sleeping + TezCounters counters = runDAGAndVerify(dag, new String[]{TABLE_SCAN}, true); assertEquals(3, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue()); List historyEvents1 = readRecoveryLog(1); @@ -259,7 +274,8 @@ public void testAggregationTemporalFailure() throws Exception { public void testMapJoinTemporalFailure() throws Exception { tezConf.setBoolean(MAP_JOIN_SLEEP, true); DAG dag = createDAG("MapJoinTemporalFailure"); - TezCounters counters = runDAGAndVerify(dag, true); + // Wait for TABLE_SCAN and AGGREGATION to complete before killing - MAP_JOIN will still be sleeping + TezCounters counters = runDAGAndVerify(dag, new String[]{TABLE_SCAN, AGGREGATION}, true); assertEquals(3, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue()); List historyEvents1 = readRecoveryLog(1); @@ -306,7 +322,7 @@ private DAG createDAG(String dagName) throws Exception { DataSinkDescriptor dataSink = MROutput .createConfigBuilder(new Configuration(tezConf), TextOutputFormat.class, - OUT_PATH.toString()) + outPath.toString()) .build(); // Broadcast Hash Join Vertex mapJoinVertex = Vertex @@ -335,16 +351,44 @@ private DAG createDAG(String dagName) throws Exception { return dag; } - TezCounters runDAGAndVerify(DAG dag, boolean killAM) throws Exception { + /** + * Run the DAG without killing the AM (success path). + */ + TezCounters runDAGAndVerify(DAG dag, String[] verticesToAwaitBeforeKill) throws Exception { + return runDAGAndVerify(dag, verticesToAwaitBeforeKill, false); + } + + /** + * Run the DAG, optionally killing the AM after waiting for specific vertices to succeed. + * + * @param dag the DAG to run + * @param verticesToAwaitBeforeKill names of vertices that must reach SUCCEEDED state before + * the AM is killed; empty array means kill immediately + * @param killAM whether to kill the AM during execution + */ + TezCounters runDAGAndVerify(DAG dag, String[] verticesToAwaitBeforeKill, boolean killAM) + throws Exception { tezSession.waitTillReady(); DAGClient dagClient = tezSession.submitDAG(dag); if (killAM) { - TimeUnit.SECONDS.sleep(10); + // Wait for each specified vertex to SUCCEED before killing the AM. + // This eliminates the race condition caused by a fixed sleep. + for (String vertexName : verticesToAwaitBeforeKill) { + waitForVertexSucceeded(dagClient, vertexName, 60, TimeUnit.SECONDS); + LOG.info("Vertex {} has SUCCEEDED, proceeding to next wait or AM kill.", vertexName); + } + // If no vertices to await (e.g. testTableScanTemporalFailure), give the AM + // a brief moment to start up so failApplicationAttempt can find attempt 1. + if (verticesToAwaitBeforeKill.length == 0) { + TimeUnit.SECONDS.sleep(5); + } YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(tezConf); yarnClient.start(); - ApplicationAttemptId id = ApplicationAttemptId.newInstance(tezSession.getAppMasterApplicationId(), 1); + ApplicationAttemptId id = ApplicationAttemptId.newInstance( + tezSession.getAppMasterApplicationId(), 1); + LOG.info("Killing application attempt: {}", id); yarnClient.failApplicationAttempt(id); yarnClient.close(); } @@ -352,7 +396,7 @@ TezCounters runDAGAndVerify(DAG dag, boolean killAM) throws Exception { LOG.info("Diagnosis: " + dagStatus.getDiagnostics()); Assert.assertEquals(State.SUCCEEDED, dagStatus.getState()); - FSDataInputStream in = remoteFs.open(new Path(OUT_PATH, "part-v002-o000-r-00000")); + FSDataInputStream in = remoteFs.open(new Path(outPath, "part-v002-o000-r-00000")); ByteBuffer buf = ByteBuffer.allocate(100); in.read(buf); buf.flip(); @@ -360,6 +404,36 @@ TezCounters runDAGAndVerify(DAG dag, boolean killAM) throws Exception { return dagStatus.getDAGCounters(); } + /** + * Poll the vertex status until it reaches SUCCEEDED, or throw TimeoutException. + * + * @param dagClient client connected to the running DAG + * @param vertexName vertex to wait for + * @param timeout maximum time to wait + * @param unit time unit for timeout + */ + private void waitForVertexSucceeded(DAGClient dagClient, String vertexName, + long timeout, TimeUnit unit) throws Exception { + long deadline = System.currentTimeMillis() + unit.toMillis(timeout); + while (true) { + try { + VertexStatus status = dagClient.getVertexStatus(vertexName, null); + if (status != null && status.getState() == VertexStatus.State.SUCCEEDED) { + return; + } + LOG.info("Waiting for vertex {} to succeed, current state: {}", + vertexName, status == null ? "null" : status.getState()); + } catch (Exception e) { + LOG.warn("Error getting vertex status for {}: {}", vertexName, e.getMessage()); + } + if (System.currentTimeMillis() > deadline) { + throw new TimeoutException("Timed out waiting for vertex " + vertexName + + " to reach SUCCEEDED state"); + } + TimeUnit.MILLISECONDS.sleep(500); + } + } + private List readRecoveryLog(int attemptNum) throws IOException { ApplicationId appId = tezSession.getAppMasterApplicationId(); Path tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(tezConf, appId.toString());