Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added the modifications for file md5 validation for logstash extended file plugin #83

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -2,3 +2,5 @@
*.rbc
core
*.gem
.idea
Gemfile.lock
1 change: 1 addition & 0 deletions filewatch.gemspec
Original file line number Diff line number Diff line change
@@ -20,4 +20,5 @@ Gem::Specification.new do |spec|
spec.homepage = "https://github.com/jordansissel/ruby-filewatch"

spec.add_development_dependency "stud"
spec.add_runtime_dependency 'rest-client', ['~> 1.8']
end
156 changes: 113 additions & 43 deletions lib/filewatch/observing_tail.rb
Original file line number Diff line number Diff line change
@@ -1,22 +1,39 @@
require 'filewatch/tail_base'
require 'rest-client'

module FileWatch
class ObservingTail
include TailBase
public

class NullListener
def initialize(path) @path = path; end
def accept(line) end
def deleted() end
def created() end
def error() end
def eof() end
def timed_out() end
def initialize(path)
@path = path;
end

def accept(line)
end

def deleted()
end

def created()
end

def error()
end

def eof()
end

def timed_out()
end
end

class NullObserver
def listener_for(path) NullListener.new(path); end
def listener_for(path)
NullListener.new(path);
end
end

def subscribe(observer = NullObserver.new)
@@ -25,48 +42,101 @@ def subscribe(observer = NullObserver.new)
path = watched_file.path
file_is_open = watched_file.file_open?
listener = observer.listener_for(path)
case event
when :unignore
listener.created
_add_to_sincedb(watched_file, event) unless @sincedb.member?(watched_file.inode)
when :create, :create_initial
if file_is_open
@logger.debug? && @logger.debug("#{event} for #{path}: file already open")
next
end
if _open_file(watched_file, event)
listener.created
observe_read_file(watched_file, listener)
end
when :modify
if file_is_open
observe_read_file(watched_file, listener)


# ----------------------------------------------------------------------------------
# Code modification to validate the file's MD5 Digest against a validation endpoint
# ----------------------------------------------------------------------------------
# check if an authentication endpoint is provided for the watch object
if [email protected]_endpoint.nil?
auth_endpoint = @watch.auth_endpoint
@logger.debug? && @logger.debug("An authentication endpoint was found for file validation: #{auth_endpoint}")

file_digest = Digest::MD5.file path
md5_hex_digest = file_digest.hexdigest
@logger.debug? && @logger.debug("Checksum MD5: #{md5_hex_digest} for file at path: #{path}")

url = auth_endpoint
query_string = "?md5="+md5_hex_digest

# check for other params and append them as query/path parameters accordingly
if [email protected]_params.nil?
@watch.auth_params.each { |param|
if param.include? "="
query_string += ("&"+param)
else
url += param
end
}

url += query_string
@logger.debug? && @logger.debug("Final validation URL: #{url}")
else
@logger.debug? && @logger.debug(":modify for #{path}, file is not open, opening now")
if _open_file(watched_file, event)
observe_read_file(watched_file, listener)
end
url += query_string
@logger.debug? && @logger.debug("No additional params found. Final validation URL: #{url}")
end
when :delete
if file_is_open
@logger.debug? && @logger.debug(":delete for #{path}, closing file")
observe_read_file(watched_file, listener)
watched_file.file_close
else
@logger.debug? && @logger.debug(":delete for #{path}, file already closed")

begin
response = RestClient.get(url)
@logger.debug? && @logger.debug("Response from validation endpoint: #{response.body}")
rescue RestClient::ExceptionWithResponse => err
@logger.warn("An invalid file at path - #{path} has the validation response: #{err.response}")
@logger.debug? && @logger.debug("Response from validation endpoint: #{err.response}")
watched_file.unwatch
end
end
# ----------------------------------------------------------------------------------
# End of Modifications
# ----------------------------------------------------------------------------------
# continue processing only if the file status has not been changed to ":unwatched" as
# result of failed validation above.

if !(watched_file.state == :unwatched)
case event
when :unignore
listener.created
_add_to_sincedb(watched_file, event) unless @sincedb.member?(watched_file.inode)
when :create, :create_initial
if file_is_open
@logger.debug? && @logger.debug("#{event} for #{path}: file already open")
next
end
if _open_file(watched_file, event)
listener.created
observe_read_file(watched_file, listener)
end
when :modify
if file_is_open
observe_read_file(watched_file, listener)
else
@logger.debug? && @logger.debug(":modify for #{path}, file is not open, opening now")
if _open_file(watched_file, event)
observe_read_file(watched_file, listener)
end
end
when :delete
if file_is_open
@logger.debug? && @logger.debug(":delete for #{path}, closing file")
observe_read_file(watched_file, listener)
watched_file.file_close
else
@logger.debug? && @logger.debug(":delete for #{path}, file already closed")
end
listener.deleted
when :timeout
@logger.debug? && @logger.debug(":timeout for #{path}, closing file")
watched_file.file_close
listener.timed_out
else
@logger.warn("unknown event type #{event} for #{path}")
end
listener.deleted
when :timeout
@logger.debug? && @logger.debug(":timeout for #{path}, closing file")
watched_file.file_close
listener.timed_out
else
@logger.warn("unknown event type #{event} for #{path}")
end
end # @watch.subscribe
# when watch.subscribe ends - its because we got quit
_sincedb_write
end # def subscribe
end

# def subscribe

private
def observe_read_file(watched_file, listener)
2 changes: 2 additions & 0 deletions lib/filewatch/tail_base.rb
Original file line number Diff line number Diff line change
@@ -57,6 +57,8 @@ def initialize(opts={})
@watch.ignore_older = @opts[:ignore_older]
@watch.delimiter = @opts[:delimiter]
@watch.max_open_files = @opts[:max_open_files]
@watch.auth_endpoint = @opts[:auth_endpoint]
@watch.auth_params = @opts[:auth_params]
@delimiter_byte_size = @opts[:delimiter].bytesize

_sincedb_open
2 changes: 2 additions & 0 deletions lib/filewatch/watch.rb
Original file line number Diff line number Diff line change
@@ -34,6 +34,8 @@ def self.inode(path, stat)

attr_accessor :logger
attr_accessor :delimiter
attr_accessor :auth_endpoint
attr_accessor :auth_params
attr_reader :max_active

def initialize(opts={})