@@ -104,6 +104,12 @@ def proc_once(eu, procs, args):
104
104
data_end_date = datetime .strptime (p .data_end_date , ES_DATETIME_FORMAT )
105
105
progress_percentage , frame_completion , last_processed_datetimes \
106
106
= cslc_utils .calculate_historical_progress (p .frame_states , data_end_date , disp_burst_map )
107
+
108
+ # If we've finshed the frame, then set the progress percentage to 100. Because we process only full k-sets,
109
+ # it's possible to be finished when there are a few datetimes left in which case the progress percentage
110
+ # would be less than 100
111
+ if finished is True :
112
+ progress_percentage = 100
107
113
eu .update_document (id = doc_id ,
108
114
body = {"doc_as_upsert" : True ,
109
115
"doc" : {"progress_percentage" : progress_percentage ,
@@ -159,6 +165,7 @@ def form_job_params(p, frame_id, sensing_time_position_zero_based, args, eu):
159
165
'''start and end data datetime is basically 1 hour window around the total k frame sensing time window.
160
166
TRICKY! the sensing time position is in user-friendly 1-based index, but we need to use 0-based index in code'''
161
167
try :
168
+ logger .info (f"Attempting to process frame { frame_id } at sensing time position { sensing_time_position_zero_based } " )
162
169
s_date = frame_sensing_datetimes [sensing_time_position_zero_based ] - timedelta (minutes = 30 )
163
170
except IndexError :
164
171
finished = True
@@ -174,16 +181,17 @@ def form_job_params(p, frame_id, sensing_time_position_zero_based, args, eu):
174
181
finished = True
175
182
do_submit = False
176
183
e_date = datetime .strptime ("2000-01-01T00:00:00" , ES_DATETIME_FORMAT )
177
- logger .info (f"{ frame_id = } reached end of historical processing. The rest of sensing times will be submitted as reprocessing jobs." )
178
184
185
+ '''
179
186
# Print out all the reprocessing job commands. This is temporary until it can be automated
180
- # TODO: submit reprocessing jobs instead of just printing them
187
+ # As of Dec 2024, the team's decision is that we will not perform any sub-k historical processing.
188
+ logger.info(f"{frame_id=} reached end of historical processing. The rest of sensing times will be submitted as reprocessing jobs.")
181
189
for i in range(sensing_time_position_zero_based, len(frame_sensing_datetimes)):
182
190
s_date = frame_sensing_datetimes[i] - timedelta(minutes=30)
183
191
e_date = frame_sensing_datetimes[i] + timedelta(minutes=30)
184
192
logger.info(f"python ~/mozart/ops/opera-pcm/data_subscriber/daac_data_subscriber.py query -c {CSLC_COLLECTION} \
185
193
--chunk-size=1 --k={p.k} --m={p.m} --job-queue={p.download_job_queue} --processing-mode=reprocessing --grace-mins=0 \
186
- --start-date={ convert_datetime (s_date )} --end-date={ convert_datetime (e_date )} --frame-id={ frame_id } " )
194
+ --start-date={convert_datetime(s_date)} --end-date={convert_datetime(e_date)} --frame-id={frame_id} ")'''
187
195
188
196
if s_date < data_start_date :
189
197
do_submit = False
@@ -196,15 +204,26 @@ def form_job_params(p, frame_id, sensing_time_position_zero_based, args, eu):
196
204
197
205
NOTE! While args, token, cmr, and settings are necessary arguments for CSLCDependency, they will not be used in
198
206
historical processing because all CSLC dependency information is contained in the disp_burst_map'''
199
- cslc_dependency = CSLCDependency (p .k , p .m , disp_burst_map , None , None , None , None , blackout_dates_obj )
200
- if cslc_dependency .compressed_cslc_satisfied (frame_id ,
201
- disp_burst_map [frame_id ].sensing_datetime_days_index [sensing_time_position_zero_based ], eu ):
202
- next_sensing_time_position = sensing_time_position_zero_based + p .k
203
- else :
207
+ logger .info (f"Checking Compressed CSLC satiety for frame { frame_id } at sensing time position { sensing_time_position_zero_based } " )
208
+ try :
209
+ cslc_dependency = CSLCDependency (p .k , p .m , disp_burst_map , None , None , None , None , blackout_dates_obj )
210
+ if cslc_dependency .compressed_cslc_satisfied (frame_id ,
211
+ disp_burst_map [frame_id ].sensing_datetime_days_index [sensing_time_position_zero_based ], eu ):
212
+ next_sensing_time_position = sensing_time_position_zero_based + p .k
213
+ else :
214
+ do_submit = False
215
+ next_sensing_time_position = sensing_time_position_zero_based
216
+ logger .info ("Compressed CSLC not satisfied for frame %s at sensing time position %s. \
217
+ Skipping now but will be retried in the future." % (frame_id , sensing_time_position_zero_based ))
218
+
219
+ except Exception as e :
220
+ logger .error (f"Error checking compressed cslc satiety for frame { frame_id } at sensing time position { sensing_time_position_zero_based } . Error: { e } " )
204
221
do_submit = False
205
222
next_sensing_time_position = sensing_time_position_zero_based
206
- logger .info ("Compressed CSLC not satisfied for frame %s at sensing time position %s. \
207
- Skipping now but will be retried in the future." % (frame_id , sensing_time_position_zero_based ))
223
+
224
+ # If we are at the end of the frame sensing times, we are done with this frame
225
+ if next_sensing_time_position >= len (frame_sensing_datetimes ):
226
+ finished = True
208
227
209
228
# Create job parameters used to submit query job into Mozart
210
229
# Note that if do_submit is False, none of this is actually used
0 commit comments