Skip to content

Commit d835716

Browse files
committed
buffer: add feature to evacuate chunk files when retry limit
Signed-off-by: Daijiro Fukuda <[email protected]>
1 parent c43586e commit d835716

File tree

5 files changed

+193
-1
lines changed

5 files changed

+193
-1
lines changed

lib/fluent/plugin/buf_file.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,22 @@ def handle_broken_files(path, mode, e)
229229
File.unlink(path, path + '.meta') rescue nil
230230
end
231231

232+
def evacuate_chunk(chunk)
233+
unless chunk.is_a?(Fluent::Plugin::Buffer::FileChunk)
234+
raise ArgumentError, "The chunk must be FileChunk, but it was #{chunk.class}."
235+
end
236+
safe_owner_id = owner.plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_')
237+
backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR
238+
backup_dir = File.join(backup_base_dir, 'buffer', safe_owner_id)
239+
240+
FileUtils.mkdir_p(backup_dir, mode: system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(backup_dir)
241+
242+
FileUtils.copy([chunk.path, chunk.meta_path], backup_dir)
243+
log.warn "chunk files are evacuated to #{backup_dir}.", chunk_id: dump_unique_id_hex(chunk.unique_id)
244+
rescue => e
245+
log.error "unexpected error while evacuating chunk files.", error: e
246+
end
247+
232248
private
233249

234250
def escaped_patterns(patterns)

lib/fluent/plugin/buf_file_single.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,22 @@ def handle_broken_files(path, mode, e)
241241
File.unlink(path) rescue nil
242242
end
243243

244+
def evacuate_chunk(chunk)
245+
unless chunk.is_a?(Fluent::Plugin::Buffer::FileSingleChunk)
246+
raise ArgumentError, "The chunk must be FileSingleChunk, but it was #{chunk.class}."
247+
end
248+
safe_owner_id = owner.plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_')
249+
backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR
250+
backup_dir = File.join(backup_base_dir, 'buffer', safe_owner_id)
251+
252+
FileUtils.mkdir_p(backup_dir, mode: system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(backup_dir)
253+
254+
FileUtils.copy(chunk.path, backup_dir)
255+
log.warn "chunk files are evacuated to #{backup_dir}.", chunk_id: dump_unique_id_hex(chunk.unique_id)
256+
rescue => e
257+
log.error "unexpected error while evacuating chunk files.", error: e
258+
end
259+
244260
private
245261

246262
def escaped_patterns(patterns)

lib/fluent/plugin/buffer.rb

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,7 @@ def clear_queue!
625625
until @queue.empty?
626626
begin
627627
q = @queue.shift
628+
evacuate_chunk(q)
628629
log.trace("purging a chunk in queue"){ {id: dump_unique_id_hex(chunk.unique_id), bytesize: chunk.bytesize, size: chunk.size} }
629630
q.purge
630631
rescue => e
@@ -636,6 +637,25 @@ def clear_queue!
636637
end
637638
end
638639

640+
def evacuate_chunk(chunk)
641+
# Overwrite this on demand.
642+
#
643+
# Note: Difference from the `backup` feature.
644+
# The `backup` feature is for unrecoverable errors, mainly for bad chunks.
645+
# On the other hand, this feature is for normal chunks.
646+
# The main motivation for this feature is to enable recovery by evacuating buffer files
647+
# when the retry limit is reached due to external factors such as network issues.
648+
#
649+
# Note: Difference from the `secondary` feature.
650+
# The `secondary` feature is not suitable for recovery.
651+
# It can be difficult to recover files made by `out_secondary_file` because the metadata
652+
# is lost.
653+
# For file buffers, the easiest way for recovery is to evacuate the chunk files as is.
654+
# Once the issue is recovered, we can put back the chunk files, and restart Fluentd to
655+
# load them.
656+
# This feature enables it.
657+
end
658+
639659
def chunk_size_over?(chunk)
640660
chunk.bytesize > @chunk_limit_size || (@chunk_limit_records && chunk.size > @chunk_limit_records)
641661
end

lib/fluent/plugin/buffer/file_chunk.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class FileChunkError < StandardError; end
3737
# path_prefix: path prefix string, ended with '.'
3838
# path_suffix: path suffix string, like '.log' (or any other user specified)
3939

40-
attr_reader :path, :permission
40+
attr_reader :path, :meta_path, :permission
4141

4242
def initialize(metadata, path, mode, perm: nil, compress: :text)
4343
super(metadata, compress: compress)

test/plugin/test_buf_file.rb

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,30 @@ def write(chunk)
2020
# drop
2121
end
2222
end
23+
24+
class DummyErrorOutputPlugin < DummyOutputPlugin
25+
def register_write(&block)
26+
instance_variable_set("@write", block)
27+
end
28+
29+
def initialize
30+
super
31+
@should_fail_writing = true
32+
@write = nil
33+
end
34+
35+
def recover
36+
@should_fail_writing = false
37+
end
38+
39+
def write(chunk)
40+
if @should_fail_writing
41+
raise "failed writing chunk"
42+
else
43+
@write ? @write.call(chunk) : nil
44+
end
45+
end
46+
end
2347
end
2448

2549
class FileBufferTest < Test::Unit::TestCase
@@ -1311,4 +1335,120 @@ def compare_log(plugin, msg)
13111335
assert { not File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") }
13121336
end
13131337
end
1338+
1339+
sub_test_case 'evacuate_chunk' do
1340+
def setup
1341+
Fluent::Test.setup
1342+
1343+
@now = Time.local(2025, 5, 30, 17, 0, 0)
1344+
@base_dir = File.expand_path("../../tmp/evacuate_chunk", __FILE__)
1345+
@buf_dir = File.join(@base_dir, "buffer")
1346+
@root_dir = File.join(@base_dir, "root")
1347+
FileUtils.mkdir_p(@root_dir)
1348+
@output = nil
1349+
1350+
Fluent::SystemConfig.overwrite_system_config("root_dir" => @root_dir) do
1351+
Timecop.freeze(@now)
1352+
yield
1353+
end
1354+
ensure
1355+
Timecop.return
1356+
stop_plugin(@output)
1357+
FileUtils.rm_rf(@base_dir)
1358+
end
1359+
1360+
def start_plugin(plugin)
1361+
plugin.start
1362+
plugin.after_start
1363+
end
1364+
1365+
def stop_plugin(plugin)
1366+
plugin.stop unless plugin.stopped?
1367+
plugin.before_shutdown unless plugin.before_shutdown?
1368+
plugin.shutdown unless plugin.shutdown?
1369+
plugin.after_shutdown unless plugin.after_shutdown?
1370+
plugin.close unless plugin.closed?
1371+
plugin.terminate unless plugin.terminated?
1372+
end
1373+
1374+
def configure_output(id, chunk_key, buffer_conf)
1375+
@output = FluentPluginFileBufferTest::DummyErrorOutputPlugin.new
1376+
@output.configure(
1377+
config_element('ROOT', '', {'@id' => id}, [config_element('buffer', chunk_key, buffer_conf)])
1378+
)
1379+
end
1380+
1381+
def wait(sec: 4)
1382+
waiting(sec) do
1383+
Thread.pass until yield
1384+
end
1385+
end
1386+
1387+
def emit_events(tag, es)
1388+
@output.interrupt_flushes
1389+
@output.emit_events("test.1", dummy_event_stream)
1390+
@now += 1
1391+
Timecop.freeze(@now)
1392+
@output.enqueue_thread_wait
1393+
@output.flush_thread_wakeup
1394+
end
1395+
1396+
def dummy_event_stream
1397+
Fluent::ArrayEventStream.new([
1398+
[ event_time("2025-05-30 10:00:00"), {"message" => "data1"} ],
1399+
[ event_time("2025-05-30 10:10:00"), {"message" => "data2"} ],
1400+
[ event_time("2025-05-30 10:20:00"), {"message" => "data3"} ],
1401+
])
1402+
end
1403+
1404+
def evacuate_dir(plugin_id)
1405+
File.join(@root_dir, "buffer", plugin_id)
1406+
end
1407+
1408+
test 'foo' do
1409+
plugin_id = "test_output"
1410+
buffer_conf = {
1411+
"path" => @buf_dir,
1412+
"flush_mode" => "interval",
1413+
"flush_interval" => "1s",
1414+
"retry_type" => "periodic",
1415+
"retry_max_times" => 0,
1416+
"retry_randomize" => false,
1417+
}
1418+
1419+
configure_output(plugin_id, "tag", buffer_conf)
1420+
start_plugin(@output)
1421+
1422+
emit_events("test.1", dummy_event_stream)
1423+
1424+
wait { @output.write_count > 0 && @output.num_errors > 0 }
1425+
wait { Dir.empty?(@buf_dir) }
1426+
1427+
# Assert evacuated files
1428+
evacuated_files = Dir.children(evacuate_dir(plugin_id)).map do |child_name|
1429+
File.join(evacuate_dir(plugin_id), child_name)
1430+
end
1431+
assert { evacuated_files.size == 2 } # .log and .log.meta
1432+
1433+
# Put back evacuated files
1434+
FileUtils.move(evacuated_files, @buf_dir)
1435+
1436+
# Restart plugin to load
1437+
stop_plugin(@output)
1438+
configure_output(plugin_id, "tag", buffer_conf)
1439+
@output.recover
1440+
written_data = []
1441+
@output.register_write do |chunk|
1442+
written_data << chunk.read
1443+
end
1444+
start_plugin(@output)
1445+
1446+
wait { not written_data.empty? }
1447+
1448+
@output.log.out.logs.each do |log|
1449+
puts log
1450+
end
1451+
p written_data
1452+
end
1453+
end
13141454
end

0 commit comments

Comments
 (0)