Skip to content
28 changes: 28 additions & 0 deletions tasks/ashihmin_d_mult_matr_crs/all/include/ops_all.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#pragma once

#include <vector>

#include "ashihmin_d_mult_matr_crs/common/include/common.hpp"
#include "task/include/task.hpp"

namespace ashihmin_d_mult_matr_crs {

class AshihminDMultMatrCrsALL : public BaseTask {
public:
static constexpr ppc::task::TypeOfTask GetStaticTypeOfTask() {
return ppc::task::TypeOfTask::kALL;
}

explicit AshihminDMultMatrCrsALL(const InType &in);

private:
bool ValidationImpl() override;
bool PreProcessingImpl() override;
bool RunImpl() override;
bool PostProcessingImpl() override;

static void MultiplyRow(int global_row_idx, int local_idx, const CRSMatrix &matrix_a, const CRSMatrix &matrix_b,
std::vector<std::vector<int>> &local_cols, std::vector<std::vector<double>> &local_vals);
};

} // namespace ashihmin_d_mult_matr_crs
101 changes: 101 additions & 0 deletions tasks/ashihmin_d_mult_matr_crs/all/report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Умножение разреженных матриц в формате CRS - ALL

* Student: Ашихмин Д., group 3823Б1ФИ2
* Technology: ALL
* Variant: 4

## 1. Introduction

ALL-версия представляет собой гибридную реализацию, объединяющую несколько технологий
параллелизма. Цель этой реализации — распределить вычисления между узлами кластера (процессами MPI),
а внутри каждого процесса максимально
эффективно загрузить доступные ядра процессора, используя комбинацию STL Threads, TBB и OpenMP.

## 2. Problem Statement

Задача: реализовать умножение двух разреженных матриц A и B в формате CRS
(Compressed Row Storage).
Вход: InType = std::pair<CRSMatrix, CRSMatrix>.
Выход: OutType = CRSMatrix, результат C = A * B.
Особенности: Алгоритм должен поддерживать как распределенную память (MPI), так и
общую память (Threads).

## 3. Baseline Algorithm (Sequential)

Baseline описан в seq/report.md. Он выполняет последовательный проход по всем строкам
матрицы A. Результат используется для верификации корректности гибридной версии и расчета
итогового ускорения.

## 4. Parallelization Scheme

ALL-реализация использует многоуровневую гибридную схему:

1. **MPI (Уровень процессов):** Общий диапазон строк матрицы A делится между MPI-рангами.
Каждый процесс отвечает за вычисление своей "полосы" результирующей матрицы.
2. **STL Threads (Уровень потоков):** Внутри каждого MPI-ранга локальный диапазон строк
делится на блоки (chunks) и распределяется между потоками `std::thread`.
3. **TBB (Микро-параллелизм):** Внутри каждого STL-потока вызывается функция обработки,
где используется `tbb::parallel_for` для мелкозернистого распараллеливания вычислений внутри блока строк.
4. **OpenMP (Вспомогательный уровень):** Используется внутри процесса для
параллельного подсчета количества ненулевых элементов (NNZ) в строках через директиву `#pragma omp parallel for`.
5. **MPI-синхронизация:** После вычисления локальных частей выполняется сложная
процедура сборки. Сначала через `MPI_Allgatherv` собираются размеры строк, затем
восстанавливается глобальный `row_ptr`, и в конце через еще один `MPI_Allgatherv`
собираются векторы `values` и `col_index`.

Конфигурация задается как workers = ranks × threads.

## 5. Implementation Details

* Файлы: all/include/ops_all.hpp, all/src/ops_all.cpp.
* Класс: AshihminDMultMatrCrsALL.
* **MPI_Allgatherv:** Применяется для обмена данными между процессами, так как результирующие
части матрицы имеют разный размер (из-за разреженности).
* **Балансировка:** Использование TBB внутри STL-потоков позволяет планировщику TBB динамически
балансировать нагрузку, если строки матрицы имеют разную плотность.
* **Память:** Каждый процесс хранит полную копию матрицы B и часть матрицы A, что типично для
алгоритмов SpGEMM на небольших кластерах.

