From 21217da6d8c2529b610339d5dc3af88a26ba34ff Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Tue, 16 Mar 2021 12:43:43 -0400 Subject: [PATCH 1/8] Fix file `cleans up sincedb entry` Add `ExpectationNotMetError` to the arguments to `Stud.try`. Without this, the retry logic does not trigger, leading to flakiness in this test as not enough time has elapsed for the sincedb entry to be cleaned up --- spec/inputs/file_read_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/inputs/file_read_spec.rb b/spec/inputs/file_read_spec.rb index 54c3685..f55d6e6 100644 --- a/spec/inputs/file_read_spec.rb +++ b/spec/inputs/file_read_spec.rb @@ -338,7 +338,7 @@ sincedb_content = File.read(sincedb_path).strip expect( sincedb_content ).to_not be_empty - Stud.try(3.times) do + Stud.try(3.times, RSpec::Expectations::ExpectationNotMetError) do sleep(1.5) # > sincedb_clean_after sincedb_content = File.read(sincedb_path).strip From 11938bb63d90a1af09830ae059e9ae9a87be260f Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Tue, 16 Mar 2021 13:05:31 -0400 Subject: [PATCH 2/8] Give more sleep time to execute removal --- spec/inputs/file_read_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/inputs/file_read_spec.rb b/spec/inputs/file_read_spec.rb index f55d6e6..ac08c9e 100644 --- a/spec/inputs/file_read_spec.rb +++ b/spec/inputs/file_read_spec.rb @@ -296,7 +296,7 @@ it 'removes watched file from collection' do wait_for_file_removal(sample_file) # watched discovery - sleep(0.25) # give CI some space to execute the removal + sleep(1) # give CI some space to execute the removal # TODO shouldn't be necessary once WatchedFileCollection does proper locking watched_files = plugin.watcher.watch.watched_files_collection expect( watched_files ).to be_empty From ac99a7248c0fb4de7823ffbd754a358528d42476 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Tue, 16 Mar 2021 13:35:03 -0400 Subject: [PATCH 3/8] More lenient waiting for file removal --- spec/inputs/file_read_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/inputs/file_read_spec.rb b/spec/inputs/file_read_spec.rb index ac08c9e..096e8e6 100644 --- a/spec/inputs/file_read_spec.rb +++ b/spec/inputs/file_read_spec.rb @@ -36,7 +36,7 @@ end events = input(conf) do |pipeline, queue| - wait(0.5).for{File.exist?(tmpfile_path)}.to be_falsey + wait(2).for{File.exist?(tmpfile_path)}.to be_falsey 2.times.collect { queue.pop } end From 381a4ee176553a84d168905127dded2db2a4cc39 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Wed, 17 Mar 2021 13:49:45 -0400 Subject: [PATCH 4/8] Attempt to further reduce test flakiness This plugin suffers from a large amount of test flakiness in the travis ci environment. This commit continues the attempts to reduce this flakiness, starting with https://github.com/logstash-plugins/logstash-input-file/pull/263 Mostly, this is increasing timeouts to deal with sluggish build servers, but also includes a fix to a Stud.try call which would not retry due to failure exceptions not matching. --- spec/filewatch/tailing_spec.rb | 2 +- spec/inputs/file_read_spec.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/filewatch/tailing_spec.rb b/spec/filewatch/tailing_spec.rb index fbc4f9e..30eaf81 100644 --- a/spec/filewatch/tailing_spec.rb +++ b/spec/filewatch/tailing_spec.rb @@ -450,7 +450,7 @@ module FileWatch FileUtils.mv(file_path2, file_path3) end .then("wait") do - wait(4).for do + wait(8).for do listener1.lines.size == 32 && listener2.calls == [:delete] && listener3.calls == [:open, :accept, :timed_out] end.to eq(true), "listener1.lines != 32 or listener2.calls != [:delete] or listener3.calls != [:open, :accept, :timed_out]" end diff --git a/spec/inputs/file_read_spec.rb b/spec/inputs/file_read_spec.rb index 096e8e6..94e8af9 100644 --- a/spec/inputs/file_read_spec.rb +++ b/spec/inputs/file_read_spec.rb @@ -363,7 +363,7 @@ def wait_for_start_processing(run_thread, timeout: 1.0) end end - def wait_for_file_removal(path, timeout: 3 * interval) + def wait_for_file_removal(path, timeout: 5 * interval) wait(timeout).for { File.exist?(path) }.to be_falsey end end From 60b4015fecf7d010ed8ae19fcd57165754234bf8 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Thu, 18 Mar 2021 15:06:14 -0400 Subject: [PATCH 5/8] Add additional sleep delays for tests --- spec/filewatch/rotate_spec.rb | 63 ++++++++++++++++++---------------- spec/filewatch/tailing_spec.rb | 13 +++---- spec/inputs/file_read_spec.rb | 6 ++-- spec/inputs/file_tail_spec.rb | 10 +++--- 4 files changed, 49 insertions(+), 43 deletions(-) diff --git a/spec/filewatch/rotate_spec.rb b/spec/filewatch/rotate_spec.rb index cdef967..121a03f 100644 --- a/spec/filewatch/rotate_spec.rb +++ b/spec/filewatch/rotate_spec.rb @@ -53,6 +53,9 @@ module FileWatch end context "create + rename rotation: when a new logfile is renamed to a path we have seen before and the open file is fully read, renamed outside glob" do + let(:stat_interval) { 0.04 } + let(:discover_interval) { 15 } + let(:watch_dir) { directory.join("*A.log") } let(:file_path) { directory.join("1A.log") } subject { described_class.new(conf) } @@ -60,21 +63,23 @@ module FileWatch let(:listener2) { observer.listener_for(second_file.to_path) } let(:actions) do RSpec::Sequencing - .run_after(0.25, "create file") do - file_path.open("wb") { |file| file.write("#{line1}\n") } - end - .then_after(0.25, "write a 'unfinished' line") do - file_path.open("ab") { |file| file.write(line2) } - end - .then_after(0.25, "rotate once") do - tmpfile = directory.join("1.logtmp") - tmpfile.open("wb") { |file| file.write("\n#{line3}\n")} + .run_after(1.25, "create file") do + file_path.open("wb") { |file| file.write("#{line1}\n") } + end + .then_after(1.25, "write a 'unfinished' line") do + file_path.open("ab") { |file| file.write("#{line2}") } + end + .then_after(1.25, "rotate once") do + tmpfile = directory.join("1.logtmp") + tmpfile.open("wb") { |file| + file.write("\n#{line3}\n") + } file_path.rename(directory.join("1.log.1")) FileUtils.mv(directory.join("1.logtmp").to_path, file1_path) - end + end .then("wait for expectation") do - sleep(0.25) # if ENV['CI'] - wait(2).for { listener1.calls }.to eq([:open, :accept, :accept, :accept]) + sleep(1.25) # if ENV['CI'] + wait(2).for { listener1.calls }.to include(:open, :accept, :accept, :accept) end .then("quit") do tailing.quit @@ -104,14 +109,14 @@ module FileWatch let(:listener3) { observer.listener_for(third_file.to_path) } let(:actions) do RSpec::Sequencing - .run_after(0.25, "create file") do + .run_after(0.75, "create file") do file_path.open("wb") { |file| file.write("#{line1}\n") } end - .then_after(0.25, "rotate 1 - line1(66) is in 2B.log, line2(61) is in 1B.log") do + .then_after(0.75, "rotate 1 - line1(66) is in 2B.log, line2(61) is in 1B.log") do file_path.rename(second_file) file_path.open("wb") { |file| file.write("#{line2}\n") } end - .then_after(0.25, "rotate 2 - line1(66) is in 3B.log, line2(61) is in 2B.log, line3(47) is in 1B.log") do + .then_after(0.75, "rotate 2 - line1(66) is in 3B.log, line2(61) is in 2B.log, line3(47) is in 1B.log") do second_file.rename(third_file) file_path.rename(second_file) file_path.open("wb") { |file| file.write("#{line3}\n") } @@ -145,19 +150,19 @@ module FileWatch let(:listener2) { observer.listener_for(second_file.to_path) } let(:actions) do RSpec::Sequencing - .run_after(0.25, "create original - write line 1, 66 bytes") do + .run_after(0.75, "create original - write line 1, 66 bytes") do file_path.open("wb") { |file| file.write("#{line1}\n") } end - .then_after(0.25, "rename to 2.log") do + .then_after(0.75, "rename to 2.log") do file_path.rename(second_file) end - .then_after(0.25, "write line 2 to original, 61 bytes") do + .then_after(0.75, "write line 2 to original, 61 bytes") do file_path.open("wb") { |file| file.write("#{line2}\n") } end - .then_after(0.25, "rename to 2.log again") do + .then_after(0.75, "rename to 2.log again") do file_path.rename(second_file) end - .then_after(0.25, "write line 3 to original, 47 bytes") do + .then_after(0.75, "write line 3 to original, 47 bytes") do file_path.open("wb") { |file| file.write("#{line3}\n") } end .then("wait for expectations to be met") do @@ -267,14 +272,14 @@ module FileWatch let(:listener1) { observer.listener_for(file1_path) } let(:actions) do RSpec::Sequencing - .run_after(0.25, "create file") do + .run_after(0.75, "create file") do file_path.open("wb") { |file| file.puts(line1); file.puts(line2) } end - .then_after(0.25, "rotate") do + .then_after(0.75, "rotate") do FileUtils.cp(file1_path, directory.join("1F.log.1").to_path) file_path.truncate(0) end - .then_after(0.25, "write to truncated file") do + .then_after(0.75, "write to truncated file") do file_path.open("wb") { |file| file.puts(line3) } end .then("wait for expectations to be met") do @@ -342,13 +347,13 @@ module FileWatch let(:listener2) { observer.listener_for(file2.to_path) } let(:actions) do RSpec::Sequencing - .run_after(0.25, "create file") do + .run_after(0.75, "create file") do file_path.open("wb") { |file| file.puts(line1); file.puts(line2) } end - .then_after(0.25, "rename") do + .then_after(0.75, "rename") do FileUtils.mv(file1_path, file2.to_path) end - .then_after(0.25, "write to renamed file") do + .then_after(0.75, "write to renamed file") do file2.open("ab") { |file| file.puts(line3) } end .then("wait for expectations to be met") do @@ -464,14 +469,14 @@ module FileWatch .run_after(0.75, "create file") do file_path.open("wb") { |file| file.puts(line1); file.puts(line2) } end - .then_after(0.5, "rename") do + .then_after(0.75, "rename") do file_path.rename(second_file) file_path.open("wb") { |file| file.puts("#{line3}") } end .then("wait for expectations to be met") do wait(2.0).for{listener1.lines.size + listener2.lines.size}.to eq(3) - end - .then_after(0.5, "rename again") do + end + .then_after(0.75, "rename again") do file_path.rename(second_file) file_path.open("wb") { |file| file.puts("#{line4}") } end diff --git a/spec/filewatch/tailing_spec.rb b/spec/filewatch/tailing_spec.rb index 30eaf81..924c042 100644 --- a/spec/filewatch/tailing_spec.rb +++ b/spec/filewatch/tailing_spec.rb @@ -58,6 +58,7 @@ module FileWatch ENV["FILEWATCH_MAX_FILES_WARN_INTERVAL"] = "0" File.open(file_path, "wb") { |file| file.write("line1\nline2\n") } File.open(file_path2, "wb") { |file| file.write("line-A\nline-B\n") } + sleep(0.25) # if ENV['CI'] end context "when max_active is 1" do @@ -76,7 +77,7 @@ module FileWatch end context "when close_older is set" do - let(:wait_before_quit) { 0.8 } + let(:wait_before_quit) { 1.5 } let(:opts) { super().merge(:close_older => 0.1, :max_open_files => 1, :stat_interval => 0.1) } let(:suffix) { "B" } it "opens both files" do @@ -134,7 +135,7 @@ module FileWatch File.open(file_path, "wb") { |file| file.write("line1\nline2\n") } end .then("wait") do - wait(0.75).for { listener1.lines }.to_not be_empty + wait(1.5).for { listener1.lines }.to_not be_empty end .then("quit") do tailing.quit @@ -155,7 +156,7 @@ module FileWatch # it simulates that the user deleted the file # so when a stat is taken on the file an error is raised let(:suffix) { "E" } - let(:quit_after) { 0.2 } + let(:quit_after) { 1 } let(:stat) { double("stat", :size => 100, :modified_at => Time.now.to_f, :inode => 234567, :inode_struct => InodeStruct.new("234567", 1, 5)) } let(:watched_file) { WatchedFile.new(file_path, stat, tailing.settings) } before do @@ -253,10 +254,10 @@ module FileWatch # create file after first discovery, will be read from the beginning File.open(file_path, "wb") { |file| file.write("line1\nline2\n") } end - .then_after(0.55, "rename file") do + .then_after(0.75, "rename file") do FileUtils.mv(file_path, new_file_path) end - .then_after(0.55, "then write to renamed file") do + .then_after(0.75, "then write to renamed file") do File.open(new_file_path, "ab") { |file| file.write("line3\nline4\n") } wait(0.5).for{listener1.lines.size}.to eq(2), "listener1.lines.size not eq(2)" end @@ -354,7 +355,7 @@ module FileWatch end .then("watch and wait") do tailing.watch_this(watch_dir) - wait(1.25).for{listener1.calls}.to eq([:open, :timed_out]) + wait(2).for{listener1.calls}.to eq([:open, :timed_out]) end .then("quit") do tailing.quit diff --git a/spec/inputs/file_read_spec.rb b/spec/inputs/file_read_spec.rb index 94e8af9..7cd7e9f 100644 --- a/spec/inputs/file_read_spec.rb +++ b/spec/inputs/file_read_spec.rb @@ -102,7 +102,7 @@ end events = input(conf) do |pipeline, queue| - wait(0.5).for{File.exist?(tmpfile_path)}.to be_falsey + wait(1).for{File.exist?(tmpfile_path)}.to be_falsey 3.times.collect { queue.pop } end @@ -363,7 +363,7 @@ def wait_for_start_processing(run_thread, timeout: 1.0) end end - def wait_for_file_removal(path, timeout: 5 * interval) - wait(timeout).for { File.exist?(path) }.to be_falsey + def wait_for_file_removal(path) + wait(5).for { File.exist?(path) }.to be_falsey end end diff --git a/spec/inputs/file_tail_spec.rb b/spec/inputs/file_tail_spec.rb index 7aefc69..2eb1575 100644 --- a/spec/inputs/file_tail_spec.rb +++ b/spec/inputs/file_tail_spec.rb @@ -288,7 +288,7 @@ .run("create file") do File.open(tmpfile_path, "wb") { |file| file.puts(line) } end - .then_after(0.1, "identity is mapped") do + .then_after(0.25, "identity is mapped") do wait(0.75).for{subject.codec.identity_map[tmpfile_path]}.not_to be_nil, "identity is not mapped" end .then("wait for auto_flush") do @@ -395,7 +395,7 @@ let(:suffix) { "M" } it "an event is generated via auto_flush" do actions = RSpec::Sequencing - .run_after(0.1, "create files") do + .run_after(0.25, "create files") do File.open(tmpfile_path, "wb") do |fd| fd.puts("line1.1-of-a") fd.puts(" line1.2-of-a") @@ -478,7 +478,7 @@ "sincedb_path" => sincedb_path, "stat_interval" => 0.1, "max_open_files" => 1, - "close_older" => 0.5, + "close_older" => 1, "start_position" => "beginning", "file_sort_by" => "path", "delimiter" => TEST_FILE_DELIMITER) @@ -491,9 +491,9 @@ wait(0.4).for{subject.codec.identity_count == 1 && events.size == 2}.to eq(true), "both identities are not mapped and the first two events are not built" end .then("wait for close to flush last event of each identity") do - wait(0.8).for{events.size}.to eq(4), "close does not flush last event of each identity" + wait(1.6).for{events.size}.to eq(4), "close does not flush last event of each identity" end - .then_after(0.1, "stop") do + .then_after(0.2, "stop") do subject.stop end subject.run(events) From 31a3346dc6447ce9b63d908a66c898f17fa9dafc Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Thu, 8 Apr 2021 10:06:00 -0400 Subject: [PATCH 6/8] nxt --- spec/inputs/file_read_spec.rb | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/spec/inputs/file_read_spec.rb b/spec/inputs/file_read_spec.rb index 7cd7e9f..d2b4213 100644 --- a/spec/inputs/file_read_spec.rb +++ b/spec/inputs/file_read_spec.rb @@ -353,12 +353,22 @@ def wait_for_start_processing(run_thread, timeout: 1.0) begin Timeout.timeout(timeout) do - sleep(0.01) while run_thread.status != 'sleep' - sleep(timeout) unless plugin.queue + # sleep(0.01) while run_thread.status != 'sleep' + while run_thread.status != 'sleep' + puts "not sleep" + sleep(0.01) + end + # sleep(0.1) while !plugin.queue + while !plugin.queue + sleep(0.1) + puts "no queue" + end end - rescue Timeout::Error + rescue Timeout::Error => e + puts "plugin timed out #{e}" raise "plugin did not start processing (timeout: #{timeout})" unless plugin.queue else + puts "plugin did not time out" raise "plugin did not start processing" unless plugin.queue end end From c1b44aff684b8fb21014e25032e56eb8a42a9d80 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Thu, 8 Apr 2021 10:51:40 -0400 Subject: [PATCH 7/8] nxy --- spec/filewatch/rotate_spec.rb | 8 ++++---- spec/filewatch/tailing_spec.rb | 2 +- spec/inputs/file_read_spec.rb | 20 ++++++++++++++++---- 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/spec/filewatch/rotate_spec.rb b/spec/filewatch/rotate_spec.rb index 121a03f..122ab58 100644 --- a/spec/filewatch/rotate_spec.rb +++ b/spec/filewatch/rotate_spec.rb @@ -193,19 +193,19 @@ module FileWatch let(:listener2) { observer.listener_for(second_file.to_path) } let(:actions) do RSpec::Sequencing - .run_after(0.25, "create original - write line 1, 66 bytes") do + .run_after(1.5, "create original - write line 1, 66 bytes") do file_path.open("wb") { |file| file.write("#{line1}\n") } end - .then_after(0.25, "rename to 2.log") do + .then_after(1.5, "rename to 2.log") do file_path.rename(second_file) file_path.open("wb") { |file| file.write("#{line2}\n") } end - .then_after(0.25, "rename to 2.log again") do + .then_after(1.5, "rename to 2.log again") do file_path.rename(second_file) file_path.open("wb") { |file| file.write("#{line3}\n") } end .then("wait for expectations to be met") do - wait(0.5).for{listener1.lines.size == 3 && listener2.lines.empty?}.to eq(true) + wait(1).for{listener1.lines.size == 3 && listener2.lines.empty?}.to eq(true) end .then("quit") do tailing.quit diff --git a/spec/filewatch/tailing_spec.rb b/spec/filewatch/tailing_spec.rb index 924c042..e369e01 100644 --- a/spec/filewatch/tailing_spec.rb +++ b/spec/filewatch/tailing_spec.rb @@ -482,7 +482,7 @@ module FileWatch tailing.watch_this(watch_dir) end .then("wait for lines") do - wait(1.5).for{listener1.calls}.to eq([:open, :accept, :accept, :timed_out]) + wait(2.5).for{listener1.calls}.to eq([:open, :accept, :accept, :timed_out]) end .then("quit") do tailing.quit diff --git a/spec/inputs/file_read_spec.rb b/spec/inputs/file_read_spec.rb index d2b4213..a9cb995 100644 --- a/spec/inputs/file_read_spec.rb +++ b/spec/inputs/file_read_spec.rb @@ -284,7 +284,10 @@ wait_for_start_processing(@run_thread) end - after { plugin.stop } + after { + plugin.stop + @run_thread.join + } it 'processes a file' do wait_for_file_removal(sample_file) # watched discovery @@ -330,7 +333,10 @@ wait_for_start_processing(@run_thread) end - after { plugin.stop } + after { + plugin.stop + @run_thread.join + } it 'cleans up sincedb entry' do wait_for_file_removal(sample_file) # watched discovery @@ -350,7 +356,7 @@ private - def wait_for_start_processing(run_thread, timeout: 1.0) + def wait_for_start_processing(run_thread, timeout: 10.0) begin Timeout.timeout(timeout) do # sleep(0.01) while run_thread.status != 'sleep' @@ -363,7 +369,13 @@ def wait_for_start_processing(run_thread, timeout: 1.0) sleep(0.1) puts "no queue" end - end + puts "the queue size is #{plugin.queue.size}" + # sleep(0.1) while !plugin.queue + while plugin.queue.size == 0 + sleep(0.1) + puts "no item on queue" + end + end rescue Timeout::Error => e puts "plugin timed out #{e}" raise "plugin did not start processing (timeout: #{timeout})" unless plugin.queue From 4af4e3aab99b66980327853d3587efb701eb71c9 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Thu, 8 Apr 2021 16:49:09 -0400 Subject: [PATCH 8/8] Fix `discover_interval` in tests --- spec/inputs/file_read_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/inputs/file_read_spec.rb b/spec/inputs/file_read_spec.rb index a9cb995..9a13663 100644 --- a/spec/inputs/file_read_spec.rb +++ b/spec/inputs/file_read_spec.rb @@ -255,7 +255,7 @@ 'mode' => "read", 'path' => "#{temp_directory}/*", 'stat_interval' => interval, - 'discover_interval' => interval, + 'discover_interval' => 1, 'sincedb_path' => "#{temp_directory}/.sincedb", 'sincedb_write_interval' => interval }