Skip to content

Commit b3957d5

Browse files
authored
Merge pull request #11 from moevm/metrics-collection
Metrics collection
2 parents b9fa9e8 + c0b67bc commit b3957d5

File tree

6 files changed

+247
-6
lines changed

6 files changed

+247
-6
lines changed

Dockerfile.hash

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
FROM alpine:latest AS builder
22

3-
RUN apk update && apk add --no-cache g++ openssl-dev
3+
RUN apk update && apk add --no-cache g++ openssl-dev cmake make curl-dev
44
RUN apk add bazel --repository=http://dl-cdn.alpinelinux.org/alpine/edge/testing/
55

66
WORKDIR /app
77

8+
COPY scripts/get_prometheus_cpp.sh scripts/
9+
RUN sh scripts/get_prometheus_cpp.sh
10+
811
COPY worker/src/ ./src/
912
COPY worker/include/ ./include/
1013
COPY worker/BUILD ./
@@ -14,9 +17,11 @@ RUN bazel build //:worker
1417

1518
FROM alpine:latest
1619

17-
RUN apk update && apk add --no-cache libstdc++ libgcc libssl3
20+
RUN apk update && apk add --no-cache libstdc++ libgcc libssl3 libcurl
1821

1922
COPY --from=builder /app/bazel-bin/worker /usr/local/bin/worker
23+
COPY --from=builder /app/prometheus-cpp-with-submodules/build/lib/ /usr/lib
24+
2025

2126
WORKDIR /data
2227

scripts/get_prometheus_cpp.sh

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#!/bin/bash
2+
3+
set -e
4+
wget https://github.com/jupp0r/prometheus-cpp/releases/download/v1.3.0/prometheus-cpp-with-submodules.tar.gz
5+
tar -xvzf prometheus-cpp-with-submodules.tar.gz
6+
cd prometheus-cpp-with-submodules
7+
8+
mkdir build
9+
cd build
10+
11+
cmake .. -DCMAKE_INSTALL_PREFIX:PATH=/usr -DBUILD_SHARED_LIBS=ON -DENABLE_COMPRESSION=OFF
12+
13+
cmake --build . --parallel 4
14+
15+
ctest -V
16+
17+
cmake --install .

worker/BUILD

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ cc_binary(
55
"src/md_calculator.cpp",
66
"src/file.cpp",
77
"src/worker.cpp",
8+
"src/metrics_collector.cpp",
89
"include/file.hpp",
910
"include/md_calculator.hpp",
10-
"include/worker.hpp"
11+
"include/worker.hpp",
12+
"include/metrics_collector.hpp",
1113
],
12-
linkopts = ["-lssl", "-lcrypto"],
14+
linkopts = ["-lssl", "-lcrypto", "-lprometheus-cpp-push", "-lprometheus-cpp-core", "-lcurl"],
1315
)
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#include <chrono>
2+
#include <prometheus/gateway.h>
3+
#include <prometheus/gauge.h>
4+
#include <prometheus/registry.h>
5+
6+
class MetricsCollector {
7+
prometheus::Gateway gateway;
8+
std::shared_ptr<prometheus::Registry> registry;
9+
10+
struct CPUInfo {
11+
prometheus::Gauge *gauge;
12+
13+
struct Time {
14+
uint64_t user;
15+
uint64_t user_low;
16+
uint64_t sys;
17+
uint64_t idle;
18+
};
19+
20+
Time time;
21+
};
22+
23+
std::unordered_map<std::string, CPUInfo> cpu_usage;
24+
25+
prometheus::Gauge *memory_used_gauge;
26+
prometheus::Gauge *task_processing_time_gauge;
27+
28+
std::atomic<bool> is_running{true};
29+
std::thread thread;
30+
31+
std::atomic<bool> is_task_running;
32+
std::chrono::time_point<std::chrono::high_resolution_clock> task_start;
33+
34+
void GetCPUUsage();
35+
void MainLoop();
36+
37+
public:
38+
MetricsCollector(const char *gateway_address, const char *gateway_port,
39+
const char *worker_name);
40+
~MetricsCollector();
41+
42+
void StartTask();
43+
void StopTask();
44+
};

