From 65c0a21310197c79df950f26df33ff22ae9b52a7 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Wed, 27 Nov 2024 12:16:34 +0800 Subject: [PATCH 01/11] clb1734 bug fix --- .../celeborn/common/metrics/source/AbstractSource.scala | 5 ++--- .../apache/celeborn/service/deploy/worker/WorkerSource.scala | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index e9c4cfa3b0a..dd0200e925c 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -18,7 +18,7 @@ package org.apache.celeborn.common.metrics.source import java.util.{Map => JMap} -import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -105,8 +105,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) // filter out non-number type gauges if (gauge.getValue.isInstanceOf[Number]) { namedGauges.putIfAbsent( - metricNameWithCustomizedLabels(name, labels), - NamedGauge(name, gauge, labels ++ staticLabels)) + metricNameWithCustomizedLabels(name, labels), NamedGauge(name, gauge, labels ++ staticLabels)) } else { logWarning( s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is not a number") diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index 26532a6bf9a..33316dbcf1a 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -84,7 +84,7 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, Role.WORKER) def getCounterCount(metricsName: String): Long = { val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, Map.empty) - namedCounters.get(metricNameWithLabel).counter.getCount + namedCounters.get(metricNameWithLabel)._2.counter.getCount } def connectionActive(client: TransportClient): Unit = { From 94c2cd75a0aa4ffe2954c0770b9848002f50be06 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Thu, 28 Nov 2024 10:52:01 +0800 Subject: [PATCH 02/11] remove sort --- .../apache/celeborn/common/metrics/source/AbstractSource.scala | 1 + .../apache/celeborn/service/deploy/worker/WorkerSource.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index dd0200e925c..81d8a1bbec2 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -17,6 +17,7 @@ package org.apache.celeborn.common.metrics.source +import java.{lang, util} import java.util.{Map => JMap} import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit} diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index 33316dbcf1a..26532a6bf9a 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -84,7 +84,7 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, Role.WORKER) def getCounterCount(metricsName: String): Long = { val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, Map.empty) - namedCounters.get(metricNameWithLabel)._2.counter.getCount + namedCounters.get(metricNameWithLabel).counter.getCount } def connectionActive(client: TransportClient): Unit = { From d4146d804b64e6d5c8680cb37e62057936098de4 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Thu, 28 Nov 2024 10:56:40 +0800 Subject: [PATCH 03/11] remove useless sort --- .../apache/celeborn/common/metrics/source/AbstractSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index 81d8a1bbec2..7a988311e0a 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -17,7 +17,7 @@ package org.apache.celeborn.common.metrics.source -import java.{lang, util} +import java.lang import java.util.{Map => JMap} import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit} From 787d9f97d56b089abff3ca6022e87900bf1e029f Mon Sep 17 00:00:00 2001 From: zhengtao Date: Thu, 28 Nov 2024 19:19:22 +0800 Subject: [PATCH 04/11] reduce application metrics --- .../apache/celeborn/common/CelebornConf.scala | 9 ++ .../metrics/source/AbstractSource.scala | 99 ++++++++++++++++--- docs/configuration/metrics.md | 1 + .../service/deploy/worker/Worker.scala | 12 ++- .../service/deploy/worker/WorkerSource.scala | 2 +- 5 files changed, 102 insertions(+), 21 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index cbc66c40493..94bb7bdaa80 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -879,6 +879,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def metricsWorkerForceAppendPauseSpentTimeThreshold: Int = get(METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD) def metricsJsonPrettyEnabled: Boolean = get(METRICS_JSON_PRETTY_ENABLED) + def metricsAppEnabled: Boolean = get(METRICS_APP_ENABLED) // ////////////////////////////////////////////////////// // Quota // @@ -5340,6 +5341,14 @@ object CelebornConf extends Logging { .booleanConf .createWithDefault(true) + val METRICS_APP_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.metrics.application.enabled") + .categories("metrics") + .doc("When false, the metrics of application won't return to reduce the num of metrics.") + .version("0.6.0") + .booleanConf + .createWithDefault(true) + val IDENTITY_PROVIDER: ConfigEntry[String] = buildConf("celeborn.identity.provider") .withAlternative("celeborn.quota.identity.provider") diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index 7a988311e0a..c1d60eff58e 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -21,8 +21,8 @@ import java.lang import java.util.{Map => JMap} import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit} +import scala.collection.{breakOut, mutable} import scala.collection.JavaConverters._ -import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.Random @@ -35,10 +35,18 @@ import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils, Utils} // Can Remove this if celeborn don't support scala211 in future import org.apache.celeborn.common.util.FunctionConverter._ -case class NamedCounter(name: String, counter: Counter, labels: Map[String, String]) +case class NamedCounter( + name: String, + counter: Counter, + labels: Map[String, String], + isApp: Boolean = false) extends MetricLabels -case class NamedGauge[T](name: String, gauge: Gauge[T], labels: Map[String, String]) +case class NamedGauge[T]( + name: String, + gauge: Gauge[T], + labels: Map[String, String], + isApp: Boolean = false) extends MetricLabels case class NamedMeter(name: String, meter: Meter, labels: Map[String, String]) @@ -77,6 +85,8 @@ abstract class AbstractSource(conf: CelebornConf, role: String) val staticLabels: Map[String, String] = conf.metricsExtraLabels + roleLabel ++ instanceLabel val staticLabelsString: String = MetricLabels.labelString(staticLabels) + val metricsAppEnabled: Boolean = conf.metricsAppEnabled + val applicationLabel = "applicationId" val timerMetrics: ConcurrentLinkedQueue[String] = new ConcurrentLinkedQueue[String]() @@ -102,7 +112,8 @@ abstract class AbstractSource(conf: CelebornConf, role: String) def addGauge[T]( name: String, labels: Map[String, String], - gauge: Gauge[T]): Unit = { + gauge: Gauge[T], + isAppMetrics: Boolean): Unit = { // filter out non-number type gauges if (gauge.getValue.isInstanceOf[Number]) { namedGauges.putIfAbsent( @@ -113,6 +124,13 @@ abstract class AbstractSource(conf: CelebornConf, role: String) } } + def addGauge[T]( + name: String, + labels: Map[String, String], + gauge: Gauge[T]): Unit = { + addGauge(name, labels, gauge, false) + } + def addGauge[T]( name: String, labels: JMap[String, String], @@ -120,11 +138,15 @@ abstract class AbstractSource(conf: CelebornConf, role: String) addGauge(name, labels.asScala.toMap, gauge) } - def addGauge[T](name: String, labels: Map[String, String] = Map.empty)(f: () => T): Unit = { + def addGauge[T]( + name: String, + labels: Map[String, String] = Map.empty, + isAppMetrics: Boolean = false)(f: () => T): Unit = { addGauge( name, labels, - metricRegistry.gauge(metricNameWithCustomizedLabels(name, labels), new GaugeSupplier[T](f))) + metricRegistry.gauge(metricNameWithCustomizedLabels(name, labels), new GaugeSupplier[T](f)), + isAppMetrics) } def addGauge[T](name: String, gauge: Gauge[T]): Unit = { @@ -176,11 +198,15 @@ abstract class AbstractSource(conf: CelebornConf, role: String) def addCounter(name: String): Unit = addCounter(name, Map.empty[String, String]) - def addCounter(name: String, labels: Map[String, String]): Unit = { + def addCounter(name: String, labels: Map[String, String], isAppMetrics: Boolean = false): Unit = { val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels) namedCounters.putIfAbsent( metricNameWithLabel, - NamedCounter(name, metricRegistry.counter(metricNameWithLabel), labels ++ staticLabels)) + NamedCounter( + name, + metricRegistry.counter(metricNameWithLabel), + labels ++ staticLabels, + isAppMetrics)) } def counters(): List[NamedCounter] = { @@ -457,12 +483,18 @@ abstract class AbstractSource(conf: CelebornConf, role: String) override def getMetrics(): String = { var leftMetricsNum = metricsCapacity val sb = new mutable.StringBuilder - leftMetricsNum = fillInnerMetricsSnapshot(getAndClearTimerMetrics(), leftMetricsNum, sb) - leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, sb) - leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, sb) - leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, sb) - leftMetricsNum = fillInnerMetricsSnapshot(gauges(), leftMetricsNum, sb) - leftMetricsNum = fillInnerMetricsSnapshot(counters(), leftMetricsNum, sb) + val appMetricsSnapshot = ArrayBuffer[String]() + leftMetricsNum = fillInnerMetricsSnapshot(getAndClearTimerMetrics(), leftMetricsNum, sb, appMetricsSnapshot) + leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, sb, appMetricsSnapshot) + leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, sb, appMetricsSnapshot) + leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, sb, appMetricsSnapshot) + leftMetricsNum = fillInnerMetricsSnapshot(gauges(), leftMetricsNum, sb, appMetricsSnapshot) + leftMetricsNum = fillInnerMetricsSnapshot(counters(), leftMetricsNum, sb, appMetricsSnapshot) + if (leftMetricsNum > 0 && metricsAppEnabled) { + appMetricsSnapshot.toList.take(leftMetricsNum).foreach { appMetrics => + sb.append(appMetrics) + } + } if (leftMetricsNum <= 0) { logWarning( s"The number of metrics exceed the output metrics strings capacity! All metrics Num: $getAllMetricsNum") @@ -473,7 +505,8 @@ abstract class AbstractSource(conf: CelebornConf, role: String) private def fillInnerMetricsSnapshot( metricList: List[AnyRef], leftNum: Int, - sb: mutable.StringBuilder): Int = { + sb: mutable.StringBuilder, + appMetricsSnapshot: ArrayBuffer[String]): Int = { if (leftNum <= 0) { return 0 } @@ -495,8 +528,42 @@ abstract class AbstractSource(conf: CelebornConf, role: String) .asInstanceOf[ResettableSlidingWindowReservoir].reset() case s => sb.append(s.toString) + var addNum = 0 + val appCount0Metrics = ArrayBuffer[String]() + for (m <- metricList) { + if (addNum >= leftNum) breakOut + m match { + case c: NamedCounter => + val counterMetric = getCounterMetrics(c) + if (c.isApp) { + if (c.counter.getCount > 0) { + appMetricsSnapshot += counterMetric + } else { + appCount0Metrics += counterMetric + } + } else sb.append(counterMetric) + case g: NamedGauge[_] => + val gaugeMetric = getGaugeMetrics(g) + if (g.isApp) { + appMetricsSnapshot += gaugeMetric + } else sb.append(gaugeMetric) + case m: NamedMeter => + sb.append(getMeterMetrics(m)) + case h: NamedHistogram => + sb.append(getHistogramMetrics(h)) + h.asInstanceOf[CelebornHistogram].reservoir + .asInstanceOf[ResettableSlidingWindowReservoir].reset() + case t: NamedTimer => + sb.append(getTimerMetrics(t)) + t.timer.asInstanceOf[CelebornTimer].reservoir + .asInstanceOf[ResettableSlidingWindowReservoir].reset() + case s => + sb.append(s.toString) + } + addNum = addNum + 1 } - leftNum - addList.size + appMetricsSnapshot ++= appCount0Metrics + leftNum - addNum } override def destroy(): Unit = { diff --git a/docs/configuration/metrics.md b/docs/configuration/metrics.md index a5fe1731875..880598c34af 100644 --- a/docs/configuration/metrics.md +++ b/docs/configuration/metrics.md @@ -19,6 +19,7 @@ license: | | Key | Default | isDynamic | Description | Since | Deprecated | | --- | ------- | --------- | ----------- | ----- | ---------- | +| celeborn.metrics.application.enabled | true | false | When false, the metrics of application won't return to reduce the num of metrics. | 0.6.0 | | | celeborn.metrics.capacity | 4096 | false | The maximum number of metrics which a source can use to generate output strings. | 0.2.0 | | | celeborn.metrics.collectPerfCritical.enabled | false | false | It controls whether to collect metrics which may affect performance. When enable, Celeborn collects them. | 0.2.0 | | | celeborn.metrics.conf | <undefined> | false | Custom metrics configuration file path. Default use `metrics.properties` in classpath. | 0.3.0 | | diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 3439a2e86d1..62f3df9f673 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -703,23 +703,27 @@ private[celeborn] class Worker( resourceConsumptionLabel += (resourceConsumptionSource.applicationLabel -> applicationId) resourceConsumptionSource.addGauge( ResourceConsumptionSource.DISK_FILE_COUNT, - resourceConsumptionLabel) { () => + resourceConsumptionLabel, + true) { () => computeResourceConsumption(userIdentifier, resourceConsumption).diskFileCount } resourceConsumptionSource.addGauge( ResourceConsumptionSource.DISK_BYTES_WRITTEN, - resourceConsumptionLabel) { () => + resourceConsumptionLabel, + true) { () => computeResourceConsumption(userIdentifier, resourceConsumption).diskBytesWritten } if (hasHDFSStorage) { resourceConsumptionSource.addGauge( ResourceConsumptionSource.HDFS_FILE_COUNT, - resourceConsumptionLabel) { () => + resourceConsumptionLabel, + true) { () => computeResourceConsumption(userIdentifier, resourceConsumption).hdfsFileCount } resourceConsumptionSource.addGauge( ResourceConsumptionSource.HDFS_BYTES_WRITTEN, - resourceConsumptionLabel) { () => + resourceConsumptionLabel, + true) { () => computeResourceConsumption(userIdentifier, resourceConsumption).hdfsBytesWritten } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index 26532a6bf9a..f3eaf4ee4fb 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -107,7 +107,7 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, Role.WORKER) val applicationIds = appActiveConnections.get(client.getChannel.id().asLongText()) val applicationId = Utils.splitShuffleKey(shuffleKey)._1 if (applicationIds != null && !applicationIds.contains(applicationId)) { - addCounter(ACTIVE_CONNECTION_COUNT, Map(applicationLabel -> applicationId)) + addCounter(ACTIVE_CONNECTION_COUNT, Map(applicationLabel -> applicationId), true) incCounter(ACTIVE_CONNECTION_COUNT, 1, Map(applicationLabel -> applicationId)) applicationIds.add(applicationId) } From 352f3ab3ce45de0890f7117133cc7bb98996069c Mon Sep 17 00:00:00 2001 From: zhengtao Date: Fri, 29 Nov 2024 11:26:27 +0800 Subject: [PATCH 05/11] add ut --- .../metrics/source/AbstractSource.scala | 31 ++++--- .../metrics/source/CelebornSourceSuite.scala | 83 ++++++++++++++----- 2 files changed, 81 insertions(+), 33 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index c1d60eff58e..845b2897439 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -532,35 +532,42 @@ abstract class AbstractSource(conf: CelebornConf, role: String) val appCount0Metrics = ArrayBuffer[String]() for (m <- metricList) { if (addNum >= leftNum) breakOut + var strMetrics = "" + var isApp = false m match { case c: NamedCounter => - val counterMetric = getCounterMetrics(c) + strMetrics = getCounterMetrics(c) if (c.isApp) { + isApp = true if (c.counter.getCount > 0) { - appMetricsSnapshot += counterMetric + appMetricsSnapshot += strMetrics } else { - appCount0Metrics += counterMetric + appCount0Metrics += strMetrics } - } else sb.append(counterMetric) + } case g: NamedGauge[_] => - val gaugeMetric = getGaugeMetrics(g) + strMetrics = getGaugeMetrics(g) if (g.isApp) { - appMetricsSnapshot += gaugeMetric - } else sb.append(gaugeMetric) + appMetricsSnapshot += strMetrics + isApp = true + } case m: NamedMeter => - sb.append(getMeterMetrics(m)) + strMetrics = getMeterMetrics(m) case h: NamedHistogram => - sb.append(getHistogramMetrics(h)) + strMetrics = getHistogramMetrics(h) h.asInstanceOf[CelebornHistogram].reservoir .asInstanceOf[ResettableSlidingWindowReservoir].reset() case t: NamedTimer => - sb.append(getTimerMetrics(t)) + strMetrics = getTimerMetrics(t) t.timer.asInstanceOf[CelebornTimer].reservoir .asInstanceOf[ResettableSlidingWindowReservoir].reset() case s => - sb.append(s.toString) + strMetrics = s.toString + } + if (!isApp) { + sb.append(strMetrics) + addNum = addNum + 1 } - addNum = addNum + 1 } appMetricsSnapshot ++= appCount0Metrics leftNum - addNum diff --git a/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala index d6eeb23581d..1afab5a1c1f 100644 --- a/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala @@ -22,16 +22,10 @@ import org.apache.celeborn.common.CelebornConf class CelebornSourceSuite extends CelebornFunSuite { - test("test getMetrics with customized label") { - val conf = new CelebornConf() - createAbstractSourceAndCheck(conf, "", Role.MASTER) - createAbstractSourceAndCheck(conf, "", Role.WORKER) - } - - def createAbstractSourceAndCheck( + def createAbstractSource( conf: CelebornConf, extraLabels: String, - role: String = "mock"): Unit = { + role: String = "mock"): (String, List[String]) = { val mockSource = new AbstractSource(conf, role) { override def sourceName: String = "mockSource" } @@ -39,12 +33,13 @@ class CelebornSourceSuite extends CelebornFunSuite { val user2 = Map("user" -> "user2") val user3 = Map("user" -> "user3") mockSource.addGauge("Gauge1") { () => 1000 } - mockSource.addGauge("Gauge2", user1) { () => 2000 } - mockSource.addCounter("Counter1") - mockSource.addCounter("Counter2", user2) + mockSource.addGauge("Gauge2", user1, true) { () => 2000 } + mockSource.addCounter("Counter1", Map.empty[String, String], true) + mockSource.addCounter("Counter2", user2, true) // test operation with and without label mockSource.incCounter("Counter1", 3000) mockSource.incCounter("Counter2", 4000, user2) + mockSource.incCounter("Counter2", -4000, user2) mockSource.addTimer("Timer1") mockSource.addTimer("Timer2", user3) // ditto @@ -66,37 +61,83 @@ class CelebornSourceSuite extends CelebornFunSuite { s"""metrics_Gauge2_Value{${extraLabelsStr}${instanceLabelStr}role="$role",user="user1"} 2000""" val exp3 = s"""metrics_Counter1_Count{${extraLabelsStr}${instanceLabelStr}role="$role"} 3000""" val exp4 = - s"""metrics_Counter2_Count{${extraLabelsStr}${instanceLabelStr}role="$role",user="user2"} 4000""" + s"""metrics_Counter2_Count{${extraLabelsStr}${instanceLabelStr}role="$role",user="user2"} 0""" val exp5 = s"""metrics_Timer1_Count{${extraLabelsStr}${instanceLabelStr}role="$role"} 1""" val exp6 = s"""metrics_Timer2_Count{${extraLabelsStr}${instanceLabelStr}role="$role",user="user3"} 1""" - assert(res.contains(exp1)) - assert(res.contains(exp2)) - assert(res.contains(exp3)) - assert(res.contains(exp4)) - assert(res.contains(exp5)) - assert(res.contains(exp6)) + val expList = List[String](exp1, exp2, exp3, exp4, exp5, exp6) + (res, expList) + } + + def checkMetricsRes(res: String, labelList: List[String]): Unit = { + labelList.foreach { exp => + assert(res.contains(exp)) + } } test("test getMetrics with customized label by conf") { val conf = new CelebornConf() + val (resM, expsM) = createAbstractSource(conf, "", Role.MASTER) + checkMetricsRes(resM, expsM) + val (resW, expsW) = createAbstractSource(conf, "", Role.WORKER) + checkMetricsRes(resW, expsW) + // label's is normal conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, "l1=v1,l2=v2,l3=v3") val extraLabels = """l1="v1",l2="v2",l3="v3"""" - createAbstractSourceAndCheck(conf, extraLabels) + val (res, exps) = createAbstractSource(conf, extraLabels) + checkMetricsRes(res, exps) // labels' kv not correct assertThrows[IllegalArgumentException] { conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, "l1=v1,l2=") val extraLabels2 = """l1="v1",l2="v2",l3="v3"""" - createAbstractSourceAndCheck(conf, extraLabels2) + val (res2, exps2) = createAbstractSource(conf, extraLabels2) + checkMetricsRes(res2, exps2) } // there are spaces in labels conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, " l1 = v1, l2 =v2 ,l3 =v3 ") val extraLabels3 = """l1="v1",l2="v2",l3="v3"""" - createAbstractSourceAndCheck(conf, extraLabels3) + val (res3, exps3) = createAbstractSource(conf, extraLabels3) + checkMetricsRes(res3, exps3) + } + + test("test getMetrics with full capacity and isAppEnable false") { + val conf = new CelebornConf() + // metrics won't contain appMetrics + conf.set(CelebornConf.METRICS_APP_ENABLED.key, "false") + conf.set(CelebornConf.METRICS_CAPACITY.key, "6") + val (res1, exps1) = createAbstractSource(conf, "") + List[Int](0, 4, 5).foreach { i => + assert(res1.contains(exps1(i))) + } + List[Int](1, 2, 3).foreach { i => + assert(!res1.contains(exps1(i))) + } + + // app metrics will fall behind when it reaches capacity + conf.set(CelebornConf.METRICS_APP_ENABLED.key, "true") + conf.set(CelebornConf.METRICS_CAPACITY.key, "3") + val (res2, exps2) = createAbstractSource(conf, "") + List[Int](0, 4, 5).foreach { i => + assert(res2.contains(exps2(i))) + } + List[Int](1, 2, 3).foreach { i => + assert(!res2.contains(exps2(i))) + } + + // app metrics count0 will fall behind + conf.set(CelebornConf.METRICS_APP_ENABLED.key, "true") + conf.set(CelebornConf.METRICS_CAPACITY.key, "5") + val (res3, exps3) = createAbstractSource(conf, "") + List[Int](0, 4, 5, 1, 2).foreach { i => + assert(res3.contains(exps3(i))) + } + List[Int](3).foreach { i => + assert(!res3.contains(exps3(i))) + } } } From 3e0a48dbf8e24a260bb03276db7f9b7b02bac92c Mon Sep 17 00:00:00 2001 From: zhengtao Date: Fri, 29 Nov 2024 11:38:05 +0800 Subject: [PATCH 06/11] modify ut --- .../metrics/source/CelebornSourceSuite.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala index 1afab5a1c1f..a68e6b1a8ea 100644 --- a/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala @@ -49,6 +49,8 @@ class CelebornSourceSuite extends CelebornFunSuite { mockSource.stopTimer("Timer1", "key1") mockSource.stopTimer("Timer2", "key2", user3) + mockSource.timerMetricsMap.add("testTimerMetricsMap") + val res = mockSource.getMetrics() var extraLabelsStr = extraLabels if (extraLabels.nonEmpty) { @@ -65,8 +67,9 @@ class CelebornSourceSuite extends CelebornFunSuite { val exp5 = s"""metrics_Timer1_Count{${extraLabelsStr}${instanceLabelStr}role="$role"} 1""" val exp6 = s"""metrics_Timer2_Count{${extraLabelsStr}${instanceLabelStr}role="$role",user="user3"} 1""" + val exp7 = "testTimerMetricsMap" - val expList = List[String](exp1, exp2, exp3, exp4, exp5, exp6) + val expList = List[String](exp1, exp2, exp3, exp4, exp5, exp6, exp7) (res, expList) } @@ -111,7 +114,7 @@ class CelebornSourceSuite extends CelebornFunSuite { conf.set(CelebornConf.METRICS_APP_ENABLED.key, "false") conf.set(CelebornConf.METRICS_CAPACITY.key, "6") val (res1, exps1) = createAbstractSource(conf, "") - List[Int](0, 4, 5).foreach { i => + List[Int](0, 4, 5, 6).foreach { i => assert(res1.contains(exps1(i))) } List[Int](1, 2, 3).foreach { i => @@ -120,9 +123,9 @@ class CelebornSourceSuite extends CelebornFunSuite { // app metrics will fall behind when it reaches capacity conf.set(CelebornConf.METRICS_APP_ENABLED.key, "true") - conf.set(CelebornConf.METRICS_CAPACITY.key, "3") + conf.set(CelebornConf.METRICS_CAPACITY.key, "4") val (res2, exps2) = createAbstractSource(conf, "") - List[Int](0, 4, 5).foreach { i => + List[Int](0, 4, 5, 6).foreach { i => assert(res2.contains(exps2(i))) } List[Int](1, 2, 3).foreach { i => @@ -131,9 +134,9 @@ class CelebornSourceSuite extends CelebornFunSuite { // app metrics count0 will fall behind conf.set(CelebornConf.METRICS_APP_ENABLED.key, "true") - conf.set(CelebornConf.METRICS_CAPACITY.key, "5") + conf.set(CelebornConf.METRICS_CAPACITY.key, "6") val (res3, exps3) = createAbstractSource(conf, "") - List[Int](0, 4, 5, 1, 2).foreach { i => + List[Int](0, 4, 5, 6, 1, 2).foreach { i => assert(res3.contains(exps3(i))) } List[Int](3).foreach { i => From 11cebf292d1be62d93675165232b34b566b82672 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Fri, 29 Nov 2024 14:06:11 +0800 Subject: [PATCH 07/11] replace break --- .../celeborn/common/metrics/source/AbstractSource.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index 845b2897439..7855f9ae4dc 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -21,8 +21,8 @@ import java.lang import java.util.{Map => JMap} import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit} -import scala.collection.{breakOut, mutable} import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.Random @@ -530,8 +530,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) sb.append(s.toString) var addNum = 0 val appCount0Metrics = ArrayBuffer[String]() - for (m <- metricList) { - if (addNum >= leftNum) breakOut + for (m <- metricList if addNum < leftNum) { var strMetrics = "" var isApp = false m match { From a52901f37a8b4696661b1290829f550d02b4cecb Mon Sep 17 00:00:00 2001 From: zhengtao Date: Wed, 4 Dec 2024 11:56:09 +0800 Subject: [PATCH 08/11] add ut and rebase main --- .../metrics/source/AbstractSource.scala | 27 ++++--------------- .../metrics/source/CelebornSourceSuite.scala | 20 +++++++++++++- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index 7855f9ae4dc..e28bde79b5c 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -17,9 +17,8 @@ package org.apache.celeborn.common.metrics.source -import java.lang import java.util.{Map => JMap} -import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -117,7 +116,8 @@ abstract class AbstractSource(conf: CelebornConf, role: String) // filter out non-number type gauges if (gauge.getValue.isInstanceOf[Number]) { namedGauges.putIfAbsent( - metricNameWithCustomizedLabels(name, labels), NamedGauge(name, gauge, labels ++ staticLabels)) + metricNameWithCustomizedLabels(name, labels), + NamedGauge(name, gauge, labels ++ staticLabels, isAppMetrics)) } else { logWarning( s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is not a number") @@ -484,7 +484,8 @@ abstract class AbstractSource(conf: CelebornConf, role: String) var leftMetricsNum = metricsCapacity val sb = new mutable.StringBuilder val appMetricsSnapshot = ArrayBuffer[String]() - leftMetricsNum = fillInnerMetricsSnapshot(getAndClearTimerMetrics(), leftMetricsNum, sb, appMetricsSnapshot) + leftMetricsNum = + fillInnerMetricsSnapshot(getAndClearTimerMetrics(), leftMetricsNum, sb, appMetricsSnapshot) leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, sb, appMetricsSnapshot) leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, sb, appMetricsSnapshot) leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, sb, appMetricsSnapshot) @@ -510,24 +511,6 @@ abstract class AbstractSource(conf: CelebornConf, role: String) if (leftNum <= 0) { return 0 } - val addList = metricList.take(leftNum) - addList.foreach { - case c: NamedCounter => - sb.append(getCounterMetrics(c)) - case g: NamedGauge[_] => - sb.append(getGaugeMetrics(g)) - case m: NamedMeter => - sb.append(getMeterMetrics(m)) - case h: NamedHistogram => - sb.append(getHistogramMetrics(h)) - h.asInstanceOf[CelebornHistogram].reservoir - .asInstanceOf[ResettableSlidingWindowReservoir].reset() - case t: NamedTimer => - sb.append(getTimerMetrics(t)) - t.timer.asInstanceOf[CelebornTimer].reservoir - .asInstanceOf[ResettableSlidingWindowReservoir].reset() - case s => - sb.append(s.toString) var addNum = 0 val appCount0Metrics = ArrayBuffer[String]() for (m <- metricList if addNum < leftNum) { diff --git a/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala index a68e6b1a8ea..237bbc56396 100644 --- a/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala @@ -49,7 +49,7 @@ class CelebornSourceSuite extends CelebornFunSuite { mockSource.stopTimer("Timer1", "key1") mockSource.stopTimer("Timer2", "key2", user3) - mockSource.timerMetricsMap.add("testTimerMetricsMap") + mockSource.timerMetrics.add("testTimerMetricsMap") val res = mockSource.getMetrics() var extraLabelsStr = extraLabels @@ -143,4 +143,22 @@ class CelebornSourceSuite extends CelebornFunSuite { assert(!res3.contains(exps3(i))) } } + + test("test getAndClearTimerMetrics in timerMetrics") { + val conf = new CelebornConf() + conf.set(CelebornConf.METRICS_CAPACITY.key, "6") + val role = "mock" + val mockSource = new AbstractSource(conf, role) { + override def sourceName: String = "mockSource" + } + val exp1 = "testTimerMetrics1" + val exp2 = "testTimerMetrics2" + mockSource.timerMetrics.add(exp1) + val res1 = mockSource.getMetrics() + mockSource.timerMetrics.add(exp2) + val res2 = mockSource.getMetrics() + + assert(res1.contains(exp1) && !res1.contains(exp2)) + assert(res2.contains(exp2) && !res2.contains(exp1)) + } } From e0155fa3fbfe9c7f87378f8a71aa02f894e12b3b Mon Sep 17 00:00:00 2001 From: zhengtao Date: Wed, 4 Dec 2024 12:14:47 +0800 Subject: [PATCH 09/11] reduce appMetricsSnapshot --- .../common/metrics/source/AbstractSource.scala | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index e28bde79b5c..1736e6e6eea 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -516,21 +516,19 @@ abstract class AbstractSource(conf: CelebornConf, role: String) for (m <- metricList if addNum < leftNum) { var strMetrics = "" var isApp = false + var isCount0 = false m match { case c: NamedCounter => strMetrics = getCounterMetrics(c) if (c.isApp) { isApp = true - if (c.counter.getCount > 0) { - appMetricsSnapshot += strMetrics - } else { - appCount0Metrics += strMetrics + if (c.counter.getCount <= 0) { + isCount0 = true } } case g: NamedGauge[_] => strMetrics = getGaugeMetrics(g) if (g.isApp) { - appMetricsSnapshot += strMetrics isApp = true } case m: NamedMeter => @@ -549,6 +547,15 @@ abstract class AbstractSource(conf: CelebornConf, role: String) if (!isApp) { sb.append(strMetrics) addNum = addNum + 1 + } else { + val leftAppMetricsNum = leftNum - addNum - appMetricsSnapshot.size + if (leftAppMetricsNum > 0) { + if (isCount0) { + appCount0Metrics += strMetrics + } else { + appMetricsSnapshot += strMetrics + } + } } } appMetricsSnapshot ++= appCount0Metrics From 5029d4c90ed4c8f0b198cfe321a0eb3161830312 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Wed, 4 Dec 2024 13:57:49 +0800 Subject: [PATCH 10/11] minor --- .../common/metrics/source/AbstractSource.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index 1736e6e6eea..da4103dc8c2 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -511,9 +511,9 @@ abstract class AbstractSource(conf: CelebornConf, role: String) if (leftNum <= 0) { return 0 } - var addNum = 0 + var nonAppMetricsAddNum = 0 val appCount0Metrics = ArrayBuffer[String]() - for (m <- metricList if addNum < leftNum) { + for (m <- metricList if nonAppMetricsAddNum < leftNum) { var strMetrics = "" var isApp = false var isCount0 = false @@ -546,10 +546,9 @@ abstract class AbstractSource(conf: CelebornConf, role: String) } if (!isApp) { sb.append(strMetrics) - addNum = addNum + 1 + nonAppMetricsAddNum = nonAppMetricsAddNum + 1 } else { - val leftAppMetricsNum = leftNum - addNum - appMetricsSnapshot.size - if (leftAppMetricsNum > 0) { + if (leftNum - nonAppMetricsAddNum - appMetricsSnapshot.size > 0) { if (isCount0) { appCount0Metrics += strMetrics } else { @@ -558,8 +557,11 @@ abstract class AbstractSource(conf: CelebornConf, role: String) } } } - appMetricsSnapshot ++= appCount0Metrics - leftNum - addNum + val leftAppMetricsNum = leftNum - nonAppMetricsAddNum - appMetricsSnapshot.size + if (appCount0Metrics.nonEmpty && leftAppMetricsNum > 0) { + appMetricsSnapshot ++= appCount0Metrics.toList.take(leftAppMetricsNum) + } + leftNum - nonAppMetricsAddNum } override def destroy(): Unit = { From 25224a0804bf43df60abf82c2a5a28cb23f7cbe2 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Sun, 8 Dec 2024 22:12:24 +0800 Subject: [PATCH 11/11] only enable control --- .../metrics/source/AbstractSource.scala | 127 +++++++----------- .../metrics/source/CelebornSourceSuite.scala | 24 +--- 2 files changed, 50 insertions(+), 101 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index da4103dc8c2..51e6efa71ae 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -113,14 +113,16 @@ abstract class AbstractSource(conf: CelebornConf, role: String) labels: Map[String, String], gauge: Gauge[T], isAppMetrics: Boolean): Unit = { - // filter out non-number type gauges - if (gauge.getValue.isInstanceOf[Number]) { - namedGauges.putIfAbsent( - metricNameWithCustomizedLabels(name, labels), - NamedGauge(name, gauge, labels ++ staticLabels, isAppMetrics)) - } else { - logWarning( - s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is not a number") + if (metricsAppEnabled || (!metricsAppEnabled && !isAppMetrics)) { + // filter out non-number type gauges + if (gauge.getValue.isInstanceOf[Number]) { + namedGauges.putIfAbsent( + metricNameWithCustomizedLabels(name, labels), + NamedGauge(name, gauge, labels ++ staticLabels, isAppMetrics)) + } else { + logWarning( + s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is not a number") + } } } @@ -199,14 +201,16 @@ abstract class AbstractSource(conf: CelebornConf, role: String) def addCounter(name: String): Unit = addCounter(name, Map.empty[String, String]) def addCounter(name: String, labels: Map[String, String], isAppMetrics: Boolean = false): Unit = { - val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels) - namedCounters.putIfAbsent( - metricNameWithLabel, - NamedCounter( - name, - metricRegistry.counter(metricNameWithLabel), - labels ++ staticLabels, - isAppMetrics)) + if (metricsAppEnabled || (!metricsAppEnabled && !isAppMetrics)) { + val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels) + namedCounters.putIfAbsent( + metricNameWithLabel, + NamedCounter( + name, + metricRegistry.counter(metricNameWithLabel), + labels ++ staticLabels, + isAppMetrics)) + } } def counters(): List[NamedCounter] = { @@ -483,19 +487,12 @@ abstract class AbstractSource(conf: CelebornConf, role: String) override def getMetrics(): String = { var leftMetricsNum = metricsCapacity val sb = new mutable.StringBuilder - val appMetricsSnapshot = ArrayBuffer[String]() - leftMetricsNum = - fillInnerMetricsSnapshot(getAndClearTimerMetrics(), leftMetricsNum, sb, appMetricsSnapshot) - leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, sb, appMetricsSnapshot) - leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, sb, appMetricsSnapshot) - leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, sb, appMetricsSnapshot) - leftMetricsNum = fillInnerMetricsSnapshot(gauges(), leftMetricsNum, sb, appMetricsSnapshot) - leftMetricsNum = fillInnerMetricsSnapshot(counters(), leftMetricsNum, sb, appMetricsSnapshot) - if (leftMetricsNum > 0 && metricsAppEnabled) { - appMetricsSnapshot.toList.take(leftMetricsNum).foreach { appMetrics => - sb.append(appMetrics) - } - } + leftMetricsNum = fillInnerMetricsSnapshot(getAndClearTimerMetrics(), leftMetricsNum, sb) + leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, sb) + leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, sb) + leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, sb) + leftMetricsNum = fillInnerMetricsSnapshot(gauges(), leftMetricsNum, sb) + leftMetricsNum = fillInnerMetricsSnapshot(counters(), leftMetricsNum, sb) if (leftMetricsNum <= 0) { logWarning( s"The number of metrics exceed the output metrics strings capacity! All metrics Num: $getAllMetricsNum") @@ -506,62 +503,30 @@ abstract class AbstractSource(conf: CelebornConf, role: String) private def fillInnerMetricsSnapshot( metricList: List[AnyRef], leftNum: Int, - sb: mutable.StringBuilder, - appMetricsSnapshot: ArrayBuffer[String]): Int = { + sb: mutable.StringBuilder): Int = { if (leftNum <= 0) { return 0 } - var nonAppMetricsAddNum = 0 - val appCount0Metrics = ArrayBuffer[String]() - for (m <- metricList if nonAppMetricsAddNum < leftNum) { - var strMetrics = "" - var isApp = false - var isCount0 = false - m match { - case c: NamedCounter => - strMetrics = getCounterMetrics(c) - if (c.isApp) { - isApp = true - if (c.counter.getCount <= 0) { - isCount0 = true - } - } - case g: NamedGauge[_] => - strMetrics = getGaugeMetrics(g) - if (g.isApp) { - isApp = true - } - case m: NamedMeter => - strMetrics = getMeterMetrics(m) - case h: NamedHistogram => - strMetrics = getHistogramMetrics(h) - h.asInstanceOf[CelebornHistogram].reservoir - .asInstanceOf[ResettableSlidingWindowReservoir].reset() - case t: NamedTimer => - strMetrics = getTimerMetrics(t) - t.timer.asInstanceOf[CelebornTimer].reservoir - .asInstanceOf[ResettableSlidingWindowReservoir].reset() - case s => - strMetrics = s.toString - } - if (!isApp) { - sb.append(strMetrics) - nonAppMetricsAddNum = nonAppMetricsAddNum + 1 - } else { - if (leftNum - nonAppMetricsAddNum - appMetricsSnapshot.size > 0) { - if (isCount0) { - appCount0Metrics += strMetrics - } else { - appMetricsSnapshot += strMetrics - } - } - } - } - val leftAppMetricsNum = leftNum - nonAppMetricsAddNum - appMetricsSnapshot.size - if (appCount0Metrics.nonEmpty && leftAppMetricsNum > 0) { - appMetricsSnapshot ++= appCount0Metrics.toList.take(leftAppMetricsNum) + val addList = metricList.take(leftNum) + addList.foreach { + case c: NamedCounter => + sb.append(getCounterMetrics(c)) + case g: NamedGauge[_] => + sb.append(getGaugeMetrics(g)) + case m: NamedMeter => + sb.append(getMeterMetrics(m)) + case h: NamedHistogram => + sb.append(getHistogramMetrics(h)) + h.asInstanceOf[CelebornHistogram].reservoir + .asInstanceOf[ResettableSlidingWindowReservoir].reset() + case t: NamedTimer => + sb.append(getTimerMetrics(t)) + t.timer.asInstanceOf[CelebornTimer].reservoir + .asInstanceOf[ResettableSlidingWindowReservoir].reset() + case s => + sb.append(s.toString) } - leftNum - nonAppMetricsAddNum + leftNum - addList.size } override def destroy(): Unit = { diff --git a/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala index 237bbc56396..8f90f547c53 100644 --- a/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala @@ -112,7 +112,7 @@ class CelebornSourceSuite extends CelebornFunSuite { // metrics won't contain appMetrics conf.set(CelebornConf.METRICS_APP_ENABLED.key, "false") - conf.set(CelebornConf.METRICS_CAPACITY.key, "6") + conf.set(CelebornConf.METRICS_CAPACITY.key, "7") val (res1, exps1) = createAbstractSource(conf, "") List[Int](0, 4, 5, 6).foreach { i => assert(res1.contains(exps1(i))) @@ -121,27 +121,11 @@ class CelebornSourceSuite extends CelebornFunSuite { assert(!res1.contains(exps1(i))) } - // app metrics will fall behind when it reaches capacity + // metrics contain appMetrics conf.set(CelebornConf.METRICS_APP_ENABLED.key, "true") - conf.set(CelebornConf.METRICS_CAPACITY.key, "4") + conf.set(CelebornConf.METRICS_CAPACITY.key, "7") val (res2, exps2) = createAbstractSource(conf, "") - List[Int](0, 4, 5, 6).foreach { i => - assert(res2.contains(exps2(i))) - } - List[Int](1, 2, 3).foreach { i => - assert(!res2.contains(exps2(i))) - } - - // app metrics count0 will fall behind - conf.set(CelebornConf.METRICS_APP_ENABLED.key, "true") - conf.set(CelebornConf.METRICS_CAPACITY.key, "6") - val (res3, exps3) = createAbstractSource(conf, "") - List[Int](0, 4, 5, 6, 1, 2).foreach { i => - assert(res3.contains(exps3(i))) - } - List[Int](3).foreach { i => - assert(!res3.contains(exps3(i))) - } + checkMetricsRes(res2, exps2) } test("test getAndClearTimerMetrics in timerMetrics") {