Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0e087e5
feat: cloudSupport library
FoamScience Nov 27, 2025
aa849b6
feat: basic cloud support for adaptiveFvMesh class
FoamScience Nov 27, 2025
dad0530
fix: conflicts of LB with refinement/coarsening when remapping cloud …
FoamScience Nov 27, 2025
70c5f95
feat: tutorial case showcasing cloud support
FoamScience Nov 27, 2025
c1ab17c
feat: handler types of different cloud types
FoamScience Nov 28, 2025
1e6181c
fix: particleInjection2D explicit cloudHandler setting
FoamScience Nov 28, 2025
0598f10
refactor: AMR/LB core functionality isolated
FoamScience Nov 28, 2025
08d57a4
refactor: use coreAMR for the dynamicFvMesh classes route
FoamScience Nov 28, 2025
1b8c995
feat: AMR/LB as function objects
FoamScience Nov 28, 2025
a51f380
build: compile new cloud-related libs and FOs
FoamScience Nov 28, 2025
70a0139
test: unit tests for cloud-related LB
FoamScience Nov 28, 2025
6aebf76
feat: MPPICcolumn tutorual for LB with mesh motion dynamicFvMesh types
FoamScience Nov 28, 2025
dee106e
fix: bug with cloud position storage gets triggered wrongly
FoamScience Nov 28, 2025
dfcea01
fix: cpu load time settings
FoamScience Nov 28, 2025
a53efb0
fix: cloud handlers behavior with injectors
FoamScience Nov 28, 2025
013a4a1
fix: particle coefficient in cellCount load policy
FoamScience Nov 28, 2025
29c9159
fix: random segfault in hex2D unit tetss
FoamScience Nov 29, 2025
8f87651
test: update test cases
FoamScience Nov 29, 2025
8ea6440
fix: bugs with cloud parcel relocation during LB remapping
FoamScience Nov 30, 2025
d9ca87c
fix: compilation bug with coded refinement with function object route
FoamScience Nov 30, 2025
256cc57
ci: more extensive cloud LB unit tests
FoamScience Nov 30, 2025
1b266f1
fix: proper cpuLoad synchronization and load resets
FoamScience Dec 3, 2025
5dbf031
fix: pass distribution by reference
FoamScience Dec 3, 2025
3277003
fix: sampledSurface updates; works on #10
FoamScience Dec 3, 2025
a085501
build: move cleaning script
FoamScience Dec 3, 2025
720f2be
fix: updates to particle tutorials and damBreak2D
FoamScience Dec 3, 2025
000773e
ci: update to use new foamUT versions
FoamScience Dec 4, 2025
2b7961b
fix: mesh redistribution with motion solvers and add minimum cell con…
FoamScience Dec 4, 2025
e1c786c
fix: configure MPPIC column for dynamic mesh load balancing
FoamScience Dec 4, 2025
895f13b
fix: info statements cleanup
FoamScience Dec 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions .github/workflows/of2506.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,26 @@ jobs:
with:
detached: true

- name: Compile and Test
- name: Compile
run: |
source /usr/lib/openfoam/openfoam2506/etc/bashrc
mkdir -p $FOAM_USER_LIBBIN
./Allwmake

- name: Test in serial
run: |
source /usr/lib/openfoam/openfoam2506/etc/bashrc
git clone https://github.com/FoamScience/foamUT $FOAM_FOAMUT
sed -i 's/mpirun/mpirun --oversubscribe/g' $FOAM_FOAMUT/Alltest
ln -s "$PWD"/tests/adaptiveFvMeshTests "$FOAM_FOAMUT/tests/adaptiveFvMeshTests"
rm -rf "$FOAM_FOAMUT/cases"
cp -r tests/testCases "$FOAM_FOAMUT/cases"
cd $FOAM_FOAMUT || exit 1
rm -rf tests/exampleTests
./Alltest "$@"
./foamut
if [ -f $FOAM_FOAMUT/tests/adaptiveFvMeshTests/log.wmake ]; then cat $FOAM_FOAMUT/tests/adaptiveFvMeshTests/log.wmake; fi

