-
Notifications
You must be signed in to change notification settings - Fork 2
/
lkb_manager.py
198 lines (124 loc) · 4.8 KB
/
lkb_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import pymongo
from bson.objectid import ObjectId
import configparser
import pandas as pd
import re
config = configparser.ConfigParser()
config.read('config.ini')
export_file = config.get('AGENT', 'FILE_EXPORT_LKB_NAME')
gnd_terms = config.getboolean('AGENT', 'FILE_EXPORT_GND_TERMS')
class ManageLKB(object):
def __init__(self, host, user, password):
self.host = host
self.user = user
self.password = password
uri = f'mongodb://{self.user}:{self.password}@{self.host}/ad-caspar'
self.client = pymongo.MongoClient(uri)
self.reason_keys = []
self.confidence = 0.0
def set_confidence(self, confidence):
if self.confidence == 0.0:
self.confidence = confidence
def get_confidence(self):
return self.confidence
def reset_confidence(self):
self.confidence = 0.0
def add_reason_keys(self, keys):
if len(self.reason_keys) == 0:
self.reason_keys.extend(keys)
def reset_last_keys(self):
self.reason_keys = []
def get_last_keys(self):
return self.reason_keys
def insert_clause_db(self, cls, sentence):
db = self.client["ad-caspar"]
clauses = db["clauses"]
features = self.extract_features(cls)
print("\nfeatures:", features)
try:
clause = {
"value": cls,
"features": features,
"sentence": sentence
}
sentence_id = clauses.insert_one(clause).inserted_id
print("sentence_id: " + str(sentence_id))
except pymongo.errors.DuplicateKeyError:
print("\nClause already present in Lower KB!")
def extract_features(self, sent):
chunks = sent.split(" ")
def_chinks = []
for chu in chunks:
chinks = chu.split("(")
for chi in chinks:
if ')' not in chi and chi not in def_chinks and chi != '' and chi != "==>" and ',' not in chi:
def_chinks.append(chi)
return def_chinks
# Funzione per rimuovere le sottostringhe del tipo (x1), (x2), ..., (xn)
def remove_substrings(self, text):
return re.sub(r'\(\w+\)', '', text)
def export_LKB(self):
db = self.client["ad-caspar"]
clauses = db["clauses"]
# Estrai i dati da MongoDB
cursor = clauses.find()
data = list(cursor)
# Crea un DataFrame pandas dai dati estratti
df = pd.DataFrame(data, columns=["value", "sentence"])
if gnd_terms:
# Applica la funzione alla colonna 'value'
df['value'] = df['value'].apply(self.remove_substrings)
# Salva il DataFrame in un file Excel
df.to_excel(export_file, index=False)
return clauses.count_documents({})
def show_LKB(self):
db = self.client["ad-caspar"]
clauses = db["clauses"]
myclauses = clauses.find()
for cls in myclauses:
print("\n")
print(cls['value'])
print(cls['features'])
print(cls['sentence'])
return clauses.count_documents({})
def clear_lkb(self):
db = self.client["ad-caspar"]
clauses = db["clauses"]
x = clauses.delete_many({})
return x.deleted_count
def aggregate_clauses(self, cls, aggregated_clauses, min_confidence):
db = self.client["ad-caspar"]
features = self.extract_features(cls)
feat_num = len(features)
#print("\nfeatures: ", features)
aggr = db.clauses.aggregate([
{"$project": {
"value": 1, "_id": 1,
"intersection": {"$size": {"$setIntersection": ["$features", features]}}
}},
{"$group": {"_id": "$intersection", "group1": {"$push": "$value"}, "group2": {"$push": "$_id"}}},
{"$sort": {"_id": -1}},
{"$limit": 2}
])
for a in aggr:
occurrencies = a['_id']
confidence = int(occurrencies) / int(feat_num)
clauses = a['group1']
self.set_confidence(confidence)
for c in clauses:
if c not in aggregated_clauses and confidence >= min_confidence:
aggregated_clauses.append(c)
self.add_reason_keys(a['group2'])
print("\naggregated: ", c)
print("confidence: ", confidence)
self.aggregate_clauses(c, aggregated_clauses, min_confidence)
return aggregated_clauses
def get_sentence_from_db(self, id):
db = self.client["ad-caspar"]
clauses = db["clauses"]
sentence = ""
query = {'_id': ObjectId(str(id))}
mydoc = clauses.find(query)
for t in mydoc:
sentence = t['sentence']
return sentence