-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhelpers.py
121 lines (87 loc) · 2.86 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import os
import logging
import datetime
import yaml
import pytz
import tweepy
import requests
try:
CREDENTIALS = {
'raindrop': {
'client_id': os.environ['RAINDROP_CLIENT_ID'],
'client_secret': os.environ['RAINDROP_CLIENT_SECRET'],
'token': os.environ['RAINDROP_TOKEN'],
},
'twitter': {
'consumer_key': os.environ['TWITTER_KEY'],
'consumer_secret': os.environ['TWITTER_SECRET'],
'access_token': {
'key': os.environ['TWITTER_ACCESS_TOKEN'],
'secret': os.environ['TWITTER_ACCESS_SECRET'],
},
},
}
logging.info('Loaded credentials from environment variables')
except Exception as error:
logging.error('Exception:', exc_info=True)
raise
def load_config():
'''
'''
logging.info('Loading config ...')
url = 'https://raw.githubusercontent.com/nwspk/lcpt_twitter_bot/main/config.yml'
response = requests.get(url)
try:
if response.status_code != 200:
raise Exception('Unable to download')
config = yaml.safe_load(response.text)
except:
with open('config.yml', 'r') as ifile:
config = yaml.load(ifile, Loader=yaml.FullLoader)
config['running'] = True
return config
def remove_url_get_params(url):
'''
clean up link
TODO maybe even more sense to do a .find() and index
'''
clean_url = url.split('?')[0]
return clean_url
def tweet_image(api, url, message):
'''
'''
fp = '/tmp/temp.jpg'
response = requests.get(url, stream=True)
if response.status_code != 200:
raise Exception('Unable to download image')
with open(fp, 'wb') as image:
for chunk in response:
image.write(chunk)
api.update_with_media(fp, status=message)
os.remove(fp)
return
def fetch_interval_since_last_tweet():
'''
'''
authhandler_creds = {
'consumer_key': CREDENTIALS['twitter']['consumer_key'],
'consumer_secret': CREDENTIALS['twitter']['consumer_secret'],
}
access_token_creds = {
'key': CREDENTIALS['twitter']['access_token']['key'],
'secret': CREDENTIALS['twitter']['access_token']['secret'],
}
twitter_auth = tweepy.OAuthHandler(**authhandler_creds)
twitter_auth.set_access_token(**access_token_creds)
twitter_api = tweepy.API(twitter_auth,
wait_on_rate_limit=True,
wait_on_rate_limit_notify=True,
)
twitter_account_id = twitter_api.me().id
latest_tweets = twitter_api.user_timeline(user_id=twitter_account_id, count=1)
latest_tweet = latest_tweets[0]
time_latest = latest_tweet.created_at
last_tweeted_at = pytz.utc.localize(time_latest)
now = datetime.datetime.now(tz=datetime.timezone.utc)
interval = now - last_tweeted_at
return interval