Skip to content

Metrics collection #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Dockerfile.hash
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
FROM alpine:latest AS builder

RUN apk update && apk add --no-cache g++ openssl-dev
RUN apk update && apk add --no-cache g++ openssl-dev cmake make curl-dev
RUN apk add bazel --repository=http://dl-cdn.alpinelinux.org/alpine/edge/testing/

WORKDIR /app

COPY scripts/get_prometheus_cpp.sh scripts/
RUN sh scripts/get_prometheus_cpp.sh

COPY worker/src/ ./src/
COPY worker/include/ ./include/
COPY worker/BUILD ./
Expand All @@ -14,9 +17,11 @@ RUN bazel build //:worker

FROM alpine:latest

RUN apk update && apk add --no-cache libstdc++ libgcc libssl3
RUN apk update && apk add --no-cache libstdc++ libgcc libssl3 libcurl

COPY --from=builder /app/bazel-bin/worker /usr/local/bin/worker
COPY --from=builder /app/prometheus-cpp-with-submodules/build/lib/ /usr/lib


WORKDIR /data

Expand Down
17 changes: 17 additions & 0 deletions scripts/get_prometheus_cpp.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash

set -e
wget https://github.com/jupp0r/prometheus-cpp/releases/download/v1.3.0/prometheus-cpp-with-submodules.tar.gz
tar -xvzf prometheus-cpp-with-submodules.tar.gz
cd prometheus-cpp-with-submodules

mkdir build
cd build

cmake .. -DCMAKE_INSTALL_PREFIX:PATH=/usr -DBUILD_SHARED_LIBS=ON -DENABLE_COMPRESSION=OFF

cmake --build . --parallel 4

ctest -V

cmake --install .
6 changes: 4 additions & 2 deletions worker/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ cc_binary(
"src/md_calculator.cpp",
"src/file.cpp",
"src/worker.cpp",
"src/metrics_collector.cpp",
"include/file.hpp",
"include/md_calculator.hpp",
"include/worker.hpp"
"include/worker.hpp",
"include/metrics_collector.hpp",
],
linkopts = ["-lssl", "-lcrypto"],
linkopts = ["-lssl", "-lcrypto", "-lprometheus-cpp-push", "-lprometheus-cpp-core", "-lcurl"],
)
44 changes: 44 additions & 0 deletions worker/include/metrics_collector.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#include <chrono>
#include <prometheus/gateway.h>
#include <prometheus/gauge.h>
#include <prometheus/registry.h>

class MetricsCollector {
prometheus::Gateway gateway;
std::shared_ptr<prometheus::Registry> registry;

struct CPUInfo {
prometheus::Gauge *gauge;

struct Time {
uint64_t user;
uint64_t user_low;
uint64_t sys;
uint64_t idle;
};

Time time;
};

std::unordered_map<std::string, CPUInfo> cpu_usage;

prometheus::Gauge *memory_used_gauge;
prometheus::Gauge *task_processing_time_gauge;

std::atomic<bool> is_running{true};
std::thread thread;

std::atomic<bool> is_task_running;
std::chrono::time_point<std::chrono::high_resolution_clock> task_start;

void GetCPUUsage();
void MainLoop();

public:
MetricsCollector(const char *gateway_address, const char *gateway_port,
const char *worker_name);
~MetricsCollector();

void StartTask();
void StopTask();
};
29 changes: 27 additions & 2 deletions worker/src/main.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
#include "../include/md_calculator.hpp"
#include "../include/metrics_collector.hpp"
#include "../include/worker.hpp"
#include <iostream>
#include <vector>

