-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdb_connection.py
53 lines (42 loc) · 1.53 KB
/
db_connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import pymongo
import datetime
import os
class DbConnection(object):
QUEUE = 'queue'
USED = 'used'
DB_NAME = 'psuRegisAlert'
def __init__(self):
MONGO_URL = os.getenv('MONGO_URL', 'localhost')
dbHost = "mongodb://{0}:27017".format(MONGO_URL)
connection = pymongo.MongoClient(dbHost)
self.con = connection
self.db = connection[self.DB_NAME]
def _query_all(self, collection):
dbCursor = self.db[collection].find()
if dbCursor and dbCursor.count() > 0:
return dbCursor
def _insert_item(self, item, collection):
# self.db[collection].insert(item)
# use upsert to prevent duplicate documents
key = {key: value for key, value in item.items() if key is not "date"}
self.db[collection].update(key, item, upsert=True)
def query_queue_all(self):
return self._query_all(self.QUEUE)
def query_used_all(self):
return self._query_all(self.USED)
def remove(self, item):
self.db[self.QUEUE].remove(item)
_item = item.copy()
_item['achived_date'] = datetime.datetime.utcnow()
self._insert_item(_item, self.USED)
def insert_item(self, url, subject_code, email, line_id, sec='*'):
item = {
'url': url,
'subject_code': subject_code,
'email': email,
'sec': sec,
'line_id': line_id,
'date': datetime.datetime.utcnow()}
self._insert_item(item, self.QUEUE)
def close(self):
self.con.close()