- name: Test in parallel
run: |
source /usr/lib/openfoam/openfoam2506/etc/bashrc
cd $FOAM_FOAMUT || exit 1
./foamut --parallel --mpirun-options oversubscribe,use-hwthread-cpus
File renamed without changes.
2 changes: 2 additions & 0 deletions Allwmake
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ fi

#------------------------------------------------------------------------------

wmake src/lagrangian
wmake src/loadPolicies
wmake src/cpuLoad
wmake src/errorEstimators
wmake src/dynamicMesh
wmake src/dynamicFvMesh
wmake src/functionObjects

wmake applications/utilities/updateMesh
wmake applications/solvers/combustion/reactingDyMFoam
Expand Down
197 changes: 153 additions & 44 deletions src/cpuLoad/cpuLoadPolicy/cpuLoadPolicy.C
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ namespace Foam

__attribute__((constructor))
void onCPULoadLib() {
WarningInFunction
<< "'libamrCPULoad.so' library loaded. Some MPI calls are overrided just by loading this library!"
<< nl << tab << "This is the case even if you don't use cpuLoad for load-balancing..."
<< nl << tab << "Of course this is possible only when this library is loaded before real MPI libs."
<< nl << endl;
if (Pstream::master()) {
WarningInFunction
<< "'libamrCPULoad.so' library loaded. Some MPI calls are overriden just by loading this library!"
<< nl << tab << "This is the case even if you don't use cpuLoad for load-balancing..."
<< nl << tab << "Of course this is possible only when this library is loaded before real MPI libs."
<< nl << endl;
}
}
}

