-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhandler.py
More file actions
747 lines (649 loc) · 36.2 KB
/
handler.py
File metadata and controls
747 lines (649 loc) · 36.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Send Wordfeud rating data to CDF
"""
import calendar
import time
import argparse
# Removed cryptography import - no longer needed for credential storage
from cognite.client import CogniteClient, ClientConfig
from cognite.client.credentials import OAuthClientCredentials
from cognite.client.data_classes import ExtractionPipeline
from cognite.client.data_classes import ExtractionPipelineRun
from cognite.client.data_classes import TimeSeries
from cognite.client.exceptions import CogniteDuplicatedError
from datetime import datetime
import sys
import os
# Add the parent directory to the path so we can import from src
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# Import Wordfeud API from local files (included in zip)
from wordfeud_api import Wordfeud
# Import credentials (for local development only)
try:
from credentials import EMAIL, PASSWORD, USERNAME
LOCAL_CREDENTIALS_AVAILABLE = True
except ImportError:
EMAIL = PASSWORD = USERNAME = None
LOCAL_CREDENTIALS_AVAILABLE = False
GLOBAL_CLIENT = None
def create_time_series(client, dataset_id, username):
"""Create time series for Wordfeud data"""
time_series = [
TimeSeries(name=f'Wordfeud Rating - {username}', external_id=f'WORDFEUD/{username}/rating', unit='rating', is_step=True),
TimeSeries(name=f'Wordfeud Games Played - {username}', external_id=f'WORDFEUD/{username}/games_played', unit='count', is_step=True),
TimeSeries(name=f'Wordfeud Games Won - {username}', external_id=f'WORDFEUD/{username}/games_won', unit='count', is_step=True),
TimeSeries(name=f'Wordfeud Win Rate - {username}', external_id=f'WORDFEUD/{username}/win_rate', unit='percentage', is_step=True),
TimeSeries(name=f'Wordfeud Current Streak - {username}', external_id=f'WORDFEUD/{username}/current_streak', unit='count', is_step=True),
TimeSeries(name=f'Wordfeud Best Rating - {username}', external_id=f'WORDFEUD/{username}/best_rating', unit='rating', is_step=True)
]
for ts in time_series:
if dataset_id != -1:
ts.data_set_id = dataset_id
try:
client.time_series.create(ts)
except CogniteDuplicatedError as err:
print(f'{ts.external_id} already exists')
def delete_existing_timeseries(client, username):
"""Delete existing time series for the given username"""
external_ids = [
f'WORDFEUD/{username}/rating',
f'WORDFEUD/{username}/games_played',
f'WORDFEUD/{username}/games_won',
f'WORDFEUD/{username}/win_rate',
f'WORDFEUD/{username}/current_streak',
f'WORDFEUD/{username}/best_rating'
]
existing_timeseries = []
for external_id in external_ids:
try:
ts = client.time_series.retrieve(external_id=external_id)
existing_timeseries.append(ts)
except Exception:
# Time series doesn't exist, skip
pass
if existing_timeseries:
print(f"Found {len(existing_timeseries)} existing time series for user '{username}':")
for ts in existing_timeseries:
print(f" - {ts.name} ({ts.external_id})")
response = input(f"\n❓ Do you want to delete these time series? (yes/no): ")
if response.lower() == 'yes':
try:
client.time_series.delete(external_id=[ts.external_id for ts in existing_timeseries])
print("✅ Successfully deleted existing time series")
return True
except Exception as e:
print(f"❌ Error deleting time series: {e}")
return False
else:
print("❌ Deletion cancelled")
return False
else:
print(f"✅ No existing time series found for user '{username}'")
return True
def create_extraction_pipeline(client, extraction_pipeline, dataset_id, username):
"""Create extraction pipeline for Wordfeud data"""
extpipe = ExtractionPipeline(
name=f"Wordfeud Extractor - {username}",
external_id=extraction_pipeline,
description=f'Wordfeud to CDF extractor for {username}'
)
if dataset_id != -1:
extpipe.data_set_id = dataset_id
try:
client.extraction_pipelines.create(extpipe)
except CogniteDuplicatedError as err:
print('Extraction pipeline %s already exists' % extraction_pipeline)
def report_extraction_pipeline_run(client, extraction_pipeline, status='success', message=None):
"""Report extraction pipeline run status"""
extpiperun = ExtractionPipelineRun(status=status, extpipe_external_id=extraction_pipeline)
if message:
extpiperun.message = message
client.extraction_pipelines.runs.create(extpiperun)
def get_latest_datapoint(client, external_id):
"""Get the latest datapoint from a time series"""
try:
# Get the latest datapoint
datapoints = client.time_series.data.retrieve_latest(
external_id=external_id
)
if datapoints and len(datapoints) > 0:
return datapoints[0]
return None
except Exception as e:
print(f"Could not retrieve latest datapoint for {external_id}: {e}")
return None
def get_wordfeud_data(wordfeud_client, client, username, start_time, end_time, board_type=None, rule_set=None):
"""Fetch Wordfeud data and only create datapoints for completed games"""
datapoints = {
'rating': [],
'games_played': [],
'games_won': [],
'win_rate': [],
'current_streak': [],
'best_rating': []
}
try:
# Get games with rating information (finished games) - filtered by board type and rule set
games_with_ratings = wordfeud_client.get_ratings(ruleset=rule_set, board_type=board_type)
if not games_with_ratings:
print(f"No games with rating information available for board_type={board_type}, rule_set={rule_set}")
return datapoints
print(f"Found {len(games_with_ratings)} games with ratings for board_type={board_type}, rule_set={rule_set}")
# Get the latest stored rating to determine the last processed timestamp
rating_external_id = f'WORDFEUD/{username}/rating'
latest_rating_datapoint = get_latest_datapoint(client, rating_external_id)
# Get the latest best rating to use as baseline for best rating tracking
best_rating_external_id = f'WORDFEUD/{username}/best_rating'
latest_best_rating_datapoint = get_latest_datapoint(client, best_rating_external_id)
if latest_rating_datapoint:
# We have existing data - find new games since the last datapoint
last_timestamp = latest_rating_datapoint.timestamp
last_datapoint_date = datetime.utcfromtimestamp(last_timestamp/1000).strftime("%Y-%m-%d %H:%M:%S")
print(f"Found latest datapoint in time series {rating_external_id}: timestamp={last_timestamp} ({last_datapoint_date}), value={latest_rating_datapoint.value}")
# Set baseline for best rating tracking
if latest_best_rating_datapoint:
current_best_rating = latest_best_rating_datapoint.value
best_rating_date = datetime.utcfromtimestamp(latest_best_rating_datapoint.timestamp/1000).strftime("%Y-%m-%d %H:%M:%S")
print(f"Found latest best rating datapoint: timestamp={latest_best_rating_datapoint.timestamp} ({best_rating_date}), value={current_best_rating}")
else:
current_best_rating = 0
print(f"No existing best rating datapoints found, starting baseline at 0")
# Find games that were completed after the last datapoint
new_rating_games = []
for game in games_with_ratings:
if game.get('rating') is not None and game.get('updated'):
game_updated = game.get('updated')
if game_updated and int(game_updated) > 0:
game_finished_time = int(game_updated) * 1000 # Convert to milliseconds
if game_finished_time > last_timestamp:
new_rating_games.append(game)
if new_rating_games:
# Sort games by finish time to process them chronologically
new_rating_games.sort(key=lambda g: int(g['updated']))
print(f"Found {len(new_rating_games)} new completed games")
print(f"Processing games for time series: WORDFEUD/{username}/rating")
print(f"Best rating baseline: {current_best_rating}")
# Create datapoint for each completed game (regardless of rating change)
for game in new_rating_games:
# Use the 'updated' timestamp as the game finish time
game_updated = game.get('updated')
if not game_updated or int(game_updated) <= 0:
print(f"WARNING: Game {game.get('id')} has invalid updated timestamp: {game_updated}, skipping this game")
continue
# Convert seconds to milliseconds
game_finished_time = int(game_updated) * 1000
game_rating = game.get('rating')
rating_delta = game.get('rating_delta', 0)
game_end_date = datetime.utcfromtimestamp(game_finished_time/1000).strftime("%Y-%m-%d %H:%M:%S")
print(f"Game {game.get('id')}: Rating {game_rating} (change: {rating_delta}) finished at {game_end_date}")
# Extract available metadata from the API response
game_metadata = {
'game_id': game.get('id', 'unknown'),
'rating_delta': rating_delta,
'ruleset': game.get('ruleset'),
'board': game.get('board'),
'move_count': game.get('move_count'),
'created': game.get('created'),
'updated': game.get('updated')
}
# Extract opponent information from players array
players = game.get('players', [])
if players:
# Find the opponent (non-local player)
for player in players:
if not player.get('is_local', False):
game_metadata['opponent'] = player.get('username', 'unknown')
game_metadata['opponent_score'] = player.get('score', 0)
break
# Determine game result based on scores
local_player = None
opponent_player = None
for player in players:
if player.get('is_local', False):
local_player = player
else:
opponent_player = player
if local_player and opponent_player:
local_score = local_player.get('score', 0)
opponent_score = opponent_player.get('score', 0)
if local_score > opponent_score:
game_metadata['result'] = 'won'
elif local_score < opponent_score:
game_metadata['result'] = 'lost'
else:
game_metadata['result'] = 'tied'
# Store rating datapoint with available metadata
datapoints['rating'].append({
'timestamp': game_finished_time,
'value': game_rating,
'metadata': game_metadata
})
# Update best rating if this game improved it
if game_rating > current_best_rating:
current_best_rating = game_rating
datapoints['best_rating'].append({
'timestamp': game_finished_time,
'value': game_rating,
'metadata': {
'game_id': game.get('id', 'unknown'),
'result': game_metadata.get('result', 'unknown'),
'rating_delta': rating_delta
}
})
# Update other metrics based on the state after this game
# Get all games to calculate current totals
all_games = wordfeud_client.get_games()
if all_games:
total_games = len(all_games)
won_games = sum(1 for g in all_games if g.get('result') == 'won')
win_rate = (won_games / total_games * 100) if total_games > 0 else 0
datapoints['games_played'].append({
'timestamp': game_finished_time,
'value': total_games,
'metadata': {
'game_id': game.get('id', 'unknown'),
'result': game_metadata.get('result', 'unknown'),
'rating_delta': rating_delta
}
})
datapoints['games_won'].append({
'timestamp': game_finished_time,
'value': won_games,
'metadata': {
'game_id': game.get('id', 'unknown'),
'result': game_metadata.get('result', 'unknown'),
'rating_delta': rating_delta
}
})
datapoints['win_rate'].append({
'timestamp': game_finished_time,
'value': win_rate,
'metadata': {
'game_id': game.get('id', 'unknown'),
'result': game_metadata.get('result', 'unknown'),
'rating_delta': rating_delta
}
})
else:
print("No new completed games found")
else:
print(f"No existing datapoints found in time series {rating_external_id}")
# Set baseline for best rating tracking (even in first run)
if latest_best_rating_datapoint:
current_best_rating = latest_best_rating_datapoint.value
best_rating_date = datetime.utcfromtimestamp(latest_best_rating_datapoint.timestamp/1000).strftime("%Y-%m-%d %H:%M:%S")
print(f"Found existing best rating datapoint: timestamp={latest_best_rating_datapoint.timestamp} ({best_rating_date}), value={current_best_rating}")
else:
current_best_rating = 0
print(f"No existing best rating datapoints found, starting baseline at 0")
# First run - check if there are any completed games to create initial datapoints
completed_games = []
for game in games_with_ratings:
if game.get('rating') is not None and game.get('updated'):
completed_games.append(game)
if completed_games:
# Sort games by finish time to process them chronologically
completed_games.sort(key=lambda g: int(g['updated']))
print(f"First run: Found {len(completed_games)} completed games to create initial datapoints")
print(f"Best rating baseline: {current_best_rating}")
# Create datapoint for each completed game
for game in completed_games:
# Use the 'updated' timestamp as the game finish time
game_updated = game.get('updated')
if not game_updated or int(game_updated) <= 0:
print(f"WARNING: Game {game.get('id')} has invalid updated timestamp: {game_updated}, skipping this game")
continue
# Convert seconds to milliseconds
game_finished_time = int(game_updated) * 1000
game_rating = game.get('rating')
rating_delta = game.get('rating_delta', 0)
game_end_date = datetime.utcfromtimestamp(game_finished_time/1000).strftime("%Y-%m-%d %H:%M:%S")
print(f"Game {game.get('id')}: Rating {game_rating} (change: {rating_delta}) finished at {game_end_date}")
# Extract available metadata from the API response
game_metadata = {
'game_id': game.get('id', 'unknown'),
'rating_delta': rating_delta,
'ruleset': game.get('ruleset'),
'board': game.get('board'),
'move_count': game.get('move_count'),
'created': game.get('created'),
'updated': game.get('updated')
}
# Extract opponent information from players array
players = game.get('players', [])
if players:
# Find the opponent (non-local player)
for player in players:
if not player.get('is_local', False):
game_metadata['opponent'] = player.get('username', 'unknown')
game_metadata['opponent_score'] = player.get('score', 0)
break
# Determine game result based on scores
local_player = None
opponent_player = None
for player in players:
if player.get('is_local', False):
local_player = player
else:
opponent_player = player
if local_player and opponent_player:
local_score = local_player.get('score', 0)
opponent_score = opponent_player.get('score', 0)
if local_score > opponent_score:
game_metadata['result'] = 'won'
elif local_score < opponent_score:
game_metadata['result'] = 'lost'
else:
game_metadata['result'] = 'tied'
# Store rating datapoint with available metadata
datapoints['rating'].append({
'timestamp': game_finished_time,
'value': game_rating,
'metadata': game_metadata
})
# Update best rating if this game improved it
if game_rating > current_best_rating:
current_best_rating = game_rating
datapoints['best_rating'].append({
'timestamp': game_finished_time,
'value': game_rating,
'metadata': {
'game_id': game.get('id', 'unknown'),
'result': game_metadata.get('result', 'unknown'),
'rating_delta': rating_delta
}
})
# Update other metrics based on the state after this game
# Get all games to calculate current totals
all_games = wordfeud_client.get_games()
if all_games:
total_games = len(all_games)
won_games = sum(1 for g in all_games if g.get('result') == 'won')
win_rate = (won_games / total_games * 100) if total_games > 0 else 0
datapoints['games_played'].append({
'timestamp': game_finished_time,
'value': total_games,
'metadata': {
'game_id': game.get('id', 'unknown'),
'result': game_metadata.get('result', 'unknown'),
'rating_delta': rating_delta
}
})
datapoints['games_won'].append({
'timestamp': game_finished_time,
'value': won_games,
'metadata': {
'game_id': game.get('id', 'unknown'),
'result': game_metadata.get('result', 'unknown'),
'rating_delta': rating_delta
}
})
datapoints['win_rate'].append({
'timestamp': game_finished_time,
'value': win_rate,
'metadata': {
'game_id': game.get('id', 'unknown'),
'result': game_metadata.get('result', 'unknown'),
'rating_delta': rating_delta
}
})
else:
print("First run: No completed games found - no initial datapoints created")
total_datapoints = sum(len(d) for d in datapoints.values())
print(f"Successfully processed Wordfeud data: {total_datapoints} new datapoints")
if total_datapoints > 0:
for metric, points in datapoints.items():
if points:
external_id = f'WORDFEUD/{username}/{metric}'
print(f" - {external_id}: {len(points)} datapoints")
except Exception as e:
print(f"Error processing Wordfeud data: {e}")
import traceback
traceback.print_exc()
return datapoints
def store_wordfeud_data(client, data, username):
"""Store Wordfeud data in CDF with metadata"""
ts_point_list = []
for metric, datapoints in data.items():
if datapoints:
external_id = f'WORDFEUD/{username}/{metric}'
# Convert datapoints to CDF format
cdf_datapoints = []
for dp in datapoints:
if isinstance(dp, dict):
# New format with metadata
cdf_datapoint = {
'timestamp': dp['timestamp'],
'value': dp['value']
}
if 'metadata' in dp:
cdf_datapoint['metadata'] = dp['metadata']
cdf_datapoints.append(cdf_datapoint)
else:
# Legacy format (timestamp, value)
cdf_datapoints.append({'timestamp': dp[0], 'value': dp[1]})
if cdf_datapoints:
# Check if time series exists before inserting
try:
ts_info = client.time_series.retrieve(external_id=external_id)
print(f"✓ Time series {external_id} exists")
except Exception as ts_error:
print(f"❌ Time series {external_id} does not exist: {ts_error}")
print(f" Skipping data insertion for {external_id}")
continue
ts_point_list.append({
'externalId': external_id,
'datapoints': cdf_datapoints
})
if ts_point_list:
try:
print(f"Attempting to insert {len(ts_point_list)} time series to CDF...")
for ts_data in ts_point_list:
external_id = ts_data['externalId']
datapoint_count = len(ts_data['datapoints'])
print(f" - {external_id}: {datapoint_count} datapoints")
# Log first datapoint for debugging
if ts_data['datapoints']:
first_dp = ts_data['datapoints'][0]
print(f" First datapoint: timestamp={first_dp['timestamp']}, value={first_dp['value']}")
# Insert the data
print(f"Data structure being sent to CDF:")
for ts_data in ts_point_list:
external_id = ts_data['externalId']
datapoints = ts_data['datapoints']
print(f" {external_id}: {len(datapoints)} datapoints")
if datapoints:
first_dp = datapoints[0]
print(f" Sample datapoint: {first_dp}")
result = client.time_series.data.insert_multiple(ts_point_list)
print(f"✓ CDF insert_multiple completed successfully")
# Verify the insertion by checking if time series has data
for ts_data in ts_point_list:
external_id = ts_data['externalId']
try:
# Get the latest datapoint to verify insertion
latest = client.time_series.data.retrieve_latest(external_id=external_id)
if latest and len(latest) > 0:
print(f"✓ Verified: {external_id} has data (latest: {latest[0].timestamp})")
else:
print(f"⚠️ Warning: {external_id} appears to be empty after insertion")
except Exception as verify_error:
print(f"⚠️ Warning: Could not verify {external_id}: {verify_error}")
except Exception as insert_error:
print(f"❌ Error inserting data to CDF: {insert_error}")
print(f"Error type: {type(insert_error).__name__}")
# Log the data that failed to insert
for ts_data in ts_point_list:
external_id = ts_data['externalId']
datapoint_count = len(ts_data['datapoints'])
print(f" Failed to insert: {external_id} ({datapoint_count} datapoints)")
raise
def handle(data, client, secrets):
"""Main handler function for the CDF function"""
global GLOBAL_CLIENT
GLOBAL_CLIENT = client
try:
# Get credentials from function secrets (production) or credentials file (local development)
email = secrets.get('wordfeud-email')
password = secrets.get('wordfeud-pass')
username = secrets.get('wordfeud-user')
# Fall back to local credentials file for local development
if not email and LOCAL_CREDENTIALS_AVAILABLE:
email = EMAIL
if not password and LOCAL_CREDENTIALS_AVAILABLE:
password = PASSWORD
if not username and LOCAL_CREDENTIALS_AVAILABLE:
username = USERNAME
if not email or not password or not username:
raise Exception("Wordfeud credentials not found. Please configure function secrets (wordfeud-email, wordfeud-pass, wordfeud-user) or ensure credentials.py exists for local development.")
# Initialize Wordfeud client with board configuration
wordfeud_client = Wordfeud()
wordfeud_client.login_email(email, password)
# Configure board type and rule set
board_type = data.get('board_type', secrets.get('board-type', 'BoardNormal'))
rule_set = data.get('rule_set', secrets.get('rule-set', 'RuleSetNorwegian'))
# Set board configuration
wordfeud_client.board_type = getattr(wordfeud_client, board_type)
wordfeud_client.rule_set = getattr(wordfeud_client, rule_set)
print(f"✓ Wordfeud login successful")
print(f"✓ Board configured: {board_type}, {rule_set}")
# Determine time range
start_time = (int(time.time()*1000) - 7*24*3600000) - ((int(time.time()*1000) - 7*24*3600000) % (3600000))
if 'start-time' in data:
start_time = int(data['start-time'])
end_time = int(time.time()*1000)
if 'end-time' in data:
end_time = int(data['end-time'])
start_date = datetime.utcfromtimestamp(start_time/1000).strftime("%Y-%m-%d")
end_date = datetime.utcfromtimestamp(end_time/1000).strftime("%Y-%m-%d")
print(f'Starting Wordfeud data extraction from {start_date} to {end_date}')
# Get and store data
wordfeud_data = get_wordfeud_data(wordfeud_client, client, username, start_time, end_time,
board_type=wordfeud_client.board_type, rule_set=wordfeud_client.rule_set)
if any(wordfeud_data.values()):
store_wordfeud_data(client, wordfeud_data, username)
print(f'Successfully processed Wordfeud data for {username}')
else:
print('No Wordfeud data was collected for the specified time range')
# Report extraction pipeline run
extraction_pipeline = data.get('extraction-pipeline')
if not extraction_pipeline and username:
# Auto-generate extraction pipeline name from username
extraction_pipeline = f'extractors/wordfeud-{username}'
if extraction_pipeline:
report_extraction_pipeline_run(client, extraction_pipeline)
except Exception as e:
error_msg = f'Critical error in handle function: {type(e).__name__}: {str(e)}'
print(error_msg)
# Report failure to extraction pipeline if configured
extraction_pipeline = data.get('extraction-pipeline')
if not extraction_pipeline and username:
# Auto-generate extraction pipeline name from username
extraction_pipeline = f'extractors/wordfeud-{username}'
if extraction_pipeline:
try:
report_extraction_pipeline_run(client, extraction_pipeline, status='failure', message=error_msg)
except Exception as pipeline_error:
print(f'Failed to report pipeline failure: {pipeline_error}')
raise
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
'-k', '--key', type=str, required=True, help='Cognite OIDC client secret. Required.')
parser.add_argument(
'-c', '--client_id', type=str, required=True, help='Cognite OIDC client id. Required.')
parser.add_argument(
'-t', '--tenant_id', type=str, help='Tenant id in identity provider used for OIDC. Required for Azure AD, optional for other IDPs.')
parser.add_argument(
'-p', '--project', type=str, required=True, help='Project name. Required.')
parser.add_argument(
'-b', '--base_url', type=str, help='Base URL of the CDF cluster of the CDF project. Defaults to https://api.cognitedata.com.', default='https://api.cognitedata.com')
parser.add_argument(
'--token_url', type=str, help='Token URL of the CDF project. If not set, it will use the default Azure AD token url based on the tenant ID provided.')
parser.add_argument(
'-s', '--start_time', type=int, help='Begin at this UTC unix timestamp in ms. Defaults to one week ago.', default=-1)
parser.add_argument(
'-e', '--end_time', type=int, help='End at this UTC unix timestamp in ms. Defaults to now.', default=-1)
parser.add_argument(
'-i', '--init', type=bool, help='Create necessary time series and extraction pipeline, but do not do anything else.', default=False)
parser.add_argument(
'--cleanup', action='store_true', help='Delete existing time series before creating new ones (requires --init)')
parser.add_argument(
'-d', '--dataset', type=int, help='Dataset ID from Cognite Data Fusion', default=-1)
parser.add_argument(
'-a', '--admin_security_category', type=int, help='ID of admin security category for the Wordfeud credentials', default=-1)
parser.add_argument(
'--extraction_pipeline', type=str, help='External ID of extraction pipeline to update on every run', default=None)
# Credentials are now loaded from credentials.py file
parser.add_argument(
'--board_type', type=str, choices=['BoardNormal', 'BoardRandom'], default='BoardNormal',
help='Wordfeud board type. Default: BoardNormal')
parser.add_argument(
'--rule_set', type=str,
choices=['RuleSetAmerican', 'RuleSetDanish', 'RuleSetDutch', 'RuleSetEnglish',
'RuleSetFrench', 'RuleSetNorwegian', 'RuleSetSpanish', 'RuleSetSwedish'],
default='RuleSetNorwegian', help='Wordfeud rule set/language. Default: RuleSetNorwegian')
args = parser.parse_args()
if args.start_time == -1:
args.start_time = (int(time.time()*1000) - 7*24*3600000) - ((int(time.time()*1000) - 7*24*3600000) % (3600000))
if args.end_time == -1:
args.end_time = int(time.time()*1000)
SCOPES = ["%s/.default" % args.base_url]
if args.token_url:
TOKEN_URL = args.token_url
elif args.tenant_id:
# Azure AD configuration
TOKEN_URL = "https://login.microsoftonline.com/%s/oauth2/v2.0/token" % args.tenant_id
else:
# Other IDP configuration - token URL must be provided explicitly
raise ValueError("Either --token_url or --tenant_id must be provided for IDP configuration")
creds = OAuthClientCredentials(
token_url=TOKEN_URL,
client_id=args.client_id,
client_secret=args.key,
scopes=SCOPES,
audience=args.base_url)
config = ClientConfig(
client_name='wordfeud-rating-reader',
project=args.project,
base_url=args.base_url,
credentials=creds)
client = CogniteClient(config)
if args.init:
# For initialization, use credentials from file or command line
init_username = USERNAME if LOCAL_CREDENTIALS_AVAILABLE else None
if not init_username:
print("❌ Error: USERNAME not found in credentials.py")
print("Please add USERNAME to your credentials.py file for local initialization")
sys.exit(1)
# Set default extraction pipeline external ID if not provided
if args.extraction_pipeline is None:
args.extraction_pipeline = f'extractors/wordfeud-{init_username}'
# Handle cleanup if requested
if args.cleanup:
print(f"🧹 Cleanup mode: Checking for existing time series for user '{init_username}'...")
if not delete_existing_timeseries(client, init_username):
print("❌ Cleanup failed or was cancelled. Exiting.")
sys.exit(1)
print("✅ Cleanup completed successfully")
create_time_series(client, args.dataset, init_username)
create_extraction_pipeline(client, args.extraction_pipeline, args.dataset, init_username)
print(f"✓ Time series and extraction pipeline created successfully for user: {init_username}")
print("ℹ Wordfeud credentials are loaded from credentials.py file for initialization")
else:
# For local testing and CDF function execution, credentials are loaded from credentials.py
secrets = {}
data = {}
data['start-time'] = args.start_time
data['end-time'] = args.end_time
# Set default extraction pipeline external ID if not provided
if args.extraction_pipeline is None:
init_username = USERNAME if LOCAL_CREDENTIALS_AVAILABLE else None
if init_username:
args.extraction_pipeline = f'extractors/wordfeud-{init_username}'
data['extraction-pipeline'] = args.extraction_pipeline
data['board_type'] = args.board_type
data['rule_set'] = args.rule_set
handle(data, client, secrets)