-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathdata.py
168 lines (144 loc) · 5.97 KB
/
data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import sqlite3
connector = sqlite3.connect('storage.db')
class Data:
def __init__(self):
self.connector = sqlite3.connect('storage.db')
self.modEntry = self.ModEntry(self)
self.taggingEntry = self.TaggingEntry(self)
# Setup 'modLogs' Databases if isn't valid
try:
# modLogs table container
connector.execute('''CREATE TABLE modLogs (
ACTION TEXT NOT NULL,
DURATION INTEGER NULL,
REASON TEXT NOT NULL,
MODERATOR INTEGER NOT NULL,
TARGET INTEGER NOT NULL,
DATE TEXT NOT NULL
);''')
except Exception as error:
pass # Database table is already created
# Setup 'tagging' Database if isn't valid
try:
# Tagging table container
connector.execute('''CREATE TABLE tagging (
CHANNELID BIGINT NOT NULL,
NAME TEXT NOT NULL,
CONTENT TEXT NOT NULL,
DATE TEXT NOT NULL
);''')
except Exception as error:
pass # Database table is already created
class ModEntry:
def __init__(self, data):
self.Data = data
self.connector = data.connector
self.cursor = self.connector.cursor()
def insert(self, action, duration, reason, moderator, target, date):
try:
self.cursor.execute(
'''INSERT INTO modLogs (ACTION, DURATION, REASON, MODERATOR, TARGET, DATE) VALUES (?, ?, ?, ?, ?, ?);''',
(action, duration, reason, moderator, target, date))
self.connector.commit()
except sqlite3.Error as error:
print(f'{error}')
def fetch(self, id):
result = []
index = 0
try:
self.cursor.execute('''SELECT * FROM modLogs''')
except sqlite3.Error as error:
print(f'{error}')
for action, duration, reason, moderator, target, date in self.cursor.fetchall():
index += 1
print(index, action, duration, reason, moderator, target, date)
if int(target) == id:
result.append({
"index": index,
"action": action,
"duration": duration,
"reason": reason,
"moderator": moderator,
"target": target,
"date": date
})
return result
def update(self, id, **kwargs):
duration = kwargs.get("duration", None)
reason = kwargs.get("reason", None)
try:
if duration:
self.cursor.execute('''UPDATE modLogs SET DURATION = (?) WHERE rowid = (?);''', (duration, id))
elif reason:
self.cursor.execute('''UPDATE modLogs SET REASON = (?) WHERE rowid = (?);''', (reason, id))
except sqlite3.Error as error:
print(f'{error}')
return False
return True
class TaggingEntry:
def __init__(self, data):
self.Data = data
self.connector = data.connector
self.cursor = self.connector.cursor()
def insert(self, channelid, name, content, date):
try:
self.cursor.execute('''INSERT INTO tagging (CHANNELID, NAME, CONTENT, DATE) VALUES (?, ?, ?, ?);''',
(channelid, name, content, date))
self.connector.commit()
except sqlite3.Error as error:
print(f'{error}')
def fetch(self, identifier, channelIdentifier):
index = 0
try:
self.cursor.execute('''SELECT * FROM tagging''')
except sqlite3.Error as error:
print(f'{error}')
for channelid, name, content, date in self.cursor.fetchall():
index += 1
if name == identifier and channelid == channelIdentifier:
return {
"index": index,
"channelid": channelid,
"content": content,
"date": date
}
return None
def fetch_all(self, channelIdentifier):
index = 0
result = []
try:
self.cursor.execute('''SELECT * FROM tagging''')
except sqlite3.Error as error:
print(f'{error}')
for channelid, name, content, date in self.cursor.fetchall():
index += 1
if channelid == channelIdentifier:
result.append({
"index": index,
"channelid": channelid,
"name": name,
"content": content,
"date": date
})
return result
def update(self, id, **kwargs):
content = kwargs.get("content", None)
date = kwargs.get("date", None)
try:
if content and date:
self.cursor.execute('''UPDATE tagging SET CONTENT = (?) WHERE rowid = (?);''', (content, id))
self.cursor.execute('''UPDATE tagging SET DATE = (?) WHERE rowid = (?);''', (date, id))
except sqlite3.Error as error:
print(f'{error}')
return False
return True
def check_identifier(self, identifier):
try:
self.cursor.execute('''SELECT * FROM tagging''')
for channelid, name, content, date in self.cursor.fetchall():
if name == identifier:
return True
return False
except sqlite3.Error as error:
print(f'{error}')
return False