-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathdb.py
77 lines (60 loc) · 2.2 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import config
from sqlalchemy import create_engine
from sqlalchemy import Table, Column, Integer, String, DateTime, Enum
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.types import TypeDecorator, CHAR
from sqlalchemy.dialects.postgresql import UUID
import uuid
Base = declarative_base()
# Backend agnostic GUID type, from the SQLalchemy docs
class GUID(TypeDecorator):
"""Platform-independent GUID type.
Uses Postgresql's UUID type, otherwise uses
CHAR(32), storing as stringified hex values.
"""
impl = CHAR
def load_dialect_impl(self, dialect):
if dialect.name == 'postgresql':
return dialect.type_descriptor(UUID())
else:
return dialect.type_descriptor(CHAR(32))
def process_bind_param(self, value, dialect):
if value is None:
return value
elif dialect.name == 'postgresql':
return str(value)
else:
if not isinstance(value, uuid.UUID):
return "%.32x" % uuid.UUID(value)
else:
# hexstring
return "%.32x" % value
def process_result_value(self, value, dialect):
if value is None:
return value
else:
return uuid.UUID(value)
class Work(Base):
__tablename__ = 'works'
id = Column(Integer, primary_key=True)
handler = Column(String)
# Never process an URL more than once
url = Column(String, unique=True)
# loaded -> queued -> processing -> done/error
status = Column(Enum('loaded', 'queued', 'processing', 'done', 'error', name='status'))
# Current processing task
task_id = Column(GUID)
process_start = Column(DateTime)
apidata = Column(String)
hash = Column(String)
updated = Column(DateTime)
def __init__(self, handler, url):
self.handler = handler
self.url = url
self.status = 'loaded'
def open_session():
engine = create_engine(config.SQLALCHEMY_URL, echo=False)
Base.metadata.create_all(bind=engine)
session = scoped_session(sessionmaker(autocommit=False, autoflush=False, expire_on_commit=False, bind=engine))
return session