-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathplanet_diffs.py
106 lines (80 loc) · 3.97 KB
/
planet_diffs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import gzip
import re
from collections.abc import Sequence
from datetime import UTC, datetime
from itertools import chain
import xmltodict
from anyio import create_task_group, fail_after
from sentry_sdk import start_span, trace
from config import AED_REBUILD_THRESHOLD, PLANET_DIFF_TIMEOUT, PLANET_REPLICA_URL
from utils import HTTP, retry_exponential
from xmltodict_postprocessor import xmltodict_postprocessor
_action_open_re = re.compile(r'<(create|modify|delete)>')
_action_close_re = re.compile(r'</(create|modify|delete)>')
@trace
async def get_planet_diffs(last_update: float) -> tuple[Sequence[dict], float]:
with fail_after(PLANET_DIFF_TIMEOUT.total_seconds()):
sequence_numbers = []
sequence_timestamps = []
while True:
next_sequence_number = sequence_numbers[-1] - 1 if sequence_numbers else None
sequence_number, sequence_timestamp = await _get_state(next_sequence_number)
if sequence_timestamp <= last_update:
break
sequence_numbers.append(sequence_number)
sequence_timestamps.append(sequence_timestamp)
if not sequence_numbers:
return (), last_update
result: list[tuple[int, list[dict]]] = []
with start_span(description=f'Processing {len(sequence_numbers)} planet diffs'):
@retry_exponential(AED_REBUILD_THRESHOLD)
async def _get_planet_diff(sequence_number: int) -> None:
path = f'{_format_sequence_number(sequence_number)}.osc.gz'
r = await HTTP.get(f'{PLANET_REPLICA_URL}{path}')
r.raise_for_status()
xml = gzip.decompress(r.content).decode()
xml = _format_actions(xml)
actions: list[dict] = xmltodict.parse(
xml,
postprocessor=xmltodict_postprocessor,
force_list=('action', 'node', 'way', 'relation', 'member', 'tag', 'nd'),
)['osmChange']['action']
node_actions: list[dict] = []
for action in actions:
# ignore everything that is not a node
if 'node' in action:
action.pop('way', None)
action.pop('relation', None)
node_actions.append(action)
result.append((sequence_number, node_actions))
async with create_task_group() as tg:
for sequence_number in sequence_numbers:
tg.start_soon(_get_planet_diff, sequence_number)
# sort by sequence number in ascending order
result.sort(key=lambda x: x[0])
data = tuple(chain.from_iterable(data for _, data in result))
data_timestamp = sequence_timestamps[0]
return data, data_timestamp
@retry_exponential(AED_REBUILD_THRESHOLD)
@trace
async def _get_state(sequence_number: int | None) -> tuple[int, float]:
path = 'state.txt' if sequence_number is None else f'{_format_sequence_number(sequence_number)}.state.txt'
r = await HTTP.get(f'{PLANET_REPLICA_URL}{path}')
r.raise_for_status()
text = r.text.replace('\\:', ':')
sequence_number = int(re.search(r'sequenceNumber=(\d+)', text).group(1)) # pyright: ignore [reportOptionalMemberAccess]
sequence_date_str = re.search(r'timestamp=(\S+)', text).group(1) # pyright: ignore [reportOptionalMemberAccess]
sequence_date = datetime.strptime(sequence_date_str, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=UTC)
sequence_timestamp = sequence_date.timestamp()
return sequence_number, sequence_timestamp
def _format_sequence_number(sequence_number: int) -> str:
result = f'{sequence_number:09d}'
result = '/'.join(result[i : i + 3] for i in range(0, 9, 3))
return result
def _format_actions(xml: str) -> str:
# <create> -> <action type="create">
# </create> -> </action>
# etc.
xml = _action_open_re.sub(r'<action type="\1">', xml)
xml = _action_close_re.sub(r'</action>', xml)
return xml