-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathUtil.py
124 lines (111 loc) · 4.11 KB
/
Util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import re
import yaml
import sys
from googletrans import Translator
from PIL import Image
import pytesseract
import math
class Util:
config = None
def __init__(self):
pass
def get_config(self):
if self.config:
return self.config
else:
self.load_config()
return self.config
def load_config(self):
with open("config.yml", "r") as stream:
try:
self.config = yaml.safe_load(stream)
except yaml.YAMLError as exc:
print("[-] Error while loading config.yml")
sys.exit(1)
@staticmethod
def recover_url(msg, urls):
msg_out = msg
for url in urls:
msg_out = msg_out.replace("{{" + str(urls.index(url)) + "}}", url)
return msg_out
@staticmethod
def extract_message_url(msg):
match = re.findall(
r"((?:[*]{2})?\[(?:[*]{2})?[\w\s\n\u00a9\u00ae\u2000-\u3300\ud83c\ud000-\udfff\ud83d\ud000-\udfff\ud83e\ud000-\udfffа-яА-Я]+(?:[*]{2})?\]\((?:[*]{2})?https?://[\S\n]+(?:[*]{2})?\)(?:[*]{2})?)",
msg,
re.IGNORECASE | re.MULTILINE,
)
urls = []
count = 0
for m in match:
msg = msg.replace(m, "{{" + str(count) + "}}")
current_match = m.replace("\n","").replace("\r","").replace("\t","").replace("**","")
if not current_match + ")" in urls:
urls.append(current_match)
count += 1
return urls, msg
@staticmethod
def clean_message(msg):
return msg.replace("] (", "](").replace(") ,", "),")
# split message in array of 2000 chars string length
@staticmethod
def split_message(msg):
index = 0
res = {"len": 0, "messages": []}
if msg is not None and len(msg) > 1800:
res = {"len": math.ceil(len(msg) / 1800), "messages": []}
for i in range(0, len(msg), 1800):
index = index + 1
res["messages"].append({"msg": msg[i : i + 1800], "piece": index})
res["messages"].sort(key=lambda x: x["piece"])
elif msg is not None and len(msg) > 0:
res = {"len": 1, "messages": [{"msg": msg, "piece": 1}]}
return res
@staticmethod
def detect_image_lang(img_path):
try:
osd = pytesseract.image_to_osd(img_path)
script = re.search("Script: ([a-zA-Z]+)\n", osd).group(1)
conf = re.search("Script confidence: (\d+\.?(\d+)?)", osd).group(1)
return script, float(conf)
except Exception as e:
return None, 0.0
@staticmethod
def process_message(mess, dest_lang):
out = {"tmessage": "", "confidence": 0, "olanguage": ""}
if mess is not None:
mess_txt = '"' + mess + '"'
else:
mess_txt = "None"
if mess_txt != "None":
translator = Translator()
detection = translator.detect(mess_txt)
translation_confidence = detection.confidence
translation = translator.translate(mess_txt, dest=dest_lang)
original_language = translation.src
translated_text = translation.text.strip("\"")
out = {
"tmessage": translated_text,
"confidence": translation_confidence,
"olanguage": original_language,
"omessage": mess_txt,
}
return out
@staticmethod
def process_ocr_image(image_path, lang=None):
_lang = ""
image_string = ""
try:
if lang:
_lang = lang
image_string = pytesseract.image_to_string(
image_path, lang=_lang
) # Timeout after 2 seconds
else:
image_string = pytesseract.image_to_string(image_path)
except RuntimeError as timeout_error:
# Tesseract processing is terminated
print("[-] Timeout error: " + str(timeout_error), "SETTINGS: ", _lang)
pass
#print("[-] OCR result: " + image_string, "SETTINGS: ", lang)
return image_string