Skip to content

Commit

Permalink
feature: Update dbcat version to 0.6.1
Browse files Browse the repository at this point in the history
This dependency update provide sql migrations using Alembic as well as
managed sessions to eliminate chances of leaked database connections.

Other minor improvements:

* Remove idea config files and setup.py
* Fix path in README
* Update version 0.8.0
* Fix code in example.py to analyze queries. Wrong API was used.
  • Loading branch information
vrajat committed Jul 29, 2021
1 parent 070557b commit fa63423
Show file tree
Hide file tree
Showing 20 changed files with 297 additions and 382 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and WebStorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839

.idea

# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
Expand Down
2 changes: 0 additions & 2 deletions .idea/.gitignore

This file was deleted.

18 changes: 0 additions & 18 deletions .idea/data-lineage.iml

This file was deleted.

6 changes: 0 additions & 6 deletions .idea/inspectionProfiles/profiles_settings.xml

This file was deleted.

7 changes: 0 additions & 7 deletions .idea/misc.xml

This file was deleted.

8 changes: 0 additions & 8 deletions .idea/modules.xml

This file was deleted.

6 changes: 0 additions & 6 deletions .idea/vcs.xml

This file was deleted.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Download the docker-compose file from Github repository.
# in a new directory run
wget https://raw.githubusercontent.com/tokern/data-lineage/master/install-manifests/docker-compose/catalog-demo.yml
# or run
curl https://raw.githubusercontent.com/tokern/data-lineage/master/install-manifests/docker-compose/catalog-demo.yml -o docker-compose.yml
curl https://raw.githubusercontent.com/tokern/data-lineage/master/install-manifests/docker-compose/tokern-lineage-engine.yml -o docker-compose.yml


Run docker-compose
Expand Down
2 changes: 1 addition & 1 deletion data_lineage/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# flake8: noqa
__version__ = "0.7.8"
__version__ = "0.8.0"

