Skip to content

Commit 1245428

Browse files
author
James Halgren
committed
add parallel options
1 parent fc67d18 commit 1245428

File tree

2 files changed

+60
-19
lines changed

2 files changed

+60
-19
lines changed

ngen_forcing/TestConvert_NWMForcing_to_Ngen.py

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,12 @@ def get_forcing_dict_newway(
4848

4949

5050
def get_forcing_dict_newway_parallel(
51-
feature_list,
52-
folder_prefix,
53-
file_list,
54-
):
51+
feature_list,
52+
folder_prefix,
53+
file_list,
54+
para="thread",
55+
para_n=2,
56+
):
5557

5658
reng = "rasterio"
5759
_xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng)
@@ -67,8 +69,14 @@ def get_forcing_dict_newway_parallel(
6769
)
6870
filehandles = [xr.open_dataset("data/" + f) for f in file_list]
6971

70-
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
71-
# with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
72+
if para == "process":
73+
pool = concurrent.futures.ProcessPoolExecutor
74+
elif para == "thread":
75+
pool = concurrent.futures.ThreadPoolExecutor
76+
else:
77+
pool = concurrent.futures.ThreadPoolExecutor
78+
79+
with pool(max_workers=para_n) as executor:
7280
stats = []
7381
future_list = []
7482

@@ -130,10 +138,12 @@ def get_forcing_dict_newway_inverted(
130138

131139

132140
def get_forcing_dict_newway_inverted_parallel(
133-
feature_list,
134-
folder_prefix,
135-
file_list,
136-
):
141+
feature_list,
142+
folder_prefix,
143+
file_list,
144+
para="thread",
145+
para_n=2,
146+
):
137147

138148
import concurrent.futures
139149

@@ -161,8 +171,14 @@ def get_forcing_dict_newway_inverted_parallel(
161171
stats = []
162172
future_list = []
163173

164-
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
165-
# with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
174+
if para == "process":
175+
pool = concurrent.futures.ProcessPoolExecutor
176+
elif para == "thread":
177+
pool = concurrent.futures.ThreadPoolExecutor
178+
else:
179+
pool = concurrent.futures.ThreadPoolExecutor
180+
181+
with pool(max_workers=para_n) as executor:
166182

167183
for f in filehandles:
168184
print(f"{i}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r")

ngen_forcing/process_nwm_forcing.py

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,10 @@ def main():
106106
"SWDOWN",
107107
]
108108

109-
file_list = list_of_files[0:30]
110-
gpkg_subset = gpkg_divides[0:2000]
111-
#file_list = list_of_files[0:3]
112-
#gpkg_subset = gpkg_divides[0:20]
109+
# file_list = list_of_files[0:30]
110+
# gpkg_subset = gpkg_divides[0:2000]
111+
file_list = list_of_files[0:3]
112+
gpkg_subset = gpkg_divides[0:200]
113113
feature_list = gpkg_subset.geometry.to_list()
114114

115115
# start_time = time.time()
@@ -132,14 +132,26 @@ def main():
132132
print(time.time() - start_time)
133133

134134
start_time = time.time()
135-
print(f"Working on the new way with threading. (It's not much better.)")
135+
print(f"Working on the new way with threading parallel.")
136136
fd3 = get_forcing_dict_newway_parallel(
137137
feature_list,
138138
folder_prefix,
139139
file_list,
140-
)
140+
para="thread",
141+
para_n=16,
142+
)
141143
print(time.time() - start_time)
142144

145+
start_time = time.time()
146+
print(f"Working on the new way with process parallel.")
147+
fd3 = get_forcing_dict_newway_parallel(
148+
feature_list,
149+
folder_prefix,
150+
file_list,
151+
para="process",
152+
para_n=16,
153+
)
154+
print(time.time() - start_time)
143155

144156
start_time = time.time()
145157
print(f"Working on the new way with loops reversed.")
@@ -151,11 +163,24 @@ def main():
151163
print(time.time() - start_time)
152164

153165
start_time = time.time()
154-
print(f"Working on the new way with loops reversed with threading.")
166+
print(f"Working on the new way with loops reversed with threading parallel.")
167+
fd4 = get_forcing_dict_newway_inverted_parallel(
168+
feature_list,
169+
folder_prefix,
170+
file_list,
171+
para="thread",
172+
para_n=16,
173+
)
174+
print(time.time() - start_time)
175+
176+
start_time = time.time()
177+
print(f"Working on the new way with loops reversed with process parallel.")
155178
fd4 = get_forcing_dict_newway_inverted_parallel(
156179
feature_list,
157180
folder_prefix,
158181
file_list,
182+
para="process",
183+
para_n=16,
159184
)
160185
print(time.time() - start_time)
161186

0 commit comments

Comments
 (0)