Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ RedisSingleFile.configuration do |config|
# config.port = '6379'
# config.name = 'default'
# config.expire_in = 300
# config.concurrency = 1
end
```

Expand Down Expand Up @@ -65,6 +66,14 @@ end
end
```

#### Support concurrent worker processing
```ruby
semaphore = RedisSingleFile.new(name: :concurrent_queue)
semaphore.synchronize(concurrency: 3) do
# synchronized logic defined here...
end
```

#### Use your own redis client instance
```ruby
redis = Redis.new(...)
Expand Down
16 changes: 14 additions & 2 deletions lib/redis_single_file/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Configuration
DEFAULT_EXPIRE_IN = 300 # 5 mins
DEFAULT_MUTEX_KEY = 'RedisSingleFile/Mutex/%s'
DEFAULT_QUEUE_KEY = 'RedisSingleFile/Queue/%s'
DEFAULT_CONCURRENCY = 1 # single slot enabled

# class delegation methods to singleton instance
#
Expand All @@ -33,13 +34,21 @@ class Configuration
# Configuration.port => Configuration.instance.port
#
class << self
%i[host port name expire_in mutex_key queue_key].each do |attr|
%i[
host
port
name
expire_in
concurrency
mutex_key
queue_key
].each do |attr|
define_method(attr) { instance.send(attr) }
end
end

# writers used in config block to set new values
attr_writer :host, :port, :name, :expire_in
attr_writer :host, :port, :name, :expire_in, :concurrency

# @return [String] redis server hostname value
def host = @host || DEFAULT_HOST
Expand All @@ -53,6 +62,9 @@ def name = @name || DEFAULT_NAME
# @return [String] redis keys expiration value
def expire_in = @expire_in || DEFAULT_EXPIRE_IN

# @return [String] redis lock concurrency value
def concurrency = @concurrency || DEFAULT_CONCURRENCY

# @note This attr is not configurable
# @return [String] synchronization mutex key name
def mutex_key = @mutex_key || DEFAULT_MUTEX_KEY
Expand Down
33 changes: 20 additions & 13 deletions lib/redis_single_file/semaphore.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module RedisSingleFile
# @attr name [String] custom sync queue name
# @attr host [String] host for redis server
# @attr port [String] port for redis server
# @attr concurrency [Integer] simultaneous slots allowed
#
# @example Default lock name and infinite blocking
# semaphore = RedisSingleFile::Semaphore.new
Expand Down Expand Up @@ -65,23 +66,26 @@ def initialize(
redis: nil, # provide your own redis instance
name: Configuration.name, # designate queue name per session
host: Configuration.host, # designate redis host per session
port: Configuration.port # designate redis port per session
port: Configuration.port, # designate redis port per session
concurrency: Configuration.concurrency # concurrent workers
)
@redis = redis || Redis.new(host:, port:)

@mutex_val = name
@mutex_key = format(Configuration.mutex_key, @mutex_val)
@queue_key = format(Configuration.queue_key, @mutex_val)
@concurrency = concurrency.to_i
end

# Queues up client and waits for turn to execute. Returns nil
# when queue wait time expires.
#
# @param timeout [Integer] seconds for client to wait in queue
# @param concurrency [Integer] override concurrent workers
# @yieldreturn [...] response from synchronized block execution
# @return [nil] redis blpop timeout
def synchronize(timeout: 0, &)
synchronize!(timeout:, &)
def synchronize(timeout: 0, concurrency: @concurrency, &)
synchronize!(timeout:, concurrency:, &)
rescue QueueTimeoutError => _e
nil
end
Expand All @@ -90,14 +94,15 @@ def synchronize(timeout: 0, &)
# when queue wait time expires.
#
# @param timeout [Integer] seconds for blpop to wait in queue
# @param concurrency [Integer] override concurrent workers
# @yieldreturn [...] response from synchronized block execution
# @raise [QueueTimeoutError] redis blpop timeout
def synchronize!(timeout: 0)
def synchronize!(timeout: 0, concurrency: @concurrency)
return unless block_given?

with_retry_protection do
prime_queue unless redis.getset(mutex_key, mutex_val)
raise QueueTimeoutError unless redis.blpop(queue_key, timeout:)
prime_queue(concurrency) unless redis.getset(mutex_key, mutex_val)
raise QueueTimeoutError unless redis.blpop(queue_key, timeout:)

redis.multi do
redis.persist(mutex_key) # unexpire during execution
Expand All @@ -108,7 +113,7 @@ def synchronize!(timeout: 0)
yield
ensure
# always cycle the queue when exiting
unlock_queue if block_given?
unlock_queue(concurrency) if block_given?
end

private #===================================================================
Expand All @@ -119,20 +124,22 @@ def expire_in
@expire_in ||= Configuration.expire_in
end

def prime_queue
def prime_queue(concurrency)
with_retry_protection do
redis.multi do
redis.del(queue_key) # remove existing queue
redis.lpush(queue_key, '1') # create and prime new queue
redis.del(queue_key) # remove existing queue
concurrency.times do # create and prime new queue
redis.lpush(queue_key, '1')
end
end
end
end

def unlock_queue
def unlock_queue(concurrency)
with_retry_protection do
redis.multi do
# queue next client execution if queue is empty
redis.lpush(queue_key, '1') if redis.llen(queue_key) == 0
# queue next client execution if queue has space (concurrency)
redis.lpush(queue_key, '1') if redis.llen(queue_key) < concurrency
redis.expire(mutex_key, expire_in) # set expiration for auto removal
redis.expire(queue_key, expire_in) # set expiration for auto removal
end
Expand Down
6 changes: 3 additions & 3 deletions redis-single-file.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ Gem::Specification.new do |spec|
end
end

# Identify Gem Exectuables
spec.bindir = 'exe'
spec.executables = spec.files.grep(%r{\Aexe/}) { |f| File.basename(f) }
# Identify Gem Executables
# spec.bindir = 'exe'
# spec.executables = spec.files.grep(%r{\Aexe/}) { |f| File.basename(f) }

# Redis Single File Dependencies
spec.add_dependency 'redis', '~> 5.3.0'
Expand Down
42 changes: 37 additions & 5 deletions spec/redis_single_file/semaphore_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,41 @@
expect(Redis).to have_received(:new).with(host: 'localhost', port: '1234')
end

it '#synchronize method calls synchronize! with default timeout' do
it '#synchronize method calls synchronize! with default timeout/concurrency' do
semaphore = described_class.new
expect(semaphore).to receive(:synchronize!).with(timeout: 0)
allow(semaphore).to receive(:synchronize!)

semaphore.synchronize { nil }
expect(semaphore).to(
have_received(:synchronize!).with(timeout: 0, concurrency: 1)
)
end

it '#synchronize method calls synchronize! with provided timeout' do
semaphore = described_class.new
expect(semaphore).to receive(:synchronize!).with(timeout: 15)
allow(semaphore).to receive(:synchronize!)

semaphore.synchronize(timeout: 15) { nil }
expect(semaphore).to(
have_received(:synchronize!).with(timeout: 15, concurrency: 1)
)
end

it '#synchronize method calls synchronize! with provided concurrency' do
semaphore = described_class.new
allow(semaphore).to receive(:synchronize!)

semaphore.synchronize(concurrency: 3) { nil }
expect(semaphore).to(
have_received(:synchronize!).with(timeout: 0, concurrency: 3)
)
end

it '#synchronize returns nil on timeout' do
semaphore = described_class.new
expect(semaphore).to(
receive(:synchronize!)
.with(timeout: 0)
.with(timeout: 0, concurrency: 1)
.and_raise(RedisSingleFile::QueueTimeoutError)
)

Expand All @@ -76,6 +92,7 @@
it '#synchronize! primes the queue when first client' do
expect(redis_mock).to receive(:del)
expect(redis_mock).to receive(:lpush)
expect(redis_mock).to receive(:llen).and_return(1)
expect(redis_mock).to receive(:blpop).and_return('1')

semaphore = described_class.new
Expand All @@ -84,9 +101,22 @@
expect(result).to eq('test body')
end

it '#synchronize! primes the queue with concurrency when first client' do
expect(redis_mock).to receive(:del)
expect(redis_mock).to receive(:lpush).exactly(3).times
expect(redis_mock).to receive(:llen).and_return(3)
expect(redis_mock).to receive(:blpop).and_return('1')

semaphore = described_class.new
result = semaphore.synchronize!(concurrency: 3) { 'test body' }

expect(result).to eq('test body')
end

it '#synchronize! skips priming the queue when not first client' do
expect(redis_mock).not_to receive(:del)
expect(redis_mock).not_to receive(:lpush)
expect(redis_mock).to receive(:llen).and_return(1)
expect(redis_mock).to receive(:getset).and_return('1')
expect(redis_mock).to receive(:blpop).and_return('1')

Expand All @@ -99,6 +129,7 @@
it '#synchronize! persists redis keys when executing block' do
expect(redis_mock).to receive(:blpop).and_return('1')
expect(redis_mock).to receive(:persist).twice
expect(redis_mock).to receive(:llen).and_return(1)

semaphore = described_class.new
result = semaphore.synchronize! { 'test body' }
Expand All @@ -108,7 +139,8 @@

it '#synchronize! unlocks queue when exiting' do
expect(redis_mock).to receive(:blpop).and_return('1')
expect(redis_mock).to receive(:lpush)
expect(redis_mock).to receive(:lpush).twice
expect(redis_mock).to receive(:llen).and_return(0)
expect(redis_mock).to receive(:expire).twice

semaphore = described_class.new
Expand Down
Loading