Skip to content
This repository has been archived by the owner on Jan 10, 2025. It is now read-only.

Commit

Permalink
work on accumulations
Browse files Browse the repository at this point in the history
  • Loading branch information
b8raoult committed Mar 18, 2024
1 parent 5d9b090 commit 72ca923
Showing 1 changed file with 83 additions and 47 deletions.
130 changes: 83 additions & 47 deletions ecml_tools/create/functions/actions/acc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# nor does it submit to any jurisdiction.
#
import datetime
from collections import defaultdict
from copy import deepcopy

import climetlab as cml
Expand All @@ -20,29 +21,30 @@


class Accumulation:
def __init__(self, out, param, date, time, number, stepping):
def __init__(self, out, /, param, date, time, number, step, frequency, stream=None):
self.out = out
self.param = param
self.date = date
self.time = time
self.steps = step
self.number = number
self.values = None
self.seen = set()
self.startStep = None
self.endStep = None
self.done = False
self.stepping = stepping
self.frequency = frequency

@property
def key(self):
return (self.param, self.date, self.time, self.number)
return (self.param, self.date, self.time, self.steps, self.number)


class AccumulationFromStart(Accumulation):
def add(self, field, values):
step = field.metadata("step")
# if step not in self.steps:
# return
if step not in self.steps:
return

assert not self.done, (self.key, step)
assert step not in self.seen, (self.key, step)
Expand Down Expand Up @@ -88,8 +90,11 @@ def add(self, field, values):


class AccumulationFromLastStep(Accumulation):

def add(self, field, values):
step = field.metadata("step")
if step not in self.steps:
return

assert not self.done, (self.key, step)
assert step not in self.seen, (self.key, step)
Expand All @@ -100,7 +105,7 @@ def add(self, field, values):
assert endStep == step, (startStep, endStep, step)
assert step not in self.seen, (self.key, step)

assert endStep - startStep == self.stepping, (startStep, endStep)
assert endStep - startStep == self.frequency, (startStep, endStep)

if self.startStep is None:
self.startStep = startStep
Expand Down Expand Up @@ -132,35 +137,50 @@ def add(self, field, values):
self.done = True


def accumulations_from_start(dates, step1, step2):
def accumulations_from_start(dates, step1, step2, frequency, base_times):
assert frequency == 0, frequency
assert base_times is None, base_times

for valid_date in dates:
base_date = valid_date - datetime.timedelta(hours=step2)

yield (
base_date.year * 10000 + base_date.month * 100 + base_date.day,
base_date.hour * 100 + base_date.minute,
step1,
(step1, step2),
)


def accumulations_from_last_step(dates, step1, step2, frequency, base_times):

if base_times is None:
base_times = [0, 6, 12, 18]

base_times = [t // 100 if t > 100 else t for t in base_times]
assert frequency

for valid_date in dates:

print(f"====> {valid_date=}")

base_date = valid_date - datetime.timedelta(hours=step2)
add_step = 0
while base_date.hour not in base_times:
# print(f'{base_date=}, {base_times=}, {add_step=} {frequency=}')
base_date -= datetime.timedelta(hours=frequency)
add_step += frequency

steps = []
for step in range(step1 + frequency * 2, step2 + frequency, frequency):
steps.append(step)

yield (
base_date.year * 10000 + base_date.month * 100 + base_date.day,
base_date.hour * 100 + base_date.minute,
step2,
tuple(steps),
)


def accumulations_from_last_step(dates, step1, step2, frequency):
for valid_date in dates:
date1 = valid_date - datetime.timedelta(hours=step1 + frequency)

for step in range(step1, step2, frequency):
date = date1 + datetime.timedelta(hours=step)
yield (
date.year * 10000 + date.month * 100 + date.day,
date.hour * 100 + date.minute,
step,
)


def identity(x):
return x

Expand All @@ -171,6 +191,7 @@ def accumulations(
user_accumulation_period,
request,
patch=identity,
base_times=None,
):
if not isinstance(user_accumulation_period, (list, tuple)):
user_accumulation_period = (0, user_accumulation_period)
Expand All @@ -180,9 +201,9 @@ def accumulations(
assert step1 < step2, user_accumulation_period

if data_accumulation_period == 0:
mars_date_time_step = accumulations_from_start(dates, step1, step2)
mars_date_time_step = accumulations_from_start(dates, step1, step2, data_accumulation_period, base_times)
else:
mars_date_time_step = accumulations_from_last_step(dates, step1, step2, data_accumulation_period)
mars_date_time_step = accumulations_from_last_step(dates, step1, step2, data_accumulation_period, base_times)

request = deepcopy(request)

Expand All @@ -196,7 +217,7 @@ def accumulations(
number = request.get("number", [0])
assert isinstance(number, (list, tuple))

stepping = data_accumulation_period
frequency = data_accumulation_period

type_ = request.get("type", "an")
if type_ == "an":
Expand All @@ -214,7 +235,7 @@ def accumulations(

accumulations = {}

for date, time, step in mars_date_time_step:
for date, time, steps in mars_date_time_step:
for p in param:
for n in number:
requests.append(
Expand All @@ -223,42 +244,38 @@ def accumulations(
"param": p,
"date": date,
"time": time,
"step": step,
"step": sorted(steps),
"number": n,
}
)
)

key = (p, date, time, n)
if key not in accumulations:
accumulations[key] = AccumulationClass(
out,
stepping=stepping,
param=p,
date=date,
time=time,
number=number,
)

compressed = Availability(requests)
ds = cml.load_source("empty")
for r in compressed.iterate():
request.update(r)
ds = ds + cml.load_source("mars", **request)

accumulations = defaultdict(list)
for a in [AccumulationClass(out, frequency=frequency, **r) for r in requests]:
for s in a.steps:
accumulations[(a.param, a.date, a.time, s, a.number)].append(a)

for field in ds:
print(field)
key = (
field.metadata("param"),
field.metadata("date"),
field.metadata("time"),
field.metadata("step"),
field.metadata("number"),
)
values = field.values # optimisation
accumulations[key].add(field, values)
for a in accumulations[key]:
a.add(field, values)

for a in accumulations.values():
assert a.done, (a.key, a.seen)
for acc in accumulations.values():
for a in acc:
assert a.done, (a.key, a.seen, a.steps)

out.close()

Expand All @@ -277,6 +294,11 @@ def accumulations(
if __name__ == "__main__":
import yaml

dates = yaml.safe_load("[2022-12-30 18:00, 2022-12-31 00:00, 2022-12-31 06:00, 2022-12-31 12:00]")
dates = to_datetime_list(dates)

print(dates)

config = yaml.safe_load(
"""
class: od
Expand All @@ -286,10 +308,6 @@ def accumulations(
param: tp
"""
)
dates = yaml.safe_load("[2022-12-30 18:00, 2022-12-31 00:00, 2022-12-31 06:00, 2022-12-31 12:00]")
dates = to_datetime_list(dates)

print(dates)

def scda(request):
if request["time"] in (600, 1800):
Expand All @@ -298,7 +316,25 @@ def scda(request):
request["stream"] = "oper"
return request

ds = accumulations(dates, 0, (0, 6), config, scda)
ds = accumulations(dates, 0, (6, 12), config, scda)
print()
for f in ds:
print(f.valid_datetime())

################

config = yaml.safe_load(
"""
class: ea
expver: '0001'
grid: 20./20.
levtype: sfc
param: tp
"""
)
print()

ds = accumulations(dates, 1, (0, 6), config, base_times=[6, 18])
print()
for f in ds:
print(f.valid_datetime())

0 comments on commit 72ca923

Please sign in to comment.