Skip to content

Commit 7a887a9

Browse files
author
Richard Pijnenburg
committed
This came from elasticsearch/logstash at cf2242170011fbf2d264ae87660192754697671a
0 parents  commit 7a887a9

File tree

7 files changed

+577
-0
lines changed

7 files changed

+577
-0
lines changed

.gitignore

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
*.gem
2+
Gemfile.lock
3+
.bundle
4+
vendor

Gemfile

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
source 'https://rubygems.org'
2+
gem 'rake'
3+
gem 'gem_publisher'
4+
gem 'archive-tar-minitar'

Rakefile

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
@files=[]
2+
3+
task :default do
4+
system("rake -T")
5+
end
6+

lib/logstash/outputs/s3.rb

+357
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
# encoding: utf-8
2+
require "logstash/outputs/base"
3+
require "logstash/namespace"
4+
require "socket" # for Socket.gethostname
5+
6+
# TODO integrate aws_config in the future
7+
#require "logstash/plugin_mixins/aws_config"
8+
9+
# INFORMATION:
10+
11+
# This plugin was created for store the logstash's events into Amazon Simple Storage Service (Amazon S3).
12+
# For use it you needs authentications and an s3 bucket.
13+
# Be careful to have the permission to write file on S3's bucket and run logstash with super user for establish connection.
14+
15+
# S3 plugin allows you to do something complex, let's explain:)
16+
17+
# S3 outputs create temporary files into "/opt/logstash/S3_temp/". If you want, you can change the path at the start of register method.
18+
# This files have a special name, for example:
19+
20+
# ls.s3.ip-10-228-27-95.2013-04-18T10.00.tag_hello.part0.txt
21+
22+
# ls.s3 : indicate logstash plugin s3
23+
24+
# "ip-10-228-27-95" : indicate you ip machine, if you have more logstash and writing on the same bucket for example.
25+
# "2013-04-18T10.00" : represents the time whenever you specify time_file.
26+
# "tag_hello" : this indicate the event's tag, you can collect events with the same tag.
27+
# "part0" : this means if you indicate size_file then it will generate more parts if you file.size > size_file.
28+
# When a file is full it will pushed on bucket and will be deleted in temporary directory.
29+
# If a file is empty is not pushed, but deleted.
30+
31+
# This plugin have a system to restore the previous temporary files if something crash.
32+
33+
##[Note] :
34+
35+
## If you specify size_file and time_file then it will create file for each tag (if specified), when time_file or
36+
## their size > size_file, it will be triggered then they will be pushed on s3's bucket and will delete from local disk.
37+
38+
## If you don't specify size_file, but time_file then it will create only one file for each tag (if specified).
39+
## When time_file it will be triggered then the files will be pushed on s3's bucket and delete from local disk.
40+
41+
## If you don't specify time_file, but size_file then it will create files for each tag (if specified),
42+
## that will be triggered when their size > size_file, then they will be pushed on s3's bucket and will delete from local disk.
43+
44+
## If you don't specific size_file and time_file you have a curios mode. It will create only one file for each tag (if specified).
45+
## Then the file will be rest on temporary directory and don't will be pushed on bucket until we will restart logstash.
46+
47+
# INFORMATION ABOUT CLASS:
48+
49+
# I tried to comment the class at best i could do.
50+
# I think there are much thing to improve, but if you want some points to develop here a list:
51+
52+
# TODO Integrate aws_config in the future
53+
# TODO Find a method to push them all files when logtstash close the session.
54+
# TODO Integrate @field on the path file
55+
# TODO Permanent connection or on demand? For now on demand, but isn't a good implementation.
56+
# Use a while or a thread to try the connection before break a time_out and signal an error.
57+
# TODO If you have bugs report or helpful advice contact me, but remember that this code is much mine as much as yours,
58+
# try to work on it if you want :)
59+
60+
61+
# USAGE:
62+
63+
# This is an example of logstash config:
64+
65+
# output {
66+
# s3{
67+
# access_key_id => "crazy_key" (required)
68+
# secret_access_key => "monkey_access_key" (required)
69+
# endpoint_region => "eu-west-1" (required)
70+
# bucket => "boss_please_open_your_bucket" (required)
71+
# size_file => 2048 (optional)
72+
# time_file => 5 (optional)
73+
# format => "plain" (optional)
74+
# canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" )
75+
# }
76+
# }
77+
78+
# We analize this:
79+
80+
# access_key_id => "crazy_key"
81+
# Amazon will give you the key for use their service if you buy it or try it. (not very much open source anyway)
82+
83+
# secret_access_key => "monkey_access_key"
84+
# Amazon will give you the secret_access_key for use their service if you buy it or try it . (not very much open source anyway).
85+
86+
# endpoint_region => "eu-west-1"
87+
# When you make a contract with Amazon, you should know where the services you use.
88+
89+
# bucket => "boss_please_open_your_bucket"
90+
# Be careful you have the permission to write on bucket and know the name.
91+
92+
# size_file => 2048
93+
# Means the size, in KB, of files who can store on temporary directory before you will be pushed on bucket.
94+
# Is useful if you have a little server with poor space on disk and you don't want blow up the server with unnecessary temporary log files.
95+
96+
# time_file => 5
97+
# Means, in minutes, the time before the files will be pushed on bucket. Is useful if you want to push the files every specific time.
98+
99+
# format => "plain"
100+
# Means the format of events you want to store in the files
101+
102+
# canned_acl => "private"
103+
# The S3 canned ACL to use when putting the file. Defaults to "private".
104+
105+
# LET'S ROCK AND ROLL ON THE CODE!
106+
107+
class LogStash::Outputs::S3 < LogStash::Outputs::Base
108+
#TODO integrate aws_config in the future
109+
# include LogStash::PluginMixins::AwsConfig
110+
111+
config_name "s3"
112+
milestone 1
113+
114+
# Aws access_key.
115+
config :access_key_id, :validate => :string
116+
117+
# Aws secret_access_key
118+
config :secret_access_key, :validate => :string
119+
120+
# S3 bucket
121+
config :bucket, :validate => :string
122+
123+
# Aws endpoint_region
124+
config :endpoint_region, :validate => ["us-east-1", "us-west-1", "us-west-2",
125+
"eu-west-1", "ap-southeast-1", "ap-southeast-2",
126+
"ap-northeast-1", "sa-east-1", "us-gov-west-1"], :default => "us-east-1"
127+
128+
# Set the size of file in KB, this means that files on bucket when have dimension > file_size, they are stored in two or more file.
129+
# If you have tags then it will generate a specific size file for every tags
130+
##NOTE: define size of file is the better thing, because generate a local temporary file on disk and then put it in bucket.
131+
config :size_file, :validate => :number, :default => 0
132+
133+
# Set the time, in minutes, to close the current sub_time_section of bucket.
134+
# If you define file_size you have a number of files in consideration of the section and the current tag.
135+
# 0 stay all time on listerner, beware if you specific 0 and size_file 0, because you will not put the file on bucket,
136+
# for now the only thing this plugin can do is to put the file when logstash restart.
137+
config :time_file, :validate => :number, :default => 0
138+
139+
# The event format you want to store in files. Defaults to plain text.
140+
config :format, :validate => [ "json", "plain", "nil" ], :default => "plain"
141+
142+
## IMPORTANT: if you use multiple instance of s3, you should specify on one of them the "restore=> true" and on the others "restore => false".
143+
## This is hack for not destroy the new files after restoring the initial files.
144+
## If you do not specify "restore => true" when logstash crashes or is restarted, the files are not sent into the bucket,
145+
## for example if you have single Instance.
146+
config :restore, :validate => :boolean, :default => false
147+
148+
# Aws canned ACL
149+
config :canned_acl, :validate => ["private", "public_read", "public_read_write", "authenticated_read"],
150+
:default => "private"
151+
152+
# Method to set up the aws configuration and establish connection
153+
def aws_s3_config
154+
155+
@endpoint_region == 'us-east-1' ? @endpoint_region = 's3.amazonaws.com' : @endpoint_region = 's3-'+@endpoint_region+'.amazonaws.com'
156+
157+
@logger.info("Registering s3 output", :bucket => @bucket, :endpoint_region => @endpoint_region)
158+
159+
AWS.config(
160+
:access_key_id => @access_key_id,
161+
:secret_access_key => @secret_access_key,
162+
:s3_endpoint => @endpoint_region
163+
)
164+
@s3 = AWS::S3.new
165+
166+
end
167+
168+
# This method is used to manage sleep and awaken thread.
169+
def time_alert(interval)
170+
171+
Thread.new do
172+
loop do
173+
start_time = Time.now
174+
yield
175+
elapsed = Time.now - start_time
176+
sleep([interval - elapsed, 0].max)
177+
end
178+
end
179+
180+
end
181+
182+
# this method is used for write files on bucket. It accept the file and the name of file.
183+
def write_on_bucket (file_data, file_basename)
184+
185+
# if you lose connection with s3, bad control implementation.
186+
if ( @s3 == nil)
187+
aws_s3_config
188+
end
189+
190+
# find and use the bucket
191+
bucket = @s3.buckets[@bucket]
192+
193+
@logger.debug "S3: ready to write "+file_basename+" in bucket "+@bucket+", Fire in the hole!"
194+
195+
# prepare for write the file
196+
object = bucket.objects[file_basename]
197+
object.write(:file => file_data, :acl => @canned_acl)
198+
199+
@logger.debug "S3: has written "+file_basename+" in bucket "+@bucket + " with canned ACL \"" + @canned_acl + "\""
200+
201+
end
202+
203+
# this method is used for create new path for name the file
204+
def getFinalPath
205+
206+
@pass_time = Time.now
207+
return @temp_directory+"ls.s3."+Socket.gethostname+"."+(@pass_time).strftime("%Y-%m-%dT%H.%M")
208+
209+
end
210+
211+
# This method is used for restore the previous crash of logstash or to prepare the files to send in bucket.
212+
# Take two parameter: flag and name. Flag indicate if you want to restore or not, name is the name of file
213+
def upFile(flag, name)
214+
215+
Dir[@temp_directory+name].each do |file|
216+
name_file = File.basename(file)
217+
218+
if (flag == true)
219+
@logger.warn "S3: have found temporary file: "+name_file+", something has crashed before... Prepare for upload in bucket!"
220+
end
221+
222+
if (!File.zero?(file))
223+
write_on_bucket(file, name_file)
224+
225+
if (flag == true)
226+
@logger.debug "S3: file: "+name_file+" restored on bucket "+@bucket
227+
else
228+
@logger.debug "S3: file: "+name_file+" was put on bucket "+@bucket
229+
end
230+
end
231+
232+
File.delete (file)
233+
234+
end
235+
end
236+
237+
# This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file.
238+
def newFile (flag)
239+
240+
if (flag == true)
241+
@current_final_path = getFinalPath
242+
@sizeCounter = 0
243+
end
244+
245+
if (@tags.size != 0)
246+
@tempFile = File.new(@current_final_path+".tag_"+@tag_path+"part"+@sizeCounter.to_s+".txt", "w")
247+
else
248+
@tempFile = File.new(@current_final_path+".part"+@sizeCounter.to_s+".txt", "w")
249+
end
250+
251+
end
252+
253+
public
254+
def register
255+
require "aws-sdk"
256+
@temp_directory = "/opt/logstash/S3_temp/"
257+
258+
if (@tags.size != 0)
259+
@tag_path = ""
260+
for i in (0..@tags.size-1)
261+
@tag_path += @tags[i].to_s+"."
262+
end
263+
end
264+
265+
if !(File.directory? @temp_directory)
266+
@logger.debug "S3: Directory "+@temp_directory+" doesn't exist, let's make it!"
267+
Dir.mkdir(@temp_directory)
268+
else
269+
@logger.debug "S3: Directory "+@temp_directory+" exist, nothing to do"
270+
end
271+
272+
if (@restore == true )
273+
@logger.debug "S3: is attempting to verify previous crashes..."
274+
275+
upFile(true, "*.txt")
276+
end
277+
278+
newFile(true)
279+
280+
if (time_file != 0)
281+
first_time = true
282+
@thread = time_alert(@time_file*60) do
283+
if (first_time == false)
284+
@logger.debug "S3: time_file triggered, let's bucket the file if dosen't empty and create new file "
285+
upFile(false, File.basename(@tempFile))
286+
newFile(true)
287+
else
288+
first_time = false
289+
end
290+
end
291+
end
292+
293+
end
294+
295+
public
296+
def receive(event)
297+
return unless output?(event)
298+
299+
# Prepare format of Events
300+
if (@format == "plain")
301+
message = self.class.format_message(event)
302+
elsif (@format == "json")
303+
message = event.to_json
304+
else
305+
message = event.to_s
306+
end
307+
308+
if(time_file !=0)
309+
@logger.debug "S3: trigger files after "+((@pass_time+60*time_file)-Time.now).to_s
310+
end
311+
312+
# if specific the size
313+
if(size_file !=0)
314+
315+
if (@tempFile.size < @size_file )
316+
317+
@logger.debug "S3: File have size: "+@tempFile.size.to_s+" and size_file is: "+ @size_file.to_s
318+
@logger.debug "S3: put event into: "+File.basename(@tempFile)
319+
320+
# Put the event in the file, now!
321+
File.open(@tempFile, 'a') do |file|
322+
file.puts message
323+
file.write "\n"
324+
end
325+
326+
else
327+
328+
@logger.debug "S3: file: "+File.basename(@tempFile)+" is too large, let's bucket it and create new file"
329+
upFile(false, File.basename(@tempFile))
330+
@sizeCounter += 1
331+
newFile(false)
332+
333+
end
334+
335+
# else we put all in one file
336+
else
337+
338+
@logger.debug "S3: put event into "+File.basename(@tempFile)
339+
File.open(@tempFile, 'a') do |file|
340+
file.puts message
341+
file.write "\n"
342+
end
343+
end
344+
345+
end
346+
347+
def self.format_message(event)
348+
message = "Date: #{event[LogStash::Event::TIMESTAMP]}\n"
349+
message << "Source: #{event["source"]}\n"
350+
message << "Tags: #{event["tags"].join(', ')}\n"
351+
message << "Fields: #{event.to_hash.inspect}\n"
352+
message << "Message: #{event["message"]}"
353+
end
354+
355+
end
356+
357+
# Enjoy it, by Bistic:)

0 commit comments

Comments
 (0)