forked from jobaTheBuilder/covid19de_monitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
COVIDUpdate.py
107 lines (92 loc) · 4.12 KB
/
COVIDUpdate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import requests
import json
import argparse
class COVIDResultData:
def __init__(self, data, dates):
self.data = data
self.dates = dates
self.str = None
def __str__(self):
if not self.str:
self.str = 'RKI ' + ', '.join(self.dates) + ': ' + ', '.join(key + ': ' + str(self.data[key]) for key in sorted(self.data))
return self.str
class COVIDUpdate:
DEFAULT_REST_URI = 'https://services7.arcgis.com/mOBPykOjAyBO2ZKk/arcgis/rest/services/RKI_Landkreisdaten/FeatureServer/0/query?where=1%3D1&outFields=OBJECTID,GEN,BEZ,last_update,cases7_per_100k&outSR=4326&f=json'
def __init__(self, rest_url=DEFAULT_REST_URI):
self.rest_url = rest_url
def check(self, area_list):
data = {}
dates = set()
response = requests.get(self.rest_url, verify=True)
if response.ok:
response_data = json.loads(response.content)
features = response_data['features']
for feature in features:
if self.in_area(feature, area_list):
dates.add(self.get(feature, 'last_update'))
gf = str(self.get(feature, 'GEN'))
bf = str(self.get(feature, 'BEZ'))
c7p100 = round(self.get(feature, 'cases7_per_100k'), 1)
data['{0} ({1})'.format(gf, bf)] = c7p100
return COVIDResultData(data, dates)
def find_areas(self, filter_string=None):
area_list = []
response = requests.get(self.rest_url, verify=True)
if response.ok:
response_data = json.loads(response.content)
if 'error' in response_data:
error_code = response_data['error']['code']
error_message = response_data['error']['message']
area_list.append({'GEN': error_code, 'BEZ': error_message})
else:
features = response_data['features']
for feature in features:
gf = self.get(feature, 'GEN')
bf = self.get(feature, 'BEZ')
if not filter_string:
area_list.append({'GEN': gf, 'BEZ': bf})
elif filter_string in gf or filter_string in bf:
area_list.append({'GEN': gf, 'BEZ': bf})
return area_list
def list_areas(self):
return self.find_areas()
def in_area(self, feature, areas):
f_gen = self.get(feature, 'GEN')
f_bez = self.get(feature, 'BEZ')
for area in areas:
gen = area['GEN']
bez = area['BEZ']
if gen == f_gen and bez == f_bez:
return area
return None
def get(self, feature, attribute_name):
return feature['attributes'][attribute_name]
def print_me(self, json_strct):
if json_strct:
print(json.dumps(json_strct, indent=4))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-l", "--list", help="Lists the available areas of interest as config JSON.", action="store_true")
parser.add_argument("-f", "--find", help="Find & filters the available areas of interest according to the given string (case sensitive!).")
parser.add_argument("-a", "--areas", help="Receives JSON file with defined areas of interest.")
parser.add_argument("-i", "--incidence", help="Find all areas with names including the given string and return the 100k-7 incidence.")
args = parser.parse_args()
cu = COVIDUpdate()
if args.list:
cu.print_me(cu.list_areas())
elif args.find:
cu.print_me(cu.find_areas(args.find))
elif args.areas:
with open(args.areas) as json_file:
example_area = json.load(json_file)
result = cu.check(example_area)
print(result)
elif args.incidence:
areas = cu.find_areas(args.incidence)
result = cu.check(areas)
print(result)
else:
print("Please use help to see your options (--help).\nHere is an example...")
example_area = [{'GEN': 'Würzburg', 'BEZ': 'Kreisfreie Stadt'}]
result = cu.check(example_area)
print(str(result))