Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics for ExecutorService and ScheduledExecutorService #57

Closed
jeremyspb opened this issue Sep 5, 2024 · 1 comment
Closed

Metrics for ExecutorService and ScheduledExecutorService #57

jeremyspb opened this issue Sep 5, 2024 · 1 comment
Assignees
Labels
Milestone

Comments

@jeremyspb
Copy link

jeremyspb commented Sep 5, 2024

Currently there are a lot of frameworks for metrics which provides possibility to monitor metrics for executor service and scheduled executor service.

For example, dropwizard framework provides Instrumented implementation for executor service:
https://github.com/dropwizard/metrics/blob/release/4.2.x/metrics-core/src/main/java/com/codahale/metrics/InstrumentedExecutorService.java
and for scheduled executor service:
https://github.com/dropwizard/metrics/blob/release/4.2.x/metrics-core/src/main/java/com/codahale/metrics/InstrumentedScheduledExecutorService.java

Micrometer provides binders for executor service like:
https://github.com/micrometer-metrics/micrometer/blob/main/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/jvm/ExecutorServiceMetrics.java

As a possible implementation it might be some like that:

`
public class InstrumentedExecutorService implements ExecutorService {
private static final AtomicLong NAME_COUNTER = new AtomicLong();
private final ExecutorService delegate;
private final Rate submitted;
private final Counter running;
private final Rate completed;
private final Timer idle;
private final Timer duration;

public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry registry) {
    this(delegate, registry, "instrumented-delegate-" + NAME_COUNTER.incrementAndGet());
}

public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry registry, String name) {
    this.delegate = delegate;
    this.submitted = registry.rate(MetricName.name(name, "submitted"));
    this.running = registry.counter(MetricName.name(name, "running"));
    this.completed = registry.rate(MetricName.name(name, "completed"));
    this.idle = registry.timer(MetricName.name(name, "idle"));
    this.duration = registry.timer(MetricName.name(name, "duration"));
    MetricKey metricKey;
    if (delegate instanceof ThreadPoolExecutor) {
        ThreadPoolExecutor executor = (ThreadPoolExecutor) delegate;
        metricKey = MetricName.name(name, "pool.size");
        registry.longVar(metricKey, () -> (long) executor.getPoolSize());
        metricKey = MetricName.name(name, "pool.core");
        registry.longVar(metricKey, () -> (long) executor.getCorePoolSize());
        metricKey = MetricName.name(name, "pool.max");
        registry.longVar(metricKey, () -> (long) executor.getMaximumPoolSize());
        BlockingQueue<Runnable> queue = executor.getQueue();
        metricKey = MetricName.name(name, "tasks.active");
        registry.longVar(metricKey, () -> (long) executor.getActiveCount());
        metricKey = MetricName.name(name, "tasks.completed");
        registry.longVar(metricKey, executor::getCompletedTaskCount);
        metricKey = MetricName.name(name, "tasks.queued");
        registry.longVar(metricKey, () -> (long) queue.size());
        metricKey = MetricName.name(name, "tasks.capacity");
        registry.longVar(metricKey, () -> (long) queue.remainingCapacity());
    } else if (delegate instanceof ForkJoinPool) {
        ForkJoinPool forkJoinPool = (ForkJoinPool) delegate;
        metricKey = MetricName.name(name, "tasks.stolen");
        registry.longVar(metricKey, forkJoinPool::getStealCount);
        metricKey = MetricName.name(name, "tasks.queued");
        registry.longVar(metricKey, forkJoinPool::getQueuedTaskCount);
        metricKey = MetricName.name(name, "threads.active");
        registry.longVar(metricKey, () -> (long) forkJoinPool.getActiveThreadCount());
        metricKey = MetricName.name(name, "threads.running");
        registry.longVar(metricKey, () -> (long) forkJoinPool.getRunningThreadCount());
    }
}

public void execute(@Nonnull Runnable runnable) {
    this.submitted.mark();
    this.delegate.execute(new InstrumentedRunnable(runnable));
}

public @Nonnull Future<?> submit(@Nonnull Runnable runnable) {
    this.submitted.mark();
    return this.delegate.submit(new InstrumentedRunnable(runnable));
}

public <T> @Nonnull Future<T> submit(@Nonnull Runnable runnable, @Nullable T result) {
    this.submitted.mark();
    return this.delegate.submit(new InstrumentedRunnable(runnable), result);
}

public <T> @Nonnull Future<T> submit(@Nonnull Callable<T> task) {
    this.submitted.mark();
    return this.delegate.submit(new InstrumentedCallable<>(task));
}

public <T> @Nonnull List<Future<T>> invokeAll(@Nonnull Collection<? extends Callable<T>> tasks) throws InterruptedException {
    this.submitted.mark(tasks.size());
    Collection<? extends Callable<T>> instrumented = this.instrument(tasks);
    return this.delegate.invokeAll(instrumented);
}

public <T> @Nonnull List<Future<T>> invokeAll(@Nonnull Collection<? extends Callable<T>> tasks, long timeout, @Nonnull TimeUnit unit) throws InterruptedException {
    this.submitted.mark(tasks.size());
    Collection<? extends Callable<T>> instrumented = this.instrument(tasks);
    return this.delegate.invokeAll(instrumented, timeout, unit);
}

public <T> @Nonnull T invokeAny(@Nonnull Collection<? extends Callable<T>> tasks) throws ExecutionException, InterruptedException {
    this.submitted.mark(tasks.size());
    Collection<? extends Callable<T>> instrumented = this.instrument(tasks);
    return this.delegate.invokeAny(instrumented);
}

public <T> @Nonnull T invokeAny(@Nonnull Collection<? extends Callable<T>> tasks, long timeout, @Nonnull TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {
    this.submitted.mark(tasks.size());
    Collection<? extends Callable<T>> instrumented = this.instrument(tasks);
    return this.delegate.invokeAny(instrumented, timeout, unit);
}

private <T> @Nonnull Collection<? extends Callable<T>> instrument(@Nonnull Collection<? extends Callable<T>> tasks) {
    List<InstrumentedCallable<T>> instrumented = new ArrayList<>(tasks.size());

    for (Callable<T> task : tasks) {
        instrumented.add(new InstrumentedCallable<>(task));
    }

    return instrumented;
}

public void shutdown() {
    this.delegate.shutdown();
}

public @Nonnull List<Runnable> shutdownNow() {
    return this.delegate.shutdownNow();
}

public boolean isShutdown() {
    return this.delegate.isShutdown();
}

public boolean isTerminated() {
    return this.delegate.isTerminated();
}

public boolean awaitTermination(long timeout, @Nonnull TimeUnit timeUnit) throws InterruptedException {
    return this.delegate.awaitTermination(timeout, timeUnit);
}

private class InstrumentedRunnable implements Runnable {
    private final Runnable task;
    private final Stopwatch idleStopwatch;

    InstrumentedRunnable(Runnable task) {
        this.task = task;
        this.idleStopwatch = idle.stopwatch();
    }

    public void run() {
        idleStopwatch.stop();
        running.inc();

        try {
            Stopwatch durationContext = duration.stopwatch();

            try {
                task.run();
            } catch (Throwable ex) {
                if (durationContext != null) {
                    try {
                        durationContext.stop();
                    } catch (Throwable subEx) {
                        ex.addSuppressed(subEx);
                    }
                }

                throw ex;
            }

            if (durationContext != null) {
                durationContext.stop();
            }
        } finally {
            running.dec();
            completed.mark();
        }

    }
}

private class InstrumentedCallable<T> implements Callable<T> {
    private final Callable<T> callable;
    private final Stopwatch idleStopwatch;

    InstrumentedCallable(Callable<T> callable) {
        this.callable = callable;
        this.idleStopwatch = idle.stopwatch();
    }

    public T call() throws Exception {
        idleStopwatch.stop();
        running.inc();

        Object result;
        try {
            Stopwatch durationStopwatch = duration.stopwatch();

            try {
                result = this.callable.call();
            } catch (Throwable ex) {
                if (durationStopwatch != null) {
                    try {
                        durationStopwatch.stop();
                    } catch (Throwable subEx) {
                        ex.addSuppressed(subEx);
                    }
                }

                throw ex;
            }

            if (durationStopwatch != null) {
                durationStopwatch.stop();
            }
        } finally {
            running.dec();
            completed.mark();
        }

        return (T) result;
    }
}

}
`