## 6. Experimental Setup

Аппаратное обеспечение:

* CPU: AMD Ryzen 5 3500X (3.60 GHz, 6 ядер)
* RAM: 16 ГБ
* OS: Windows 11 / Linux (CI)
* MPI: MS-MPI / OpenMPI

Окружение:

* PPC_NUM_THREADS: количество потоков внутри процесса.
* PPC_NUM_PROC: количество MPI-процессов.

## 7. Results and Discussion

### 7.1 Correctness

Корректность проверялась функциональными тестами. После сборки `MPI_Allgatherv` каждый процесс обладает
полной и корректной копией матрицы C, идентичной результату последовательной версии.

### 7.2 Performance

Mode | Count (R x T) | Time, s | Speedup | Efficiency
--- | --- | --- | --- | ---
seq | 1 | 0.852412 | 1.00 | N/A
all | 2 x 1 | 0.453211 | 1.88 | 94.00%
all | 2 x 3 | 0.192451 | 4.43 | 73.83%
all | 3 x 2 | 0.198523 | 4.29 | 71.50%

## 8. Conclusions

Гибридная реализация демонстрирует возможность масштабирования алгоритма умножения CRS матриц.
Основная сложность заключается в накладных расходах на коммуникации MPI при сборке разреженной
структуры, так как объемы передаваемых данных заранее неизвестны. Комбинация TBB и MPI позволяет
эффективно использовать как ресурсы одного узла, так и вычислительную мощность всей сети.

## 9. References

1. OpenMP Architecture Review Board. OpenMP API.
2. oneAPI Threading Building Blocks Documentation.
3. Microsoft MPI / MPICH Documentation.
4. ISO C++ Standard Library: std::thread.
146 changes: 146 additions & 0 deletions tasks/ashihmin_d_mult_matr_crs/all/src/ops_all.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
#include "ashihmin_d_mult_matr_crs/all/include/ops_all.hpp"

#include <mpi.h>
#include <omp.h>
#include <tbb/tbb.h>

#include <algorithm>
#include <cmath>
#include <map>
#include <thread>
#include <vector>

#include "ashihmin_d_mult_matr_crs/common/include/common.hpp"
#include "util/include/util.hpp"

namespace ashihmin_d_mult_matr_crs {

AshihminDMultMatrCrsALL::AshihminDMultMatrCrsALL(const InType &in) {
SetTypeOfTask(GetStaticTypeOfTask());
GetInput() = in;
}

bool AshihminDMultMatrCrsALL::ValidationImpl() {
return GetInput().first.cols == GetInput().second.rows;
}

bool AshihminDMultMatrCrsALL::PreProcessingImpl() {
auto &matrix_c = GetOutput();

matrix_c.rows = GetInput().first.rows;
matrix_c.cols = GetInput().second.cols;
return true;
}

void AshihminDMultMatrCrsALL::MultiplyRow(int global_row_idx, int local_idx, const CRSMatrix &matrix_a,
const CRSMatrix &matrix_b, std::vector<std::vector<int>> &local_cols,
std::vector<std::vector<double>> &local_vals) {
std::map<int, double> row_accumulator;
for (int j = matrix_a.row_ptr[global_row_idx]; j < matrix_a.row_ptr[global_row_idx + 1]; ++j) {
int col_a = matrix_a.col_index[j];
double val_a = matrix_a.values[j];
for (int k = matrix_b.row_ptr[col_a]; k < matrix_b.row_ptr[col_a + 1]; ++k) {
row_accumulator[matrix_b.col_index[k]] += val_a * matrix_b.values[k];
}
}

for (const auto &entry : row_accumulator) {
if (std::abs(entry.second) > 1e-15) {
local_cols[local_idx].push_back(entry.first);
local_vals[local_idx].push_back(entry.second);
}
}
}

bool AshihminDMultMatrCrsALL::RunImpl() {
const auto &matrix_a = GetInput().first;
const auto &matrix_b = GetInput().second;
auto &matrix_c = GetOutput();

int rank = 0;
int size = 0;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);

int rows_a = matrix_a.rows;
int base_rows = rows_a / size;
int rem = rows_a % size;
int my_start = (rank * base_rows) + std::min(rank, rem);
int my_end = my_start + base_rows + (rank < rem ? 1 : 0);
int my_row_count = my_end - my_start;

std::vector<std::vector<int>> local_cols(my_row_count);
std::vector<std::vector<double>> local_vals(my_row_count);

int thread_count = ppc::util::GetNumThreads();
std::vector<std::thread> threads;

auto compute_rows = [&](int start_idx, int end_idx) {
tbb::parallel_for(start_idx, end_idx,
[&](int i) { MultiplyRow(my_start + i, i, matrix_a, matrix_b, local_cols, local_vals); });
};

int stl_chunk = (my_row_count + thread_count - 1) / thread_count;
for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
int start_chunk = thread_idx * stl_chunk;
int end_chunk = std::min(start_chunk + stl_chunk, my_row_count);
if (start_chunk < end_chunk) {
threads.emplace_back(compute_rows, start_chunk, end_chunk);
}
}
for (auto &th : threads) {
th.join();
}

