Skip to content

Commit

Permalink
remov mesh passing
Browse files Browse the repository at this point in the history
  • Loading branch information
lroberts36 committed Nov 1, 2024
1 parent 675895e commit 9bb4747
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 41 deletions.
4 changes: 2 additions & 2 deletions src/bvals/comms/boundary_communication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ TaskStatus SendBoundBufs(std::shared_ptr<MeshData<Real>> &md) {
RebuildBufferCache<bound_type, true>(md, nbound, BndInfo::GetSendBndInfo,
ProResInfo::GetSend);
}
pmesh->pcombined_buffers->RepointSendBuffers(pmesh, md->partition, bound_type);
pmesh->pcombined_buffers->RepointSendBuffers(md->partition, bound_type);
}
// Restrict
if (md->NumBlocks() > 0) {
Expand Down Expand Up @@ -219,7 +219,7 @@ TaskStatus ReceiveBoundBufs(std::shared_ptr<MeshData<Real>> &md) {
false);

// Receive any messages that are around
pmesh->pcombined_buffers->TryReceiveAny(pmesh, bound_type);
pmesh->pcombined_buffers->TryReceiveAny(bound_type);

bool all_received = true;
int nreceived{0};
Expand Down
45 changes: 21 additions & 24 deletions src/bvals/comms/combined_buffers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ void CombinedBuffersRankPartition::AddVarBoundary(
//----------------------------------------------------------------------------------------
//----------------------------------------------------------------------------------------
CombinedBuffersRank::CombinedBuffersRank(int o_rank, BoundaryType b_type, bool send,
mpi_comm_t comm)
mpi_comm_t comm, Mesh *pmesh)
: other_rank(o_rank), b_type(b_type), sender(send), buffers_built(false),
comm_(comm) {
comm_(comm), pmesh(pmesh) {

int tag = 1234 + static_cast<int>(GetAssociatedSender(b_type));
if (sender) {
Expand Down Expand Up @@ -195,7 +195,7 @@ void CombinedBuffersRank::AddSendBuffer(int partition, MeshBlock *pmb,
}

//----------------------------------------------------------------------------------------
bool CombinedBuffersRank::TryReceiveBufInfo(Mesh *pmesh) {
bool CombinedBuffersRank::TryReceiveBufInfo() {
PARTHENON_REQUIRE(!sender, "Trying to receive on a combined sender.");
if (buffers_built) return buffers_built;

Expand Down Expand Up @@ -237,7 +237,7 @@ bool CombinedBuffersRank::TryReceiveBufInfo(Mesh *pmesh) {
}

//----------------------------------------------------------------------------------------
void CombinedBuffersRank::ResolveSendBuffersAndSendInfo(Mesh *pmesh) {
void CombinedBuffersRank::ResolveSendBuffersAndSendInfo() {
// First calculate the total size of the message
int total_buffers{0};
for (auto &[partition, combined_buf] : combined_bufs)
Expand Down Expand Up @@ -274,7 +274,7 @@ void CombinedBuffersRank::ResolveSendBuffersAndSendInfo(Mesh *pmesh) {
}

//----------------------------------------------------------------------------------------
void CombinedBuffersRank::RepointBuffers(Mesh *pmesh, int partition) {
void CombinedBuffersRank::RepointBuffers(int partition) {
if (combined_bufs.count(partition) == 0) return;
combined_bufs.at(partition).RebuildBndIdsOnDevice();
return;
Expand All @@ -298,8 +298,7 @@ bool CombinedBuffersRank::IsAvailableForWrite(int partition) {
}

//----------------------------------------------------------------------------------------
bool CombinedBuffersRank::TryReceiveAndUnpack(Mesh *pmesh, int partition,
mpi_message_t *message) {
bool CombinedBuffersRank::TryReceiveAndUnpack(int partition, mpi_message_t *message) {
PARTHENON_REQUIRE(buffers_built,
"Trying to recv combined buffers before they have been built");
PARTHENON_REQUIRE(combined_bufs.count(partition) > 0,
Expand All @@ -317,7 +316,7 @@ void CombinedBuffers::AddSendBuffer(int partition, MeshBlock *pmb,
if (combined_send_buffers.count({nb.rank, b_type}) == 0)
combined_send_buffers.emplace(std::make_pair(
std::make_pair(nb.rank, b_type),
CombinedBuffersRank(nb.rank, b_type, true, comms_[GetAssociatedSender(b_type)])));
CombinedBuffersRank(nb.rank, b_type, true, comms_[GetAssociatedSender(b_type)], pmesh)));
combined_send_buffers.at({nb.rank, b_type}).AddSendBuffer(partition, pmb, nb, var);
}

Expand All @@ -331,23 +330,23 @@ void CombinedBuffers::AddRecvBuffer(MeshBlock *pmb, const NeighborBlock &nb,
combined_recv_buffers.emplace(
std::make_pair(std::make_pair(nb.rank, b_type),
CombinedBuffersRank(nb.rank, b_type, false,
comms_[GetAssociatedSender(b_type)])));
comms_[GetAssociatedSender(b_type)], pmesh)));
}

void CombinedBuffers::ResolveAndSendSendBuffers(Mesh *pmesh) {
void CombinedBuffers::ResolveAndSendSendBuffers() {
for (auto &[id, buf] : combined_send_buffers)
buf.ResolveSendBuffersAndSendInfo(pmesh);
buf.ResolveSendBuffersAndSendInfo();
}

void CombinedBuffers::ReceiveBufferInfo(Mesh *pmesh) {
void CombinedBuffers::ReceiveBufferInfo() {
constexpr std::int64_t max_it = 1e10;
std::vector<bool> received(combined_recv_buffers.size(), false);
bool all_received;
std::int64_t receive_iters = 0;
do {
all_received = true;
for (auto &[id, buf] : combined_recv_buffers)
all_received = buf.TryReceiveBufInfo(pmesh) && all_received;
all_received = buf.TryReceiveBufInfo() && all_received;
receive_iters++;
} while (!all_received && receive_iters < max_it);
PARTHENON_REQUIRE(
Expand All @@ -374,23 +373,21 @@ void CombinedBuffers::PackAndSend(int partition, BoundaryType b_type) {
}
}

void CombinedBuffers::RepointSendBuffers(Mesh *pmesh, int partition,
BoundaryType b_type) {
void CombinedBuffers::RepointSendBuffers(int partition, BoundaryType b_type) {
for (int rank = 0; rank < Globals::nranks; ++rank) {
if (combined_send_buffers.count({rank, b_type}))
combined_send_buffers.at({rank, b_type}).RepointBuffers(pmesh, partition);
combined_send_buffers.at({rank, b_type}).RepointBuffers(partition);
}
}

void CombinedBuffers::RepointRecvBuffers(Mesh *pmesh, int partition,
BoundaryType b_type) {
void CombinedBuffers::RepointRecvBuffers(int partition, BoundaryType b_type) {
for (int rank = 0; rank < Globals::nranks; ++rank) {
if (combined_recv_buffers.count({rank, b_type}))
combined_recv_buffers.at({rank, b_type}).RepointBuffers(pmesh, partition);
combined_recv_buffers.at({rank, b_type}).RepointBuffers(partition);
}
}

void CombinedBuffers::TryReceiveAny(Mesh *pmesh, BoundaryType b_type) {
void CombinedBuffers::TryReceiveAny(BoundaryType b_type) {
#ifdef MPI_PARALLEL
// This was an attempt at another method for receiving, it seemed to work
// but was subject to the same problems as the Iprobe based code
Expand All @@ -414,7 +411,7 @@ void CombinedBuffers::TryReceiveAny(Mesh *pmesh, BoundaryType b_type) {
const int rank = status.MPI_SOURCE;
const int partition = status.MPI_TAG;
bool finished = combined_recv_buffers.at({rank, b_type})
.TryReceiveAndUnpack(pmesh, partition, nullptr);
.TryReceiveAndUnpack(partition, nullptr);
if (!finished)
processing_messages.insert(
std::make_pair(std::pair<int, int>{rank, partition}, message));
Expand All @@ -427,7 +424,7 @@ void CombinedBuffers::TryReceiveAny(Mesh *pmesh, BoundaryType b_type) {
int rank = p.first;
int partition = p.second;
bool finished = combined_recv_buffers.at({rank, b_type})
.TryReceiveAndUnpack(pmesh, partition, nullptr);
.TryReceiveAndUnpack(partition, nullptr);
if (finished) finished_messages.push_back({rank, partition});
}

Expand All @@ -444,7 +441,7 @@ void CombinedBuffers::TryReceiveAny(Mesh *pmesh, BoundaryType b_type) {
const int rank = status.MPI_SOURCE;
const int partition = status.MPI_TAG;
bool finished = combined_recv_buffers.at({rank, b_type})
.TryReceiveAndUnpack(pmesh, partition, &message);
.TryReceiveAndUnpack(partition, &message);
if (!finished)
processing_messages.insert(
std::make_pair(std::pair<int, int>{rank, partition}, message));
Expand All @@ -457,7 +454,7 @@ void CombinedBuffers::TryReceiveAny(Mesh *pmesh, BoundaryType b_type) {
int rank = p.first;
int partition = p.second;
bool finished = combined_recv_buffers.at({rank, b_type})
.TryReceiveAndUnpack(pmesh, partition, &message);
.TryReceiveAndUnpack(partition, &message);
if (finished) finished_messages.push_back({rank, partition});
}

Expand Down
28 changes: 16 additions & 12 deletions src/bvals/comms/combined_buffers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,22 +101,23 @@ struct CombinedBuffersRank {
com_buf_t message;

mpi_comm_t comm_;

Mesh *pmesh;
bool sender{true};
CombinedBuffersRank(int o_rank, BoundaryType b_type, bool send, mpi_comm_t comm);

CombinedBuffersRank(int o_rank, BoundaryType b_type, bool send, mpi_comm_t comm, Mesh *pmesh);

void AddSendBuffer(int partition, MeshBlock *pmb, const NeighborBlock &nb,
const std::shared_ptr<Variable<Real>> &var);

bool TryReceiveBufInfo(Mesh *pmesh);
bool TryReceiveBufInfo();

void ResolveSendBuffersAndSendInfo(Mesh *pmesh);
void ResolveSendBuffersAndSendInfo();

void PackAndSend(int partition);

bool TryReceiveAndUnpack(Mesh *pmesh, int partition, mpi_message_t *message);
bool TryReceiveAndUnpack(int partition, mpi_message_t *message);

void RepointBuffers(Mesh *pmesh, int partition);
void RepointBuffers(int partition);

bool IsAvailableForWrite(int partition);
};
Expand All @@ -129,7 +130,10 @@ struct CombinedBuffers {
std::map<std::pair<int, int>, mpi_message_t> processing_messages;

std::map<BoundaryType, mpi_comm_t> comms_;
CombinedBuffers() {

Mesh *pmesh;

CombinedBuffers(Mesh *pmesh) : pmesh(pmesh) {
#ifdef MPI_PARALLEL
// TODO(LFR): Switch to a different communicator for each BoundaryType pair
for (auto b_type :
Expand Down Expand Up @@ -164,17 +168,17 @@ struct CombinedBuffers {
void AddRecvBuffer(MeshBlock *pmb, const NeighborBlock &nb,
const std::shared_ptr<Variable<Real>>, BoundaryType b_type);

void ResolveAndSendSendBuffers(Mesh *pmesh);
void ResolveAndSendSendBuffers();

void ReceiveBufferInfo(Mesh *pmesh);
void ReceiveBufferInfo();

void PackAndSend(int partition, BoundaryType b_type);

void RepointSendBuffers(Mesh *pmesh, int partition, BoundaryType b_type);
void RepointSendBuffers(int partition, BoundaryType b_type);

void RepointRecvBuffers(Mesh *pmesh, int partition, BoundaryType b_type);
void RepointRecvBuffers(int partition, BoundaryType b_type);

void TryReceiveAny(Mesh *pmesh, BoundaryType b_type);
void TryReceiveAny(BoundaryType b_type);

bool IsAvailableForWrite(int partition, BoundaryType b_type);
};
Expand Down
6 changes: 3 additions & 3 deletions src/mesh/mesh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ Mesh::Mesh(ParameterInput *pin, ApplicationInput *app_in, Packages_t &packages,
nref(Globals::nranks), nderef(Globals::nranks), rdisp(Globals::nranks),
ddisp(Globals::nranks), bnref(Globals::nranks), bnderef(Globals::nranks),
brdisp(Globals::nranks), bddisp(Globals::nranks),
pcombined_buffers(std::make_shared<CombinedBuffers>()),
pcombined_buffers(std::make_shared<CombinedBuffers>(this)),
receive_type{pin->GetOrAddString("parthenon/mesh", "receive_type", "iprobe")} {
// Allow for user overrides to default Parthenon functions
if (app_in->InitUserMeshData != nullptr) {
Expand Down Expand Up @@ -647,9 +647,9 @@ void Mesh::BuildTagMapAndBoundaryBuffers() {
}
}

pcombined_buffers->ResolveAndSendSendBuffers(this);
pcombined_buffers->ResolveAndSendSendBuffers();
// This operation is blocking
pcombined_buffers->ReceiveBufferInfo(this);
pcombined_buffers->ReceiveBufferInfo();
}

void Mesh::CommunicateBoundaries(std::string md_name,
Expand Down

0 comments on commit 9bb4747

Please sign in to comment.