-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathrun_iasworld_data_tests.py
1554 lines (1371 loc) · 61 KB
/
run_iasworld_data_tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
#
# Generates an Excel workbook of dbt test failures that can be shared with
# other teams for review and correction, along with metadata parquet files
# that can be uploaded to S3 for long-term result tracking.
#
# Run `python3 run_iasworld_data_tests.py --help` for detailed
# documentation.
import argparse
import dataclasses
import datetime
import decimal
import enum
import hashlib
import os
import pathlib
import re
import subprocess
import typing
import openpyxl
import openpyxl.cell
import openpyxl.styles
import openpyxl.styles.colors
import openpyxl.utils
import pyarrow as pa
import pyarrow.parquet
import pyathena
import pyathena.arrow.cursor
import pyathena.cursor
import simplejson as json
import yaml
from dbt.artifacts.schemas.results import TestStatus
from dbt.cli.main import dbtRunner
from utils import constants
DBT = dbtRunner()
# Prefix for the URL location of a test in the dbt docs
DOCS_URL_PREFIX = "https://ccao-data.github.io/data-architecture/#!/test"
# The S3 bucket where Athena query results are stored
AWS_ATHENA_S3_STAGING_DIR = os.getenv(
"AWS_ATHENA_S3_STAGING_DIR", "s3://ccao-athena-results-us-east-1/"
)
# Field names that are used in the output workbook
SOURCE_TABLE_FIELD = "source_table"
DESCRIPTION_FIELD = "description"
TEST_NAME_FIELD = "test_name"
DOCS_URL_FIELD = "docs_url"
TAXYR_FIELD = "taxyr"
PARID_FIELD = "parid"
CARD_FIELD = "card"
LAND_LINE_FIELD = "lline"
TOWNSHIP_FIELD = "township_code"
CLASS_FIELD = "class"
WHO_FIELD = "who"
WEN_FIELD = "wen"
# Mapping that defines category names that should be reported for tests
# based on their generics
TEST_CATEGORIES = {
"test_accepted_range": "incorrect_values",
"test_accepted_values": "incorrect_values",
"test_not_accepted_values": "incorrect_values",
"test_unique_combination_of_columns": "duplicate_records",
"test_not_null": "missing_values",
"test_is_null": "missing_values",
"test_res_class_matches_pardat": "class_mismatch_or_issue",
}
# Fallback for tests whose category we can't determine from either the
# test name, the `meta.category` attribute, or the TEST_CATEGORIES mapping
DEFAULT_TEST_CATEGORY = "miscellaneous"
# Directory to store failed test caches
TEST_CACHE_DIR = "test_cache"
class Status(enum.Enum):
"""Status of an individual dbt test result."""
PASS = "pass"
FAIL = "fail"
WARN = "warn"
def __repr__(self) -> str:
return self.value.upper()
class TestResult:
"""Class to store results for an individual test."""
def __init__(
self,
name: str,
table_name: str,
column_name: typing.Optional[str],
status: Status,
description: str,
elapsed_time: decimal.Decimal,
failing_rows: typing.Optional[typing.List[typing.Dict]] = None,
) -> None:
"""
The failing_rows list should be formatted like the rows
returned by a csv.DictReader or a DictCursor, i.e. a list of
dicts mapping `{field_name: row_value}`.
"""
self.name = name
self.table_name = table_name
self.column_name = column_name
self.status = status
self.description = description
self.elapsed_time = elapsed_time
self.failing_rows: typing.List[typing.Dict] = failing_rows or []
@property
def fieldnames(self) -> typing.List[str]:
"""Return a list of strings representing the fieldnames for any
failing_rows of this test. Returns an empty list if the test
passed."""
fieldnames = []
for row in self.failing_rows:
for fieldname in row.keys():
if fieldname not in fieldnames:
fieldnames.append(fieldname)
return fieldnames
def to_dict(self) -> typing.Dict:
"""Serialize the TestResult object as a dictionary."""
return {
"name": self.name,
"table_name": self.table_name,
"column_name": self.column_name,
"status": self.status.value,
"description": self.description,
"elapsed_time": self.elapsed_time,
"failing_rows": self.failing_rows,
}
@classmethod
def from_dict(cls, result_dict: typing.Dict) -> "TestResult":
"""Deserialize a TestResult object from a dictionary."""
return TestResult(
name=result_dict["name"],
table_name=result_dict["table_name"],
column_name=result_dict["column_name"],
status=Status(result_dict["status"]),
description=result_dict["description"],
elapsed_time=result_dict["elapsed_time"],
failing_rows=result_dict["failing_rows"],
)
def __repr__(self) -> str:
return f"TestResult({self.to_dict()!r})"
def split_by_township(self) -> typing.List["TownshipTestResult"]:
"""Split out this TestResult object into one or more
TownshipTestResults based on the township code of each failing row. If
there are no failing rows, or if all the failing rows have a null
township, the return value will be a list of one TownshipTestResult
whose township_code is None."""
# Split out the failing rows by township so that we know which
# townships are represented in this test's failures
failing_rows_by_township: typing.Dict[
typing.Optional[str], typing.List[typing.Dict]
] = {}
for row in self.failing_rows:
township_code = row.get(TOWNSHIP_FIELD)
if not failing_rows_by_township.get(township_code):
failing_rows_by_township[township_code] = []
failing_rows_by_township[township_code].append(row)
# These kwargs are shared by all TownshipTestResults, regardless of
# township code or failure status
base_kwargs = {
"name": self.name,
"table_name": self.table_name,
"column_name": self.column_name,
"status": self.status,
"description": self.description,
"elapsed_time": self.elapsed_time,
}
# If we have any failing rows, split out separate TownshipTestResult
# objects for each township/row group; otherwise, just create one
# object with a null township mapped to the passing test
if failing_rows_by_township:
return [
TownshipTestResult(
township_code=township_code,
failing_rows=rows,
**base_kwargs,
)
for township_code, rows in failing_rows_by_township.items()
]
return [
TownshipTestResult(
township_code=None, failing_rows=[], **base_kwargs
)
]
class TownshipTestResult(TestResult):
"""A variant of TestResult for a test whose results all share the same
township. Note that township_code is only present in the case of failing
tests; the township_code will always be None in the case of a passing test,
since passing tests have no township (or, thinking of it differently,
passing tests encompass all of the townships)."""
def __init__(
self, township_code: typing.Optional[str], *args, **kwargs
) -> None:
self.township_code = township_code
super().__init__(*args, **kwargs)
class TestCategory:
"""Class to store TestResult objects for a group of dbt tests that share
the same category. Provides convenience methods for formatting those
results for output to a workbook and saving them to a cache."""
# Names of fields that are used for debugging
possible_debugging_fieldnames = [TEST_NAME_FIELD, DOCS_URL_FIELD]
# Names of fields that identify the test
possible_test_metadata_fieldnames = [
*[SOURCE_TABLE_FIELD, DESCRIPTION_FIELD],
*possible_debugging_fieldnames,
]
# Names of fields that are used for diagnostics
possible_diagnostic_fieldnames = [
TAXYR_FIELD,
PARID_FIELD,
CARD_FIELD,
LAND_LINE_FIELD,
CLASS_FIELD,
TOWNSHIP_FIELD,
WHO_FIELD,
WEN_FIELD,
]
# The complete set of fixed fields
possible_fixed_fieldnames = [
*possible_test_metadata_fieldnames,
*possible_diagnostic_fieldnames,
]
def __init__(
self,
category: str,
results: typing.Optional[typing.List[TestResult]] = None,
) -> None:
self.category = category
self.test_results: typing.List[TestResult] = results or []
def to_dict(self) -> typing.Dict:
"""Serialize the TestCategory object as a dictionary."""
return {
"category": self.category,
"test_results": [result.to_dict() for result in self.test_results],
}
@classmethod
def from_dict(cls, category_dict: typing.Dict) -> "TestCategory":
"""Deserialize a TestCategory object from a dictionary."""
return TestCategory(
category=category_dict["category"],
results=[
TestResult.from_dict(result_dict)
for result_dict in category_dict["test_results"]
],
)
def __repr__(self) -> str:
num_failing_rows = sum(
len(result.failing_rows) for result in self.test_results
)
return (
f"TestCategory(category={self.category!r}, "
f"status={self.status!r}, "
f"num_tests={len(self.test_results)}, "
f"num_failing_rows={num_failing_rows})"
)
@property
def fieldnames(self) -> typing.List[str]:
"""Get a list of fieldnames that encapsulates all of the fieldnames
for all of the rows of tests tracked by this group."""
fieldnames = []
for result in self.test_results:
for fieldname in result.fieldnames:
if fieldname not in fieldnames:
fieldnames.append(fieldname)
# Remove any fixed fieldnames from the ordered list that are not
# present in this group
fixed_field_order = [
field
for field in self.possible_fixed_fieldnames
if field in fieldnames
]
# Reorder the fieldnames so that diagnostic fields are presented in the
# correct order
for field in reversed(fixed_field_order):
fieldnames.insert(0, fieldnames.pop(fieldnames.index(field)))
return fieldnames
@property
def rows(self) -> typing.List[typing.List]:
"""Format the rows of tests tracked by this group, with
fieldname data excluded. The combination of this property and the
`fieldnames` property can be used to write to a csv.Writer or
to an openpyxl.Workbook sheet for the tests tracked by this group."""
fieldnames = self.fieldnames
return [
[row.get(fieldname) for fieldname in fieldnames]
for result in self.test_results
for row in result.failing_rows
]
@property
def status(self) -> Status:
"""Return an aggregate status for this category based on the statuses
of its TestResult objects."""
statuses = set(result.status for result in self.test_results)
# case/match syntax doesn't work with sets, unfortunately
if statuses == {Status.PASS}:
return Status.PASS
if statuses == {Status.WARN}:
return Status.WARN
if statuses == {Status.FAIL}:
return Status.FAIL
if statuses == {Status.PASS, Status.WARN}:
return Status.WARN
if statuses == {Status.PASS, Status.FAIL}:
return Status.FAIL
if statuses == {Status.WARN, Status.FAIL}:
return Status.FAIL
raise ValueError(f"Unexpected combination of statuses: {statuses}")
def add_to_workbook(self, workbook: openpyxl.Workbook) -> None:
"""Add a sheet of failed dbt tests to an openpyxl Workbook using data
from the TestCategory object. Note that we expect the workbook to be
initialized with write_only=True."""
# Only add this category to the workbook if it has any failing tests
if self.status in (Status.PASS, Status.WARN):
print(
f"Skipping add_to_workbook for category {self.category} since "
f"its status is '{self.status!r}'"
)
return
# openpyxl Workbooks are typically created with one untitled active
# sheet by default, but write-only sheets are an exception to this
# rule, so we always have to create a new sheet
sheet = workbook.create_sheet()
sheet.title = self.category
# Freeze the header row. The syntax for the freeze_panes attribute is
# undocumented, but it freezes all rows above and all columns to the
# left of the given cell identifier. Note that freeze operations must
# be performed before any data is added to a sheet in a write-only
# workbook
data_header_idx = 3 # We have 3 headers; 2 for grouping and 1 for data
freeze_pane_letter = openpyxl.utils.get_column_letter(
len(self.test_metadata_fieldnames) + 1
)
freeze_pane_number = data_header_idx + 1
sheet.freeze_panes = f"{freeze_pane_letter}{freeze_pane_number}"
# Hide columns that are intended for debugging only, so that they don't
# get in the way of non-technical workbook consumers
for col_idx in self.debugging_field_indexes:
sheet.column_dimensions[col_idx].hidden = True
# Create groupings for columns with a special group header
bold_font = openpyxl.styles.Font(bold=True)
italic_font = openpyxl.styles.Font(italic=True)
title_row, subtitle_row, header_row, merged_cell_range = [], [], [], []
column_groups = {
self.test_metadata_field_indexes: {
"title": "Test description fields",
"subtitle": "These fields identify a failing test.",
"fieldnames": self.test_metadata_fieldnames,
"style": "20 % - Accent4",
"header_style": "Accent4",
},
self.diagnostic_field_indexes: {
"title": "Unique identifier fields",
"subtitle": (
"These fields identify the row that is failing a test."
),
"fieldnames": self.diagnostic_fieldnames,
"style": "20 % - Accent1",
"header_style": "Accent1",
},
self.nonfixed_field_indexes: {
"title": "Problematic fields",
"subtitle": (
"These fields contain values that are causing the test "
"to fail."
),
"fieldnames": self.nonfixed_fieldnames,
"style": "20 % - Accent2",
"header_style": "Accent2",
},
}
for col_group_indexes, col_metadata in column_groups.items():
# Sometimes there are no problematic fields for a given test;
# if this is the case, skip it
if not col_group_indexes:
continue
# Save merged cell info
for cell_range in [
f"{col_group_indexes[0]}1:{col_group_indexes[-1]}1",
f"{col_group_indexes[0]}2:{col_group_indexes[-1]}2",
]:
merged_cell_range.append(cell_range)
# Fill out and format grouping header
title_cell = openpyxl.cell.WriteOnlyCell(
sheet, value=col_metadata["title"]
)
title_cell.style = "Note"
title_cell.font = bold_font
title_row.append(title_cell)
# Flesh out the empty title row cells that will be merged later on
for _ in range(len(col_group_indexes) - 1):
title_row.append("")
subtitle_cell = openpyxl.cell.WriteOnlyCell(
sheet, value=col_metadata["subtitle"]
)
subtitle_cell.style = "Note"
subtitle_cell.font = italic_font
subtitle_row.append(subtitle_cell)
for _ in range(len(col_group_indexes) - 1):
subtitle_row.append("")
# Fill out and format the data header
for fieldname in col_metadata["fieldnames"]:
header_cell = openpyxl.cell.WriteOnlyCell(
sheet, value=fieldname
)
header_cell.style = col_metadata["header_style"]
header_cell.font = openpyxl.styles.Font(
bold=True, color=openpyxl.styles.colors.WHITE
)
header_row.append(header_cell)
# Initialize the column widths based on the length of values in
# the header row
column_widths = {
openpyxl.utils.get_column_letter(idx + 1): len(fieldname) + 2
for idx, fieldname in enumerate(self.fieldnames)
}
# Iterate the rows to extract data and optionally update the column
# widths if the length of the cell value exceeds the length of the
# header value
data_rows = []
for row in self.rows:
data_row = []
# Start enumeration at 1 since openpyxl columns are 1-indexed
for col_idx, cell in enumerate(row, 1):
# Convert row values to string so that Excel doesn't apply
# autoformatting
cell_str = str(cell) if cell is not None else ""
cell = openpyxl.cell.WriteOnlyCell(sheet, value=cell_str)
# Retrieve the cell style from the column groupings if one
# exists
cell_style = None
column_letter = openpyxl.utils.get_column_letter(col_idx)
for col_group_indexes, col_metadata in column_groups.items():
if column_letter in col_group_indexes:
cell_style = col_metadata["style"]
if cell_style:
cell.style = cell_style
data_row.append(cell)
# Check if this cell is longer than the longest cell we've seen
# so far, and adjust the column dimensions accordingly
column_letter = openpyxl.utils.get_column_letter(col_idx)
column_widths[column_letter] = max(
column_widths.get(column_letter, 0), len(cell_str)
)
data_rows.append(data_row)
# Update column widths so that they fit the longest column
for (
column_letter,
column_width,
) in column_widths.items():
# Pad with an extra two characters to account for the fact that
# non-monospace fonts do not have consistent character widths,
# and set a hard limit of 75 characters so no one field takes over
# the viewport of the spreadsheet
width = min(column_width + 2, 75)
sheet.column_dimensions[column_letter].width = width
# Add filters to fixed columns (i.e. columns that appear in every sheet
# in the same position)
fixed_field_indexes = self.fixed_field_indexes
sheet_max_row_idx = data_header_idx + len(data_rows)
min_fixed_idx = f"{fixed_field_indexes[0]}{data_header_idx}"
max_fixed_idx = f"{fixed_field_indexes[-1]}{sheet_max_row_idx}"
fixed_field_range = f"{min_fixed_idx}:{max_fixed_idx}"
sheet.auto_filter.ref = fixed_field_range
# Add the data to the sheet. This should be one of the last steps in
# this function, since write-only sheets require all formatting to be
# set before data is added
sheet.append(title_row)
sheet.append(subtitle_row)
sheet.append(header_row)
for data_row in data_rows:
sheet.append(data_row)
# Merge cells in the grouping headers. This approach is a bit of a hack
# since merged cells are not fully supported in write-only workbooks,
# hence why it takes place _after_ rows have been added to the sheet
# whereas most formatting options for write-only workbooks need to
# happen _before_ data is added. See here for details:
# https://stackoverflow.com/a/66159254
for cell_range in merged_cell_range:
sheet.merged_cells.ranges.add(cell_range)
@property
def debugging_fieldnames(self) -> typing.List[str]:
"""Get a list of fieldnames (e.g. ["foo", "bar"]) for fields that
are used for debugging."""
return self._filter_for_existing_fieldnames(
self.possible_debugging_fieldnames
)
@property
def debugging_field_indexes(self) -> tuple:
"""Get a tuple of field indexes (e.g. ["A", "B"]) for fields that
are used for debugging."""
return self._filter_for_existing_field_indexes(
self.possible_debugging_fieldnames
)
@property
def test_metadata_fieldnames(self) -> typing.List[str]:
"""Get a list of fieldnames (e.g. ["foo", "bar"]) for fields that
are used for identifying tests."""
return self._filter_for_existing_fieldnames(
self.possible_test_metadata_fieldnames
)
@property
def test_metadata_field_indexes(self) -> tuple:
"""Get a tuple of field indexes (e.g. ["A", "B"]) for fields that
are used for identifying tests."""
return self._filter_for_existing_field_indexes(
self.possible_test_metadata_fieldnames
)
@property
def diagnostic_fieldnames(self) -> typing.List[str]:
"""Get a list of fieldnames (e.g. ["foo", "bar"]) for fields that
are used for diagnostics."""
return self._filter_for_existing_fieldnames(
self.possible_diagnostic_fieldnames
)
@property
def diagnostic_field_indexes(self) -> tuple:
"""Get a tuple of field indexes (e.g. ["A", "B"]) for fields that
are used for diagnostics."""
return self._filter_for_existing_field_indexes(
self.possible_diagnostic_fieldnames
)
@property
def fixed_fieldnames(self) -> typing.List[str]:
"""Get a list of fieldnames (e.g. ["foo", "bar"]) for fields that
are fixed (i.e. whose position is always at the start of the sheet,
for diagnostic purposes)."""
return self._filter_for_existing_fieldnames(
self.possible_fixed_fieldnames
)
@property
def fixed_field_indexes(self) -> tuple:
"""Get a list of field indexes (e.g. ["A", "B"]) for fields that
are fixed (i.e. whose position is always at the start of the sheet,
for diagnostic purposes)."""
return self._filter_for_existing_field_indexes(
self.possible_fixed_fieldnames
)
@property
def nonfixed_fieldnames(self) -> typing.List[str]:
"""Get a list of field names (e.g. ["foo", "bar"]) for fields that
are nonfixed (i.e. whose position comes after the fixed fields in the
sheet and are thus variable)."""
fieldnames = self.fieldnames
fixed_fieldnames = self.possible_fixed_fieldnames
return [field for field in fieldnames if field not in fixed_fieldnames]
@property
def nonfixed_field_indexes(self) -> tuple:
"""Get a list of field indexes (e.g. ["A", "B"]) for fields that
are nonfixed (i.e. whose position comes after the fixed fields in the
sheet and are thus variable)."""
nonfixed_fieldnames = self.nonfixed_fieldnames
return self._filter_for_existing_field_indexes(nonfixed_fieldnames)
def _filter_for_existing_fieldnames(
self, possible_fieldnames: typing.List[str]
) -> typing.List[str]:
"""Helper function to filter a list of `possible_fieldnames` for
only those fields that exist in the test group, returning the
names of the fields (e.g. ["foo", "bar"])."""
existing_fieldnames = self.fieldnames
return [
field
for field in possible_fieldnames
if field in existing_fieldnames
]
def _filter_for_existing_field_indexes(
self, possible_fieldnames: typing.List[str]
) -> tuple:
"""Helper function to filter a list of `possible_fieldnames` for
only those fields that exist in the test group, returning the
indexes of the fields (e.g. ["A", "B"])."""
existing_fieldnames = self.fieldnames
return tuple(
openpyxl.utils.get_column_letter(
# openpyxl is 1-indexed while the index() method is 0-indexed
existing_fieldnames.index(field) + 1
)
for field in self._filter_for_existing_fieldnames(
possible_fieldnames
)
)
# Help docstring for the command line interface
CLI_DESCRIPTION = """Runs iasWorld data tests and generates an Excel workbook of dbt test failures that can be shared with other teams
for review and correction, along with metadata parquet files that can be uploaded to S3 for long-term result tracking.
This script expects that Python dependencies have been installed from [project.optional-dependencies].dbt_tests.
Expects one required environment variable to be set:
1. USER: The username of the user who ran the script. This is automatically set during login on Unix systems, but should be set manually elsewhere.
Expects four optional environment variables:
1. AWS_ATHENA_S3_STAGING_DIR: Location in S3 where Athena query results should be written (defaults to s3://ccao-athena-results-us-east-1)
2. GIT_SHA: The SHA of the latest git commit (defaults to the output of `git rev-parse`)
3. GIT_REF: The name of the ref for the latest git commit (defaults to the output of `git rev-parse --abbrev-ref`)
4. GIT_AUTHOR: The author of the latest git commit (defaults to the output of `git log`)
Outputs three files to the directory specified by the `--output-dir` flag:
1. `iasworld_test_failures_<date>.xlsx`: Excel workbook to share with other teams
2. `metadata/test_run/run_year=YYYY/*.parquet`: Metadata about this run, partitioned by year of run and prepped for upload to S3
3. `metadata/test_run_result/run_year=YYYY/*.parquet`: Metadata about test results (pass, fail, number of failing rows, etc.) in this run,
partitioned by year of run and prepped for upload to S3
Each sheet in the output workbook represents a category of test, e.g. "valid_range" or "not_null"; each row in a sheet represents a row in
a database that failed a test, with enough metadata that a reader can figure out what conditions caused the test to fail and investigate the root cause.""" # noqa: E501
# Examples to use in the command line interface docstring
CLI_EXAMPLE = """Example usage with no options provided:
python3 run_iasworld_data_tests.py
Example usage with all options provided:
AWS_ATHENA_S3_STAGING_DIR=s3://foo-bar-baz/ python3 run_iasworld_data_tests.py
--output-dir ./iasworld_test_results/
--township 77
--no-use-cached
Example usage to filter for multiple townships:
python3 run_iasworld_data_tests.py --township 76 77
Example usage to skip running tests, and instead reuse results from a previous run:
python3 run_iasworld_data_tests.py --use-cached
""" # noqa: E501
def main() -> None:
"""Entrypoint to this script. Runs dbt tests and writes artifacts
to the output directory with metadata about test results."""
parser = argparse.ArgumentParser(
description=CLI_DESCRIPTION,
epilog=CLI_EXAMPLE,
# Parse the description and epilog as raw text so that newlines
# get preserved
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--output-dir",
required=False,
help=(
"The directory to which output artifacts should be written; "
"if the directory does not exist, it will be created. Defaults to "
"'./iasworld_test_results_<date>/'."
),
)
parser.add_argument(
"--township",
required=False,
nargs="*",
help=(
"One or more optional township codes which will be used to filter "
"results. Can be provided with multiple space-separated values "
"to select multiple townships. Defaults to all townships, "
"including null townships (which typically indicate invalid PINs)."
),
)
parser.add_argument(
"--use-cached",
action=argparse.BooleanOptionalAction,
required=False,
help=(
"Toggle using cached results from the most recent run. Useful when debugging "
"transformation steps. Defaults to False."
),
)
parser.add_argument(
"--skip-artifacts",
action=argparse.BooleanOptionalAction,
required=False,
help=(
"Just run tests and skip the step that parses test output. "
"Ignored if --use-cached is set, since --use-cached implies "
"that the script should skip running tests"
),
)
parser.add_argument(
"--defer",
action=argparse.BooleanOptionalAction,
required=False,
default=False,
help=(
"Same as the dbt --defer option, resolves unselected nodes by "
"deferring to the manifest within the --state directory"
),
)
parser.add_argument(
"--state",
required=False,
help=(
"Same as the dbt --state option, use this state directory for "
"deferral"
),
)
parser.add_argument(
*constants.SELECT_ARGUMENT_ARGS, **constants.SELECT_ARGUMENT_KWARGS
)
parser.add_argument(
*constants.SELECTOR_ARGUMENT_ARGS, **constants.SELECTOR_ARGUMENT_KWARGS
)
parser.add_argument(
*constants.TARGET_ARGUMENT_ARGS, **constants.TARGET_ARGUMENT_KWARGS
)
args = parser.parse_args()
output_dir = args.output_dir
townships = args.township if args.township else tuple()
use_cached = args.use_cached
skip_artifacts = args.skip_artifacts
defer = args.defer
state = args.state
select = args.select
selector = args.selector
target = args.target
run_results_filepath = os.path.join("target", "run_results.json")
manifest_filepath = os.path.join("target", "manifest.json")
date_today = datetime.datetime.today().strftime("%Y-%m-%d")
if output_dir is None:
output_dir = f"iasworld_test_results_{date_today}"
if (not defer and state) or (defer and not state):
raise ValueError("--defer and --state must be used together")
select_args = ["--selector", "select_data_test_iasworld"]
if select:
select_args = ["--select", *select]
if selector:
select_args = ["--selector", selector]
if use_cached:
test_cache_path = get_test_cache_path(
run_results_filepath, manifest_filepath, townships
)
if os.path.isfile(test_cache_path):
print(f"Loading test results from cache at {test_cache_path}")
test_categories = get_test_categories_from_file(test_cache_path)
else:
raise ValueError(
f"Test cache not found at {test_cache_path}, try rerunning "
"without --use-cached"
)
else:
print("Running tests")
dbt_run_args = ["test", "--target", target, *select_args]
if not skip_artifacts:
dbt_run_args.append("--store-failures")
if defer and state:
dbt_run_args += ["--defer", "--state", state]
print(f"> dbt {' '.join(dbt_run_args)}")
dbt_test_result = DBT.invoke(dbt_run_args)
if dbt_test_result.exception is not None:
raise dbt_test_result.exception
if any(
result.status == TestStatus.Error
for result in getattr(dbt_test_result.result, "results", [])
):
# No need to report the exception, since the dbt process
# will have printed it already
raise ValueError("Quitting due to error in dbt test run")
if skip_artifacts:
print("Skipping artifact generation since --skip-artifacts is set")
return
print("Loading test results from Athena")
test_categories = get_test_categories_from_athena(
run_results_filepath, manifest_filepath, townships
)
new_test_cache_path = get_test_cache_path(
run_results_filepath, manifest_filepath, townships
)
print(f"Saving test results to the cache at {new_test_cache_path}")
save_test_categories_to_file(test_categories, new_test_cache_path)
print("Generating the output workbook")
# It's important to use a write-only workbook here because otherwise
# the metadata required to store cell info about a large number of failing
# tests can cause the process to run out of memory
workbook = openpyxl.Workbook(write_only=True)
for test_category in test_categories:
print(f"Adding sheet for {test_category.category}")
test_category.add_to_workbook(workbook)
pathlib.Path(output_dir).mkdir(exist_ok=True)
workbook_filepath = os.path.join(
output_dir, f"iasworld_test_failures_{date_today}.xlsx"
)
workbook.save(workbook_filepath)
print(f"Output workbook saved to {workbook_filepath}")
# Get run metadata from the environment
try:
run_by = os.environ["USER"]
except KeyError:
raise ValueError("USER env variable must be set")
git_sha = (
os.environ["GIT_SHA"]
if os.getenv("GIT_SHA")
else subprocess.getoutput("git rev-parse HEAD")
)
git_ref = (
os.environ["GIT_REF"]
if os.getenv("GIT_REF")
else subprocess.getoutput("git rev-parse --abbrev-ref HEAD")
)
git_author = (
os.environ["GIT_AUTHOR"]
if os.getenv("GIT_AUTHOR")
else subprocess.getoutput("git log -1 --pretty=format:'%an <%ae>'")
)
# Generate and save metadata tables as parquet
test_run_metadata = TestRunMetadata.create(
run_results_filepath, run_by, git_sha, git_ref, git_author
)
test_run_result_metadata_list = TestRunResultMetadata.create_list(
test_categories, run_results_filepath
)
test_run_failing_row_metadata_list = TestRunFailingRowMetadata.create_list(
test_categories, run_results_filepath
)
run_date = get_run_date_from_run_results(run_results_filepath)
run_id = get_run_id_from_run_results(run_results_filepath)
for metadata_list, tablename, partition_cols in [
([test_run_metadata], "test_run", ["run_year"]),
(test_run_result_metadata_list, "test_run_result", ["run_year"]),
(
test_run_failing_row_metadata_list,
"test_run_failing_row",
["run_year"],
),
]:
if not metadata_list:
print(f"{tablename} is empty, skipping metadata output")
continue
table = pa.Table.from_pylist(
[meta_obj.to_dict() for meta_obj in metadata_list], # type: ignore
)
metadata_root_path = os.path.join(output_dir, "metadata", tablename)
pyarrow.parquet.write_to_dataset(
table,
metadata_root_path,
partition_cols,
basename_template="%s_%s_{i}.parquet" % (run_date, run_id),
)
print(f"{tablename} metadata saved to {metadata_root_path}/")
@dataclasses.dataclass
class TestRunMetadata:
"""Metadata object storing information about a test run."""
run_id: str
run_date: str
run_year: str # Separate from run_date for partitioning
run_by: str
elapsed_time: decimal.Decimal
var_year_start: str
var_year_end: str
git_sha: str
git_ref: str
git_author: str
@classmethod
def create(
cls,
run_results_filepath: str,
run_by: str,
git_sha: str,
git_ref: str,
git_author: str,
) -> "TestRunMetadata":
"""Generate a TestRunMetadata object from a filepath to a
run_results.json file."""
run_id = get_run_id_from_run_results(run_results_filepath)
run_date = get_run_date_from_run_results(run_results_filepath)
run_year = run_date[:4]
elapsed_time = get_key_from_run_results(
"elapsed_time", run_results_filepath
)
# Extract dbt vars
run_vars = get_key_from_run_results("args", run_results_filepath)[
"vars"
]
var_year_start = run_vars.get("data_test_iasworld_year_start")
var_year_end = run_vars.get("data_test_iasworld_year_end")
# If dbt vars weren't set on the command line, the defaults won't exist
# in run_results.json, so we have to parse them from the dbt project
# config
if not var_year_start or not var_year_end:
with open("dbt_project.yml") as project_fobj:
project = yaml.safe_load(project_fobj)
var_year_start = (
var_year_start
or project["vars"]["data_test_iasworld_year_start"]
)
var_year_end = (
var_year_end or project["vars"]["data_test_iasworld_year_end"]
)
return cls(
run_id=run_id,
run_year=run_year,
run_date=run_date,
run_by=run_by,
elapsed_time=elapsed_time,
var_year_start=var_year_start,
var_year_end=var_year_end,
git_sha=git_sha,
git_ref=git_ref,
git_author=git_author,
)
def to_dict(self) -> typing.Dict:
return dataclasses.asdict(self)
@dataclasses.dataclass
class TestRunResultMetadata:
"""Metadata object storing aggregated information about township-level
test results in a run."""