-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgenerator.py
executable file
·208 lines (150 loc) · 7.79 KB
/
generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
#!/usr/bin/env python
import sys
import json
import datetime
from datetime import timedelta
from elasticsearch import Elasticsearch, helpers
MAX_BULKSIZE = 100000
ES_HOST="localhost"
ES_PORT="9200"
ES_AUTH="elastic:changeme"
ES_SSL=False
def buildEvent(timestamp = None):
event = {}
#if we don't have a desired timestamp passed in for the event, use the current time in UTC
if timestamp == None:
event['timestamp'] = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
else:
event['timestamp'] = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
#TODO make some of these random inputs from seed lists
#add these 2 for bulk API goodness
event['_index'] = 'smoke_event'
event['_type'] = 'smoke_event'
event['request'] = '/index.html'
event['response'] = '200'
event['agent'] = 'Firefox'
event['remote_ip'] = '1.1.1.1'
event['remote_user'] = ''
event['bytes'] = '1234'
event['referrer'] = 'http://example.com'
json_event = json.dumps(event)
return json_event
def buildEventSeries(daysBack = 7, bulkSize = 1000):
print "creating time series for previous %s days" % daysBack
CURRENT_TIME = datetime.datetime.utcnow()
print "End Time, aka CURRENT_TIME: %s %s" % (CURRENT_TIME, CURRENT_TIME.strftime('%Y-%m-%dT%H:%M:%SZ'))
STEP_TIME = CURRENT_TIME - timedelta(days=daysBack)
print "Start Time, aka STEP_TIME: %s %s" % (STEP_TIME, STEP_TIME.strftime('%Y-%m-%dT%H:%M:%SZ'))
### Set starting time (e.g. 7 days ago)
STEP_TIME = CURRENT_TIME - timedelta(days=daysBack)
# connection to elasticsearch
if ES_SSL == True:
es = Elasticsearch(host=ES_HOST,port=ES_PORT,http_auth=ES_AUTH,use_ssl=True,verify_certs=True)
else:
es = Elasticsearch(host=ES_HOST,port=ES_PORT,http_auth=ES_AUTH)
# create index, even if it exists already
es.indices.create(index='smoke_event', ignore=400)
print "elasticsearch bulk size is %i" % bulkSize
while True:
#batch bulkSize events, then bulk index into ES, then resume the loop
i = 0
bulk_list = []
while i < bulkSize:
i = i + 1
# break when we reach the current time again
if STEP_TIME == CURRENT_TIME:
print('STEP_TIME : %s' % STEP_TIME)
print('CURRENT_TIME : %s' % CURRENT_TIME)
print "STEP_TIME == CURRENT_TIME, stopping event generation"
return
# step 5 seconds forward for each event
STEP_TIME = STEP_TIME + timedelta(seconds=5)
# print STEP_TIME.strftime('%Y-%m-%dT%H:%M:%SZ')
json_event = buildEvent(STEP_TIME)
bulk_list.append(json.loads(json_event))
# decide which output to use
# writeEventToLogstashHTTP(json_event)
# writeEventToRedis(json_event)
# writeEventToStdout(json_event)
print "reached %s events, flushing to ElasticSearch" % bulkSize
bulk_iter = iter(bulk_list)
print(helpers.bulk(es,bulk_iter,stats_only=False,raise_on_exception=False))
def buildAnomalyEventSeries(daysBack = 7, anomalyPeriod = 30, anomalyMagnification = 10, bulkSize = 10000):
# TODO refactor this into a callable function, rather than largely repeat the buildEvent function
print "creating anomaly period with %s magnification for %i minutes within the previous %s days" % (anomalyMagnification, anomalyPeriod, daysBack)
# connection to elasticsearch
if ES_SSL == True:
es = Elasticsearch(host=ES_HOST,port=ES_PORT,http_auth=ES_AUTH,use_ssl=True,verify_certs=True)
else:
es = Elasticsearch(host=ES_HOST,port=ES_PORT,http_auth=ES_AUTH)
print "Requested elasticsearch bulk size is %i" % bulkSize
# number of events per flush magnified by the anomalyMagnification
calculatedBulkSize = bulkSize * anomalyMagnification
#print(calculatedBulkSize, bulkSize, anomalyMagnification)
if calculatedBulkSize > MAX_BULKSIZE:
print "calculatedBulkSize = bulkSize * anomalyMagnification | %i = %i * %i" % (calculatedBulkSize, bulkSize, anomalyMagnification)
# reduce the bulkSize proportionally based on the bulkSize and anomalyMagnification
newBulkSize = ((bulkSize / anomalyMagnification) / anomalyMagnification)
print "newBulkSize = ((bulkSize / anomalyMagnification) / anomalyMagnification) | %i = %i * %i" % (newBulkSize, bulkSize, anomalyMagnification)
print "adjusting bulkSize %i down to %i since calculatedBulkSize %i is greater than MAX_BULKSIZE %i" % (bulkSize, newBulkSize, calculatedBulkSize, MAX_BULKSIZE)
bulkSize = newBulkSize
else:
print "going with requested bulkSize of %i" % bulkSize
CURRENT_TIME = datetime.datetime.utcnow()
# print "End Time, aka CURRENT_TIME: %s" % (CURRENT_TIME.strftime('%Y-%m-%dT%H:%M:%SZ')
# e.g. 30 minute anomaly of high traffic somewhere within the relative time series
anomaly_factor = daysBack * .3
ANOMALY_START_TIME = CURRENT_TIME - timedelta(days=anomaly_factor)
ANOMALY_END_TIME = ANOMALY_START_TIME + timedelta(minutes=anomalyPeriod)
print "Anomaly Start Time %s" % ANOMALY_START_TIME.strftime('%Y-%m-%dT%H:%M:%SZ')
print "Anomaly End Time %s" % ANOMALY_END_TIME.strftime('%Y-%m-%dT%H:%M:%SZ')
### Set starting time (e.g. 7 days ago)
STEP_TIME = ANOMALY_START_TIME
while True:
#batch bulkSize events, then bulk index into ES, then resume the loop
# TODO: fix bulkSize to adjust based on anomalyMagnification
# e.g. always bulk 1000 at time, regardless of 10x anomaly (100*10 = 1000)
# bulkSize * anomalyMagnification
i = 0
bulk_list = []
#print(bulkSize * anomalyMagnification)
while i < bulkSize:
i = i + 1
#print i
# break when we reach the current time again
if STEP_TIME == ANOMALY_END_TIME:
print('STEP_TIME : %s' % STEP_TIME)
print('ANOMALY_END_TIME : %s' % ANOMALY_END_TIME)
print "STEP_TIME == ANOMALY_END_TIME, stopping event generation"
# ugly math to get the remaining # ((i * 10) - 10))
print('Flushing remaining %i events to ElasticSearch Bulk API' % len(bulk_list))
bulk_iter = iter(bulk_list)
print(helpers.bulk(es,bulk_iter,stats_only=True,raise_on_exception=False))
return
# step 5 seconds forward for each event
STEP_TIME = STEP_TIME + timedelta(seconds=5)
# print STEP_TIME.strftime('%Y-%m-%dT%H:%M:%SZ')
#generate %x more events than normal during the anomaly period, based on anomalyMagnification
anomaly_i = 0
while anomaly_i < anomalyMagnification:
json_event = buildEvent(STEP_TIME)
anomaly_i = anomaly_i + 1
bulk_list.append(json.loads(json_event))
# decide which output to use
# writeEventToLogstashHTTP(json_event)
# writeEventToRedis(json_event)
# writeEventToStdout(json_event)
print "reached %i events, flushing to ElasticSearch" % (len(bulk_list))
bulk_iter = iter(bulk_list)
print(helpers.bulk(es,bulk_iter,stats_only=True,raise_on_exception=False))
def writeEventToNull(json_event):
print json_event
def main():
bulkSize = 10000 # elasticsearch bulk size
daysBack = 7
anomalyPeriod = 30 # period for anomaly to last, in minutes
anomalyMagnification = 10 # e.g. 10x more than the normal
buildEventSeries(daysBack, bulkSize)
buildAnomalyEventSeries(daysBack, anomalyPeriod, anomalyMagnification, bulkSize)
if __name__ == "__main__":
main()