class HashWorker : public Worker {
MetricsCollector &metrics_collector;

protected:
void DoTask(int client_fd) {
metrics_collector.StartTask();

uint64_t task_size;
UnixRead(client_fd, &task_size, sizeof(task_size));
MDCalculator md_calculator("md5");
Expand All @@ -19,12 +24,32 @@ class HashWorker : public Worker {
uint64_t hash_size = hash.size();
UnixWrite(client_fd, &hash_size, sizeof(hash_size));
UnixWrite(client_fd, hash.data(), hash_size);

metrics_collector.StopTask();
}

public:
HashWorker(MetricsCollector &metrics_collector)
: metrics_collector(metrics_collector) {}
};

int main() {
const char *gateway_address = getenv("METRICS_GATEWAY_ADDRESS");
const char *gateway_port = getenv("METRICS_GATEWAY_PORT");
const char *worker_name = getenv("METRICS_WORKER_NAME");

if (gateway_address == nullptr || gateway_port == nullptr ||
worker_name == nullptr) {
std::cerr << "[ERROR] Environment variables are not fully specified\n"
"[INFO] Specify METRICS_GATEWAY_ADDRESS METRICS_GATEWAY_PORT "
"METRICS_WORKER_NAME\n";
return 1;
}

try {
HashWorker().MainLoop();
MetricsCollector metrics_collector(gateway_address, gateway_port,
worker_name);
HashWorker(metrics_collector).MainLoop();
} catch (WorkerException &e) {
std::cerr << "[ERROR] " << e.what() << std::endl;
return 1;
Expand All @@ -35,4 +60,4 @@ int main() {
}

return 0;
}
}
148 changes: 148 additions & 0 deletions worker/src/metrics_collector.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
#include "../include/metrics_collector.hpp"

#include <chrono>
#include <fstream>
#include <iostream>
#include <thread>
#include <unistd.h>

namespace {
double GetMemoryUsed() {
std::ifstream file("/proc/self/statm");
if (!file.is_open()) {
return 0;
}

long mem_pages = 0;
file >> mem_pages;
file.close();
return mem_pages * (double)getpagesize();
}
} // namespace

MetricsCollector::MetricsCollector(const char *gateway_address,
const char *gateway_port,
const char *worker_name)
: gateway(gateway_address, gateway_port, worker_name),
registry(std::make_shared<prometheus::Registry>()) {
auto &cpu_usage_family = prometheus::BuildGauge()
.Name("cpu_usage")
.Help("CPU Usage in percents")
.Register(*registry);

auto &memory_used_family = prometheus::BuildGauge()
.Name("memory_used")
.Help("Memory used by worker in bytes")
.Register(*registry);

auto &task_processing_time_family =
prometheus::BuildGauge()
.Name("task_processing_time")
.Help("Task processing time (in seconds)")
.Register(*registry);

std::ifstream file("/proc/stat");
int ign;
std::string cpu_name;

while (true) {
CPUInfo cpu;

file >> cpu_name >> cpu.time.user >> cpu.time.user_low >> cpu.time.sys >>
cpu.time.idle >> ign >> ign >> ign >> ign >> ign >> ign;

if (cpu_name.find("cpu") != 0)
break;

cpu.gauge = &cpu_usage_family.Add({{"cpu", std::string(cpu_name)}});
cpu_usage.insert({std::string(cpu_name), cpu});
}

file.close();

memory_used_gauge = &memory_used_family.Add({});
task_processing_time_gauge = &task_processing_time_family.Add({});

gateway.RegisterCollectable(registry);
thread = std::thread(&MetricsCollector::MainLoop, this);
is_task_running = false;
}

void MetricsCollector::MainLoop() {
while (is_running) {
std::this_thread::sleep_for(std::chrono::seconds(1));

memory_used_gauge->Set(::GetMemoryUsed());
GetCPUUsage();

if (is_task_running) {
auto cur_time = std::chrono::high_resolution_clock::now();
task_processing_time_gauge->Set(
std::chrono::duration<double>(cur_time - task_start).count());
} else {
task_processing_time_gauge->Set(0);
}

int status = gateway.PushAdd();
if (status != 200) {
std::cerr << "[ERROR] Failed to push metrics. Status " << status
<< std::endl;
}
}
}

MetricsCollector::~MetricsCollector() {
is_running = false;
thread.join();
}

void MetricsCollector::GetCPUUsage() {
std::ifstream file("/proc/stat");
CPUInfo::Time cur_time;
double percent;

std::string cpu_name;
int ign;

while (true) {
file >> cpu_name >> cur_time.user >> cur_time.user_low >> cur_time.sys >>
cur_time.idle >> ign >> ign >> ign >> ign >> ign >> ign;

if (cpu_name.find("cpu") != 0)
break;

CPUInfo &cpu = cpu_usage[cpu_name];
if (cur_time.user < cpu.time.user ||
cur_time.user_low < cpu.time.user_low || cur_time.sys < cpu.time.sys ||
cur_time.idle < cpu.time.idle) {
// overflow detection
percent = -1.0;
} else {
uint64_t total = (cur_time.user - cpu.time.user) +
(cur_time.user_low - cpu.time.user_low) +
(cur_time.sys - cpu.time.sys);

percent = total;
total += (cur_time.idle - cpu.time.idle);
percent = (total == 0) ? -1.0 : (percent / total) * 100.0;
}

cpu.time = cur_time;
cpu.gauge->Set(percent);
}

file.close();
}

void MetricsCollector::StartTask() {
is_task_running = true;
task_start = std::chrono::high_resolution_clock::now();
task_processing_time_gauge->Set(0);
gateway.PushAdd();
}

void MetricsCollector::StopTask() {
is_task_running = false;
task_processing_time_gauge->Set(0);
gateway.PushAdd();
}
Loading