and
`
public class InstrumentedScheduledExecutorService implements ScheduledExecutorService {
private static final AtomicLong NAME_COUNTER = new AtomicLong();
private final ScheduledExecutorService delegate;
private final Rate submitted;
private final Counter running;
private final Rate completed;
private final Timer duration;
private final Rate scheduledOnce;
private final Rate scheduledRepetitively;
private final Counter scheduledOverrun;
private final Histogram percentOfPeriod;

public InstrumentedScheduledExecutorService(ScheduledExecutorService delegate, MetricRegistry registry) {
    this(delegate, registry, "instrumented-scheduled-executor-service-" + NAME_COUNTER.incrementAndGet());
}

public InstrumentedScheduledExecutorService(ScheduledExecutorService delegate, MetricRegistry registry, String name) {
    this.delegate = delegate;
    this.submitted = registry.rate(MetricName.name(name, "submitted"));
    this.running = registry.counter(MetricName.name(name, "running"));
    this.completed = registry.rate(MetricName.name(name, "completed"));
    this.duration = registry.timer(MetricName.name(name, "duration"));
    this.scheduledOnce = registry.rate(MetricName.name(name, "scheduled.once"));
    this.scheduledRepetitively = registry.rate(MetricName.name(name, "scheduled.repetitively"));
    this.scheduledOverrun = registry.counter(MetricName.name(name, "scheduled.overrun"));
    this.percentOfPeriod = registry.histogram(MetricName.name(name, "scheduled.percent-of-period"));
}

@Override
public @Nonnull ScheduledFuture<?> schedule(@Nonnull Runnable command, long delay, @Nonnull TimeUnit unit) {
    this.scheduledOnce.mark();
    return this.delegate.schedule(new InstrumentedRunnable(command), delay, unit);
}

@Override
public <V> @Nonnull ScheduledFuture<V> schedule(@Nonnull Callable<V> callable, long delay, @Nonnull TimeUnit unit) {
    this.scheduledOnce.mark();
    return this.delegate.schedule(new InstrumentedCallable<>(callable), delay, unit);
}

@Override
public @Nonnull ScheduledFuture<?> scheduleAtFixedRate(@Nonnull Runnable command, long initialDelay, long period, @Nonnull TimeUnit unit) {
    this.scheduledRepetitively.mark();
    return this.delegate.scheduleAtFixedRate(new InstrumentedPeriodicRunnable(command, period, unit), initialDelay, period, unit);
}

@Override
public @Nonnull ScheduledFuture<?> scheduleWithFixedDelay(@Nonnull Runnable command, long initialDelay, long delay, @Nonnull TimeUnit unit) {
    this.scheduledRepetitively.mark();
    return this.delegate.scheduleWithFixedDelay(new InstrumentedRunnable(command), initialDelay, delay, unit);
}

@Override
public void shutdown() {
    this.delegate.shutdown();
}

@Override
public @Nonnull List<Runnable> shutdownNow() {
    return this.delegate.shutdownNow();
}

@Override
public boolean isShutdown() {
    return this.delegate.isShutdown();
}

@Override
public boolean isTerminated() {
    return this.delegate.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, @Nonnull TimeUnit unit) throws InterruptedException {
    return this.delegate.awaitTermination(timeout, unit);
}

@Override
public <T> @Nonnull Future<T> submit(@Nonnull Callable<T> task) {
    this.submitted.mark();
    return this.delegate.submit(new InstrumentedCallable<>(task));
}

@Override
public <T> @Nonnull Future<T> submit(@Nonnull Runnable task, T result) {
    this.submitted.mark();
    return this.delegate.submit(new InstrumentedRunnable(task), result);
}

@Override
public @Nonnull Future<?> submit(@Nonnull Runnable task) {
    this.submitted.mark();
    return this.delegate.submit(new InstrumentedRunnable(task));
}

@Override
public <T> @Nonnull List<Future<T>> invokeAll(@Nonnull Collection<? extends Callable<T>> tasks) throws InterruptedException {
    this.submitted.mark(tasks.size());
    Collection<? extends Callable<T>> instrumented = this.instrument(tasks);
    return this.delegate.invokeAll(instrumented);
}

@Override
public <T> @Nonnull List<Future<T>> invokeAll(@Nonnull Collection<? extends Callable<T>> tasks, long timeout, @Nonnull TimeUnit unit) throws InterruptedException {
    this.submitted.mark(tasks.size());
    Collection<? extends Callable<T>> instrumented = this.instrument(tasks);
    return this.delegate.invokeAll(instrumented, timeout, unit);
}

@Override
public <T> @Nonnull T invokeAny(@Nonnull Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
    this.submitted.mark(tasks.size());
    Collection<? extends Callable<T>> instrumented = this.instrument(tasks);
    return this.delegate.invokeAny(instrumented);
}

@Override
public <T> @Nonnull T invokeAny(@Nonnull Collection<? extends Callable<T>> tasks, long timeout, @Nonnull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    this.submitted.mark(tasks.size());
    Collection<? extends Callable<T>> instrumented = this.instrument(tasks);
    return this.delegate.invokeAny(instrumented, timeout, unit);
}

private <T> @Nonnull Collection<? extends Callable<T>> instrument(@Nonnull Collection<? extends Callable<T>> tasks) {
    List<InstrumentedCallable<T>> instrumented = new ArrayList<>(tasks.size());

    for (Callable<T> task : tasks) {
        instrumented.add(new InstrumentedCallable<>(task));
    }

    return instrumented;
}

@Override
public void execute(@Nonnull Runnable command) {
    this.submitted.mark();
    this.delegate.execute(new InstrumentedRunnable(command));
}

private class InstrumentedRunnable implements Runnable {
    private final Runnable command;

    InstrumentedRunnable(Runnable command) {
        this.command = command;
    }

    @Override
    public void run() {
        running.inc();
        Stopwatch context = duration.stopwatch();

        try {
            this.command.run();
        } finally {
            context.stop();
            running.dec();
            completed.mark();
        }

    }
}

private class InstrumentedCallable<T> implements Callable<T> {
    private final Callable<T> task;

    InstrumentedCallable(Callable<T> task) {
        this.task = task;
    }

    @Override
    public T call() throws Exception {
        running.inc();
        Stopwatch context = duration.stopwatch();

        Object result;
        try {
            result = this.task.call();
        } finally {
            context.stop();
            running.dec();
            completed.mark();
        }

        return (T) result;
    }
}

private class InstrumentedPeriodicRunnable implements Runnable {
    private final Runnable command;
    private final long periodInNanos;

    InstrumentedPeriodicRunnable(Runnable command, long period, TimeUnit unit) {
        this.command = command;
        this.periodInNanos = unit.toNanos(period);
    }

    @Override
    public void run() {
        running.inc();
        Stopwatch context = duration.stopwatch();

        try {
            this.command.run();
        } finally {
            long elapsed = context.stop();
            running.dec();
            completed.mark();
            if (elapsed > this.periodInNanos) {
                scheduledOverrun.inc();
            }

            percentOfPeriod.update(100L * elapsed / this.periodInNanos);
        }
    }
}

}
`

@devromik devromik added this to the 4.1 milestone Sep 23, 2024
@devromik devromik added the enhancement New feature or request label Sep 23, 2024
@devromik devromik self-assigned this Sep 23, 2024
@devromik
Copy link
Collaborator

Hi @jeremyspb, thank you! We will complete this task within the upcoming release_4-1.

@devromik devromik added feature and removed enhancement New feature or request labels Sep 23, 2024
devromik added a commit that referenced this issue Oct 1, 2024
devromik added a commit that referenced this issue Oct 1, 2024
devromik added a commit that referenced this issue Oct 1, 2024
devromik added a commit that referenced this issue Oct 1, 2024
devromik added a commit that referenced this issue Oct 3, 2024
#57: metrics for ExecutorService and ScheduledExecutorService
@devromik devromik closed this as completed Oct 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants