From 794f1b947ecb5814927893600852e1fbc18e4557 Mon Sep 17 00:00:00 2001 From: Franck Delache Date: Mon, 3 Nov 2025 16:43:53 -0500 Subject: [PATCH] Add Parallelizable concern for thread-based parallel processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a new concern that enables maintenance tasks to process items in parallel using threads. This is useful for I/O-bound operations where significant speedup can be achieved through concurrent processing. Key features: - Task authors batch their collections using existing framework features (.in_batches, csv_collection(in_batches:), .each_slice) - Batch size directly determines thread count (explicit relationship) - Spawns one thread per item within each batch - Maintains all maintenance_tasks guarantees (pausable, resumable) - Requires idempotent and thread-safe process_item implementations Implementation: - ParallelExecutor: Handles thread creation, execution, and error handling - Parallelizable concern: Converts batches to arrays and delegates to executor - Simple API: include concern + implement process_item instead of process 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../concerns/parallelizable.rb | 108 +++++++ .../maintenance_tasks/parallel_executor.rb | 60 ++++ .../concerns/parallelizable_test.rb | 265 ++++++++++++++++++ .../parallel_executor_test.rb | 108 +++++++ 4 files changed, 541 insertions(+) create mode 100644 app/models/maintenance_tasks/concerns/parallelizable.rb create mode 100644 app/models/maintenance_tasks/parallel_executor.rb create mode 100644 test/models/maintenance_tasks/concerns/parallelizable_test.rb create mode 100644 test/models/maintenance_tasks/parallel_executor_test.rb diff --git a/app/models/maintenance_tasks/concerns/parallelizable.rb b/app/models/maintenance_tasks/concerns/parallelizable.rb new file mode 100644 index 00000000..bfc8ec5e --- /dev/null +++ b/app/models/maintenance_tasks/concerns/parallelizable.rb @@ -0,0 +1,108 @@ +# frozen_string_literal: true + +module MaintenanceTasks + module Concerns + # Concern that adds parallel processing capability to maintenance tasks. + # + # When included in a task, this concern enables processing items in parallel + # using threads. Task authors define their collection with batching + # (using in_batches, csv_collection(in_batches:), or each_slice), and + # implement process_item(item) instead of process(item). + # + # The concern works by: + # 1. Receiving a batch from the job iteration framework + # 2. Converting the batch to an array of items + # 3. Spawning one thread per item to process them concurrently + # 4. Waiting for all threads to complete before moving to the next batch + # + # @example ActiveRecord with batching + # class Maintenance::UpdateUsersTask < MaintenanceTasks::Task + # include MaintenanceTasks::Concerns::Parallelizable + # + # def collection + # User.where(status: 'pending').in_batches(of: 10) + # end + # + # def process_item(user) + # # This will be called in parallel (10 concurrent threads per batch) + # user.update!(status: 'processed', processed_at: Time.current) + # end + # end + # + # @example CSV processing with batching + # class Maintenance::ProcessCsvTask < MaintenanceTasks::Task + # include MaintenanceTasks::Concerns::Parallelizable + # + # csv_collection(in_batches: 10) + # + # def process_item(row) + # # Process CSV row in parallel (10 concurrent threads per batch) + # User.create!(name: row['name'], email: row['email']) + # end + # end + # + # @example Array processing with batching + # class Maintenance::ProcessIdsTask < MaintenanceTasks::Task + # include MaintenanceTasks::Concerns::Parallelizable + # + # def collection + # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].each_slice(5) + # end + # + # def process_item(id) + # # Process each ID in parallel (5 concurrent threads per batch) + # SomeService.call(id) + # end + # end + # + # @note Cursor granularity: The cursor tracks batches, not individual items. + # If the task is interrupted mid-batch, items from that batch will be + # reprocessed on resume. Ensure your process_item method is idempotent. + # + # @note Thread safety requirements: + # - Your process_item method MUST be thread-safe + # - Avoid shared mutable state between items + # - Most ActiveRecord operations are thread-safe if each thread gets its own connection + # - ActiveRecord handles connection pooling automatically + # + # @note Error handling: If any thread raises an exception, the entire batch + # fails and the exception is propagated to the maintenance task's error handler. + # The first exception encountered is raised. + # + # @note Progress tracking: Progress is tracked per batch, not per item. + # The UI will show "X batches processed" rather than "X items processed". + module Parallelizable + extend ActiveSupport::Concern + + # Process a batch by spawning threads for parallel execution. + # This is called by the job iteration framework with a batch of items. + # + # @param batch [Object] batch (ActiveRecord::Relation, Array of items/rows) + def process(batch) + # Convert batch to array of items + # ActiveRecord::Relation responds to to_a, arrays are already arrays + items = batch.respond_to?(:to_a) ? batch.to_a : Array(batch) + + # Execute items in parallel, storing errored item for context + ParallelExecutor.execute(items) do |item| + process_item(item) + end + rescue => error + # Store the errored item for maintenance tasks error reporting + @errored_element = error.errored_item if error.respond_to?(:errored_item) + raise + end + + # Task authors implement this method instead of process(item). + # It will be called in parallel for each item in a batch. + # + # @param item [Object] the individual item to process + def process_item(item) + raise NoMethodError, <<~MSG.squish + #{self.class.name} must implement `process_item(item)` when using + Parallelizable concern. + MSG + end + end + end +end diff --git a/app/models/maintenance_tasks/parallel_executor.rb b/app/models/maintenance_tasks/parallel_executor.rb new file mode 100644 index 00000000..b9832a5f --- /dev/null +++ b/app/models/maintenance_tasks/parallel_executor.rb @@ -0,0 +1,60 @@ +# frozen_string_literal: true + +module MaintenanceTasks + # Executes items in parallel using a thread pool. + # + # Handles thread creation, error collection, and ensures all threads + # complete before raising exceptions. + # + # @api private + class ParallelExecutor + # Executes a block for each item in parallel. + # + # @param items [Array] items to process + # @yield [item] block to execute for each item + # @return [void] + # @raise [StandardError] the first exception encountered during execution + def self.execute(items, &block) + exceptions = [] + exception_mutex = Mutex.new + + threads = items.map do |item| + Thread.new do + ActiveRecord::Base.connection_pool.with_connection do + begin + block.call(item) + rescue => error + exception_mutex.synchronize do + exceptions << { item: item, error: error } + end + end + end + end + end + + # Wait for all threads to complete + threads.each(&:join) + + # Raise first exception if any occurred + raise_first_exception(exceptions) if exceptions.any? + end + + # Raises the first exception from the collection. + # + # @param exceptions [Array] array of {item:, error:} hashes + # @return [void] + # @raise [StandardError] the first error from the collection + def self.raise_first_exception(exceptions) + first_exception = exceptions.first + + # Store context for error reporting (matches maintenance_tasks convention) + # The calling task will set @errored_element for error context + error = first_exception[:error] + error.define_singleton_method(:errored_item) { first_exception[:item] } + + raise error + end + + private_class_method :raise_first_exception + end +end diff --git a/test/models/maintenance_tasks/concerns/parallelizable_test.rb b/test/models/maintenance_tasks/concerns/parallelizable_test.rb new file mode 100644 index 00000000..ea08bff5 --- /dev/null +++ b/test/models/maintenance_tasks/concerns/parallelizable_test.rb @@ -0,0 +1,265 @@ +# frozen_string_literal: true + +require "test_helper" + +module MaintenanceTasks + module Concerns + class ParallelizableTest < ActiveSupport::TestCase + class TestTask < MaintenanceTasks::Task + include Parallelizable + + attr_accessor :processed_items + + def initialize + super + @processed_items = Concurrent::Array.new + end + + def collection + [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].each_slice(5) + end + + def process_item(item) + # Simulate some work + sleep(0.01) + @processed_items << { item: item, thread_id: Thread.current.object_id } + end + end + + test "processes items in parallel within each batch" do + task = TestTask.new + collection = task.collection + + # Get the first batch + first_batch = collection.first + + # Process the batch + task.process(first_batch) + + # Should have processed 5 items (batch size) + assert_equal 5, task.processed_items.size + + # Items should have been processed by different threads + thread_ids = task.processed_items.map { |item| item[:thread_id] }.uniq + assert thread_ids.size > 1, "Expected multiple threads, got #{thread_ids.size}" + end + + test "raises error if process_item not implemented" do + task = Class.new(MaintenanceTasks::Task) do + include Parallelizable + + def collection + [1, 2, 3].each_slice(3) + end + end.new + + batch = task.collection.first + + error = assert_raises(NoMethodError) do + task.process(batch) + end + + assert_includes error.message, "must implement `process_item(item)`" + end + + test "propagates exceptions from threads" do + task = Class.new(MaintenanceTasks::Task) do + include Parallelizable + + def collection + [1, 2, 3, 4, 5].each_slice(5) + end + + def process_item(item) + # Sleep briefly to ensure all threads run + sleep(0.001) + raise StandardError, "Error processing item #{item}" if item == 3 + end + end.new + + batch = task.collection.first + + error = assert_raises(StandardError) do + task.process(batch) + end + + assert_includes error.message, "Error processing item 3" + assert_equal 3, task.instance_variable_get(:@errored_element) + end + + test "all threads complete even if one fails" do + task = Class.new(MaintenanceTasks::Task) do + include Parallelizable + + attr_reader :processed_items + + def initialize + super + @processed_items = Concurrent::Array.new + end + + def collection + [1, 2, 3, 4, 5].each_slice(5) + end + + def process_item(item) + sleep(0.01) # Ensure other threads have time to run + @processed_items << item + raise StandardError, "Error on item 3" if item == 3 + end + end.new + + batch = task.collection.first + + assert_raises(StandardError) do + task.process(batch) + end + + # All 5 threads should have attempted to process + # (though one failed, all should have been spawned) + assert task.processed_items.size >= 4, "Expected at least 4 items processed, got #{task.processed_items.size}" + end + + test "works with ActiveRecord::Relation batches" do + # Create a simple model for testing + Post.delete_all + 5.times { |i| Post.create!(title: "Post #{i}", content: "Content #{i}") } + + task = Class.new(MaintenanceTasks::Task) do + include Parallelizable + + attr_accessor :processed_ids + + def initialize + super + @processed_ids = Concurrent::Array.new + end + + def collection + Post.all.in_batches(of: 5) + end + + def process_item(post) + @processed_ids << post.id + end + end.new + + # Get first batch (AR::Relation) + first_batch = task.collection.first + task.process(first_batch) + + assert_equal 5, task.processed_ids.size + ensure + Post.delete_all + end + + test "works with batched CSV collections" do + csv_content = <<~CSV + name,age + Alice,30 + Bob,25 + Charlie,35 + David,28 + Eve,32 + CSV + + task = Class.new(MaintenanceTasks::Task) do + include Parallelizable + + csv_collection(in_batches: 3) + + attr_accessor :processed_names + + def initialize + super + @processed_names = Concurrent::Array.new + end + + def process_item(row) + @processed_names << row["name"] + end + end.new + + task.csv_content = csv_content + + # Get the collection + collection = task.collection + assert_kind_of BatchCsvCollectionBuilder::BatchCsv, collection + + # Simulate what job-iteration does + csv_enum = JobIteration::CsvEnumerator.new(collection.csv).batches( + batch_size: collection.batch_size, + cursor: nil + ) + + # Get first batch - job-iteration yields [rows, cursor] + # but only passes rows to process() + rows, _cursor = csv_enum.first + task.process(rows) + + assert_equal 3, task.processed_names.size + assert_includes task.processed_names, "Alice" + assert_includes task.processed_names, "Bob" + assert_includes task.processed_names, "Charlie" + end + + test "converts ActiveRecord::Relation to array" do + Post.delete_all + 3.times { |i| Post.create!(title: "Post #{i}", content: "Content") } + + task = Class.new(MaintenanceTasks::Task) do + include Parallelizable + + attr_accessor :processed_count + + def initialize + super + @processed_count = Concurrent::AtomicFixnum.new(0) + end + + def collection + Post.all.in_batches(of: 3) + end + + def process_item(post) + @processed_count.increment + end + end.new + + batch = task.collection.first + assert_kind_of ActiveRecord::Relation, batch + + task.process(batch) + assert_equal 3, task.processed_count.value + ensure + Post.delete_all + end + + test "handles plain arrays" do + task = Class.new(MaintenanceTasks::Task) do + include Parallelizable + + attr_accessor :sum + + def initialize + super + @sum = Concurrent::AtomicFixnum.new(0) + end + + def collection + [1, 2, 3, 4, 5].each_slice(5) + end + + def process_item(num) + @sum.update { |v| v + num } + end + end.new + + batch = task.collection.first + task.process(batch) + + assert_equal 15, task.sum.value + end + end + end +end diff --git a/test/models/maintenance_tasks/parallel_executor_test.rb b/test/models/maintenance_tasks/parallel_executor_test.rb new file mode 100644 index 00000000..9e21dda4 --- /dev/null +++ b/test/models/maintenance_tasks/parallel_executor_test.rb @@ -0,0 +1,108 @@ +# frozen_string_literal: true + +require "test_helper" + +module MaintenanceTasks + class ParallelExecutorTest < ActiveSupport::TestCase + test "executes block for each item in parallel" do + results = Concurrent::Array.new + + items = [1, 2, 3, 4, 5] + ParallelExecutor.execute(items) do |item| + sleep(0.001) # Simulate work + results << item + end + + assert_equal 5, results.size + assert_equal [1, 2, 3, 4, 5].sort, results.sort + end + + test "uses multiple threads" do + thread_ids = Concurrent::Array.new + + items = [1, 2, 3, 4, 5] + ParallelExecutor.execute(items) do |_item| + thread_ids << Thread.current.object_id + end + + # Should use more than one thread + assert thread_ids.uniq.size > 1, "Expected multiple threads, got #{thread_ids.uniq.size}" + end + + test "waits for all threads to complete" do + completion_order = Concurrent::Array.new + + items = [1, 2, 3] + ParallelExecutor.execute(items) do |item| + # Item 2 finishes last + sleep(0.01) if item == 2 + completion_order << item + end + + # All 3 items should be completed + assert_equal 3, completion_order.size + end + + test "raises first exception after all threads complete" do + processed = Concurrent::Array.new + + items = [1, 2, 3, 4, 5] + error = assert_raises(StandardError) do + ParallelExecutor.execute(items) do |item| + sleep(0.001) # Give other threads time to run + processed << item + raise StandardError, "Error on item #{item}" if item == 3 + end + end + + assert_equal "Error on item 3", error.message + + # All threads should have attempted to run + assert processed.size >= 4, "Expected at least 4 items processed" + end + + test "attaches errored item to exception" do + items = [1, 2, 3] + error = assert_raises(StandardError) do + ParallelExecutor.execute(items) do |item| + raise StandardError, "Error" if item == 2 + end + end + + assert_respond_to error, :errored_item + assert_equal 2, error.errored_item + end + + test "handles exceptions from ActiveRecord operations" do + Post.delete_all + post = Post.create!(title: "Test", content: "Content") + + items = [post.id, post.id + 1, post.id + 2] + error = assert_raises(ActiveRecord::RecordNotFound) do + ParallelExecutor.execute(items) do |post_id| + Post.find(post_id) # Will raise for non-existent IDs + end + end + + # Should capture the error + assert_kind_of ActiveRecord::RecordNotFound, error + ensure + Post.delete_all + end + + test "each thread gets own database connection" do + connection_ids = Concurrent::Array.new + + items = [1, 2, 3, 4, 5] + ParallelExecutor.execute(items) do |_item| + # Each thread should have its own connection from the pool + conn = ActiveRecord::Base.connection + connection_ids << conn.object_id + end + + # Connections may be reused from the pool, but should be managed safely + # At minimum, verify we didn't crash due to connection issues + assert_equal 5, connection_ids.size + end + end +end