Expand Down Expand Up @@ -263,16 +265,46 @@ WRAP_MPI_FUNCTION(

// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //

namespace
{
// Helper to parse time unit from string
Foam::TimeUnit parseTimeUnit(const Foam::word& unitStr)
{
if (unitStr == "nano" || unitStr == "nanoseconds" || unitStr == "ns")
{
return Foam::TimeUnit::Nano;
}
else if (unitStr == "milli" || unitStr == "milliseconds" || unitStr == "ms")
{
return Foam::TimeUnit::Milli;
}
// Default to microseconds
return Foam::TimeUnit::Micro;
}
}

Foam::cpuLoadPolicy::cpuLoadPolicy
(
const fvMesh& mesh,
const dictionary& dict
)
:
loadPolicy(mesh, dict),
maxCycleLength_(dict.lookupOrDefault("maxLBCycleLength", 5*readLabel(dict.lookup("refineInterval")))),
isActive_(true)
maxCycleLength_(dict.getOrDefault<label>("maxLBCycleLength", 50)),
lastResetTimeIndex_(0),
isActive_(true),
timeUnit_(parseTimeUnit(dict.getOrDefault<word>("timeUnit", "milli")))
{
// Set the global time unit for output formatting
profilerTimeUnit() = timeUnit_;

Info<< " maxLBCycleLength: " << maxCycleLength_ << " timesteps" << nl
<< " timeUnit: " << timeUnitSuffix(timeUnit_) << endl;

// Synchronize all processors before resetting cycleStart
// This ensures all processors start measurement from approximately
// the same wall-clock time, even if construction happens at different times
UPstream::barrier(UPstream::worldComm);
mpiCommsStats.reset();
}

Expand All @@ -287,65 +319,142 @@ Foam::cpuLoadPolicy::~cpuLoadPolicy()

bool Foam::cpuLoadPolicy::canBalance()
{
if (isActive_ && mesh_.time().timeIndex() > 5) {
// check that MPI calls where intercepted
// it's highly unlikely that after 5 iterations, no measuremants were picked up!
if (
returnReduce(mpiCommsStats.nP2PSends, sumOp<int>()) == 0 &&
returnReduce(mpiCommsStats.nP2PRecvs, sumOp<int>()) == 0 &&
returnReduce(mpiCommsStats.nCollectives, sumOp<int>()) == 0 &&
returnReduce(mpiCommsStats.nOthers, sumOp<int>()) == 0
) {
const label currentTimeIndex = mesh_.time().timeIndex();

// Increment timestep counter in cycle
mpiCommsStats.nTSInCycle++;

// Compute elapsed wall-clock time since last reset
auto now = std::chrono::high_resolution_clock::now();
long long elapsed = std::chrono::duration_cast<Duration>(now - mpiCommsStats.cycleStart).count();

if (elapsed == 0)
{
return false;
}

// Total MPI communication time (waiting/transferring, not computing)
long long totalMPITime = mpiCommsStats.p2pSendTime
+ mpiCommsStats.p2pRecvTime
+ mpiCommsStats.collectiveTime
+ mpiCommsStats.otherTime;

// Computational load = wall-clock time - MPI communication time
// This represents time spent actually computing (not waiting on MPI)
// Ensure non-negative (edge case protection)
myLoad_ = std::max(0LL, elapsed - totalMPITime);

// Check if we should reset stats (every maxCycleLength timesteps)
// Use synchronized decision across all processors to avoid cycleStart drift
const label stepsSinceReset = currentTimeIndex - lastResetTimeIndex_;
const bool localShouldReset = (stepsSinceReset >= maxCycleLength_)
|| (mpiCommsStats.nTSInCycle > maxCycleLength_);

// Synchronize reset decision - if ANY processor needs reset, ALL reset
const bool shouldReset = returnReduce(localShouldReset, orOp<bool>());

if (shouldReset)
{
// Report in the configured time unit
long long divisor = timeUnitDivisor(timeUnit_);
Pout<< "Load measurement for interval [" << lastResetTimeIndex_
<< ", " << currentTimeIndex << "] (" << stepsSinceReset << " timesteps):"
<< nl << " Wall-clock time: " << label(elapsed/divisor) << " " << timeUnitSuffix(timeUnit_)
<< nl << " MPI comm time: " << label(totalMPITime/divisor) << " " << timeUnitSuffix(timeUnit_)
<< nl << " Compute load: " << label(myLoad_/divisor) << " " << timeUnitSuffix(timeUnit_)
<< nl << mpiCommsStats << endl;

// Synchronize all processors before resetting cycleStart
// This ensures all processors start their new measurement interval
// at approximately the same wall-clock time
UPstream::barrier(UPstream::worldComm);

// Reset stats and start new measurement interval
mpiCommsStats.reset();
lastResetTimeIndex_ = currentTimeIndex;
}

// Check MPI interception is working (after enough timesteps)
// Once verified, disable this check to avoid repeated overhead
if (isActive_ && currentTimeIndex > 5)
{
bool hasMPIActivity =
returnReduce(mpiCommsStats.nP2PSends, sumOp<int>()) > 0 ||
returnReduce(mpiCommsStats.nP2PRecvs, sumOp<int>()) > 0 ||
returnReduce(mpiCommsStats.nCollectives, sumOp<int>()) > 0 ||
returnReduce(mpiCommsStats.nOthers, sumOp<int>()) > 0;

if (hasMPIActivity)
{
// MPI interception is working, no need to check again
isActive_ = false;
}
else if (currentTimeIndex > lastResetTimeIndex_ + 2)
{
// Give a few timesteps after reset before failing
FatalErrorInFunction
<< "Seems like MPI call interception is not working. Make sure to preload the intercepting libraries:"
<< nl << nl << "LD_PRELOAD=\"$FOAM_USER_LIBBIN/libamrLoadPolicies.so $FOAM_USER_LIBBIN/libamrCPULoad.so\" <your-solver-command>"
<< nl << nl<< "Library loading order is important, libamrCPULoad.so must load before MPI libs."
<< nl << nl << "Library loading order is important, libamrCPULoad.so must load before MPI libs."
<< nl << "So, no point in continuing..."
<< abort(FatalError);
}
}
// Stat a new measuring cycle when LB is viable, bound by maxCycleLength_ of
// time steps
bool newCycle = !returnReduce(stillSameCycle(), orOp<bool>());
if (mpiCommsStats.nTSInCycle > maxCycleLength_) newCycle = true;
auto now = std::chrono::high_resolution_clock::now();
auto elapsed = std::chrono::duration_cast<Duration>(now-mpiCommsStats.cycleStart).count();
if (elapsed == 0) return false;
if (newCycle) myLoad_ = 0;
myLoad_ += elapsed
- mpiCommsStats.p2pSendTime - mpiCommsStats.p2pRecvTime
- mpiCommsStats.collectiveTime - mpiCommsStats.otherTime;
scalar idealLoad = returnReduce(myLoad_, sumOp<scalar>()) / scalar(Pstream::nProcs());

// Calculate imbalance across processors
scalar totalLoad = returnReduce(myLoad_, sumOp<scalar>());
scalar idealLoad = totalLoad / scalar(Pstream::nProcs());

// Protect against division by zero (e.g., at simulation start or after reset)
if (idealLoad < SMALL)
{
Pout<< "Maximum imbalance found = 0 % (no meaningful load yet)" << endl;
return false;
}

scalar maxImbalance = returnReduce(mag(myLoad_ - idealLoad) / idealLoad, maxOp<scalar>());

Pout<< "Maximum imbalance found = " << 100*maxImbalance << " %" << endl;
Pout<< "Current processor load stats:" << nl << mpiCommsStats << endl;

if (maxImbalance < allowedImbalance_)
{
return false;
}
// start new cylce only if rebalancing...
mpiCommsStats.reset();
myLoadHistory_.set(mesh_.time().timeIndex(), myLoad_);
//Pout << "Last history datapoint: " << myLoadHistory_.toc() << tab << myLoadHistory_[myLoadHistory_.toc().last()] << endl;
if (!myLoadHistory_.empty()) {
Pout<< "Current processor load computed from Time index = "
<< myLoadHistory_.toc().last()
<< " to Time index = " << mesh_.time().timeIndex() << "s"
<< nl;
}

// Record load history
myLoadHistory_.set(currentTimeIndex, myLoad_);

long long divisor = timeUnitDivisor(timeUnit_);
Pout<< "Load balancing triggered at Time index = " << currentTimeIndex
<< ", myLoad = " << label(myLoad_/divisor) << " " << timeUnitSuffix(timeUnit_) << nl;

return true;
}

Foam::scalarField Foam::cpuLoadPolicy::cellWeights() {
label idealLoad = returnReduce(myLoad_, sumOp<scalar>()) / scalar(Pstream::nProcs());
scalar totalLoad = returnReduce(myLoad_, sumOp<scalar>());
scalar idealLoad = totalLoad / scalar(Pstream::nProcs());

// Protect against division by zero
if (idealLoad < SMALL || myLoad_ < SMALL)
{
return scalarField(mesh_.nCells(), 1.0);
}

scalar imbalance = mag(myLoad_ - idealLoad) / idealLoad;
if (myLoad_ > idealLoad) imbalance = 1.0/imbalance;
return scalarField(mesh_.nCells(), myLoad_ != 0 ? imbalance : 1.0);

// Protect against division by zero when inverting imbalance
if (myLoad_ > idealLoad && imbalance > SMALL)
{
imbalance = 1.0/imbalance;
}

return scalarField(mesh_.nCells(), imbalance > SMALL ? imbalance : 1.0);
}

bool Foam::cpuLoadPolicy::willBeBeneficial
(
const labelList distribution
const labelList& distribution
) {
// WARN: hard to know beforehand which cells will be beneficial
// to move; it's a very dynamic property anyway, so no attempt
Expand Down
11 changes: 9 additions & 2 deletions src/cpuLoad/cpuLoadPolicy/cpuLoadPolicy.H
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,18 @@ class cpuLoadPolicy
: public loadPolicy
{
protected:
// Maximum number of time steps to resut MPI data
// Maximum cycle length (load is tracked over this many timesteps)
label maxCycleLength_;

// Time index when stats were last reset
label lastResetTimeIndex_;

// Is MPI call interception active
bool isActive_;

// Time unit for reporting (nano, micro, milli)
TimeUnit timeUnit_;

public:

//- Runtime type information
Expand Down Expand Up @@ -98,7 +105,7 @@ public:
//- Predict if LB would be viable
virtual bool willBeBeneficial
(
const labelList distribution
const labelList& distribution
);

inline bool stillSameCycle() const {
Expand Down
Loading