import datetime
import json
Expand Down
143 changes: 71 additions & 72 deletions data_lineage/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import flask_restless
import gunicorn.app.base
from dbcat import Catalog
from dbcat import Catalog, init_db
from dbcat.catalog import CatColumn
from dbcat.catalog.db import DbScanner
from dbcat.catalog.models import (
Expand Down Expand Up @@ -66,23 +66,24 @@ def get(self):
edges = []

args = self._parser.parse_args()
column_edges = self._catalog.get_column_lineages(args["job_ids"])
for edge in column_edges:
nodes.append(self._column_info(edge.source))
nodes.append(self._column_info(edge.target))
nodes.append(self._job_info(edge.job_execution.job))
edges.append(
{
"source": "column:{}".format(edge.source_id),
"target": "task:{}".format(edge.job_execution.job_id),
}
)
edges.append(
{
"source": "task:{}".format(edge.job_execution.job_id),
"target": "column:{}".format(edge.target_id),
}
)
with self._catalog.managed_session:
column_edges = self._catalog.get_column_lineages(args["job_ids"])
for edge in column_edges:
nodes.append(self._column_info(edge.source))
nodes.append(self._column_info(edge.target))
nodes.append(self._job_info(edge.job_execution.job))
edges.append(
{
"source": "column:{}".format(edge.source_id),
"target": "task:{}".format(edge.job_execution.job_id),
}
)
edges.append(
{
"source": "task:{}".format(edge.job_execution.job_id),
"target": "column:{}".format(edge.target_id),
}
)

return {"nodes": nodes, "edges": edges}

Expand All @@ -106,14 +107,12 @@ def __init__(self, catalog: Catalog):
self._parser.add_argument("id", required=True, help="ID of the resource")

def post(self):
try:
args = self._parser.parse_args()
logging.debug("Args for scanning: {}".format(args))
args = self._parser.parse_args()
logging.debug("Args for scanning: {}".format(args))
with self._catalog.managed_session:
source = self._catalog.get_source_by_id(int(args["id"]))
DbScanner(self._catalog, source).scan()
return "Scanned {}".format(source.fqdn), 200
finally:
self._catalog.scoped_session.remove()


class Parse(Resource):
Expand All @@ -134,27 +133,26 @@ def post(self):
raise ParseErrorHTTP(description=str(error))

try:
source = self._catalog.get_source_by_id(args["source_id"])
logging.debug("Parsing query for source {}".format(source))
binder = parse_dml_query(
catalog=self._catalog, parsed=parsed, source=source
)

return (
{
"select_tables": [table.name for table in binder.tables],
"select_columns": [context.alias for context in binder.columns],
},
200,
)
with self._catalog.managed_session:
source = self._catalog.get_source_by_id(args["source_id"])
logging.debug("Parsing query for source {}".format(source))
binder = parse_dml_query(
catalog=self._catalog, parsed=parsed, source=source
)

return (
{
"select_tables": [table.name for table in binder.tables],
"select_columns": [context.alias for context in binder.columns],
},
200,
)
except TableNotFound as table_error:
raise TableNotFoundHTTP(description=str(table_error))
except ColumnNotFound as column_error:
raise ColumnNotFoundHTTP(description=str(column_error))
except SemanticError as semantic_error:
raise SemanticErrorHTTP(description=str(semantic_error))
finally:
self._catalog.scoped_session.remove()


class Analyze(Resource):
Expand Down Expand Up @@ -182,45 +180,44 @@ def post(self):
raise ParseErrorHTTP(description=str(error))

try:
source = self._catalog.get_source_by_id(args["source_id"])
logging.debug("Parsing query for source {}".format(source))
chosen_visitor = analyze_dml_query(self._catalog, parsed, source)
job_execution = extract_lineage(
catalog=self._catalog,
visited_query=chosen_visitor,
source=source,
parsed=parsed,
start_time=datetime.datetime.fromisoformat(args["start_time"]),
end_time=datetime.datetime.fromisoformat(args["end_time"]),
)

return (
{
"data": {
"id": job_execution.id,
"type": "job_executions",
"attributes": {
"job_id": job_execution.job_id,
"started_at": job_execution.started_at.strftime(
"%Y-%m-%d %H:%M:%S"
),
"ended_at": job_execution.ended_at.strftime(
"%Y-%m-%d %H:%M:%S"
),
"status": job_execution.status.name,
},
}
},
200,
)
with self._catalog.managed_session:
source = self._catalog.get_source_by_id(args["source_id"])
logging.debug("Parsing query for source {}".format(source))
chosen_visitor = analyze_dml_query(self._catalog, parsed, source)
job_execution = extract_lineage(
catalog=self._catalog,
visited_query=chosen_visitor,
source=source,
parsed=parsed,
start_time=datetime.datetime.fromisoformat(args["start_time"]),
end_time=datetime.datetime.fromisoformat(args["end_time"]),
)

return (
{
"data": {
"id": job_execution.id,
"type": "job_executions",
"attributes": {
"job_id": job_execution.job_id,
"started_at": job_execution.started_at.strftime(
"%Y-%m-%d %H:%M:%S"
),
"ended_at": job_execution.ended_at.strftime(
"%Y-%m-%d %H:%M:%S"
),
"status": job_execution.status.name,
},
}
},
200,
)
except TableNotFound as table_error:
raise TableNotFoundHTTP(description=str(table_error))
except ColumnNotFound as column_error:
raise ColumnNotFoundHTTP(description=str(column_error))
except SemanticError as semantic_error:
raise SemanticErrorHTTP(description=str(semantic_error))
finally:
self._catalog.scoped_session.remove()


class Server(gunicorn.app.base.BaseApplication):
Expand Down Expand Up @@ -289,6 +286,8 @@ def create_server(
pool_pre_ping=True
)

init_db(catalog)

restful_catalog = Catalog(
**catalog_options,
connect_args={"application_name": "data-lineage:restful"},
Expand All @@ -300,7 +299,7 @@ def create_server(
# Create CRUD APIs
methods = ["DELETE", "GET", "PATCH", "POST"]
url_prefix = "/api/v1/catalog"
api_manager = flask_restless.APIManager(app, catalog.scoped_session)
api_manager = flask_restless.APIManager(app, catalog.get_scoped_session())
api_manager.create_api(
CatSource,
methods=methods,
Expand Down
53 changes: 42 additions & 11 deletions example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -62,9 +62,20 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 2,
"metadata": {},
"outputs": [],
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Setup a connection to catalog using the SDK.\n",
"from data_lineage import Catalog\n",
Expand All @@ -82,7 +93,7 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -94,19 +105,39 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 5,
"metadata": {
"scrolled": true
},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'name': 'LOAD page_lookup_nonredirect', 'query': 'INSERT INTO page_lookup_nonredirect SELECT page.page_id as redircet_id, page.page_title as redirect_title, page.page_title true_title, page.page_id, page.page_latest FROM page LEFT OUTER JOIN redirect ON page.page_id = redirect.rd_from WHERE redirect.rd_from IS NULL '}\n"
]
},
{
"ename": "TypeError",
"evalue": "analyze() missing 2 required positional arguments: 'start_time' and 'end_time'",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mTypeError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m/tmp/ipykernel_2259588/1883341295.py\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m 5\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mquery\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mqueries\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 6\u001b[0m \u001b[0mprint\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mquery\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 7\u001b[0;31m \u001b[0manalyze\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0manalyze\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m**\u001b[0m\u001b[0mquery\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0msource\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0msource\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;31mTypeError\u001b[0m: analyze() missing 2 required positional arguments: 'start_time' and 'end_time'"
]
}
],
"source": [
"from data_lineage import Parser\n",
"from datetime import datetime\n",
"from data_lineage import Analyze\n",
"\n",
"parser = Parser(docker_address)\n",
"analyze = Analyze(docker_address)\n",
"\n",
"for query in queries:\n",
" print(query)\n",
" parser.parse(**query, source=source)"
" analyze.analyze(**query, source=source, start_time=datetime.now(), end_time=datetime.now())"
]
},
{
Expand All @@ -128,7 +159,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
Expand All @@ -142,7 +173,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.5"
"version": "3.8.10"
}
},
"nbformat": 4,
Expand Down
Loading

0 comments on commit fa63423

Please sign in to comment.