Skip to content

Commit

Permalink
mpas particles checkpoint options
Browse files Browse the repository at this point in the history
  • Loading branch information
hguo committed Oct 29, 2023
1 parent 08f5967 commit b7b05b3
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 36 deletions.
10 changes: 10 additions & 0 deletions include/ftk/filters/particle_tracer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ struct particle_tracer : public virtual tracker
void write_trajectories(const std::string& filename);
void write_geo_trajectories(const std::string& filename);

public:
void set_nsteps_per_day(int n) { nsteps_per_day = n; }
void set_checkpoint_days(int n) { checkpoint_days = n; }
void set_checkpoint_months(int n) { checkpoint_months = n; }

public: // will be overriden by above parameters
void set_delta_t(double d) { current_delta_t = d; } // overriding delta t
void set_nsteps_per_interval(int n) { nsteps_per_interval = n; }
void set_nsteps_per_checkpoint(int n) { nsteps_per_checkpoint = 16; }
Expand All @@ -55,9 +61,13 @@ protected:

double current_t = 0.0, current_delta_t = 1.0;

int nsteps_per_day = 1024;

int nsteps_per_interval = 128;
int nsteps_per_checkpoint = 16;

int checkpoint_days = 0, checkpoint_months = 0;

const int nd_;
int integrator = PARTICLE_TRACER_INTEGRATOR_RK4;
};
Expand Down
64 changes: 55 additions & 9 deletions include/ftk/filters/particle_tracer_mpas_ocean.hh
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,15 @@ protected:
int nch() const { return 7; }
std::vector<std::string> scalar_names() const { return {"vertVelocity", "salinity", "temperature"}; }

static double deltaT(std::tm t0, std::tm t1);

protected:
// std::shared_ptr<ndarray<double>> V[2]; // inherited from particle_tracer
std::shared_ptr<ndarray<double>> zTop[2];
std::shared_ptr<ndarray<double>> vertVelocityTop[2];
std::shared_ptr<ndarray<double>> salinity[2];
std::shared_ptr<ndarray<double>> temperature[2];
std::tm timestamp[2];

#if FTK_HAVE_CUDA
mop_ctx_t *ctx;
Expand Down Expand Up @@ -220,14 +223,26 @@ inline void particle_tracer_mpas_ocean::load_particles_cuda()