std::vector<int> my_nnz_per_row(my_row_count);
#pragma omp parallel for default(none) shared(my_nnz_per_row, local_cols, my_row_count)
for (int i = 0; i < my_row_count; ++i) {
my_nnz_per_row[i] = static_cast<int>(local_cols[i].size());
}

std::vector<int> my_flat_cols;
std::vector<double> my_flat_vals;
for (int i = 0; i < my_row_count; ++i) {
my_flat_cols.insert(my_flat_cols.end(), local_cols[i].begin(), local_cols[i].end());
my_flat_vals.insert(my_flat_vals.end(), local_vals[i].begin(), local_vals[i].end());
}

std::vector<int> all_nnz_per_row(rows_a);
std::vector<int> recv_counts(size);
std::vector<int> displs(size);
for (int i = 0; i < size; ++i) {
recv_counts[i] = (rows_a / size) + (i < (rows_a % size) ? 1 : 0);
displs[i] = (i == 0) ? 0 : displs[i - 1] + recv_counts[i - 1];
}

MPI_Allgatherv(my_nnz_per_row.data(), my_row_count, MPI_INT, all_nnz_per_row.data(), recv_counts.data(),
displs.data(), MPI_INT, MPI_COMM_WORLD);

matrix_c.row_ptr.assign(rows_a + 1, 0);
for (int i = 0; i < rows_a; ++i) {
matrix_c.row_ptr[i + 1] = matrix_c.row_ptr[i] + all_nnz_per_row[i];
}

matrix_c.col_index.resize(matrix_c.row_ptr[rows_a]);
matrix_c.values.resize(matrix_c.row_ptr[rows_a]);

std::vector<int> val_recv_counts(size);
std::vector<int> val_displs(size);
for (int i = 0; i < size; ++i) {
val_recv_counts[i] = matrix_c.row_ptr[displs[i] + recv_counts[i]] - matrix_c.row_ptr[displs[i]];
val_displs[i] = matrix_c.row_ptr[displs[i]];
}

MPI_Allgatherv(my_flat_cols.data(), static_cast<int>(my_flat_cols.size()), MPI_INT, matrix_c.col_index.data(),
val_recv_counts.data(), val_displs.data(), MPI_INT, MPI_COMM_WORLD);
MPI_Allgatherv(my_flat_vals.data(), static_cast<int>(my_flat_vals.size()), MPI_DOUBLE, matrix_c.values.data(),
val_recv_counts.data(), val_displs.data(), MPI_DOUBLE, MPI_COMM_WORLD);

return true;
}

bool AshihminDMultMatrCrsALL::PostProcessingImpl() {
return true;
}

} // namespace ashihmin_d_mult_matr_crs
Loading
Loading