Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

Commit 3991b74

Browse files
authored
Merge pull request #782 from datafold/revert_databricks_information_schema
revert databricks information_schema
2 parents 0c21e43 + 73331fc commit 3991b74

File tree

1 file changed

+30
-39
lines changed

1 file changed

+30
-39
lines changed

data_diff/databases/databricks.py

+30-39
Original file line numberDiff line numberDiff line change
@@ -139,47 +139,38 @@ def create_connection(self):
139139
raise ConnectionError(*e.args) from e
140140

141141
def query_table_schema(self, path: DbPath) -> Dict[str, tuple]:
142+
# Databricks has INFORMATION_SCHEMA only for Databricks Runtime, not for Databricks SQL.
143+
# https://docs.databricks.com/spark/latest/spark-sql/language-manual/information-schema/columns.html
144+
# So, to obtain information about schema, we should use another approach.
145+
142146
conn = self.create_connection()
143-
table_schema = {}
144147

145-
try:
146-
table_schema = super().query_table_schema(path)
147-
except:
148-
logging.warning("Failed to get schema from information_schema, falling back to legacy approach.")
149-
150-
if not table_schema:
151-
# This legacy approach can cause bugs. e.g. VARCHAR(255) -> VARCHAR(255)
152-
# and not the expected VARCHAR
153-
154-
# I don't think we'll fall back to this approach, but if so, see above
155-
catalog, schema, table = self._normalize_table_path(path)
156-
with conn.cursor() as cursor:
157-
cursor.columns(catalog_name=catalog, schema_name=schema, table_name=table)
158-
try:
159-
rows = cursor.fetchall()
160-
finally:
161-
conn.close()
162-
if not rows:
163-
raise RuntimeError(f"{self.name}: Table '{'.'.join(path)}' does not exist, or has no columns")
164-
165-
table_schema = {r.COLUMN_NAME: (r.COLUMN_NAME, r.TYPE_NAME, r.DECIMAL_DIGITS, None, None) for r in rows}
166-
assert len(table_schema) == len(rows)
167-
return table_schema
168-
else:
169-
return table_schema
170-
171-
def select_table_schema(self, path: DbPath) -> str:
172-
"""Provide SQL for selecting the table schema as (name, type, date_prec, num_prec)"""
173-
database, schema, name = self._normalize_table_path(path)
174-
info_schema_path = ["information_schema", "columns"]
175-
if database:
176-
info_schema_path.insert(0, database)
177-
178-
return (
179-
"SELECT column_name, data_type, datetime_precision, numeric_precision, numeric_scale "
180-
f"FROM {'.'.join(info_schema_path)} "
181-
f"WHERE table_name = '{name}' AND table_schema = '{schema}'"
182-
)
148+
catalog, schema, table = self._normalize_table_path(path)
149+
with conn.cursor() as cursor:
150+
cursor.columns(catalog_name=catalog, schema_name=schema, table_name=table)
151+
try:
152+
rows = cursor.fetchall()
153+
finally:
154+
conn.close()
155+
if not rows:
156+
raise RuntimeError(f"{self.name}: Table '{'.'.join(path)}' does not exist, or has no columns")
157+
158+
d = {r.COLUMN_NAME: (r.COLUMN_NAME, r.TYPE_NAME, r.DECIMAL_DIGITS, None, None) for r in rows}
159+
assert len(d) == len(rows)
160+
return d
161+
162+
# def select_table_schema(self, path: DbPath) -> str:
163+
# """Provide SQL for selecting the table schema as (name, type, date_prec, num_prec)"""
164+
# database, schema, name = self._normalize_table_path(path)
165+
# info_schema_path = ["information_schema", "columns"]
166+
# if database:
167+
# info_schema_path.insert(0, database)
168+
169+
# return (
170+
# "SELECT column_name, data_type, datetime_precision, numeric_precision, numeric_scale "
171+
# f"FROM {'.'.join(info_schema_path)} "
172+
# f"WHERE table_name = '{name}' AND table_schema = '{schema}'"
173+
# )
183174

184175
def _process_table_schema(
185176
self, path: DbPath, raw_schema: Dict[str, tuple], filter_columns: Sequence[str], where: str = None

0 commit comments

Comments
 (0)