inline void particle_tracer_mpas_ocean::update_timestep()
{
prepare_timestep();

typedef std::chrono::high_resolution_clock clock_type;

const int ndays = current_delta_t / 86400.;
nsteps_per_interval = nsteps_per_day * ndays;

if (checkpoint_days)
nsteps_per_checkpoint = nsteps_per_day * checkpoint_days;
else if (checkpoint_months)
nsteps_per_checkpoint = nsteps_per_day * 30 * checkpoint_months;
else
nsteps_per_checkpoint = nsteps_per_interval;

if (xl == FTK_XL_CUDA) {
#if FTK_HAVE_CUDA
if (comm.rank() == 0) fprintf(stderr, "current_timestep=%d\n", current_timestep);
if (comm.rank() == 0) fprintf(stderr, "current_timestep=%d, delta_t=%f\n",
current_timestep, current_delta_t);

current_t = current_timestep;

prepare_timestep();

bool streamlines = false;
if (this->ntimesteps == 1) {
Expand All @@ -237,20 +252,23 @@ inline void particle_tracer_mpas_ocean::update_timestep()

// if ((!streamlines) && this->snapshots.size() < 2)
// return; // nothing can be done
const int nsteps_per_day = 1024;

#if 0
const int years = 1;
const double T = 86400.0 * 365 * years;
const int nsteps = nsteps_per_day * 365 * years;
const int nsubsteps = nsteps_per_day * 10; // 10 days
#endif

for (int istep = 0; istep < nsteps; istep += nsubsteps) {
for (int istep = 0; istep < nsteps_per_interval; istep += nsteps_per_checkpoint) {
auto t1 = clock_type::now();
mop_execute(ctx, T, nsteps, nsubsteps);
mop_execute(ctx, current_delta_t, nsteps_per_interval, nsteps_per_checkpoint);

auto t2 = clock_type::now();
fprintf(stderr, "day=%d, t_comp=%f\n", istep / nsteps_per_day,
std::chrono::duration_cast<std::chrono::nanoseconds>(t2 - t1).count() * 1e-9);
fprintf(stderr, "checkpoint=%d, t_comp=%f, istep=%d, nsteps_per_interval=%d, nsteps_per_checkpoint=%d\n",
istep / nsteps_per_day,
std::chrono::duration_cast<std::chrono::nanoseconds>(t2 - t1).count() * 1e-9,
istep, nsteps_per_interval, nsteps_per_checkpoint);

// push resulting particles to the trajectory
for (int i = 0; i < ctx->nparticles; i ++) {
Expand Down Expand Up @@ -320,6 +338,20 @@ inline void particle_tracer_mpas_ocean::push_field_data_snapshot(std::shared_ptr
particle_tracer::push_field_data_snapshot(g);
}

inline double particle_tracer_mpas_ocean::deltaT(std::tm tm0, std::tm tm1)
{
if (tm0.tm_year < 2000) // a workaround for year before 1970
tm0.tm_year += 2000;

if (tm1.tm_year < 2000)
tm1.tm_year += 2000;

std::time_t t0 = std::mktime(&tm0),
t1 = std::mktime(&tm1);

return std::difftime(t1, t0);
}

inline void particle_tracer_mpas_ocean::prepare_timestep()
{
V[0] = snapshots[0]->get_ptr<double>("velocity");
Expand All @@ -336,6 +368,20 @@ inline void particle_tracer_mpas_ocean::prepare_timestep()

temperature[0] = snapshots[0]->get_ptr<double>("temperature");
temperature[1] = snapshots.size() > 1 ? snapshots[1]->get_ptr<double>("temperature") : nullptr;

// timestamp
for (int i = 0; i < snapshots.size(); i ++)
{
const auto xtime = snapshots[i]->get_ptr<char>("xtime");
const std::string xtimestr(xtime->data(), xtime->size());
std::istringstream ss(xtimestr);
ss >> std::get_time(&timestamp[i], "%Y-%m-%d_%H:%M:%S");
}

if (snapshots.size() > 1) {
// fprintf(stderr, "difftime=%f\n", deltaT(timestamp[0], timestamp[1]));
set_delta_t(deltaT(timestamp[0], timestamp[1]));
}
}

inline bool particle_tracer_mpas_ocean::eval_v_vertical(int t, const double *x, double *v, int *hint)
Expand Down
42 changes: 24 additions & 18 deletions include/ftk/io/mpas_stream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ public:

std::shared_ptr<mpas_mesh<>> m;
std::function<void(int, std::shared_ptr<ndarray_group>)> callback;

int ncid;
size_t start_timestep = 0, current_timestep = 0, ntimesteps = 0;
size_t time_strlen;
bool c2v = true;
};

Expand Down Expand Up @@ -61,23 +62,10 @@ void mpas_stream::initialize()
NC_SAFE_CALL( nc_inq_dimlen(ncid, dim_unlimited, &ntimesteps) );

fprintf(stderr, "mpas_ntimesteps=%zu\n", ntimesteps);

{
ndarray<char> xtime;
xtime.read_netcdf(ncid, "xtime");
std::string stime(xtime.data(), xtime.size());

for (int i = 0; i < xtime.dim(1); i ++) {
std::string str(stime, i*xtime.dim(0), xtime.dim(0));

std::tm t = {};
std::istringstream ss(str);
ss >> std::get_time(&t, "%Y-%m-%d_%H:%M:%S");

// fprintf(stderr, "xtime=%s\n", str.c_str());
// std::cout << std::put_time(&t, "%c") << std::endl;
}
}

int dim_strlen;
NC_SAFE_CALL( nc_inq_dimid(ncid, "StrLen", &dim_strlen) );
NC_SAFE_CALL( nc_inq_dimlen(ncid, dim_strlen, &time_strlen) );

#else
fatal(FTK_ERR_NOT_BUILT_WITH_NETCDF);
Expand All @@ -92,6 +80,24 @@ bool mpas_stream::advance_timestep()
// fprintf(stderr, "current_timestep=%zu\n", current_timestep);
std::shared_ptr<ndarray_group> g(new ndarray_group);

// timestamp
{
const size_t st[3] = {current_timestep, 0},
sz[3] = {1, time_strlen};

ndarray<char> xtime;
xtime.read_netcdf(ncid, "xtime", st, sz);
g->set("xtime", xtime);

#if 0
std::string str(xtime.data(), xtime.size());
std::tm t = {};
std::istringstream ss(str);
ss >> std::get_time(&t, "%Y-%m-%d_%H:%M:%S");
std::cerr << std::put_time(&t, "%c") << std::endl;
#endif
}

{
const size_t st[3] = {current_timestep, 0, 0},
sz[3] = {1, m->n_cells(), m->n_layers()};
Expand Down
23 changes: 16 additions & 7 deletions src/cli/ftk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ std::vector<int> pt_seed_strides;
std::vector<double> pt_seed_box;

// geo particle tracer
int ptgeo_nsteps_per_day = 1024;
int ptgeo_checkpoint_per_timestep = 0;
int ptgeo_nsteps_per_day = 0; // 1024;
int ptgeo_checkpoint_per_timestep = 0; //
int ptgeo_checkpoint_months = 0;
int ptgeo_checkpoint_days = 0;

Expand Down Expand Up @@ -590,8 +590,17 @@ void initialize_particle_tracer_mpas_ocean(diy::mpi::communicator comm)
pt_nsteps_per_interval, pt_nsteps_per_checkpoint, pt_delta_t);

tracker_particle_mpas_ocean->set_ntimesteps(mpas_data_stream->ntimesteps);
tracker_particle_mpas_ocean->set_nsteps_per_interval(pt_nsteps_per_interval);
tracker_particle_mpas_ocean->set_nsteps_per_checkpoint(pt_nsteps_per_checkpoint);
if (ptgeo_nsteps_per_day)
tracker_particle_mpas_ocean->set_nsteps_per_day( ptgeo_nsteps_per_day );

// checkpoint
if (ptgeo_checkpoint_days)
tracker_particle_mpas_ocean->set_checkpoint_days(ptgeo_checkpoint_days);
else if (ptgeo_checkpoint_months)
tracker_particle_mpas_ocean->set_checkpoint_months(ptgeo_checkpoint_months);

// tracker_particle_mpas_ocean->set_nsteps_per_interval(pt_nsteps_per_interval);
// tracker_particle_mpas_ocean->set_nsteps_per_checkpoint(pt_nsteps_per_checkpoint);
tracker_particle_mpas_ocean->set_delta_t( pt_delta_t );

tracker_particle_mpas_ocean->initialize();
Expand Down Expand Up @@ -922,7 +931,7 @@ int parse_arguments(int argc, char **argv, diy::mpi::communicator comm)
("pt-nsteps-per-checkpoint", "Particle tracing: Number of steps per checkpoint", cxxopts::value<int>(pt_nsteps_per_checkpoint))
("pt-delta-t", "Particle tracing: Delta t per timestep", cxxopts::value<double>(pt_delta_t))
("pt-seed-strides", "Particle tracing: Seed strides, e.g., '4', '1,4', or '1,4,4'", cxxopts::value<std::string>())
("ptgeo-seed-box", "Geo particle tracing: Seed in a geobox: nlat,lat0,lat1,nlon,lon0,lon1,nz,z0,z1", cxxopts::value<std::string>())
("ptgeo-seeds", "Geo particle tracing: Seed in a geobox: nlat,lat0,lat1,nlon,lon0,lon1,nz,z0,z1", cxxopts::value<std::string>())
("ptgeo-nsteps-per-day", "Geo particle tracing: Number of steps per earth day", cxxopts::value<int>(ptgeo_nsteps_per_day))
("ptgeo-checkpoint-per-timestep", "Geo particle tracing: Number of checkpoints per timestep", cxxopts::value<int>(ptgeo_checkpoint_per_timestep))
("ptgeo-checkpoint-days", "Geo particle tracing: Number of days per checkpoint", cxxopts::value<int>(ptgeo_checkpoint_days))
Expand Down Expand Up @@ -1004,8 +1013,8 @@ int parse_arguments(int argc, char **argv, diy::mpi::communicator comm)
pt_seed_strides.push_back( std::atoi(s.c_str()) );
}

if (results.count("pt-seed-geo")) {
const auto str = results["pt-seed-geo"].as<std::string>();
if (results.count("ptgeo-seeds")) {
const auto str = results["ptgeo-seeds"].as<std::string>();
const auto strs = split(str, ",");
if (strs.size() < 9)
fatal("Insufficient number for seeding geospatially.");
Expand Down
4 changes: 2 additions & 2 deletions src/filters/particle_tracer_mpas.cu
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ static void load_cw_data(
cudaMalloc((void**)ddbuf, sizeof(double*) * 2);
checkLastCudaError("memcpy to c2w buffer: allocating ddbuf");
}
fprintf(stderr, "ddbuf=%p, dbuf=%p\n", *ddbuf, dbuf);
// fprintf(stderr, "ddbuf=%p, dbuf=%p\n", *ddbuf, dbuf);
cudaMemcpy(*ddbuf, dbuf, sizeof(double*) * 2,
cudaMemcpyHostToDevice);
checkLastCudaError("memcpy to c2w buffer: dev ptrs");
Expand All @@ -739,7 +739,7 @@ static void load_cw_data(
sizeof(double) * c->ncells * nch * nlayers,
cudaMemcpyHostToDevice);
cudaDeviceSynchronize();
fprintf(stderr, "dcw=%p\n", c->dcw);
// fprintf(stderr, "dcw=%p\n", c->dcw);
checkLastCudaError("memcpy to c2w buffer");

// c2w interpolation
Expand Down

0 comments on commit b7b05b3

Please sign in to comment.