Skip to content

Commit 1785832

Browse files
HIVE-27126: queue level resource stats for YARN RM.
1 parent 94af009 commit 1785832

20 files changed

Lines changed: 2073 additions & 10 deletions

File tree

beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,5 +96,10 @@ public String executionStatus() {
9696
public double progressedPercentage() {
9797
return response.getProgressedPercentage();
9898
}
99+
100+
@Override
101+
public String queueMetrics() {
102+
return response.isSetQueueMetrics() ? response.getQueueMetrics() : "";
103+
}
99104
}
100105
}

common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,13 @@ public void render(ProgressMonitor monitor) {
175175
reprintLine(SEPARATOR);
176176
reprintLineWithColorAsBold(footer, Ansi.Color.RED);
177177
reprintLine(SEPARATOR);
178+
179+
// Display queue metrics if available (may be multi-line: queue name + metrics)
180+
String queueMetrics = monitor.queueMetrics();
181+
if (queueMetrics != null && !queueMetrics.isEmpty()) {
182+
reprintMultiLine(queueMetrics);
183+
reprintLine(SEPARATOR);
184+
}
178185
}
179186

180187

common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ public String executionStatus() {
5252
public double progressedPercentage() {
5353
return 0;
5454
}
55+
56+
@Override
57+
public String queueMetrics() {
58+
return "";
59+
}
5560
};
5661

5762
List<String> headers();
@@ -65,4 +70,6 @@ public double progressedPercentage() {
6570
String executionStatus();
6671

6772
double progressedPercentage();
73+
74+
String queueMetrics();
6875
}

