From b0198425efaeca643ccaf607c5e555577e3f45a2 Mon Sep 17 00:00:00 2001 From: Gorsky Kirill Date: Tue, 8 Apr 2025 20:16:07 +0300 Subject: [PATCH 1/5] add metrics collector class --- .devcontainer.json | 14 +++ .gitignore | 1 + Dockerfile | 13 +++ Makefile | 13 +++ scripts/build.sh | 4 + scripts/get-prometheus-cpp.sh | 17 ++++ src/Makefile | 2 - src/hash.cpp | 55 ++++++++++-- src/metrics-collector.cpp | 157 ++++++++++++++++++++++++++++++++++ src/metrics-collector.hpp | 36 ++++++++ 10 files changed, 303 insertions(+), 9 deletions(-) create mode 100644 .devcontainer.json create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 Makefile create mode 100755 scripts/build.sh create mode 100644 scripts/get-prometheus-cpp.sh delete mode 100644 src/Makefile create mode 100644 src/metrics-collector.cpp create mode 100644 src/metrics-collector.hpp diff --git a/.devcontainer.json b/.devcontainer.json new file mode 100644 index 0000000..818d152 --- /dev/null +++ b/.devcontainer.json @@ -0,0 +1,14 @@ +{ + "image": "qemu-riscv-cluster/grpc_server", + "runArgs": [], + "customizations": { + "vscode": { + "extensions": [ + "ms-vscode.cpptools", + "EditorConfig.EditorConfig" + ] + } + }, + "workspaceMount": "source=${localWorkspaceFolder},target=/app,type=bind", + "workspaceFolder": "/app" +} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..722d5e7 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.vscode diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..1110b75 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,13 @@ +FROM alpine:3.16.3 + +RUN apk update && apk add g++ cmake make openssl-dev libcurl curl-dev + +WORKDIR /tmp +COPY scripts/get-prometheus-cpp.sh scripts/ +RUN sh scripts/get-prometheus-cpp.sh + +WORKDIR /app +COPY . . +RUN make + +ENTRYPOINT [ "build/grpc_server" ] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..bdf782f --- /dev/null +++ b/Makefile @@ -0,0 +1,13 @@ +.PHONY: all clean + +all: build/grpc_server + +build: + mkdir build + +build/grpc_server: build src/hash.cpp src/metrics-collector.cpp src/metrics-collector.hpp + g++ -Wall -Werror -o build/grpc_server src/hash.cpp src/metrics-collector.cpp -lssl -lcrypto \ + $(shell pkg-config --libs prometheus-cpp-core prometheus-cpp-push) + +clean: + rm -rf build diff --git a/scripts/build.sh b/scripts/build.sh new file mode 100755 index 0000000..d4c6f43 --- /dev/null +++ b/scripts/build.sh @@ -0,0 +1,4 @@ +#!/bin/bash +set -e + +docker build -t qemu-riscv-cluster/grpc_server . diff --git a/scripts/get-prometheus-cpp.sh b/scripts/get-prometheus-cpp.sh new file mode 100644 index 0000000..0b38020 --- /dev/null +++ b/scripts/get-prometheus-cpp.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +set -e +wget https://github.com/jupp0r/prometheus-cpp/releases/download/v1.3.0/prometheus-cpp-with-submodules.tar.gz +tar -xvzf prometheus-cpp-with-submodules.tar.gz +cd prometheus-cpp-with-submodules + +mkdir build +cd build + +cmake .. -DBUILD_SHARED_LIBS=ON -DENABLE_COMPRESSION=OFF + +cmake --build . --parallel 4 + +ctest -V + +cmake --install . diff --git a/src/Makefile b/src/Makefile deleted file mode 100644 index 1034722..0000000 --- a/src/Makefile +++ /dev/null @@ -1,2 +0,0 @@ -all: - g++ -o hash hash.cpp -lssl -lcrypto diff --git a/src/hash.cpp b/src/hash.cpp index 9ee21ca..d5d8450 100644 --- a/src/hash.cpp +++ b/src/hash.cpp @@ -5,7 +5,10 @@ #include -int calculate_hash(char* path, char* algorithm, char* result) { +#include +#include "metrics-collector.hpp" + +int calculate_hash(char* path, char* algorithm) { std::ifstream stream(path, std::ios::in | std::ios::binary); if (!stream.good()) { @@ -37,7 +40,7 @@ int calculate_hash(char* path, char* algorithm, char* result) { stream.close(); std::cout << "Digest is: "; - for (int i = 0; i < md_len; ++i) { + for (int i = 0; i < (int)md_len; ++i) { std::cout << std::hex << std::setw(2) << std::setfill('0') << static_cast(md_value[i]); } std::cout << std::endl; @@ -46,10 +49,34 @@ int calculate_hash(char* path, char* algorithm, char* result) { return 0; } -int main(int argc, char* argv[]) { +int main_metrics(int argc, char* argv[]) { + if (argc != 5) { + std::cerr << "usage: " << argv[0] << ' ' << argv[1] + << " ADDRESS PORT WORKER_NAME" << std::endl; + return 1; + } + + MetricsCollector metrics_collector(argv[2], argv[3], argv[4]); - if (argc < 3) { - std::cerr << "usage: " << argv[0] + srand(time(NULL)); + while (true) { + std::this_thread::sleep_for(std::chrono::seconds(rand() % 5 + 1)); + metrics_collector.startTask(); + + // some "work" that requires memory + void *data = malloc(rand() % (1024 * 1024 * 50)); // 0 - 50 megabytes + std::this_thread::sleep_for(std::chrono::seconds(rand() % 5 + 1)); + free(data); + + metrics_collector.stopTask(); + } + + return 0; +} + +int main_hash(int argc, char* argv[]) { + if (argc != 4) { + std::cerr << "usage: " << argv[0] << ' ' << argv[1] << " PATH ALGORITHM" << std::endl << "Supported algorithms are: " @@ -60,11 +87,25 @@ int main(int argc, char* argv[]) { return 1; } - char* result; try { - calculate_hash(argv[1], argv[2], result); + calculate_hash(argv[2], argv[3]); } catch(const std::exception& e) { std::cerr << "Error: " << e.what() << std::endl; } return 0; } + +int main(int argc, char* argv[]) { + if (argc < 2) { + std::cerr << "usage: " << argv[0] << " hash/metrics" << std::endl; + return 1; + } + + if (strcmp(argv[1], "hash") == 0) + return main_hash(argc, argv); + if (strcmp(argv[1], "metrics") == 0) + return main_metrics(argc, argv); + + std::cerr << "specify hash/metrics" << std::endl; + return 1; +} diff --git a/src/metrics-collector.cpp b/src/metrics-collector.cpp new file mode 100644 index 0000000..54a27c6 --- /dev/null +++ b/src/metrics-collector.cpp @@ -0,0 +1,157 @@ +#include "metrics-collector.hpp" + +#include +#include +#include +#include + +MetricsCollector::MetricsCollector(const char *gateway_address, const char *gateway_port, const char *worker_name) + : gateway(gateway_address, gateway_port, worker_name), + registry(std::make_shared()) +{ + auto &cpu_usage_family = prometheus::BuildGauge() + .Name("cpu_usage") + .Help("CPU Usage in percents") + .Register(*registry); + + auto &memory_used_family = prometheus::BuildGauge() + .Name("memory_used") + .Help("Memory used by worker in bytes") + .Register(*registry); + + auto &task_processing_time_family = prometheus::BuildGauge() + .Name("task_processing_time") + .Help("Task processing time (in seconds)") + .Register(*registry); + + FILE *file = fopen("/proc/stat", "r"); + char cpu_name[32]; + + while (true) { + CPUInfo cpu; + + fscanf(file, "%s %lu %lu %lu %lu %*u %*u %*u %*u %*u %*u", + cpu_name, + &cpu.last_total_user, &cpu.last_total_user_low, + &cpu.last_total_sys, &cpu.last_total_idle + ); + + if (strstr(cpu_name, "cpu") != cpu_name) + break; + + cpu.gauge = &cpu_usage_family.Add({{ "cpu", std::string(cpu_name) }}); + cpu_usage.insert({ std::string(cpu_name), cpu }); + } + + fclose(file); + + memory_used_gauge = &memory_used_family.Add({}); + task_processing_time_gauge = &task_processing_time_family.Add({}); + + gateway.RegisterCollectable(registry); + + is_running = true; + thread = std::thread(&MetricsCollector::mainLoop, this); + + is_task_running = false; +} + +void MetricsCollector::mainLoop() +{ + while (is_running) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + + getMemoryUsed(); + getCPUUsage(); + + if (is_task_running) { + struct timespec time; + clock_gettime(CLOCK_MONOTONIC, &time); + task_processing_time_gauge->Set((time.tv_sec - task_start.tv_sec) + (time.tv_nsec - task_start.tv_nsec) * 1e-9); + } + else + task_processing_time_gauge->Set(0); + + gateway.PushAdd(); + } +} + +MetricsCollector::~MetricsCollector() +{ + is_running = false; + thread.join(); +} + +void MetricsCollector::getMemoryUsed() +{ + FILE *file = fopen("/proc/self/statm", "r"); + if (file == NULL) { + memory_used_gauge->Set(0); + return; + } + + long mem_pages = 0; + fscanf(file, "%ld", &mem_pages); + fclose(file); + + memory_used_gauge->Set(mem_pages * (double)getpagesize()); +} + +void MetricsCollector::getCPUUsage() +{ + FILE *file = fopen("/proc/stat", "r"); + uint64_t total_user, total_user_low, total_sys, total_idle, total; + double percent; + + char cpu_name[32]; + + while (true) { + fscanf(file, "%s %lu %lu %lu %lu %*u %*u %*u %*u %*u %*u", + cpu_name, + &total_user, &total_user_low, + &total_sys, &total_idle + ); + + + if (strstr(cpu_name, "cpu") != cpu_name) + break; + + CPUInfo &cpu = cpu_usage[cpu_name]; + if (total_user < cpu.last_total_user || total_user_low < cpu.last_total_user_low || + total_sys < cpu.last_total_sys || total_idle < cpu.last_total_idle) { + // overflow detection + percent = -1.0; + } + else { + total = (total_user - cpu.last_total_user) + (total_user_low - cpu.last_total_user_low) + + (total_sys - cpu.last_total_sys); + + percent = total; + total += (total_idle - cpu.last_total_idle); + percent /= total; + percent *= 100; + } + + cpu.last_total_user = total_user; + cpu.last_total_user_low = total_user_low; + cpu.last_total_sys = total_sys; + cpu.last_total_idle = total_idle; + + cpu.gauge->Set(percent); + } + + fclose(file); +} + +void MetricsCollector::startTask() +{ + is_task_running = true; + clock_gettime(CLOCK_MONOTONIC, &task_start); +} + +void MetricsCollector::stopTask() +{ + is_task_running = false; + task_processing_time_gauge->Set(0); + gateway.PushAdd(); +} \ No newline at end of file diff --git a/src/metrics-collector.hpp b/src/metrics-collector.hpp new file mode 100644 index 0000000..bc05458 --- /dev/null +++ b/src/metrics-collector.hpp @@ -0,0 +1,36 @@ +#include +#include +#include + +class MetricsCollector { + prometheus::Gateway gateway; + std::shared_ptr registry; + + struct CPUInfo { + prometheus::Gauge *gauge; + uint64_t last_total_user; + uint64_t last_total_user_low; + uint64_t last_total_sys; + uint64_t last_total_idle; + }; + + prometheus::Gauge *memory_used_gauge; + std::unordered_map cpu_usage; + prometheus::Gauge *task_processing_time_gauge; + bool is_running; + std::thread thread; + + bool is_task_running; + struct timespec task_start; + + void getMemoryUsed(); + void getCPUUsage(); + void mainLoop(); + +public: + MetricsCollector(const char *gateway_address, const char *gateway_port, const char *worker_name); + ~MetricsCollector(); + + void startTask(); + void stopTask(); +}; \ No newline at end of file From 60d8ead4f7a88d1d3359f17cdcd661186df8f7a2 Mon Sep 17 00:00:00 2001 From: Gorsky Kirill Date: Tue, 15 Apr 2025 22:55:23 +0300 Subject: [PATCH 2/5] use std::ifstream instead of FILE* --- src/metrics-collector.cpp | 48 ++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/src/metrics-collector.cpp b/src/metrics-collector.cpp index 54a27c6..86c90de 100644 --- a/src/metrics-collector.cpp +++ b/src/metrics-collector.cpp @@ -3,7 +3,7 @@ #include #include #include -#include +#include MetricsCollector::MetricsCollector(const char *gateway_address, const char *gateway_port, const char *worker_name) : gateway(gateway_address, gateway_port, worker_name), @@ -24,26 +24,25 @@ MetricsCollector::MetricsCollector(const char *gateway_address, const char *gate .Help("Task processing time (in seconds)") .Register(*registry); - FILE *file = fopen("/proc/stat", "r"); - char cpu_name[32]; + std::ifstream file("/proc/stat"); + int ign; + std::string cpu_name; while (true) { CPUInfo cpu; - fscanf(file, "%s %lu %lu %lu %lu %*u %*u %*u %*u %*u %*u", - cpu_name, - &cpu.last_total_user, &cpu.last_total_user_low, - &cpu.last_total_sys, &cpu.last_total_idle - ); - - if (strstr(cpu_name, "cpu") != cpu_name) + file >> cpu_name >> cpu.last_total_user >> cpu.last_total_user_low + >> cpu.last_total_sys >> cpu.last_total_idle + >> ign >> ign >> ign >> ign >> ign >> ign; + + if (cpu_name.find("cpu") != 0) break; cpu.gauge = &cpu_usage_family.Add({{ "cpu", std::string(cpu_name) }}); cpu_usage.insert({ std::string(cpu_name), cpu }); } - fclose(file); + file.close(); memory_used_gauge = &memory_used_family.Add({}); task_processing_time_gauge = &task_processing_time_family.Add({}); @@ -84,37 +83,34 @@ MetricsCollector::~MetricsCollector() void MetricsCollector::getMemoryUsed() { - FILE *file = fopen("/proc/self/statm", "r"); - if (file == NULL) { + std::ifstream file("/proc/self/statm"); + if (!file.is_open()) { memory_used_gauge->Set(0); return; } long mem_pages = 0; - fscanf(file, "%ld", &mem_pages); - fclose(file); + file >> mem_pages; + file.close(); memory_used_gauge->Set(mem_pages * (double)getpagesize()); } void MetricsCollector::getCPUUsage() { - FILE *file = fopen("/proc/stat", "r"); + std::ifstream file("/proc/stat"); uint64_t total_user, total_user_low, total_sys, total_idle, total; double percent; - char cpu_name[32]; + std::string cpu_name; + int ign; while (true) { - fscanf(file, "%s %lu %lu %lu %lu %*u %*u %*u %*u %*u %*u", - cpu_name, - &total_user, &total_user_low, - &total_sys, &total_idle - ); - + file >> cpu_name >> total_user >> total_user_low >> total_sys >> total_idle + >> ign >> ign >> ign >> ign >> ign >> ign; - if (strstr(cpu_name, "cpu") != cpu_name) - break; + if (cpu_name.find("cpu") != 0) + break; CPUInfo &cpu = cpu_usage[cpu_name]; if (total_user < cpu.last_total_user || total_user_low < cpu.last_total_user_low || @@ -140,7 +136,7 @@ void MetricsCollector::getCPUUsage() cpu.gauge->Set(percent); } - fclose(file); + file.close(); } void MetricsCollector::startTask() From 3695e63643e8984ea227558969616488c8edf639 Mon Sep 17 00:00:00 2001 From: Gorsky Kirill Date: Sun, 20 Apr 2025 23:09:37 +0300 Subject: [PATCH 3/5] merge main --- .devcontainer.json | 14 -- Dockerfile | 13 - Makefile | 13 - controller/.env | 3 + controller/cmd/grpc_server/main.go | 34 +++ controller/cmd/manager/main.go | 23 ++ controller/cmd/worker/main.go | 9 + controller/go.mod | 20 ++ controller/go.sum | 46 ++++ controller/internal/config/config.go | 48 ++++ .../internal/grpcserver/integration_test.go | 78 ++++++ controller/internal/grpcserver/server.go | 78 ++++++ controller/internal/grpcserver/unit_test.go | 77 ++++++ controller/internal/manager/manager.go | 233 ++++++++++++++++++ controller/internal/worker/worker.go | 122 +++++++++ controller/pkg/binary/binary.go | 25 ++ controller/pkg/converter/converter.go | 20 ++ .../pkg/proto/file_service/file_service.proto | 19 ++ scripts/build.sh | 4 - src/.env | 2 + src/Dockerfile.hash | 21 ++ src/Makefile | 3 + src/data/test.txt | 1 + src/docker-compose.yml | 19 ++ src/file.cpp | 28 +++ src/file.hpp | 8 + {scripts => src}/get-prometheus-cpp.sh | 0 src/hash.cpp | 111 --------- src/main.cpp | 28 +++ src/md_calculator.cpp | 47 ++++ src/md_calculator.hpp | 20 ++ wiki/using_server_grpc.md | 16 ++ wiki/using_tests.md | 5 + 33 files changed, 1033 insertions(+), 155 deletions(-) delete mode 100644 .devcontainer.json delete mode 100644 Dockerfile delete mode 100644 Makefile create mode 100644 controller/.env create mode 100644 controller/cmd/grpc_server/main.go create mode 100644 controller/cmd/manager/main.go create mode 100644 controller/cmd/worker/main.go create mode 100644 controller/go.mod create mode 100644 controller/go.sum create mode 100644 controller/internal/config/config.go create mode 100644 controller/internal/grpcserver/integration_test.go create mode 100644 controller/internal/grpcserver/server.go create mode 100644 controller/internal/grpcserver/unit_test.go create mode 100644 controller/internal/manager/manager.go create mode 100644 controller/internal/worker/worker.go create mode 100644 controller/pkg/binary/binary.go create mode 100644 controller/pkg/converter/converter.go create mode 100644 controller/pkg/proto/file_service/file_service.proto delete mode 100755 scripts/build.sh create mode 100644 src/.env create mode 100644 src/Dockerfile.hash create mode 100644 src/Makefile create mode 100644 src/data/test.txt create mode 100644 src/docker-compose.yml create mode 100644 src/file.cpp create mode 100644 src/file.hpp rename {scripts => src}/get-prometheus-cpp.sh (100%) delete mode 100644 src/hash.cpp create mode 100644 src/main.cpp create mode 100644 src/md_calculator.cpp create mode 100644 src/md_calculator.hpp create mode 100644 wiki/using_server_grpc.md create mode 100644 wiki/using_tests.md diff --git a/.devcontainer.json b/.devcontainer.json deleted file mode 100644 index 818d152..0000000 --- a/.devcontainer.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "image": "qemu-riscv-cluster/grpc_server", - "runArgs": [], - "customizations": { - "vscode": { - "extensions": [ - "ms-vscode.cpptools", - "EditorConfig.EditorConfig" - ] - } - }, - "workspaceMount": "source=${localWorkspaceFolder},target=/app,type=bind", - "workspaceFolder": "/app" -} diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 1110b75..0000000 --- a/Dockerfile +++ /dev/null @@ -1,13 +0,0 @@ -FROM alpine:3.16.3 - -RUN apk update && apk add g++ cmake make openssl-dev libcurl curl-dev - -WORKDIR /tmp -COPY scripts/get-prometheus-cpp.sh scripts/ -RUN sh scripts/get-prometheus-cpp.sh - -WORKDIR /app -COPY . . -RUN make - -ENTRYPOINT [ "build/grpc_server" ] diff --git a/Makefile b/Makefile deleted file mode 100644 index bdf782f..0000000 --- a/Makefile +++ /dev/null @@ -1,13 +0,0 @@ -.PHONY: all clean - -all: build/grpc_server - -build: - mkdir build - -build/grpc_server: build src/hash.cpp src/metrics-collector.cpp src/metrics-collector.hpp - g++ -Wall -Werror -o build/grpc_server src/hash.cpp src/metrics-collector.cpp -lssl -lcrypto \ - $(shell pkg-config --libs prometheus-cpp-core prometheus-cpp-push) - -clean: - rm -rf build diff --git a/controller/.env b/controller/.env new file mode 100644 index 0000000..6548374 --- /dev/null +++ b/controller/.env @@ -0,0 +1,3 @@ +SERVER_HOST=localhost +SERVER_PORT=50051 +MAX_MESSAGE_SIZE=4194304 \ No newline at end of file diff --git a/controller/cmd/grpc_server/main.go b/controller/cmd/grpc_server/main.go new file mode 100644 index 0000000..4f034d0 --- /dev/null +++ b/controller/cmd/grpc_server/main.go @@ -0,0 +1,34 @@ +package main + +import ( + "log" + "net" + "github.com/moevm/grpc_server/internal/config" + "github.com/moevm/grpc_server/internal/grpcserver" + pb "github.com/moevm/grpc_server/pkg/proto/file_service" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" +) + +func main() { + cfg := config.Load() + + lis, err := net.Listen("tcp", net.JoinHostPort(cfg.Host, cfg.Port)) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + + serverOpts := []grpc.ServerOption{ + grpc.MaxRecvMsgSize(cfg.MaxMessageSize), + grpc.MaxSendMsgSize(cfg.MaxMessageSize), + } + + service := grpc.NewServer(serverOpts...) + pb.RegisterFileServiceServer(service, grpcserver.NewServer(cfg.AllowedChars)) + reflection.Register(service) + + log.Printf("Server starting on %s:%s", cfg.Host, cfg.Port) + if err := service.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } +} \ No newline at end of file diff --git a/controller/cmd/manager/main.go b/controller/cmd/manager/main.go new file mode 100644 index 0000000..9b1e731 --- /dev/null +++ b/controller/cmd/manager/main.go @@ -0,0 +1,23 @@ +package main + +import ( + "crypto/rand" + "github.com/moevm/grpc_server/internal/manager" + "log" +) + +func main() { + tasks := make([][]byte, 5) + tasks[0] = make([]byte, 888) + tasks[1] = make([]byte, 1848588) + tasks[2] = make([]byte, 50138788) + tasks[3] = make([]byte, 170338664) + tasks[4] = make([]byte, 558777333) + for i := range tasks { + _, err := rand.Read(tasks[i]) + if err != nil { + log.Fatalln(err) + } + } + manager.ClusterInit(tasks) +} diff --git a/controller/cmd/worker/main.go b/controller/cmd/worker/main.go new file mode 100644 index 0000000..cb6aa7d --- /dev/null +++ b/controller/cmd/worker/main.go @@ -0,0 +1,9 @@ +package main + +import ( + "github.com/moevm/grpc_server/internal/worker" +) + +func main() { + worker.Start() +} diff --git a/controller/go.mod b/controller/go.mod new file mode 100644 index 0000000..258b099 --- /dev/null +++ b/controller/go.mod @@ -0,0 +1,20 @@ +module github.com/moevm/grpc_server + +go 1.23.7 + +require ( + github.com/joho/godotenv v1.5.1 + github.com/stretchr/testify v1.8.4 + google.golang.org/grpc v1.71.1 + google.golang.org/protobuf v1.36.6 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/net v0.39.0 // indirect + golang.org/x/sys v0.32.0 // indirect + golang.org/x/text v0.24.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250407143221-ac9807e6c755 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/controller/go.sum b/controller/go.sum new file mode 100644 index 0000000..f0584f9 --- /dev/null +++ b/controller/go.sum @@ -0,0 +1,46 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= +go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= +go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= +go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= +go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= +go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= +go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= +go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= +go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= +golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY= +golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250407143221-ac9807e6c755 h1:TwXJCGVREgQ/cl18iY0Z4wJCTL/GmW+Um2oSwZiZPnc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250407143221-ac9807e6c755/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/grpc v1.71.1 h1:ffsFWr7ygTUscGPI0KKK6TLrGz0476KUvvsbqWK0rPI= +google.golang.org/grpc v1.71.1/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/controller/internal/config/config.go b/controller/internal/config/config.go new file mode 100644 index 0000000..68bcab0 --- /dev/null +++ b/controller/internal/config/config.go @@ -0,0 +1,48 @@ +package config + +import ( + "log" + "os" + "strconv" + "github.com/joho/godotenv" +) + +type ServerConfig struct { + Host string + Port string + MaxMessageSize int + AllowedChars string +} + +func Load() *ServerConfig { + if err := godotenv.Load(); err != nil { + log.Printf("No .env file found, using default values") + } + + return &ServerConfig{ + Host: getEnv("SERVER_HOST", "localhost"), + Port: getEnv("SERVER_PORT", "50051"), + MaxMessageSize: getEnvAsInt("MAX_MESSAGE_SIZE", 4*1024*1024), // 4MB + AllowedChars: getEnv("ALLOWED_CHARS", "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 !?.,\n"), + } +} + +func getEnv(key, defaultValue string) string { + if value, exists := os.LookupEnv(key); exists { + return value + } + return defaultValue +} + +func getEnvAsInt(key string, defaultValue int) int { + strValue := getEnv(key, "") + if strValue == "" { + return defaultValue + } + intValue, err := strconv.Atoi(strValue) + if err != nil { + log.Printf("Invalid integer value for %s: %s. Using default: %d", key, strValue, defaultValue) + return defaultValue + } + return intValue +} \ No newline at end of file diff --git a/controller/internal/grpcserver/integration_test.go b/controller/internal/grpcserver/integration_test.go new file mode 100644 index 0000000..98c601c --- /dev/null +++ b/controller/internal/grpcserver/integration_test.go @@ -0,0 +1,78 @@ +package grpcserver_test + +import ( + "context" + "net" + "testing" + "time" + "github.com/moevm/grpc_server/internal/config" + "github.com/moevm/grpc_server/internal/grpcserver" + pb "github.com/moevm/grpc_server/pkg/proto/file_service" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func startTestServer(t *testing.T) (string, func()) { + cfg := config.Load() + cfg.Host = "localhost" + cfg.Port = "0" // random port + + lis, err := net.Listen("tcp", net.JoinHostPort(cfg.Host, cfg.Port)) + require.NoError(t, err) + + server := grpc.NewServer( + grpc.MaxRecvMsgSize(cfg.MaxMessageSize), + grpc.MaxSendMsgSize(cfg.MaxMessageSize), + ) + pb.RegisterFileServiceServer(server, grpcserver.NewServer(cfg.AllowedChars)) + + go func() { + _ = server.Serve(lis) + }() + + return lis.Addr().String(), func() { + server.Stop() + lis.Close() + } +} + +func TestIntegration_ServerClientCommunication(t *testing.T) { + addr, stop := startTestServer(t) + defer stop() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + conn, err := grpc.NewClient( + addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { + d := net.Dialer{Timeout: 2 * time.Second} + return d.DialContext(ctx, "tcp", addr) + }), + ) + + require.NoError(t, err) + defer conn.Close() + + client := pb.NewFileServiceClient(conn) + + t.Run("valid text file", func(t *testing.T) { + resp, err := client.UploadFile(ctx, &pb.FileRequest{ + Content: []byte("Valid content"), + FileType: "text", + }) + require.NoError(t, err) + require.True(t, resp.IsValid) + }) + + t.Run("invalid binary file", func(t *testing.T) { + resp, err := client.UploadFile(ctx, &pb.FileRequest{ + Content: make([]byte, 1024), + FileType: "binary", + }) + require.NoError(t, err) + require.True(t, resp.IsValid) + }) +} \ No newline at end of file diff --git a/controller/internal/grpcserver/server.go b/controller/internal/grpcserver/server.go new file mode 100644 index 0000000..e5b2ba8 --- /dev/null +++ b/controller/internal/grpcserver/server.go @@ -0,0 +1,78 @@ +package grpcserver + +import ( + "context" + "unicode" + "unicode/utf8" + pb "github.com/moevm/grpc_server/pkg/proto/file_service" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type Server struct { + pb.UnimplementedFileServiceServer + allowedChars string + charMap map[rune]bool +} + +func NewServer(allowedChars string) *Server { + s := &Server{ + allowedChars: allowedChars, + } + s.charMap = s.initCharMap() + return s +} + +func (s *Server) initCharMap() map[rune]bool { + chars := make(map[rune]bool) + for _, c := range s.allowedChars { + chars[c] = true + } + return chars +} + +func (s *Server) UploadFile(ctx context.Context, req *pb.FileRequest) (*pb.FileResponse, error) { + content := req.GetContent() + fileType := req.GetFileType() + + var isValid bool + size := int64(len(content)) + + switch fileType { + case "text": + isValid = s.validateText(content) + case "binary": + isValid = validateBinary(content) + default: + return nil, status.Errorf(codes.InvalidArgument, "invalid file type: %s", fileType) + } + + msg := "Validation successful" + if !isValid { + msg = "Invalid file content" + } + + return &pb.FileResponse{ + Size: size, + IsValid: isValid, + Message: msg, + }, nil +} + +func (s *Server) validateText(content []byte) bool { + if !utf8.Valid(content) { + return false + } + + str := string(content) + for _, r := range str { + if !s.charMap[r] && !unicode.IsSpace(r) { + return false + } + } + return true +} + +func validateBinary(content []byte) bool { + return true +} \ No newline at end of file diff --git a/controller/internal/grpcserver/unit_test.go b/controller/internal/grpcserver/unit_test.go new file mode 100644 index 0000000..f6abafe --- /dev/null +++ b/controller/internal/grpcserver/unit_test.go @@ -0,0 +1,77 @@ +package grpcserver_test + +import ( + "context" + "testing" + "github.com/moevm/grpc_server/internal/grpcserver" + pb "github.com/moevm/grpc_server/pkg/proto/file_service" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestUploadFile_TextValidation(t *testing.T) { + testCases := []struct { + name string + content string + allowedChars string + isValid bool + }{ + { + name: "valid content", + content: "Hello 123!", + allowedChars: "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789! ", + isValid: true, + }, + { + name: "invalid character", + content: "Hello@123", + allowedChars: "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789! ", + isValid: false, + }, + { + name: "invalid utf8", + content: string([]byte{0xff, 0xfe, 0xfd}), + allowedChars: "abc", + isValid: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + server := grpcserver.NewServer(tc.allowedChars) + resp, err := server.UploadFile(context.Background(), &pb.FileRequest{ + Content: []byte(tc.content), + FileType: "text", + }) + + require.NoError(t, err) + assert.Equal(t, tc.isValid, resp.IsValid) + }) + } +} + +func TestUploadFile_BinaryValidation(t *testing.T) { + server := grpcserver.NewServer("") + resp, err := server.UploadFile(context.Background(), &pb.FileRequest{ + Content: []byte{0x00, 0x01, 0x02}, + FileType: "binary", + }) + + require.NoError(t, err) + assert.True(t, resp.IsValid) +} + +func TestUploadFile_InvalidFileType(t *testing.T) { + server := grpcserver.NewServer("") + _, err := server.UploadFile(context.Background(), &pb.FileRequest{ + Content: []byte("test"), + FileType: "invalid", + }) + + require.Error(t, err) + st, ok := status.FromError(err) + require.True(t, ok) + assert.Equal(t, codes.InvalidArgument, st.Code()) +} \ No newline at end of file diff --git a/controller/internal/manager/manager.go b/controller/internal/manager/manager.go new file mode 100644 index 0000000..b8c82aa --- /dev/null +++ b/controller/internal/manager/manager.go @@ -0,0 +1,233 @@ +package manager + +import ( + "fmt" + "github.com/moevm/grpc_server/pkg/converter" + "log" + "net" + "os" + "os/signal" + "strings" + "syscall" + "time" +) + +const ( + TaskCreated = 3 + TaskRedirection = 2 + TaskWip = 1 + TaskSolved = 0 + WorkerBusy = 1 + WorkerFree = 0 + WorkerInitSocketPath = "/run/controller/init.sock" + WorkerSocketPath = "/run/controller/" +) + +type Task struct { + id int + state int + body []byte + solve []byte +} + +type Worker struct { + id int + state int + taskChan chan *Task + conn net.Conn +} + +var Workers = make([]*Worker, 0) + +var Tasks = make([]*Task, 0) + +var TaskIdCounter = 0 + +var Listener net.Listener + +func init() { + if err := os.RemoveAll(WorkerInitSocketPath); err != nil { + log.Fatalln(err) + } + c := make(chan os.Signal, 10) + var err error + Listener, err = net.Listen("unix", WorkerInitSocketPath) + if err != nil { + log.Fatalln(err) + } + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + Listener.Close() + os.RemoveAll(WorkerInitSocketPath) + os.Exit(1) + }() +} + +func GenTaskId() int { + defer func() { + TaskIdCounter += 1 + }() + return TaskIdCounter +} + +func NewWorker(workerId int) { + Workers = append(Workers, &Worker{ + id: workerId, + state: WorkerBusy, + taskChan: make(chan *Task, 10), + }) +} + +func NewTask(taskBody []byte) { + Tasks = append(Tasks, &Task{ + id: GenTaskId(), + state: TaskCreated, + body: taskBody, + solve: make([]byte, 0), + }) +} + +func WorkerInit() { + fmt.Println("Ready for initialization Workers...") + for { + conn, err := Listener.Accept() + if err != nil { + log.Fatalln() + } + workerRawId := make([]byte, 8) + _, err = conn.Read(workerRawId) + if err != nil { + log.Fatalln(err) + } + NewWorker(converter.ByteSliceToInt(workerRawId)) + _, err = conn.Write(converter.IntToByteSlice(1)) + if err != nil { + log.Fatalln(err) + } + go Workers[len(Workers)-1].Run() + fmt.Printf("Worker %v Started\n", Workers[len(Workers)-1].id) + conn.Close() + } +} + +func (t Task) String() string { + return fmt.Sprintf("Task id:%v, solution:%v", t.id, t.solve) +} + +func (w *Worker) AddTask(task *Task) { + if w.taskChan == nil || w.state != WorkerFree { + log.Fatalln("worker is broken") + } + if task.body == nil { + log.Fatalln("task is empty") + } + w.state = WorkerBusy + task.state = TaskWip + defer func() { + w.state = WorkerFree + }() + var err error + offset := 0 + taskLen := len(task.body) + _, err = w.conn.Write(converter.IntToByteSlice(taskLen)) + if err != nil { + log.Fatalln(err) + } + fullFrame := 1024 + for flag := 1; flag != 0; { + switch { + case taskLen-offset >= fullFrame: + _, err = w.conn.Write(converter.IntToByteSlice(fullFrame)) + if err != nil { + log.Fatalln(err) + } + _, err = w.conn.Write(task.body[offset : offset+fullFrame]) + if err != nil { + log.Fatalln(err) + } + offset += fullFrame + case taskLen-offset < fullFrame: + frameLen := taskLen - offset + _, err = w.conn.Write(converter.IntToByteSlice(frameLen)) + if err != nil { + log.Fatalln(err) + } + _, err = w.conn.Write(task.body[offset : offset+frameLen]) + if err != nil { + log.Fatalln(err) + } + offset += frameLen + flag = 0 + } + } + _, err = w.conn.Write(converter.IntToByteSlice(0)) + buf := make([]byte, 8) + _, err = w.conn.Read(buf) + if err != nil { + log.Fatalln(err) + } + responseLen := converter.ByteSliceToInt(buf) + response := make([]byte, responseLen) + _, err = w.conn.Read(response) + if err != nil { + log.Fatalln(err) + } + task.solve = append(task.solve, response...) + task.state = TaskSolved + fmt.Printf("Task solved: ") + fmt.Println(task) +} + +func (w *Worker) Run() { + var socketPath strings.Builder + socketPath.WriteString(WorkerSocketPath) + socketPath.WriteString(fmt.Sprintf("%v.sock", w.id)) + w.state = WorkerFree + for { + task := <-w.taskChan + conn, err := net.Dial("unix", socketPath.String()) + if err != nil { + log.Fatalln(err) + } + w.conn = conn + fmt.Printf("Task %v accepted by worker %v\n", task.id, w.id) + w.AddTask(task) + conn.Close() + } +} + +func LoadBalancer(task *Task) { + for task.state == TaskCreated { + for i := range Workers { + if Workers[i].state == WorkerFree { + Workers[i].taskChan <- task + task.state = TaskRedirection + fmt.Printf("Task %v send to worker %v\n", task.id, Workers[i].id) + break + } + } + } +} + +func TaskManager() { + for { + for i := range Tasks { + if Tasks[i].state == TaskCreated { + LoadBalancer(Tasks[i]) + } + } + } +} + +func ClusterInit(rawTaskArray [][]byte) { + go WorkerInit() + go TaskManager() + for i := range rawTaskArray { + NewTask(rawTaskArray[i]) + time.Sleep(1 * time.Second) + } + for { + time.Sleep(1 * time.Second) + } +} diff --git a/controller/internal/worker/worker.go b/controller/internal/worker/worker.go new file mode 100644 index 0000000..7f76cd9 --- /dev/null +++ b/controller/internal/worker/worker.go @@ -0,0 +1,122 @@ +package worker + +import ( + "crypto/md5" + "crypto/rand" + "fmt" + "github.com/moevm/grpc_server/pkg/converter" + "log" + "net" + "os" + "os/signal" + "strings" + "syscall" +) + +const ( + InitSocketPath = "/run/controller/init.sock" + SocketDir = "/run/controller/" +) + +var SocketPath strings.Builder + +var Listener net.Listener + +func DoTask(c net.Conn) { + fmt.Println("worker started performing the task...") + buf := make([]byte, 8) + _, err := c.Read(buf) + if err != nil { + log.Fatalln(err) + } + taskSize := converter.ByteSliceToInt(buf) + task := make([]byte, 0, taskSize) + for flag := 1; flag != 0; { + buf := make([]byte, 8) + _, err := c.Read(buf) + if err != nil { + log.Fatalln(err) + } + frameLen := converter.ByteSliceToInt(buf) + switch { + case frameLen > 0: + buf := make([]byte, frameLen) + _, err := c.Read(buf) + if err != nil { + log.Fatalln(err) + } + task = append(task, buf...) + case frameLen == 0: + flag = 0 + } + } + md5sum := md5.Sum(task) + _, err = c.Write(converter.IntToByteSlice(len(md5sum[:]))) + if err != nil { + log.Fatalln(err) + } + _, err = c.Write(md5sum[:]) + if err != nil { + log.Fatalln(err) + } + fmt.Println("task complete") +} + +func Start() { + idBuff := make([]byte, 8) + respBuf := make([]byte, 8) + var err error + var conn net.Conn + _, err = rand.Read(idBuff) + if err != nil { + log.Fatalln(err) + } + id := converter.ByteSliceToInt(idBuff) + SocketPath.WriteString(SocketDir) + SocketPath.WriteString(fmt.Sprintf("%v.sock", id)) + if err = os.RemoveAll(SocketPath.String()); err != nil { + log.Fatalln(err) + } + Listener, err = net.Listen("unix", SocketPath.String()) + if err != nil { + log.Fatalln(err) + } + conn, err = net.Dial("unix", InitSocketPath) + if err != nil { + log.Fatalln(err) + } + _, err = conn.Write(idBuff) + if err != nil { + log.Fatalln(err) + } + _, err = conn.Read(respBuf) + if err != nil { + log.Fatalln(err) + } + conn.Close() + if converter.ByteSliceToInt(respBuf) != 1 { + log.Fatalln("worker init error") + } else { + fmt.Println("worker init soccessfully") + } + fmt.Println("worker start listen...") + for { + conn, err := Listener.Accept() + if err != nil { + log.Fatalln(err) + } + DoTask(conn) + conn.Close() + } +} + +func init() { + c := make(chan os.Signal, 10) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + Listener.Close() + os.RemoveAll(SocketPath.String()) + os.Exit(1) + }() +} diff --git a/controller/pkg/binary/binary.go b/controller/pkg/binary/binary.go new file mode 100644 index 0000000..92e387e --- /dev/null +++ b/controller/pkg/binary/binary.go @@ -0,0 +1,25 @@ +package binary + +import ( + "io" + "log" + "os" +) + +func Read(filepath string) []byte { + f, err := os.Open(filepath) + if err != nil { + log.Fatalln(err) + } + defer f.Close() + dump := make([]byte, 0) + for { + buff := make([]byte, 1024) + _, err := f.Read(buff) + dump = append(dump, buff...) + if err == io.EOF { + break + } + } + return dump +} diff --git a/controller/pkg/converter/converter.go b/controller/pkg/converter/converter.go new file mode 100644 index 0000000..d7a1ec0 --- /dev/null +++ b/controller/pkg/converter/converter.go @@ -0,0 +1,20 @@ +package converter + +func ByteSliceToInt(slice []byte) int { + var num int + for i := 0; i < 8; i += 1 { + num += int(slice[7-i]) + if i != 7 { + num = num << 8 + } + } + return num +} + +func IntToByteSlice(num int) []byte { + slice := []byte{} + for i := 0; i < 8; i += 1 { + slice = append(slice, byte(num>>(i*8))) + } + return slice +} diff --git a/controller/pkg/proto/file_service/file_service.proto b/controller/pkg/proto/file_service/file_service.proto new file mode 100644 index 0000000..e84b8ea --- /dev/null +++ b/controller/pkg/proto/file_service/file_service.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +option go_package = "github.com/moevm/grpc_server/pkg/proto/file_service"; + +service FileService { + rpc UploadFile(FileRequest) returns (FileResponse) {} +} + +message FileRequest { + string filename = 1; + bytes content = 2; + string file_type = 3; +} + +message FileResponse { + int64 size = 1; + bool is_valid = 2; + string message = 3; +} \ No newline at end of file diff --git a/scripts/build.sh b/scripts/build.sh deleted file mode 100755 index d4c6f43..0000000 --- a/scripts/build.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/bash -set -e - -docker build -t qemu-riscv-cluster/grpc_server . diff --git a/src/.env b/src/.env new file mode 100644 index 0000000..182ce3e --- /dev/null +++ b/src/.env @@ -0,0 +1,2 @@ +LOG_PORT=3100 +LOKI_IP=172.29.0.2 \ No newline at end of file diff --git a/src/Dockerfile.hash b/src/Dockerfile.hash new file mode 100644 index 0000000..fa44e89 --- /dev/null +++ b/src/Dockerfile.hash @@ -0,0 +1,21 @@ +FROM alpine:3.16.3 AS builder + +RUN apk update && apk add --no-cache g++ make cmake openssl-dev curl-dev + +WORKDIR /app + +COPY get-prometheus-cpp.sh . +RUN sh get-prometheus-cpp.sh + +COPY *.cpp *.hpp Makefile ./ +RUN make + +FROM alpine:3.16.3 + +RUN apk update && apk add --no-cache libstdc++ libgcc libssl3 libcurl + +COPY --from=builder /app/hash /usr/local/bin/hash + +WORKDIR /data + +ENTRYPOINT ["hash"] diff --git a/src/Makefile b/src/Makefile new file mode 100644 index 0000000..aa31903 --- /dev/null +++ b/src/Makefile @@ -0,0 +1,3 @@ +all: md_calculator.cpp md_calculator.hpp main.cpp file.cpp file.hpp metrics-collector.hpp metrics-collector.cpp + g++ -o hash main.cpp md_calculator.cpp metrics-collector.cpp file.cpp -lssl -lcrypto \ + $(shell pkg-config --libs prometheus-cpp-core prometheus-cpp-push) diff --git a/src/data/test.txt b/src/data/test.txt new file mode 100644 index 0000000..53e9728 --- /dev/null +++ b/src/data/test.txt @@ -0,0 +1 @@ +fsdfsdfsdfвыа \ No newline at end of file diff --git a/src/docker-compose.yml b/src/docker-compose.yml new file mode 100644 index 0000000..2c40895 --- /dev/null +++ b/src/docker-compose.yml @@ -0,0 +1,19 @@ +version: '3.8' + +services: + hash: + build: + context: . + dockerfile: Dockerfile.hash + container_name: worker + volumes: + - ./data:/data + working_dir: /data + logging: + driver: loki + options: + loki-timeout: 10s + no-file: "true" + loki-external-labels: "container_name={{.Name}}" + labels: "container_name, host" + loki-url: "http://${LOKI_IP}:${LOG_PORT}/loki/api/v1/push" \ No newline at end of file diff --git a/src/file.cpp b/src/file.cpp new file mode 100644 index 0000000..9a8f96f --- /dev/null +++ b/src/file.cpp @@ -0,0 +1,28 @@ +#include +#include +#include + +#include "file.hpp" +#include "md_calculator.hpp" + +std::string File::calculate_hash(const std::string& file_path, const std::string& algorithm) { + std::ifstream stream(file_path, std::ios::binary); + if (!stream) { + throw std::runtime_error("Failed to open file: " + file_path); + } + + MDCalculator calculator(algorithm); + std::vector buffer(4096); + + while (stream) { + stream.read(buffer.data(), buffer.size()); + calculator.update( + reinterpret_cast(buffer.data()), + stream.gcount() + ); + } + + stream.close(); + + return calculator.finalize(); +} \ No newline at end of file diff --git a/src/file.hpp b/src/file.hpp new file mode 100644 index 0000000..1f92453 --- /dev/null +++ b/src/file.hpp @@ -0,0 +1,8 @@ +#pragma once + +#include + +class File { +public: + static std::string calculate_hash(const std::string& file_path, const std::string& algorithm); +}; \ No newline at end of file diff --git a/scripts/get-prometheus-cpp.sh b/src/get-prometheus-cpp.sh similarity index 100% rename from scripts/get-prometheus-cpp.sh rename to src/get-prometheus-cpp.sh diff --git a/src/hash.cpp b/src/hash.cpp deleted file mode 100644 index d5d8450..0000000 --- a/src/hash.cpp +++ /dev/null @@ -1,111 +0,0 @@ -#include -#include -#include -#include - -#include - -#include -#include "metrics-collector.hpp" - -int calculate_hash(char* path, char* algorithm) { - std::ifstream stream(path, std::ios::in | std::ios::binary); - - if (!stream.good()) { - throw std::runtime_error(std::strerror(errno)); - } - - EVP_MD_CTX *ctx; - const EVP_MD *md; - unsigned char md_value[EVP_MAX_MD_SIZE]; - char buffer[4096]; - unsigned int md_len; - - OpenSSL_add_all_digests(); - - md = EVP_get_digestbyname(algorithm); - if(!md) { - std::cerr << "Unknown message digest" << std::endl; - return 1; - } - - ctx = EVP_MD_CTX_create(); - EVP_DigestInit_ex(ctx, md, nullptr); - while (stream.good()) { - stream.read(buffer, 4096); - EVP_DigestUpdate(ctx, buffer, stream.gcount()); - } - EVP_DigestFinal_ex(ctx, md_value, &md_len); - EVP_MD_CTX_destroy(ctx); - stream.close(); - - std::cout << "Digest is: "; - for (int i = 0; i < (int)md_len; ++i) { - std::cout << std::hex << std::setw(2) << std::setfill('0') << static_cast(md_value[i]); - } - std::cout << std::endl; - - EVP_cleanup(); - return 0; -} - -int main_metrics(int argc, char* argv[]) { - if (argc != 5) { - std::cerr << "usage: " << argv[0] << ' ' << argv[1] - << " ADDRESS PORT WORKER_NAME" << std::endl; - return 1; - } - - MetricsCollector metrics_collector(argv[2], argv[3], argv[4]); - - srand(time(NULL)); - while (true) { - std::this_thread::sleep_for(std::chrono::seconds(rand() % 5 + 1)); - metrics_collector.startTask(); - - // some "work" that requires memory - void *data = malloc(rand() % (1024 * 1024 * 50)); // 0 - 50 megabytes - std::this_thread::sleep_for(std::chrono::seconds(rand() % 5 + 1)); - free(data); - - metrics_collector.stopTask(); - } - - return 0; -} - -int main_hash(int argc, char* argv[]) { - if (argc != 4) { - std::cerr << "usage: " << argv[0] << ' ' << argv[1] - << " PATH ALGORITHM" - << std::endl - << "Supported algorithms are: " - << "md2, md5, sha, " - << "sha1, sha224, sha256, sha384, sha512, " - << "mdc2 and ripemd160" - << std::endl; - return 1; - } - - try { - calculate_hash(argv[2], argv[3]); - } catch(const std::exception& e) { - std::cerr << "Error: " << e.what() << std::endl; - } - return 0; -} - -int main(int argc, char* argv[]) { - if (argc < 2) { - std::cerr << "usage: " << argv[0] << " hash/metrics" << std::endl; - return 1; - } - - if (strcmp(argv[1], "hash") == 0) - return main_hash(argc, argv); - if (strcmp(argv[1], "metrics") == 0) - return main_metrics(argc, argv); - - std::cerr << "specify hash/metrics" << std::endl; - return 1; -} diff --git a/src/main.cpp b/src/main.cpp new file mode 100644 index 0000000..430d282 --- /dev/null +++ b/src/main.cpp @@ -0,0 +1,28 @@ +#include +#include "file.hpp" + +void print_usage(const std::string& program_name) { + std::cerr << "Usage: " << program_name + << " PATH ALGORITHM" << std::endl + << "Supported algorithms are: " + << "md2, md5, sha, " + << "sha1, sha224, sha256, sha384, sha512, " + << "mdc2 and ripemd160" << std::endl; +} + +int main(int argc, char* argv[]) { + if (argc < 3) { + print_usage(argv[0]); + return 1; + } + + try { + std::string hash = File::calculate_hash(argv[1], argv[2]); + std::cout << "Digest is: " << hash << std::endl; + } catch (const std::exception& e) { + std::cerr << "Error: " << e.what() << std::endl; + return 1; + } + + return 0; +} \ No newline at end of file diff --git a/src/md_calculator.cpp b/src/md_calculator.cpp new file mode 100644 index 0000000..18b8447 --- /dev/null +++ b/src/md_calculator.cpp @@ -0,0 +1,47 @@ +#include +#include +#include + +#include "md_calculator.hpp" + +MDCalculator::MDCalculator(const std::string& algorithm) + : md_(nullptr), ctx_(nullptr, EVP_MD_CTX_free) { + + OpenSSL_add_all_digests(); + md_ = EVP_get_digestbyname(algorithm.c_str()); + if (!md_) { + throw std::runtime_error("Unknown message digest: " + algorithm); + } + + ctx_.reset(EVP_MD_CTX_new()); + if (!ctx_) { + throw std::runtime_error("Failed to create EVP_MD_CTX"); + } + + if (EVP_DigestInit_ex(ctx_.get(), md_, nullptr) != 1) { + throw std::runtime_error("Failed to initialize digest"); + } +} + +void MDCalculator::update(const unsigned char* data, size_t length) { + if (EVP_DigestUpdate(ctx_.get(), data, length) != 1) { + throw std::runtime_error("Failed to update digest"); + } +} + +std::string MDCalculator::finalize() { + unsigned char md_value[EVP_MAX_MD_SIZE]; + unsigned int md_len; + + if (EVP_DigestFinal_ex(ctx_.get(), md_value, &md_len) != 1) { + throw std::runtime_error("Failed to finalize digest"); + } + + std::ostringstream oss; + for (int i = 0; i < md_len; ++i) { + oss << std::hex << std::setw(2) << std::setfill('0') + << static_cast(md_value[i]); + } + + return oss.str(); +} \ No newline at end of file diff --git a/src/md_calculator.hpp b/src/md_calculator.hpp new file mode 100644 index 0000000..3daa6e2 --- /dev/null +++ b/src/md_calculator.hpp @@ -0,0 +1,20 @@ +#pragma once + +#include +#include +#include + +class MDCalculator { +public: + MDCalculator(const std::string& algorithm); + + void update(const unsigned char* data, size_t length); + std::string finalize(); + + MDCalculator(const MDCalculator&) = delete; + MDCalculator& operator=(const MDCalculator&) = delete; + +private: + const EVP_MD* md_; + std::unique_ptr ctx_; +}; \ No newline at end of file diff --git a/wiki/using_server_grpc.md b/wiki/using_server_grpc.md new file mode 100644 index 0000000..53bd227 --- /dev/null +++ b/wiki/using_server_grpc.md @@ -0,0 +1,16 @@ +## Instructions for starting the grpc server + +- installing dependencies from the folder controller: ```go get -u google.golang.org/protobuf``` and ```go get -u google.golang.org/grpc```. It is also worth installing protoc-gen-go and protoc-gen-go-grpc ```go install google.golang.org/protobuf/cmd/protoc-gen-go@latest``` and ```go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest``` +- Make sure that $GOPATH/bin is added to the PATH ```export PATH="$PATH:$(go env GOPATH)/bin"``` +- Check the protocol version ```protoc --version``` and check for plugins ```which protoc-gen-go``` ```which protoc-gen-go-grpc``` +- To use env: ```go get github.com/joho/godotenv```. File .env should be in the same folder as go.mod. +- Generate the code with explicit paths: +```bash +protoc \ + --plugin=protoc-gen-go=$(which protoc-gen-go) \ + --plugin=protoc-gen-go-grpc=$(which protoc-gen-go-grpc) \ + --go_out=. --go_opt=paths=source_relative \ + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + pkg/proto/file_service/file_service.proto +``` +- run server ```go run cmd/grpc_server/main.go``` \ No newline at end of file diff --git a/wiki/using_tests.md b/wiki/using_tests.md new file mode 100644 index 0000000..32bef96 --- /dev/null +++ b/wiki/using_tests.md @@ -0,0 +1,5 @@ +## Instructions for using tests + +- download all dependencies of ```go mod download``` or add a separate testify ```go get github.com/stretchr/testify```. If you have any problems, you can use ```go mod tidy``` to synchronize dependencies. + +- you need to run the tests from the root of the project ```go test -v ./internal/grpcserver```. Also can run only unit tests ```go test -v -run ^TestUploadFile_ ./internal/grpcserver``` or specific test ```go test -v -run TestUploadFile_TextValidation ./internal/grpcserver``` \ No newline at end of file From 0329b735ed098976eb1a132d5918527ba079ae2a Mon Sep 17 00:00:00 2001 From: Gorsky Kirill Date: Sun, 20 Apr 2025 23:20:26 +0300 Subject: [PATCH 4/5] metrics-collector fixes --- src/main.cpp | 74 ++++++++++++++++++++++++++++--- src/metrics-collector.cpp | 92 +++++++++++++++++++-------------------- src/metrics-collector.hpp | 33 ++++++++------ 3 files changed, 132 insertions(+), 67 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 430d282..c4c0045 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,8 +1,13 @@ -#include +#include "metrics-collector.hpp" #include "file.hpp" +#include +#include +#include +#include + void print_usage(const std::string& program_name) { - std::cerr << "Usage: " << program_name + std::cerr << "Usage: " << program_name << " hash" << " PATH ALGORITHM" << std::endl << "Supported algorithms are: " << "md2, md5, sha, " @@ -10,14 +15,14 @@ void print_usage(const std::string& program_name) { << "mdc2 and ripemd160" << std::endl; } -int main(int argc, char* argv[]) { - if (argc < 3) { +int hash_main(int argc, char* argv[]) { + if (argc < 4) { print_usage(argv[0]); return 1; } try { - std::string hash = File::calculate_hash(argv[1], argv[2]); + std::string hash = File::calculate_hash(argv[2], argv[3]); std::cout << "Digest is: " << hash << std::endl; } catch (const std::exception& e) { std::cerr << "Error: " << e.what() << std::endl; @@ -25,4 +30,61 @@ int main(int argc, char* argv[]) { } return 0; -} \ No newline at end of file +} + + +bool is_running = true; + +void signal_handler(int signo) { + std::cerr << "[INFO] Received " << strsignal(signo) << " signal. Stopping" << std::endl; + is_running = false; +} + +int metrics_main(int argc, char *argv[]) { + signal(SIGTERM, signal_handler); + signal(SIGINT, signal_handler); + + if (argc != 5) { + std::cerr << "usage: worker" << argv[0] << " metrics" + << " ADDRESS PORT WORKER_NAME" << std::endl; + return 1; + } + + MetricsCollector metrics_collector(argv[2], argv[3], argv[4]); + + srand(time(NULL)); + while (is_running) { + std::this_thread::sleep_for(std::chrono::seconds(rand() % 5 + 1)); + std::cerr << "[INFO] Doing task" << std::endl; + metrics_collector.StartTask(); + + // some "work" that requires memory + void *data = malloc(rand() % (1024 * 1024 * 50)); // 0 - 50 megabytes + for (volatile int i = 0; i < 1000000000 + rand() % 10000000000; i++) {} + free(data); + + metrics_collector.StopTask(); + std::cerr << "[INFO] Task done" << std::endl; + } + + return 0; +} + +int main(int argc, char *argv[]) +{ + static const char usage[] = "Specify hash/metrics"; + + if (argc < 2) { + std::cerr << usage << std::endl; + return 1; + } + + if (strcmp(argv[1], "hash") == 0) { + return hash_main(argc, argv); + } else if (strcmp(argv[1], "metrics") == 0) { + return metrics_main(argc, argv); + } + + std::cerr << usage << std::endl; + return 1; +} diff --git a/src/metrics-collector.cpp b/src/metrics-collector.cpp index 86c90de..eed7fe9 100644 --- a/src/metrics-collector.cpp +++ b/src/metrics-collector.cpp @@ -4,6 +4,22 @@ #include #include #include +#include + +namespace { + double GetMemoryUsed() + { + std::ifstream file("/proc/self/statm"); + if (!file.is_open()) { + return 0; + } + + long mem_pages = 0; + file >> mem_pages; + file.close(); + return mem_pages * (double)getpagesize(); + } +} MetricsCollector::MetricsCollector(const char *gateway_address, const char *gateway_port, const char *worker_name) : gateway(gateway_address, gateway_port, worker_name), @@ -31,8 +47,8 @@ MetricsCollector::MetricsCollector(const char *gateway_address, const char *gate while (true) { CPUInfo cpu; - file >> cpu_name >> cpu.last_total_user >> cpu.last_total_user_low - >> cpu.last_total_sys >> cpu.last_total_idle + file >> cpu_name >> cpu.time.user >> cpu.time.user_low + >> cpu.time.sys >> cpu.time.idle >> ign >> ign >> ign >> ign >> ign >> ign; if (cpu_name.find("cpu") != 0) @@ -48,30 +64,30 @@ MetricsCollector::MetricsCollector(const char *gateway_address, const char *gate task_processing_time_gauge = &task_processing_time_family.Add({}); gateway.RegisterCollectable(registry); - - is_running = true; - thread = std::thread(&MetricsCollector::mainLoop, this); - + thread = std::thread(&MetricsCollector::MainLoop, this); is_task_running = false; } -void MetricsCollector::mainLoop() +void MetricsCollector::MainLoop() { while (is_running) { std::this_thread::sleep_for(std::chrono::seconds(1)); - getMemoryUsed(); - getCPUUsage(); + memory_used_gauge->Set(::GetMemoryUsed()); + GetCPUUsage(); if (is_task_running) { - struct timespec time; - clock_gettime(CLOCK_MONOTONIC, &time); - task_processing_time_gauge->Set((time.tv_sec - task_start.tv_sec) + (time.tv_nsec - task_start.tv_nsec) * 1e-9); + auto cur_time = std::chrono::high_resolution_clock::now(); + task_processing_time_gauge->Set(std::chrono::duration(cur_time - task_start).count()); } - else + else { task_processing_time_gauge->Set(0); + } - gateway.PushAdd(); + int status = gateway.PushAdd(); + if (status != 200) { + std::cerr << "[ERROR] Failed to push metrics. Status " << status << std::endl; + } } } @@ -81,73 +97,53 @@ MetricsCollector::~MetricsCollector() thread.join(); } -void MetricsCollector::getMemoryUsed() -{ - std::ifstream file("/proc/self/statm"); - if (!file.is_open()) { - memory_used_gauge->Set(0); - return; - } - - long mem_pages = 0; - file >> mem_pages; - file.close(); - - memory_used_gauge->Set(mem_pages * (double)getpagesize()); -} - -void MetricsCollector::getCPUUsage() +void MetricsCollector::GetCPUUsage() { std::ifstream file("/proc/stat"); - uint64_t total_user, total_user_low, total_sys, total_idle, total; + CPUInfo::Time cur_time; double percent; std::string cpu_name; int ign; while (true) { - file >> cpu_name >> total_user >> total_user_low >> total_sys >> total_idle + file >> cpu_name >> cur_time.user >> cur_time.user_low >> cur_time.sys >> cur_time.idle >> ign >> ign >> ign >> ign >> ign >> ign; if (cpu_name.find("cpu") != 0) break; CPUInfo &cpu = cpu_usage[cpu_name]; - if (total_user < cpu.last_total_user || total_user_low < cpu.last_total_user_low || - total_sys < cpu.last_total_sys || total_idle < cpu.last_total_idle) { + if (cur_time.user < cpu.time.user || cur_time.user_low < cpu.time.user_low || + cur_time.sys < cpu.time.sys || cur_time.idle < cpu.time.idle) { // overflow detection percent = -1.0; } else { - total = (total_user - cpu.last_total_user) + (total_user_low - cpu.last_total_user_low) + - (total_sys - cpu.last_total_sys); + uint64_t total = (cur_time.user - cpu.time.user) + (cur_time.user_low - cpu.time.user_low) + + (cur_time.sys - cpu.time.sys); percent = total; - total += (total_idle - cpu.last_total_idle); - percent /= total; - percent *= 100; + total += (cur_time.idle - cpu.time.idle); + percent = (total == 0) ? -1.0 : (percent / total) * 100.0; } - cpu.last_total_user = total_user; - cpu.last_total_user_low = total_user_low; - cpu.last_total_sys = total_sys; - cpu.last_total_idle = total_idle; - + cpu.time = cur_time; cpu.gauge->Set(percent); } file.close(); } -void MetricsCollector::startTask() +void MetricsCollector::StartTask() { is_task_running = true; - clock_gettime(CLOCK_MONOTONIC, &task_start); + task_start = std::chrono::high_resolution_clock::now(); } -void MetricsCollector::stopTask() +void MetricsCollector::StopTask() { is_task_running = false; task_processing_time_gauge->Set(0); gateway.PushAdd(); -} \ No newline at end of file +} diff --git a/src/metrics-collector.hpp b/src/metrics-collector.hpp index bc05458..d1470b9 100644 --- a/src/metrics-collector.hpp +++ b/src/metrics-collector.hpp @@ -1,6 +1,7 @@ #include #include #include +#include class MetricsCollector { prometheus::Gateway gateway; @@ -8,29 +9,35 @@ class MetricsCollector { struct CPUInfo { prometheus::Gauge *gauge; - uint64_t last_total_user; - uint64_t last_total_user_low; - uint64_t last_total_sys; - uint64_t last_total_idle; + + struct Time { + uint64_t user; + uint64_t user_low; + uint64_t sys; + uint64_t idle; + }; + + Time time; }; - prometheus::Gauge *memory_used_gauge; std::unordered_map cpu_usage; + + prometheus::Gauge *memory_used_gauge; prometheus::Gauge *task_processing_time_gauge; - bool is_running; + + std::atomic is_running {true}; std::thread thread; bool is_task_running; - struct timespec task_start; + std::chrono::time_point task_start; - void getMemoryUsed(); - void getCPUUsage(); - void mainLoop(); + void GetCPUUsage(); + void MainLoop(); public: MetricsCollector(const char *gateway_address, const char *gateway_port, const char *worker_name); ~MetricsCollector(); - void startTask(); - void stopTask(); -}; \ No newline at end of file + void StartTask(); + void StopTask(); +}; From c0b67bcb2f77edf213bf552ea4e5f12bbd3bf453 Mon Sep 17 00:00:00 2001 From: Gorsky Kirill Date: Mon, 5 May 2025 02:59:48 +0300 Subject: [PATCH 5/5] make is_running atomic --- worker/include/metrics_collector.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/include/metrics_collector.hpp b/worker/include/metrics_collector.hpp index f2f6919..82702f8 100644 --- a/worker/include/metrics_collector.hpp +++ b/worker/include/metrics_collector.hpp @@ -28,7 +28,7 @@ class MetricsCollector { std::atomic is_running{true}; std::thread thread; - bool is_task_running; + std::atomic is_task_running; std::chrono::time_point task_start; void GetCPUUsage();