-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathGet-DeepRacerWaypointsList.py
89 lines (79 loc) · 3.17 KB
/
Get-DeepRacerWaypointsList.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import boto3
import datetime
import re
import matplotlib.path as mpath
import argparse
# args
parser = argparse.ArgumentParser(description='Generate list of waypoints for a Deepracer circuit')
parser.add_argument('--profile', required=True, help='aws credentials profile')
parser.add_argument('--logstreamname', required=True, help='e.g. sim-nd2x8c3ph1d3/2019-06-06T14-39-31.940Z_3f5ddea9-6555-44d0-b1c7-ceb4b054f884/SimulationApplicationLogs')
args = parser.parse_args()
# constants
starttimeepoch = int(datetime.datetime(2018, 1, 30, 18, 0, 0).timestamp()) * 1000
endtimeepoch = int(datetime.datetime(2030, 1, 30, 18, 0, 0).timestamp()) * 1000
loggroupname = '/aws/robomaker/SimulationJobs'
logstreamname = args.logstreamname
profile=args.profile
region='us-east-1'
session = boto3.Session(profile_name=profile,region_name=region)
# Any clients created from this session will use credentials
# from the [dev] section of ~/.aws/credentials.
logs_client = session.client('logs')
def get_string_path_data(loggroupname,logstreamname,filterPattern):
nextToken = ''
count = 0
events = []
while nextToken is not None:
if nextToken == '':
response = logs_client.filter_log_events(
logGroupName=loggroupname,
logStreamNames=[logstreamname],
startTime=starttimeepoch,
endTime=endtimeepoch,
filterPattern=filterPattern,
limit=5000
)
else:
response = logs_client.filter_log_events(
logGroupName=loggroupname,
logStreamNames=[logstreamname],
startTime=starttimeepoch,
endTime=endtimeepoch,
filterPattern=filterPattern,
nextToken=nextToken,
limit=5000
)
events += response['events']
count += len(response['events'])
print (count)
## while
if 'nextToken' not in response.keys():
print ('End')
break
else:
nextToken=response['nextToken']
#break # unhash out for testing
#print (nextToken)
#regex = re.compile()
print (len(events))
coords = []
print('Parsing Data...')
for event in events:
commasplit = event['message'].split(',')
waypoint=int(commasplit[0].split(':')[1].strip())
x=float(commasplit[1].split(':')[1].strip())
y=float(commasplit[2].split(':')[1].strip())
coord = {'waypoint': waypoint, 'x': x, 'y': y}
coords.append(coord)
#print("X: {}, Y: {}".format(x,y))
#print(len(coords))
uniquewaypoints = list({v['waypoint']:v for v in coords}.values()) # get unique items in list of dicts
print("Unique Waypoints:{}".format(len(uniquewaypoints)))
uniquewaypoints = sorted(uniquewaypoints, key = lambda i: i['waypoint'])
string_path_data = []
firstwaypoint=True
for waypoint in uniquewaypoints:
print("{},{},{}".format(waypoint['waypoint'],waypoint['x'],waypoint['y']))
return "string_path_data"
collected_string_path_data = get_string_path_data(loggroupname,logstreamname,'Waypoint0')
#print(collected_string_path_data)