common/src/java/org/apache/hadoop/hive/conf/HiveConf.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3986,6 +3986,12 @@ public static enum ConfVars {
39863986
HIVE_SERVER2_TEZ_QUEUE_ACCESS_CHECK("hive.server2.tez.queue.access.check", false,
39873987
"Whether to check user access to explicitly specified YARN queues. " +
39883988
"yarn.resourcemanager.webapp.address must be configured to use this."),
3989+
HIVE_TEZ_QUEUE_METRICS_REFRESH_INTERVAL("hive.tez.queue.metrics.refresh.interval", "0s",
3990+
new TimeValidator(TimeUnit.SECONDS),
3991+
"Interval for refreshing YARN queue resource metrics during Tez query execution. " +
3992+
"When set to a positive value (e.g. 10s), displays real-time memory, vCore, capacity " +
3993+
"and application metrics for the YARN queue being used. " +
3994+
"Set to 0 or negative to disable. Minimum effective value is 1 second."),
39893995
HIVE_SERVER2_TEZ_SESSION_LIFETIME("hive.server2.tez.session.lifetime", "162h",
39903996
new TimeValidator(TimeUnit.HOURS),
39913997
"The lifetime of the Tez sessions launched by HS2 when default sessions are enabled.\n" +
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hive.common.log;
19+
20+
import org.apache.commons.lang3.StringUtils;
21+
import org.junit.Test;
22+
23+
import java.io.ByteArrayOutputStream;
24+
import java.io.PrintStream;
25+
import java.util.Arrays;
26+
import java.util.Collections;
27+
import java.util.List;
28+
29+
import static org.junit.Assert.assertEquals;
30+
import static org.junit.Assert.assertTrue;
31+
32+
/**
33+
* Unit tests for InPlaceUpdate
34+
* <p>
35+
* We capture stdout via a ByteArrayOutputStream and inspect the rendered output.
36+
*/
37+
public class TestInPlaceUpdate {
38+
39+
/**
40+
* Minimal ProgressMonitor stub — returns empty headers/rows/footer.
41+
*/
42+
private static ProgressMonitor makeMonitor(String queueMetrics) {
43+
return new ProgressMonitor() {
44+
@Override
45+
public List<String> headers() {
46+
return Arrays.asList("VERTICES", "MODE", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", "KILLED");
47+
}
48+
49+
@Override
50+
public List<List<String>> rows() {
51+
return Collections.emptyList();
52+
}
53+
54+
@Override
55+
public String footerSummary() {
56+
return "VERTICES: 00/00";
57+
}
58+
59+
@Override
60+
public long startTime() {
61+
return System.currentTimeMillis();
62+
}
63+
64+
@Override
65+
public double progressedPercentage() {
66+
return 0.0;
67+
}
68+
69+
@Override
70+
public String executionStatus() {
71+
return "RUNNING";
72+
}
73+
74+
@Override
75+
public String queueMetrics() {
76+
return queueMetrics;
77+
}
78+
};
79+
}
80+
81+
/**
82+
* Expected separator: 94 dashes.
83+
*/
84+
private static final String SEPARATOR =
85+
new String(new char[InPlaceUpdate.MIN_TERMINAL_WIDTH]).replace("\0", "-");
86+
87+
88+
89+
/**
90+
* When queueMetrics() returns a non-empty string, InPlaceUpdate.render() must
91+
* print a separator line immediately after the metrics block — so total separators
92+
* = 4 (VERTICES table) + 1 (after queue metrics) = 5.
93+
*/
94+
@Test
95+
public void testSeparatorPrintedAfterQueueMetrics() {
96+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
97+
PrintStream ps = new PrintStream(baos);
98+
InPlaceUpdate inPlace = new InPlaceUpdate(ps);
99+
100+
String metrics = """
101+
QUEUE: default (1s ago)
102+
MEMORY: 2.0/8.0 GB (25.00%) | VCORES: 4/16 (25.00%)
103+
CAPACITY: 60.00% | ACTIVE_APPS: 1 | PENDING: 0""";
104+
105+
inPlace.render(makeMonitor(metrics));
106+
ps.flush();
107+
108+
String output = baos.toString();
109+
110+
// The metrics content should appear
111+
assertTrue("Output should contain QUEUE: line", output.contains("QUEUE: default"));
112+
assertTrue("Output should contain MEMORY: line", output.contains("MEMORY: 2.0/8.0 GB"));
113+
assertTrue("Output should contain CAPACITY: line", output.contains("CAPACITY: 60.00%"));
114+
115+
// The separator must appear AFTER the CAPACITY line in the rendered output
116+
int capacityIdx = output.indexOf("CAPACITY:");
117+
int separatorIdx = output.indexOf(SEPARATOR, capacityIdx);
118+
assertTrue("Separator must appear after CAPACITY: line (separatorIdx=" + separatorIdx
119+
+ ", capacityIdx=" + capacityIdx + ")",
120+
separatorIdx > capacityIdx);
121+
122+
// Total separators = 4 (VERTICES table) + 1 (after queue metrics) = 5
123+
int count = StringUtils.countMatches(output, SEPARATOR);
124+
assertEquals("With queue metrics, total separators should be 5 (4 VERTICES + 1 after metrics)",
125+
5, count);
126+
}
127+
128+
129+
/**
130+
* When queueMetrics() returns an empty string, InPlaceUpdate.render() must NOT
131+
* print an extra separator — so total remains 4 (the VERTICES table separators only).
132+
*/
133+
@Test
134+
public void testNoExtraSeparatorWhenQueueMetricsEmpty() {
135+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
136+
PrintStream ps = new PrintStream(baos);
137+
InPlaceUpdate inPlace = new InPlaceUpdate(ps);
138+
139+
inPlace.render(makeMonitor(""));
140+
ps.flush();
141+
142+
String output = baos.toString();
143+
144+
// VERTICES table renders 4 separators (before-header, after-header, before-footer, after-footer)
145+
// With empty queueMetrics there should be exactly 4, not 5.
146+
int count = StringUtils.countMatches(output, SEPARATOR);
147+
assertEquals("With empty queue metrics, only 4 VERTICES-table separators should appear",
148+
4, count);
149+
}
150+
151+
152+
/**
153+
* When queueMetrics() returns null, behaviour should be identical to empty.
154+
*/
155+
@Test
156+
public void testNoExtraSeparatorWhenQueueMetricsNull() {
157+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
158+
PrintStream ps = new PrintStream(baos);
159+
InPlaceUpdate inPlace = new InPlaceUpdate(ps);
160+
161+
inPlace.render(makeMonitor(null));
162+
ps.flush();
163+
164+
String output = baos.toString();
165+
166+
int count = StringUtils.countMatches(output, SEPARATOR);
167+
assertEquals("With null queue metrics, only 4 VERTICES-table separators should appear",
168+
4, count);
169+
}
170+
171+
// ── Verify separator constant length ────────────────────────────────────────
172+
173+
@Test
174+
public void testSeparatorLengthEqualsMinTerminalWidth() {
175+
assertTrue("Separator should consist only of dashes",
176+
SEPARATOR.matches("-{" + InPlaceUpdate.MIN_TERMINAL_WIDTH + "}"));
177+
}
178+
}

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
3434
import org.apache.hadoop.hive.ql.wm.WmContext;
3535
import org.apache.hadoop.yarn.api.records.LocalResource;
36+
import org.apache.hadoop.yarn.client.api.YarnClient;
3637
import org.apache.tez.client.TezClient;
3738
import org.apache.tez.dag.api.DAG;
3839
import org.apache.tez.dag.api.TezException;
@@ -89,6 +90,7 @@ public String toString() {
8990
HiveConf getConf();
9091
TezClient getTezClient();
9192
DAGClient submitDAG(DAG dag) throws TezException, IOException;
93+
YarnClient getYarnClient();
9294
boolean isOpen();
9395
boolean isOpening();
9496
boolean getDoAsEnabled();

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.hadoop.hive.ql.wm.WmContext;
3434
import org.apache.hadoop.hive.registry.impl.TezAmInstance;
3535
import org.apache.hadoop.yarn.api.records.LocalResource;
36+
import org.apache.hadoop.yarn.client.api.YarnClient;
3637
import org.apache.tez.client.TezClient;
3738
import org.apache.tez.dag.api.DAG;
3839
import org.apache.tez.dag.api.TezException;
@@ -339,6 +340,11 @@ public TezClient getTezClient() {
339340
return baseSession.getTezClient();
340341
}
341342

343+
@Override
344+
public YarnClient getYarnClient() {
345+
return baseSession.getYarnClient();
346+
}
347+
342348
@Override
343349
public DAGClient submitDAG(DAG dag) throws TezException, IOException {
344350
return baseSession.submitDAG(dag);

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import org.apache.hadoop.security.token.Token;
7474
import org.apache.hadoop.yarn.api.records.LocalResource;
7575
import org.apache.hadoop.yarn.api.records.LocalResourceType;
76+
import org.apache.hadoop.yarn.client.api.YarnClient;
7677
import org.apache.hadoop.yarn.conf.YarnConfiguration;
7778
import org.apache.tez.client.TezClient;
7879
import org.apache.tez.common.TezUtils;
@@ -119,6 +120,7 @@ public class TezSessionState implements TezSession {
119120
Path tezScratchDir;
120121
protected LocalResource appJarLr;
121122
private TezClient session;
123+
private YarnClient yarnClient;
122124
private Future<TezClient> sessionFuture;
123125
/** Console used for user feedback during async session opening. */
124126
private LogHelper console;
@@ -752,6 +754,17 @@ public void close(boolean keepDagFilesDir) throws Exception {
752754
closeClient(asyncSession);
753755
}
754756
}
757+
758+
// Stop YarnClient if it was initialized
759+
if (yarnClient != null) {
760+
try {
761+
LOG.info("Stopping YarnClient for session: {}", sessionId);
762+
yarnClient.stop();
763+
yarnClient = null;
764+
} catch (Exception e) {
765+
LOG.warn("Error stopping YarnClient for session {}: {}", sessionId, e.getMessage());
766+
}
767+
}
755768
} finally {
756769
try {
757770
cleanupScratchDir();
@@ -797,6 +810,19 @@ public String getSessionId() {
797810

798811
protected final void setTezClient(TezClient session) {
799812
this.session = session;
813+
814+
// Initialize YarnClient for queue metrics collection
815+
if (session != null && yarnClient == null) {
816+
try {
817+
yarnClient = YarnClient.createYarnClient();
818+
yarnClient.init(conf);
819+
yarnClient.start();
820+
LOG.info("YarnClient initialized for session: {}", sessionId);
821+
} catch (Exception e) {
822+
LOG.warn("Failed to initialize YarnClient for metrics collection", e);
823+
yarnClient = null;
824+
}
825+
}
800826
}
801827

802828
@Override
@@ -822,6 +848,11 @@ public TezClient getTezClient() {
822848
return session;
823849
}
824850

851+
@Override
852+
public YarnClient getYarnClient() {
853+
return yarnClient;
854+
}
855+
825856
@Override
826857
public DAGClient submitDAG(DAG dag) throws TezException, IOException {
827858
return getTezClient().submitDAG(dag);

0 commit comments

Comments
 (0)