worker/src/main.cpp

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
#include "../include/md_calculator.hpp"
2+
#include "../include/metrics_collector.hpp"
23
#include "../include/worker.hpp"
34
#include <iostream>
45
#include <vector>
56

67
class HashWorker : public Worker {
8+
MetricsCollector &metrics_collector;
9+
710
protected:
811
void DoTask(int client_fd) {
12+
metrics_collector.StartTask();
13+
914
uint64_t task_size;
1015
UnixRead(client_fd, &task_size, sizeof(task_size));
1116
MDCalculator md_calculator("md5");
@@ -19,12 +24,32 @@ class HashWorker : public Worker {
1924
uint64_t hash_size = hash.size();
2025
UnixWrite(client_fd, &hash_size, sizeof(hash_size));
2126
UnixWrite(client_fd, hash.data(), hash_size);
27+
28+
metrics_collector.StopTask();
2229
}
30+
31+
public:
32+
HashWorker(MetricsCollector &metrics_collector)
33+
: metrics_collector(metrics_collector) {}
2334
};
2435

2536
int main() {
37+
const char *gateway_address = getenv("METRICS_GATEWAY_ADDRESS");
38+
const char *gateway_port = getenv("METRICS_GATEWAY_PORT");
39+
const char *worker_name = getenv("METRICS_WORKER_NAME");
40+
41+
if (gateway_address == nullptr || gateway_port == nullptr ||
42+
worker_name == nullptr) {
43+
std::cerr << "[ERROR] Environment variables are not fully specified\n"
44+
"[INFO] Specify METRICS_GATEWAY_ADDRESS METRICS_GATEWAY_PORT "
45+
"METRICS_WORKER_NAME\n";
46+
return 1;
47+
}
48+
2649
try {
27-
HashWorker().MainLoop();
50+
MetricsCollector metrics_collector(gateway_address, gateway_port,
51+
worker_name);
52+
HashWorker(metrics_collector).MainLoop();
2853
} catch (WorkerException &e) {
2954
std::cerr << "[ERROR] " << e.what() << std::endl;
3055
return 1;
@@ -35,4 +60,4 @@ int main() {
3560
}
3661

3762
return 0;
38-
}
63+
}

worker/src/metrics_collector.cpp

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
#include "../include/metrics_collector.hpp"
2+
3+
#include <chrono>
4+
#include <fstream>
5+
#include <iostream>
6+
#include <thread>
7+
#include <unistd.h>
8+
9+
namespace {
10+
double GetMemoryUsed() {
11+
std::ifstream file("/proc/self/statm");
12+
if (!file.is_open()) {
13+
return 0;
14+
}
15+
16+
long mem_pages = 0;
17+
file >> mem_pages;
18+
file.close();
19+
return mem_pages * (double)getpagesize();
20+
}
21+
} // namespace
22+
23+
MetricsCollector::MetricsCollector(const char *gateway_address,
24+
const char *gateway_port,
25+
const char *worker_name)
26+
: gateway(gateway_address, gateway_port, worker_name),
27+
registry(std::make_shared<prometheus::Registry>()) {
28+
auto &cpu_usage_family = prometheus::BuildGauge()
29+
.Name("cpu_usage")
30+
.Help("CPU Usage in percents")
31+
.Register(*registry);
32+
33+
auto &memory_used_family = prometheus::BuildGauge()
34+
.Name("memory_used")
35+
.Help("Memory used by worker in bytes")
36+
.Register(*registry);
37+
38+
auto &task_processing_time_family =
39+
prometheus::BuildGauge()
40+
.Name("task_processing_time")
41+
.Help("Task processing time (in seconds)")
42+
.Register(*registry);
43+
44+
std::ifstream file("/proc/stat");
45+
int ign;
46+
std::string cpu_name;
47+
48+
while (true) {
49+
CPUInfo cpu;
50+
51+
file >> cpu_name >> cpu.time.user >> cpu.time.user_low >> cpu.time.sys >>
52+
cpu.time.idle >> ign >> ign >> ign >> ign >> ign >> ign;
53+
54+
if (cpu_name.find("cpu") != 0)
55+
break;
56+
57+
cpu.gauge = &cpu_usage_family.Add({{"cpu", std::string(cpu_name)}});
58+
cpu_usage.insert({std::string(cpu_name), cpu});
59+
}
60+
61+
file.close();
62+
63+
memory_used_gauge = &memory_used_family.Add({});
64+
task_processing_time_gauge = &task_processing_time_family.Add({});
65+
66+
gateway.RegisterCollectable(registry);
67+
thread = std::thread(&MetricsCollector::MainLoop, this);
68+
is_task_running = false;
69+
}
70+
71+
void MetricsCollector::MainLoop() {
72+
while (is_running) {
73+
std::this_thread::sleep_for(std::chrono::seconds(1));
74+
75+
memory_used_gauge->Set(::GetMemoryUsed());
76+
GetCPUUsage();
77+
78+
if (is_task_running) {
79+
auto cur_time = std::chrono::high_resolution_clock::now();
80+
task_processing_time_gauge->Set(
81+
std::chrono::duration<double>(cur_time - task_start).count());
82+
} else {
83+
task_processing_time_gauge->Set(0);
84+
}
85+
86+
int status = gateway.PushAdd();
87+
if (status != 200) {
88+
std::cerr << "[ERROR] Failed to push metrics. Status " << status
89+
<< std::endl;
90+
}
91+
}
92+
}
93+
94+
MetricsCollector::~MetricsCollector() {
95+
is_running = false;
96+
thread.join();
97+
}
98+
99+
void MetricsCollector::GetCPUUsage() {
100+
std::ifstream file("/proc/stat");
101+
CPUInfo::Time cur_time;
102+
double percent;
103+
104+
std::string cpu_name;
105+
int ign;
106+
107+
while (true) {
108+
file >> cpu_name >> cur_time.user >> cur_time.user_low >> cur_time.sys >>
109+
cur_time.idle >> ign >> ign >> ign >> ign >> ign >> ign;
110+
111+
if (cpu_name.find("cpu") != 0)
112+
break;
113+
114+
CPUInfo &cpu = cpu_usage[cpu_name];
115+
if (cur_time.user < cpu.time.user ||
116+
cur_time.user_low < cpu.time.user_low || cur_time.sys < cpu.time.sys ||
117+
cur_time.idle < cpu.time.idle) {
118+
// overflow detection
119+
percent = -1.0;
120+
} else {
121+
uint64_t total = (cur_time.user - cpu.time.user) +
122+
(cur_time.user_low - cpu.time.user_low) +
123+
(cur_time.sys - cpu.time.sys);
124+
125+
percent = total;
126+
total += (cur_time.idle - cpu.time.idle);
127+
percent = (total == 0) ? -1.0 : (percent / total) * 100.0;
128+
}
129+
130+
cpu.time = cur_time;
131+
cpu.gauge->Set(percent);
132+
}
133+
134+
file.close();
135+
}
136+
137+
void MetricsCollector::StartTask() {
138+
is_task_running = true;
139+
task_start = std::chrono::high_resolution_clock::now();
140+
task_processing_time_gauge->Set(0);
141+
gateway.PushAdd();
142+
}
143+
144+
void MetricsCollector::StopTask() {
145+
is_task_running = false;
146+
task_processing_time_gauge->Set(0);
147+
gateway.PushAdd();
148+
}

0 commit comments

Comments
 (0)