Skip to content

Commit 92d8672

Browse files
author
Guy Boertje
authored
WIP: Better support for NFS, step 1 (#199)
* switch to loop controlled reading * fix test Fixes #189
1 parent 498eb5e commit 92d8672

File tree

8 files changed

+99
-58
lines changed

8 files changed

+99
-58
lines changed

Diff for: lib/filewatch/bootstrap.rb

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def to_s
4242
end
4343

4444
BufferExtractResult = Struct.new(:lines, :warning, :additional)
45+
LoopControlResult = Struct.new(:count, :size, :more)
4546

4647
class NoSinceDBPathGiven < StandardError; end
4748

Diff for: lib/filewatch/read_mode/handlers/read_file.rb

+43-34
Original file line numberDiff line numberDiff line change
@@ -5,42 +5,51 @@ class ReadFile < Base
55
def handle_specifically(watched_file)
66
if open_file(watched_file)
77
add_or_update_sincedb_collection(watched_file) unless sincedb_collection.member?(watched_file.sincedb_key)
8-
changed = false
9-
logger.trace("reading...", "amount" => watched_file.read_bytesize_description, "filename" => watched_file.filename)
10-
watched_file.read_loop_count.times do
8+
loop do
119
break if quit?
12-
begin
13-
# expect BufferExtractResult
14-
result = watched_file.read_extract_lines
15-
# read_extract_lines will increment bytes_read
16-
logger.trace(result.warning, result.additional) unless result.warning.empty?
17-
changed = true
18-
result.lines.each do |line|
19-
watched_file.listener.accept(line)
20-
# sincedb position is independent from the watched_file bytes_read
21-
sincedb_collection.increment(watched_file.sincedb_key, line.bytesize + @settings.delimiter_byte_size)
22-
end
23-
sincedb_collection.request_disk_flush
24-
rescue EOFError
25-
# flush the buffer now in case there is no final delimiter
26-
line = watched_file.buffer.flush
27-
watched_file.listener.accept(line) unless line.empty?
28-
watched_file.listener.eof
29-
watched_file.file_close
30-
key = watched_file.sincedb_key
31-
sincedb_collection.reading_completed(key)
32-
sincedb_collection.clear_watched_file(key)
33-
watched_file.listener.deleted
34-
watched_file.unwatch
35-
break
36-
rescue Errno::EWOULDBLOCK, Errno::EINTR
37-
watched_file.listener.error
38-
break
39-
rescue => e
40-
logger.error("read_to_eof: general error reading file", "path" => watched_file.path, "error" => e.inspect, "backtrace" => e.backtrace.take(8))
41-
watched_file.listener.error
42-
break
10+
loop_control = watched_file.loop_control_adjusted_for_stat_size
11+
controlled_read(watched_file, loop_control)
12+
sincedb_collection.request_disk_flush
13+
break unless loop_control.more
14+
end
15+
if watched_file.all_read?
16+
# flush the buffer now in case there is no final delimiter
17+
line = watched_file.buffer.flush
18+
watched_file.listener.accept(line) unless line.empty?
19+
watched_file.listener.eof
20+
watched_file.file_close
21+
key = watched_file.sincedb_key
22+
sincedb_collection.reading_completed(key)
23+
sincedb_collection.clear_watched_file(key)
24+
watched_file.listener.deleted
25+
watched_file.unwatch
26+
end
27+
end
28+
end
29+
30+
def controlled_read(watched_file, loop_control)
31+
logger.trace("reading...", "iterations" => loop_control.count, "amount" => loop_control.size, "filename" => watched_file.filename)
32+
loop_control.count.times do
33+
begin
34+
result = watched_file.read_extract_lines(loop_control.size) # expect BufferExtractResult
35+
logger.info(result.warning, result.additional) unless result.warning.empty?
36+
result.lines.each do |line|
37+
watched_file.listener.accept(line)
38+
# sincedb position is independent from the watched_file bytes_read
39+
delta = line.bytesize + @settings.delimiter_byte_size
40+
sincedb_collection.increment(watched_file.sincedb_key, delta)
4341
end
42+
rescue EOFError
43+
logger.error("controlled_read: eof error reading file", "path" => watched_file.path, "error" => e.inspect, "backtrace" => e.backtrace.take(8))
44+
break
45+
rescue Errno::EWOULDBLOCK, Errno::EINTR
46+
logger.error("controlled_read: block or interrupt error reading file", "path" => watched_file.path, "error" => e.inspect, "backtrace" => e.backtrace.take(8))
47+
watched_file.listener.error
48+
break
49+
rescue => e
50+
logger.error("controlled_read: general error reading file", "path" => watched_file.path, "error" => e.inspect, "backtrace" => e.backtrace.take(8))
51+
watched_file.listener.error
52+
break
4453
end
4554
end
4655
end

Diff for: lib/filewatch/tail_mode/handlers/base.rb

+4-4
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,19 @@ def update_existing_specifically(watched_file, sincedb_value)
3030

3131
private
3232

33-
def read_to_eof(watched_file)
34-
logger.trace("reading...", "amount" => watched_file.read_bytesize_description, "filename" => watched_file.filename)
33+
def controlled_read(watched_file, loop_control)
3534
changed = false
35+
logger.trace("reading...", "iterations" => loop_control.count, "amount" => loop_control.size, "filename" => watched_file.filename)
3636
# from a real config (has 102 file inputs)
3737
# -- This cfg creates a file input for every log file to create a dedicated file pointer and read all file simultaneously
3838
# -- If we put all log files in one file input glob we will have indexing delay, because Logstash waits until the first file becomes EOF
3939
# by allowing the user to specify a combo of `file_chunk_count` X `file_chunk_size`...
4040
# we enable the pseudo parallel processing of each file.
4141
# user also has the option to specify a low `stat_interval` and a very high `discover_interval`to respond
4242
# quicker to changing files and not allowing too much content to build up before reading it.
43-
watched_file.read_loop_count.times do
43+
loop_control.count.times do
4444
begin
45-
result = watched_file.read_extract_lines # expect BufferExtractResult
45+
result = watched_file.read_extract_lines(loop_control.size) # expect BufferExtractResult
4646
logger.trace(result.warning, result.additional) unless result.warning.empty?
4747
changed = true
4848
result.lines.each do |line|

Diff for: lib/filewatch/tail_mode/handlers/grow.rb

+5-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ module FileWatch module TailMode module Handlers
44
class Grow < Base
55
def handle_specifically(watched_file)
66
watched_file.file_seek(watched_file.bytes_read)
7-
read_to_eof(watched_file)
7+
loop do
8+
loop_control = watched_file.loop_control_adjusted_for_stat_size
9+
controlled_read(watched_file, loop_control)
10+
break unless loop_control.more
11+
end
812
end
913
end
1014
end end end

Diff for: lib/filewatch/tail_mode/handlers/shrink.rb

+6-3
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33
module FileWatch module TailMode module Handlers
44
class Shrink < Base
55
def handle_specifically(watched_file)
6-
sdbv = add_or_update_sincedb_collection(watched_file)
6+
add_or_update_sincedb_collection(watched_file)
77
watched_file.file_seek(watched_file.bytes_read)
8-
read_to_eof(watched_file)
9-
logger.trace("handle_specifically: after read_to_eof", "watched file" => watched_file.details, "sincedb value" => sdbv)
8+
loop do
9+
loop_control = watched_file.loop_control_adjusted_for_stat_size
10+
controlled_read(watched_file, loop_control)
11+
break unless loop_control.more
12+
end
1013
end
1114

1215
def update_existing_specifically(watched_file, sincedb_value)

Diff for: lib/filewatch/tail_mode/processor.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,9 @@ def process_rotation_in_progress(watched_files)
155155
# we need to keep reading the open file, if we close it we lose it because the path is now pointing at a different file.
156156
logger.trace(">>> Rotation In Progress - inode change detected and original content is not fully read, reading all", "watched_file details" => watched_file.details)
157157
# need to fully read open file while we can
158-
watched_file.set_depth_first_read_loop
158+
watched_file.set_maximum_read_loop
159159
grow(watched_file)
160-
watched_file.set_user_defined_read_loop
160+
watched_file.set_standard_read_loop
161161
else
162162
logger.warn(">>> Rotation In Progress - inode change detected and original content is not fully read, file is closed and path points to new content", "watched_file details" => watched_file.details)
163163
end

Diff for: lib/filewatch/watched_file.rb

+37-13
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ class WatchedFile
77

88
attr_reader :bytes_read, :state, :file, :buffer, :recent_states, :bytes_unread
99
attr_reader :path, :accessed_at, :modified_at, :pathname, :filename
10-
attr_reader :listener, :read_loop_count, :read_chunk_size, :stat, :read_bytesize_description
10+
attr_reader :listener, :read_loop_count, :read_chunk_size, :stat
11+
attr_reader :loop_count_type, :loop_count_mode
1112
attr_accessor :last_open_warning_at
1213

1314
# this class represents a file that has been discovered
@@ -19,7 +20,7 @@ def initialize(pathname, stat, settings)
1920
@filename = @pathname.basename.to_s
2021
full_state_reset(stat)
2122
watch
22-
set_user_defined_read_loop
23+
set_standard_read_loop
2324
set_accessed_at
2425
end
2526

@@ -52,7 +53,7 @@ def full_state_reset(this_stat = nil)
5253

5354
def rotate_from(other)
5455
# move all state from other to this one
55-
set_user_defined_read_loop
56+
set_standard_read_loop
5657
file_close
5758
@bytes_read = other.bytes_read
5859
@bytes_unread = other.bytes_unread
@@ -231,8 +232,8 @@ def reset_buffer
231232
@buffer.flush
232233
end
233234

234-
def read_extract_lines
235-
data = file_read
235+
def read_extract_lines(amount)
236+
data = file_read(amount)
236237
result = buffer_extract(data)
237238
increment_bytes_read(data.bytesize)
238239
result
@@ -342,16 +343,39 @@ def expiry_ignore_enabled?
342343
!@settings.ignore_older.nil?
343344
end
344345

345-
def set_depth_first_read_loop
346-
@read_loop_count = FileWatch::MAX_ITERATIONS
347-
@read_chunk_size = FileWatch::FILE_READ_SIZE
348-
@read_bytesize_description = "All"
349-
end
350-
351-
def set_user_defined_read_loop
346+
def set_standard_read_loop
352347
@read_loop_count = @settings.file_chunk_count
353348
@read_chunk_size = @settings.file_chunk_size
354-
@read_bytesize_description = @read_loop_count == FileWatch::MAX_ITERATIONS ? "All" : (@read_loop_count * @read_chunk_size).to_s
349+
# e.g. 1 * 10 bytes -> 10 or 256 * 65536 -> 1677716 or 140737488355327 * 32768 -> 4611686018427355136
350+
@standard_loop_max_bytes = @read_loop_count * @read_chunk_size
351+
end
352+
353+
def set_maximum_read_loop
354+
# used to quickly fully read an open file when rotation is detected
355+
@read_loop_count = FileWatch::MAX_ITERATIONS
356+
@read_chunk_size = FileWatch::FILE_READ_SIZE
357+
@standard_loop_max_bytes = @read_loop_count * @read_chunk_size
358+
end
359+
360+
def loop_control_adjusted_for_stat_size
361+
more = false
362+
to_read = current_size - @bytes_read
363+
return LoopControlResult.new(0, 0, more) if to_read < 1
364+
return LoopControlResult.new(1, to_read, more) if to_read < @read_chunk_size
365+
# set as if to_read is greater than or equal to max_bytes
366+
# use the ones from settings and don't indicate more
367+
count = @read_loop_count
368+
if to_read < @standard_loop_max_bytes
369+
# if the defaults are used then this branch will be taken
370+
# e.g. to_read is 100 and max_bytes is 4 * 30 -> 120
371+
# will overrun and trigger EOF, build less iterations
372+
# will generate 3 * 30 -> 90 this time and we indicate more
373+
# a 2GB file in read mode will get one loop of 64666 x 32768 (2119006656 / 32768)
374+
# and a second loop with 1 x 31168
375+
count = to_read / @read_chunk_size
376+
more = true
377+
end
378+
LoopControlResult.new(count, @read_chunk_size, more)
355379
end
356380

357381
def reset_bytes_unread

Diff for: spec/filewatch/read_mode_handlers_read_file_spec.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ module FileWatch
2020
let(:watch) { double("watch", :quit? => false) }
2121
it "calls 'sincedb_write' exactly 2 times" do
2222
allow(FileOpener).to receive(:open).with(watched_file.path).and_return(file)
23-
expect(sdb_collection).to receive(:sincedb_write).exactly(2).times
23+
expect(sdb_collection).to receive(:sincedb_write).exactly(1).times
2424
watched_file.activate
2525
processor.initialize_handlers(sdb_collection, TestObserver.new)
2626
processor.read_file(watched_file)

0 commit comments

Comments
 (0)