diff --git a/README.md b/README.md index e1bd8ab..1ea4cb1 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,7 @@ RedisSingleFile.configuration do |config| # config.port = '6379' # config.name = 'default' # config.expire_in = 300 + # config.concurrency = 1 end ``` @@ -65,6 +66,14 @@ end end ``` +#### Support concurrent worker processing +```ruby + semaphore = RedisSingleFile.new(name: :concurrent_queue) + semaphore.synchronize(concurrency: 3) do + # synchronized logic defined here... + end +``` + #### Use your own redis client instance ```ruby redis = Redis.new(...) diff --git a/lib/redis_single_file/configuration.rb b/lib/redis_single_file/configuration.rb index 04ecbf1..13894a9 100644 --- a/lib/redis_single_file/configuration.rb +++ b/lib/redis_single_file/configuration.rb @@ -25,6 +25,7 @@ class Configuration DEFAULT_EXPIRE_IN = 300 # 5 mins DEFAULT_MUTEX_KEY = 'RedisSingleFile/Mutex/%s' DEFAULT_QUEUE_KEY = 'RedisSingleFile/Queue/%s' + DEFAULT_CONCURRENCY = 1 # single slot enabled # class delegation methods to singleton instance # @@ -33,13 +34,21 @@ class Configuration # Configuration.port => Configuration.instance.port # class << self - %i[host port name expire_in mutex_key queue_key].each do |attr| + %i[ + host + port + name + expire_in + concurrency + mutex_key + queue_key + ].each do |attr| define_method(attr) { instance.send(attr) } end end # writers used in config block to set new values - attr_writer :host, :port, :name, :expire_in + attr_writer :host, :port, :name, :expire_in, :concurrency # @return [String] redis server hostname value def host = @host || DEFAULT_HOST @@ -53,6 +62,9 @@ def name = @name || DEFAULT_NAME # @return [String] redis keys expiration value def expire_in = @expire_in || DEFAULT_EXPIRE_IN + # @return [String] redis lock concurrency value + def concurrency = @concurrency || DEFAULT_CONCURRENCY + # @note This attr is not configurable # @return [String] synchronization mutex key name def mutex_key = @mutex_key || DEFAULT_MUTEX_KEY diff --git a/lib/redis_single_file/semaphore.rb b/lib/redis_single_file/semaphore.rb index fb233c1..a08561d 100644 --- a/lib/redis_single_file/semaphore.rb +++ b/lib/redis_single_file/semaphore.rb @@ -12,6 +12,7 @@ module RedisSingleFile # @attr name [String] custom sync queue name # @attr host [String] host for redis server # @attr port [String] port for redis server + # @attr concurrency [Integer] simultaneous slots allowed # # @example Default lock name and infinite blocking # semaphore = RedisSingleFile::Semaphore.new @@ -65,23 +66,26 @@ def initialize( redis: nil, # provide your own redis instance name: Configuration.name, # designate queue name per session host: Configuration.host, # designate redis host per session - port: Configuration.port # designate redis port per session + port: Configuration.port, # designate redis port per session + concurrency: Configuration.concurrency # concurrent workers ) @redis = redis || Redis.new(host:, port:) @mutex_val = name @mutex_key = format(Configuration.mutex_key, @mutex_val) @queue_key = format(Configuration.queue_key, @mutex_val) + @concurrency = concurrency.to_i end # Queues up client and waits for turn to execute. Returns nil # when queue wait time expires. # # @param timeout [Integer] seconds for client to wait in queue + # @param concurrency [Integer] override concurrent workers # @yieldreturn [...] response from synchronized block execution # @return [nil] redis blpop timeout - def synchronize(timeout: 0, &) - synchronize!(timeout:, &) + def synchronize(timeout: 0, concurrency: @concurrency, &) + synchronize!(timeout:, concurrency:, &) rescue QueueTimeoutError => _e nil end @@ -90,14 +94,15 @@ def synchronize(timeout: 0, &) # when queue wait time expires. # # @param timeout [Integer] seconds for blpop to wait in queue + # @param concurrency [Integer] override concurrent workers # @yieldreturn [...] response from synchronized block execution # @raise [QueueTimeoutError] redis blpop timeout - def synchronize!(timeout: 0) + def synchronize!(timeout: 0, concurrency: @concurrency) return unless block_given? with_retry_protection do - prime_queue unless redis.getset(mutex_key, mutex_val) - raise QueueTimeoutError unless redis.blpop(queue_key, timeout:) + prime_queue(concurrency) unless redis.getset(mutex_key, mutex_val) + raise QueueTimeoutError unless redis.blpop(queue_key, timeout:) redis.multi do redis.persist(mutex_key) # unexpire during execution @@ -108,7 +113,7 @@ def synchronize!(timeout: 0) yield ensure # always cycle the queue when exiting - unlock_queue if block_given? + unlock_queue(concurrency) if block_given? end private #=================================================================== @@ -119,20 +124,22 @@ def expire_in @expire_in ||= Configuration.expire_in end - def prime_queue + def prime_queue(concurrency) with_retry_protection do redis.multi do - redis.del(queue_key) # remove existing queue - redis.lpush(queue_key, '1') # create and prime new queue + redis.del(queue_key) # remove existing queue + concurrency.times do # create and prime new queue + redis.lpush(queue_key, '1') + end end end end - def unlock_queue + def unlock_queue(concurrency) with_retry_protection do redis.multi do - # queue next client execution if queue is empty - redis.lpush(queue_key, '1') if redis.llen(queue_key) == 0 + # queue next client execution if queue has space (concurrency) + redis.lpush(queue_key, '1') if redis.llen(queue_key) < concurrency redis.expire(mutex_key, expire_in) # set expiration for auto removal redis.expire(queue_key, expire_in) # set expiration for auto removal end diff --git a/redis-single-file.gemspec b/redis-single-file.gemspec index 157b7f1..13ad5af 100644 --- a/redis-single-file.gemspec +++ b/redis-single-file.gemspec @@ -36,9 +36,9 @@ Gem::Specification.new do |spec| end end - # Identify Gem Exectuables - spec.bindir = 'exe' - spec.executables = spec.files.grep(%r{\Aexe/}) { |f| File.basename(f) } + # Identify Gem Executables + # spec.bindir = 'exe' + # spec.executables = spec.files.grep(%r{\Aexe/}) { |f| File.basename(f) } # Redis Single File Dependencies spec.add_dependency 'redis', '~> 5.3.0' diff --git a/spec/redis_single_file/semaphore_spec.rb b/spec/redis_single_file/semaphore_spec.rb index 08525fd..97f1a1a 100644 --- a/spec/redis_single_file/semaphore_spec.rb +++ b/spec/redis_single_file/semaphore_spec.rb @@ -37,25 +37,41 @@ expect(Redis).to have_received(:new).with(host: 'localhost', port: '1234') end - it '#synchronize method calls synchronize! with default timeout' do + it '#synchronize method calls synchronize! with default timeout/concurrency' do semaphore = described_class.new - expect(semaphore).to receive(:synchronize!).with(timeout: 0) + allow(semaphore).to receive(:synchronize!) semaphore.synchronize { nil } + expect(semaphore).to( + have_received(:synchronize!).with(timeout: 0, concurrency: 1) + ) end it '#synchronize method calls synchronize! with provided timeout' do semaphore = described_class.new - expect(semaphore).to receive(:synchronize!).with(timeout: 15) + allow(semaphore).to receive(:synchronize!) semaphore.synchronize(timeout: 15) { nil } + expect(semaphore).to( + have_received(:synchronize!).with(timeout: 15, concurrency: 1) + ) + end + + it '#synchronize method calls synchronize! with provided concurrency' do + semaphore = described_class.new + allow(semaphore).to receive(:synchronize!) + + semaphore.synchronize(concurrency: 3) { nil } + expect(semaphore).to( + have_received(:synchronize!).with(timeout: 0, concurrency: 3) + ) end it '#synchronize returns nil on timeout' do semaphore = described_class.new expect(semaphore).to( receive(:synchronize!) - .with(timeout: 0) + .with(timeout: 0, concurrency: 1) .and_raise(RedisSingleFile::QueueTimeoutError) ) @@ -76,6 +92,7 @@ it '#synchronize! primes the queue when first client' do expect(redis_mock).to receive(:del) expect(redis_mock).to receive(:lpush) + expect(redis_mock).to receive(:llen).and_return(1) expect(redis_mock).to receive(:blpop).and_return('1') semaphore = described_class.new @@ -84,9 +101,22 @@ expect(result).to eq('test body') end + it '#synchronize! primes the queue with concurrency when first client' do + expect(redis_mock).to receive(:del) + expect(redis_mock).to receive(:lpush).exactly(3).times + expect(redis_mock).to receive(:llen).and_return(3) + expect(redis_mock).to receive(:blpop).and_return('1') + + semaphore = described_class.new + result = semaphore.synchronize!(concurrency: 3) { 'test body' } + + expect(result).to eq('test body') + end + it '#synchronize! skips priming the queue when not first client' do expect(redis_mock).not_to receive(:del) expect(redis_mock).not_to receive(:lpush) + expect(redis_mock).to receive(:llen).and_return(1) expect(redis_mock).to receive(:getset).and_return('1') expect(redis_mock).to receive(:blpop).and_return('1') @@ -99,6 +129,7 @@ it '#synchronize! persists redis keys when executing block' do expect(redis_mock).to receive(:blpop).and_return('1') expect(redis_mock).to receive(:persist).twice + expect(redis_mock).to receive(:llen).and_return(1) semaphore = described_class.new result = semaphore.synchronize! { 'test body' } @@ -108,7 +139,8 @@ it '#synchronize! unlocks queue when exiting' do expect(redis_mock).to receive(:blpop).and_return('1') - expect(redis_mock).to receive(:lpush) + expect(redis_mock).to receive(:lpush).twice + expect(redis_mock).to receive(:llen).and_return(0) expect(redis_mock).to receive(:expire).twice semaphore = described_class.new