From a27ac7aad435022e3e4587865cb3f63fc96aa066 Mon Sep 17 00:00:00 2001 From: X1pster Date: Tue, 29 Apr 2025 20:31:27 +0300 Subject: [PATCH 1/4] Correct BUILD files according to the latest changes --- controller/cmd/manager/BUILD | 1 - controller/internal/conn/BUILD | 11 +++++++++++ controller/internal/manager/BUILD | 1 + controller/internal/worker/BUILD | 1 + controller/pkg/binary/BUILD | 8 -------- 5 files changed, 13 insertions(+), 9 deletions(-) create mode 100644 controller/internal/conn/BUILD delete mode 100644 controller/pkg/binary/BUILD diff --git a/controller/cmd/manager/BUILD b/controller/cmd/manager/BUILD index 89a6495..f2ae5ac 100644 --- a/controller/cmd/manager/BUILD +++ b/controller/cmd/manager/BUILD @@ -5,6 +5,5 @@ go_binary( srcs = ["main.go"], deps = [ "//internal/manager", - "//pkg/binary", ], ) diff --git a/controller/internal/conn/BUILD b/controller/internal/conn/BUILD new file mode 100644 index 0000000..1528ca9 --- /dev/null +++ b/controller/internal/conn/BUILD @@ -0,0 +1,11 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "conn", + srcs = ["unix.go"], + deps = [ + "//pkg/converter", + ], + importpath = "github.com/moevm/grpc_server/internal/conn", + visibility = ["//visibility:public"], +) diff --git a/controller/internal/manager/BUILD b/controller/internal/manager/BUILD index 0a452d3..dbbdf9b 100644 --- a/controller/internal/manager/BUILD +++ b/controller/internal/manager/BUILD @@ -5,6 +5,7 @@ go_library( srcs = ["manager.go"], deps = [ "//pkg/converter", + "//internal/conn" ], importpath = "github.com/moevm/grpc_server/internal/manager", visibility = ["//visibility:public"], diff --git a/controller/internal/worker/BUILD b/controller/internal/worker/BUILD index ae6f43b..24dfb1f 100644 --- a/controller/internal/worker/BUILD +++ b/controller/internal/worker/BUILD @@ -5,6 +5,7 @@ go_library( srcs = ["worker.go"], deps = [ "//pkg/converter", + "//internal/conn" ], importpath = "github.com/moevm/grpc_server/internal/worker", visibility = ["//visibility:public"], diff --git a/controller/pkg/binary/BUILD b/controller/pkg/binary/BUILD deleted file mode 100644 index 7b5d3d6..0000000 --- a/controller/pkg/binary/BUILD +++ /dev/null @@ -1,8 +0,0 @@ -load("@rules_go//go:def.bzl", "go_library") - -go_library( - name = "binary", - srcs = ["binary.go"], - importpath = "github.com/moevm/grpc_server/pkg/binary", - visibility = ["//visibility:public"], -) From 257914659c86004262be272414a62792a713436b Mon Sep 17 00:00:00 2001 From: X1pster Date: Tue, 29 Apr 2025 23:54:03 +0300 Subject: [PATCH 2/4] Bind process to least loaded cpu --- worker/src/main.cpp | 119 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 118 insertions(+), 1 deletion(-) diff --git a/worker/src/main.cpp b/worker/src/main.cpp index de94b7d..9b2c5db 100644 --- a/worker/src/main.cpp +++ b/worker/src/main.cpp @@ -1,5 +1,106 @@ #include "../include/file.hpp" #include +#include +#include +#include +#include +#include +#include +#include +#include + +std::vector getCoreLoads() { + std::vector loads; + std::ifstream stat("/proc/stat"); + + if (!stat.is_open()) { + std::cerr << "Failed to open /proc/stat" << std::endl; + return loads; + } + + // Pass 1st string (with total cpu data) + std::string line; + std::getline(stat, line); + + // Read each cpu data + std::vector> prevCpuData; + while (std::getline(stat, line)) { + if (line.find("cpu") == 0) { + std::istringstream stream(line); + std::string cpuString; + stream >> cpuString; + if (cpuString.substr(0, 3) != "cpu") break; + + std::vector coreData; + unsigned long long value; + while (stream >> value) { + coreData.push_back(value); + } + prevCpuData.push_back(coreData); + } + } + stat.close(); + + // Wait for some time + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + stat.open("/proc/stat"); + std::getline(stat, line); // Pass 1st string + + std::vector> currCpuData; + for (size_t i = 0; i < prevCpuData.size(); ++i) { + std::getline(stat, line); + std::istringstream stream(line); + std::string cpuString; + stream >> cpuString; + + std::vector coreData; + unsigned long long value; + while (stream >> value) { + coreData.push_back(value); + } + currCpuData.push_back(coreData); + } + stat.close(); + + // Calculate load for all cpus + for (size_t i = 0; i < prevCpuData.size(); ++i) { + unsigned long long prevIdle = prevCpuData[i][3] + prevCpuData[i][4]; + unsigned long long currIdle = currCpuData[i][3] + currCpuData[i][4]; + + unsigned long long prevTotal = 0; + + for(int j = 0; j < prevCpuData[i].size(); j++) { + prevTotal += prevCpuData[i][j]; + } + + unsigned long long currTotal = 0; + + for(int j = 0; j < currCpuData[i].size(); j++) { + currTotal += currCpuData[i][j]; + } + + unsigned long long totalDiff = currTotal - prevTotal; + unsigned long long idleDiff = currIdle - prevIdle; + + float usage = 100.0f * (totalDiff - idleDiff) / totalDiff; + loads.push_back(usage); + } + + return loads; +} + +bool bindProcessToCore(pid_t pid, int core) { + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(core, &cpuset); + + if (sched_setaffinity(pid, sizeof(cpu_set_t), &cpuset) != 0) { + std::cerr << "Failed to bind process to cpu: " << strerror(errno) << std::endl; + return false; + } + return true; +} void print_usage(const std::string &program_name) { std::cerr << "Usage: " << program_name << " PATH ALGORITHM" << std::endl @@ -14,12 +115,28 @@ int main(int argc, char *argv[]) { return 1; } + std::vector loads = getCoreLoads(); + if (loads.empty()) { + std::cerr << "Failed to get cpu load." << std::endl; + return 2; + } + + int leastLoadedCPU = 0; + for (int i = 1; i < loads.size(); i++) { + if (loads[i] < loads[leastLoadedCPU]) leastLoadedCPU = i; + } + + // Bind process to cpu + if (!bindProcessToCore(0, leastLoadedCPU)) { + return 3; + } + try { std::string hash = File::calculate_hash(argv[1], argv[2]); std::cout << "Digest is: " << hash << std::endl; } catch (const std::exception &e) { std::cerr << "Error: " << e.what() << std::endl; - return 1; + return 4; } return 0; From 5094963977a3887c23fe9f3eec31d2f1870f5260 Mon Sep 17 00:00:00 2001 From: X1pster Date: Tue, 6 May 2025 22:54:03 +0300 Subject: [PATCH 3/4] Revert "Bind process to least loaded cpu" This reverts commit 257914659c86004262be272414a62792a713436b. --- worker/src/main.cpp | 119 +------------------------------------------- 1 file changed, 1 insertion(+), 118 deletions(-) diff --git a/worker/src/main.cpp b/worker/src/main.cpp index 9b2c5db..de94b7d 100644 --- a/worker/src/main.cpp +++ b/worker/src/main.cpp @@ -1,106 +1,5 @@ #include "../include/file.hpp" #include -#include -#include -#include -#include -#include -#include -#include -#include - -std::vector getCoreLoads() { - std::vector loads; - std::ifstream stat("/proc/stat"); - - if (!stat.is_open()) { - std::cerr << "Failed to open /proc/stat" << std::endl; - return loads; - } - - // Pass 1st string (with total cpu data) - std::string line; - std::getline(stat, line); - - // Read each cpu data - std::vector> prevCpuData; - while (std::getline(stat, line)) { - if (line.find("cpu") == 0) { - std::istringstream stream(line); - std::string cpuString; - stream >> cpuString; - if (cpuString.substr(0, 3) != "cpu") break; - - std::vector coreData; - unsigned long long value; - while (stream >> value) { - coreData.push_back(value); - } - prevCpuData.push_back(coreData); - } - } - stat.close(); - - // Wait for some time - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - - stat.open("/proc/stat"); - std::getline(stat, line); // Pass 1st string - - std::vector> currCpuData; - for (size_t i = 0; i < prevCpuData.size(); ++i) { - std::getline(stat, line); - std::istringstream stream(line); - std::string cpuString; - stream >> cpuString; - - std::vector coreData; - unsigned long long value; - while (stream >> value) { - coreData.push_back(value); - } - currCpuData.push_back(coreData); - } - stat.close(); - - // Calculate load for all cpus - for (size_t i = 0; i < prevCpuData.size(); ++i) { - unsigned long long prevIdle = prevCpuData[i][3] + prevCpuData[i][4]; - unsigned long long currIdle = currCpuData[i][3] + currCpuData[i][4]; - - unsigned long long prevTotal = 0; - - for(int j = 0; j < prevCpuData[i].size(); j++) { - prevTotal += prevCpuData[i][j]; - } - - unsigned long long currTotal = 0; - - for(int j = 0; j < currCpuData[i].size(); j++) { - currTotal += currCpuData[i][j]; - } - - unsigned long long totalDiff = currTotal - prevTotal; - unsigned long long idleDiff = currIdle - prevIdle; - - float usage = 100.0f * (totalDiff - idleDiff) / totalDiff; - loads.push_back(usage); - } - - return loads; -} - -bool bindProcessToCore(pid_t pid, int core) { - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(core, &cpuset); - - if (sched_setaffinity(pid, sizeof(cpu_set_t), &cpuset) != 0) { - std::cerr << "Failed to bind process to cpu: " << strerror(errno) << std::endl; - return false; - } - return true; -} void print_usage(const std::string &program_name) { std::cerr << "Usage: " << program_name << " PATH ALGORITHM" << std::endl @@ -115,28 +14,12 @@ int main(int argc, char *argv[]) { return 1; } - std::vector loads = getCoreLoads(); - if (loads.empty()) { - std::cerr << "Failed to get cpu load." << std::endl; - return 2; - } - - int leastLoadedCPU = 0; - for (int i = 1; i < loads.size(); i++) { - if (loads[i] < loads[leastLoadedCPU]) leastLoadedCPU = i; - } - - // Bind process to cpu - if (!bindProcessToCore(0, leastLoadedCPU)) { - return 3; - } - try { std::string hash = File::calculate_hash(argv[1], argv[2]); std::cout << "Digest is: " << hash << std::endl; } catch (const std::exception &e) { std::cerr << "Error: " << e.what() << std::endl; - return 4; + return 1; } return 0; From 33cbb2c86fbeaad75536e899b9ebe793534296d7 Mon Sep 17 00:00:00 2001 From: X1pster Date: Tue, 6 May 2025 22:59:04 +0300 Subject: [PATCH 4/4] Add script to bind workers to cpus --- docker-compose.yml | 3 +-- scripts/start.sh | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) create mode 100755 scripts/start.sh diff --git a/docker-compose.yml b/docker-compose.yml index 5ee41b4..01c076f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,11 +1,10 @@ version: '3.8' services: - hash: + worker: build: context: . dockerfile: Dockerfile.hash - container_name: worker volumes: - ./worker/data:/data working_dir: /data diff --git a/scripts/start.sh b/scripts/start.sh new file mode 100755 index 0000000..037ccbb --- /dev/null +++ b/scripts/start.sh @@ -0,0 +1,15 @@ +#!/bin/bash +set -e + +CPU_COUNT=$(nproc) + +docker-compose up -d --scale worker=$CPU_COUNT + +for (( i=0; i<$CPU_COUNT; i++ )) +do + container_id=$(docker-compose ps -a -q worker | sed -n "$((i+1))p") + if [ -n "$container_id" ]; then + docker update --cpuset-cpus="$i" "$container_id" + fi +done +