1
+ import json
1
2
import os
2
3
import re
3
4
import time
13
14
from . import connect_to_table , diff_tables , Algorithm
14
15
from .cloud import DatafoldAPI , TCloudApiDataDiff , TCloudApiOrgMeta , get_or_create_data_source
15
16
from .dbt_parser import DbtParser , PROJECT_FILE , TDatadiffConfig
17
+ from .diff_tables import DiffResultWrapper
18
+ from .format import jsonify , jsonify_error
16
19
from .tracking import (
17
20
bool_ask_for_email ,
18
21
create_email_signup_event_json ,
@@ -49,13 +52,15 @@ class TDiffVars(pydantic.BaseModel):
49
52
where_filter : Optional [str ] = None
50
53
include_columns : List [str ]
51
54
exclude_columns : List [str ]
55
+ dbt_model : Optional [str ] = None
52
56
53
57
54
58
def dbt_diff (
55
59
profiles_dir_override : Optional [str ] = None ,
56
60
project_dir_override : Optional [str ] = None ,
57
61
is_cloud : bool = False ,
58
62
dbt_selection : Optional [str ] = None ,
63
+ json_output : bool = False ,
59
64
state : Optional [str ] = None ,
60
65
) -> None :
61
66
print_version_info ()
@@ -66,7 +71,6 @@ def dbt_diff(
66
71
config = dbt_parser .get_datadiff_config ()
67
72
_initialize_events (dbt_parser .dbt_user_id , dbt_parser .dbt_version , dbt_parser .dbt_project_id )
68
73
69
-
70
74
if not state and not (config .prod_database or config .prod_schema ):
71
75
doc_url = "https://docs.datafold.com/development_testing/open_source#configure-your-dbt-project"
72
76
raise DataDiffDbtProjectVarsNotFoundError (
@@ -122,12 +126,25 @@ def dbt_diff(
122
126
diff_thread = run_as_daemon (_cloud_diff , diff_vars , config .datasource_id , api , org_meta )
123
127
diff_threads .append (diff_thread )
124
128
else :
125
- _local_diff (diff_vars )
129
+ _local_diff (diff_vars , json_output )
126
130
else :
127
- rich .print (
128
- _diff_output_base ("." .join (diff_vars .dev_path ), "." .join (diff_vars .prod_path ))
129
- + "Skipped due to unknown primary key. Add uniqueness tests, meta, or tags.\n "
130
- )
131
+ if json_output :
132
+ print (
133
+ json .dumps (
134
+ jsonify_error (
135
+ table1 = diff_vars .prod_path ,
136
+ table2 = diff_vars .dev_path ,
137
+ dbt_model = diff_vars .dbt_model ,
138
+ error = "No primary key found. Add uniqueness tests, meta, or tags." ,
139
+ )
140
+ ),
141
+ flush = True ,
142
+ )
143
+ else :
144
+ rich .print (
145
+ _diff_output_base ("." .join (diff_vars .dev_path ), "." .join (diff_vars .prod_path ))
146
+ + "Skipped due to unknown primary key. Add uniqueness tests, meta, or tags.\n "
147
+ )
131
148
132
149
# wait for all threads
133
150
if diff_threads :
@@ -162,6 +179,7 @@ def _get_diff_vars(
162
179
datadiff_model_config = dbt_parser .get_datadiff_model_config (model .meta )
163
180
164
181
return TDiffVars (
182
+ dbt_model = model .unique_id ,
165
183
dev_path = dev_qualified_list ,
166
184
prod_path = prod_qualified_list ,
167
185
primary_keys = primary_keys ,
@@ -212,15 +230,15 @@ def _get_prod_path_from_manifest(model, prod_manifest) -> Union[Tuple[str, str],
212
230
return prod_database , prod_schema
213
231
214
232
215
- def _local_diff (diff_vars : TDiffVars ) -> None :
233
+ def _local_diff (diff_vars : TDiffVars , json_output : bool = False ) -> None :
216
234
dev_qualified_str = "." .join (diff_vars .dev_path )
217
235
prod_qualified_str = "." .join (diff_vars .prod_path )
218
236
diff_output_str = _diff_output_base (dev_qualified_str , prod_qualified_str )
219
237
220
- table1 = connect_to_table (diff_vars .connection , dev_qualified_str , tuple (diff_vars .primary_keys ), diff_vars .threads )
221
- table2 = connect_to_table (
238
+ table1 = connect_to_table (
222
239
diff_vars .connection , prod_qualified_str , tuple (diff_vars .primary_keys ), diff_vars .threads
223
240
)
241
+ table2 = connect_to_table (diff_vars .connection , dev_qualified_str , tuple (diff_vars .primary_keys ), diff_vars .threads )
224
242
225
243
table1_columns = table1 .get_schema ()
226
244
try :
@@ -235,11 +253,11 @@ def _local_diff(diff_vars: TDiffVars) -> None:
235
253
table1_column_names = set (table1_columns .keys ())
236
254
table2_column_names = set (table2_columns .keys ())
237
255
column_set = table1_column_names .intersection (table2_column_names )
238
- columns_added = table1_column_names .difference (table2_column_names )
239
- columns_removed = table2_column_names .difference (table1_column_names )
256
+ columns_added = table2_column_names .difference (table1_column_names )
257
+ columns_removed = table1_column_names .difference (table2_column_names )
240
258
# col type is i = 1 in tuple
241
259
columns_type_changed = {
242
- k for k , v in table1_columns .items () if k in table2_columns and v [1 ] != table2_columns [k ][1 ]
260
+ k for k , v in table2_columns .items () if k in table1_columns and v [1 ] != table1_columns [k ][1 ]
243
261
}
244
262
245
263
if columns_added :
@@ -262,7 +280,7 @@ def _local_diff(diff_vars: TDiffVars) -> None:
262
280
263
281
extra_columns = tuple (column_set )
264
282
265
- diff = diff_tables (
283
+ diff : DiffResultWrapper = diff_tables (
266
284
table1 ,
267
285
table2 ,
268
286
threaded = True ,
@@ -271,6 +289,35 @@ def _local_diff(diff_vars: TDiffVars) -> None:
271
289
where = diff_vars .where_filter ,
272
290
skip_null_keys = True ,
273
291
)
292
+ if json_output :
293
+ # drain the iterator to get accumulated stats in diff.info_tree
294
+ try :
295
+ list (diff )
296
+ except Exception as e :
297
+ print (
298
+ json .dumps (
299
+ jsonify_error (list (table1 .table_path ), list (table2 .table_path ), diff_vars .dbt_model , str (e ))
300
+ ),
301
+ flush = True ,
302
+ )
303
+ return
304
+
305
+ print (
306
+ json .dumps (
307
+ jsonify (
308
+ diff ,
309
+ dbt_model = diff_vars .dbt_model ,
310
+ with_summary = True ,
311
+ with_columns = {
312
+ "added" : columns_added ,
313
+ "removed" : columns_removed ,
314
+ "changed" : columns_type_changed ,
315
+ },
316
+ )
317
+ ),
318
+ flush = True ,
319
+ )
320
+ return
274
321
275
322
if list (diff ):
276
323
diff_output_str += f"{ diff .get_stats_string (is_dbt = True )} \n "
@@ -425,7 +472,7 @@ def _initialize_events(dbt_user_id: Optional[str], dbt_version: Optional[str], d
425
472
426
473
427
474
def _email_signup () -> None :
428
- email_regex = r' ^[\w\.\+-]+@[\w\.-]+\.\w+$'
475
+ email_regex = r" ^[\w\.\+-]+@[\w\.-]+\.\w+$"
429
476
prompt = "\n Would you like to be notified when a new data-diff version is available?\n \n Enter email or leave blank to opt out (we'll only ask once).\n "
430
477
431
478
if bool_ask_for_email ():
0 commit comments