-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpromoapps.py
113 lines (103 loc) · 3.01 KB
/
promoapps.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import telebot
import time
import urllib
import requests
import sqlite3
import sys
import feedparser
from telebot import types
from bs4 import BeautifulSoup
from atproto import Client, client_utils
import shutil
import xml.etree.ElementTree as ET
import os
DESTINATION = os.environ['DESTINATION']
TOKEN = os.environ['BOT_TOKEN']
FEED_URL = os.environ['FEED_URL']
bot = telebot.TeleBot(TOKEN)
lastUpdates = 'history'
user_agent = {'User-agent': 'Mozilla/5.1'}
def get_post_photo(url):
response = requests.get(
url,
headers = {'User-agent': 'Mozilla/5.1'},
timeout=3,
verify=False
)
html = BeautifulSoup(response.content, 'html.parser')
photo = html.find('meta', {'property': 'og:image'})['content']
return photo
def bluesky_post(link, title):
title = title.replace('[', '\n')
title = title.replace(']', '')
client = Client(base_url='https://bsky.social')
client.login('promoapps.grf.xyz', os.environ.get('BLUESKY_PASSWORD'))
text_builder = client_utils.TextBuilder()
text_builder.link(
title,
link
)
try:
photo = get_post_photo(link)
file_name = photo.split("/")[-1]
with requests.get(photo, stream=True) as r:
with open(photo.split("/")[-1], 'wb') as f:
shutil.copyfileobj(r.raw, f)
with open(file_name, 'rb') as f:
image_data = f.read()
client.send_image(
text=text_builder,
image=image_data,
image_alt=title,
)
os.remove(file_name)
except:
client.send_post(
text_builder
)
def send_message(url, title):
title = title.replace('[', '\n')
title = title.replace(']', '')
message = (f'<b>{title}</b>\n{url}')
try:
bot.send_message(f'@{DESTINATION}', message, parse_mode='HTML')
except:
pass
def get_site():
response = requests.get(f'https://t.me/s/{DESTINATION}')
if response.status_code != 200:
return False
return BeautifulSoup(response.content, 'html.parser')
def add_to_history(link):
conn = sqlite3.connect('links_history.db')
cursor = conn.cursor()
aux = f'INSERT INTO history (link) VALUES ("{link}")'
cursor.execute(aux)
conn.commit()
conn.close()
def checkUpdates(link):
conn = sqlite3.connect('links_history.db')
cursor = conn.cursor()
aux = f'SELECT * from history WHERE link="{link}"'
cursor.execute(aux)
data = cursor.fetchone()
conn.close()
return data
if __name__ == "__main__":
feed = feedparser.parse(FEED_URL)
for item in feed['items'][:5]:
for url in item['summary'].split('"'):
if '://apps.apple.com/' in url:
link = url
title = item['title']
try:
print(link)
except:
continue
if not checkUpdates(link):
try:
send_message(link, title)
bluesky_post(link, title)
except:
pass
add_to_history(link)