-
Notifications
You must be signed in to change notification settings - Fork 1
/
example-0-plus-requests-reuse-conn.py
147 lines (113 loc) · 4.15 KB
/
example-0-plus-requests-reuse-conn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import contextlib
from functools import wraps, partial
import requests
import socket
import time
from requests.exceptions import RequestException
from yarl import URL
# ----------------------------- Sending stats to Telegraf -----------------------------------
STATS_UDP_ADDR = ('localhost', 8094)
def prepare_str_for_telegraf(value):
if not isinstance(value, str):
return value
# there could be issues with some special chars in metric name, value or tags
# replace ':', '_', '|' with '-'
# https://github.com/influxdata/telegraf/issues/3508
return str(value).replace(':', '-').replace('_', '-').replace('|', '-')
def format_measurement_influxline(metric_name, metric_value, tags):
tags_str = ''
if tags:
tags = {
prepare_str_for_telegraf(k): prepare_str_for_telegraf(v) for k, v in tags.items()
}
tags_strs = [
f'{tag_key}={tag_value}' for tag_key, tag_value in tags.items()
]
tags_str = (',' + ','.join(tags_strs))
metric_value = prepare_str_for_telegraf(metric_value)
metric_name = prepare_str_for_telegraf(metric_name)
return f'{metric_name}{tags_str} value={metric_value}\n'
def send_stats(metric_name, metric_value, tags=None):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(
format_measurement_influxline(
metric_name,
metric_value,
tags
).encode(),
STATS_UDP_ADDR
)
print(f'Reported stats: {metric_name}={metric_value}, tags={tags}')
sock.close()
except socket.error as e:
print(f'Got error: {e}')
# ------------------ Profiling context manager and decorator ------------------------
@contextlib.contextmanager
def profiler(metric_name, **tags):
start = time.perf_counter()
yield
end = time.perf_counter()
elapsed_time = int(round((end - start) * 1000))
send_stats(metric_name, elapsed_time, tags)
def profile(f=None, metric_name=None):
"""
This profile decorator works for sync functions,
and for class methods. Default metric name will be name of
profiled function plus '_exec_time'.
Usage:
@profile(metric_name='my_exec_time')
def something_that_takes_time(...):
...
"""
def actual_decorator(f):
nonlocal metric_name
if not metric_name:
metric_name = f'{f.__name__}_exec_time'
@wraps(f)
def decorated_function(*args, **kwargs):
with profiler(metric_name):
return f(*args, **kwargs)
return decorated_function
return actual_decorator(f) if f else actual_decorator
# ------------------ Requests send requests ------------------------
def profile_request(start_time, response, *args, **kwargs):
elapsed_time = round((
time.perf_counter() - start_time
) * 1000)
send_stats(
'requests_request_exec_time',
elapsed_time,
{'domain': URL(response.url).raw_host}
)
def get_response_text(url, session):
try:
response = session.get(
url,
hooks={'response': partial(profile_request, time.perf_counter())}
)
response.raise_for_status()
return response.content.decode()
except RequestException as e:
send_stats(
'requests_request_exception',
1,
{'domain': URL(url).raw_host, 'exception_class': e.__class__.__name__}
)
return f'Exception occured: {e}'
@profile
def call_python_and_mozilla_using_requests(session):
# set domain to python1 to see errors
py_response = get_response_text('https://www.python.org/', session)
moz_response = get_response_text('https://www.mozilla.org/en-US/', session)
return (
f'Py response piece: {py_response[:60].strip()}... ,\n'
f'Moz response piece: {moz_response[:60].strip()}...'
)
# ----------------------------- main -----------------------------------
if __name__ == '__main__':
session = requests.Session()
while True:
result = call_python_and_mozilla_using_requests(session)
print(result)
time.sleep(3)