Skip to content

Commit aa2bda9

Browse files
authored
Support for concurrency. (#8)
* Support for concurrency. * debug cleanup
1 parent f4cd341 commit aa2bda9

File tree

5 files changed

+83
-23
lines changed

5 files changed

+83
-23
lines changed

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ RedisSingleFile.configuration do |config|
3636
# config.port = '6379'
3737
# config.name = 'default'
3838
# config.expire_in = 300
39+
# config.concurrency = 1
3940
end
4041
```
4142

@@ -65,6 +66,14 @@ end
6566
end
6667
```
6768

69+
#### Support concurrent worker processing
70+
```ruby
71+
semaphore = RedisSingleFile.new(name: :concurrent_queue)
72+
semaphore.synchronize(concurrency: 3) do
73+
# synchronized logic defined here...
74+
end
75+
```
76+
6877
#### Use your own redis client instance
6978
```ruby
7079
redis = Redis.new(...)

lib/redis_single_file/configuration.rb

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class Configuration
2525
DEFAULT_EXPIRE_IN = 300 # 5 mins
2626
DEFAULT_MUTEX_KEY = 'RedisSingleFile/Mutex/%s'
2727
DEFAULT_QUEUE_KEY = 'RedisSingleFile/Queue/%s'
28+
DEFAULT_CONCURRENCY = 1 # single slot enabled
2829

2930
# class delegation methods to singleton instance
3031
#
@@ -33,13 +34,21 @@ class Configuration
3334
# Configuration.port => Configuration.instance.port
3435
#
3536
class << self
36-
%i[host port name expire_in mutex_key queue_key].each do |attr|
37+
%i[
38+
host
39+
port
40+
name
41+
expire_in
42+
concurrency
43+
mutex_key
44+
queue_key
45+
].each do |attr|
3746
define_method(attr) { instance.send(attr) }
3847
end
3948
end
4049

4150
# writers used in config block to set new values
42-
attr_writer :host, :port, :name, :expire_in
51+
attr_writer :host, :port, :name, :expire_in, :concurrency
4352

4453
# @return [String] redis server hostname value
4554
def host = @host || DEFAULT_HOST
@@ -53,6 +62,9 @@ def name = @name || DEFAULT_NAME
5362
# @return [String] redis keys expiration value
5463
def expire_in = @expire_in || DEFAULT_EXPIRE_IN
5564

65+
# @return [String] redis lock concurrency value
66+
def concurrency = @concurrency || DEFAULT_CONCURRENCY
67+
5668
# @note This attr is not configurable
5769
# @return [String] synchronization mutex key name
5870
def mutex_key = @mutex_key || DEFAULT_MUTEX_KEY

lib/redis_single_file/semaphore.rb

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ module RedisSingleFile
1212
# @attr name [String] custom sync queue name
1313
# @attr host [String] host for redis server
1414
# @attr port [String] port for redis server
15+
# @attr concurrency [Integer] simultaneous slots allowed
1516
#
1617
# @example Default lock name and infinite blocking
1718
# semaphore = RedisSingleFile::Semaphore.new
@@ -65,23 +66,26 @@ def initialize(
6566
redis: nil, # provide your own redis instance
6667
name: Configuration.name, # designate queue name per session
6768
host: Configuration.host, # designate redis host per session
68-
port: Configuration.port # designate redis port per session
69+
port: Configuration.port, # designate redis port per session
70+
concurrency: Configuration.concurrency # concurrent workers
6971
)
7072
@redis = redis || Redis.new(host:, port:)
7173

7274
@mutex_val = name
7375
@mutex_key = format(Configuration.mutex_key, @mutex_val)
7476
@queue_key = format(Configuration.queue_key, @mutex_val)
77+
@concurrency = concurrency.to_i
7578
end
7679

7780
# Queues up client and waits for turn to execute. Returns nil
7881
# when queue wait time expires.
7982
#
8083
# @param timeout [Integer] seconds for client to wait in queue
84+
# @param concurrency [Integer] override concurrent workers
8185
# @yieldreturn [...] response from synchronized block execution
8286
# @return [nil] redis blpop timeout
83-
def synchronize(timeout: 0, &)
84-
synchronize!(timeout:, &)
87+
def synchronize(timeout: 0, concurrency: @concurrency, &)
88+
synchronize!(timeout:, concurrency:, &)
8589
rescue QueueTimeoutError => _e
8690
nil
8791
end
@@ -90,14 +94,15 @@ def synchronize(timeout: 0, &)
9094
# when queue wait time expires.
9195
#
9296
# @param timeout [Integer] seconds for blpop to wait in queue
97+
# @param concurrency [Integer] override concurrent workers
9398
# @yieldreturn [...] response from synchronized block execution
9499
# @raise [QueueTimeoutError] redis blpop timeout
95-
def synchronize!(timeout: 0)
100+
def synchronize!(timeout: 0, concurrency: @concurrency)
96101
return unless block_given?
97102

98103
with_retry_protection do
99-
prime_queue unless redis.getset(mutex_key, mutex_val)
100-
raise QueueTimeoutError unless redis.blpop(queue_key, timeout:)
104+
prime_queue(concurrency) unless redis.getset(mutex_key, mutex_val)
105+
raise QueueTimeoutError unless redis.blpop(queue_key, timeout:)
101106

102107
redis.multi do
103108
redis.persist(mutex_key) # unexpire during execution
@@ -108,7 +113,7 @@ def synchronize!(timeout: 0)
108113
yield
109114
ensure
110115
# always cycle the queue when exiting
111-
unlock_queue if block_given?
116+
unlock_queue(concurrency) if block_given?
112117
end
113118

114119
private #===================================================================
@@ -119,20 +124,22 @@ def expire_in
119124
@expire_in ||= Configuration.expire_in
120125
end
121126

122-
def prime_queue
127+
def prime_queue(concurrency)
123128
with_retry_protection do
124129
redis.multi do
125-
redis.del(queue_key) # remove existing queue
126-
redis.lpush(queue_key, '1') # create and prime new queue
130+
redis.del(queue_key) # remove existing queue
131+
concurrency.times do # create and prime new queue
132+
redis.lpush(queue_key, '1')
133+
end
127134
end
128135
end
129136
end
130137

131-
def unlock_queue
138+
def unlock_queue(concurrency)
132139
with_retry_protection do
133140
redis.multi do
134-
# queue next client execution if queue is empty
135-
redis.lpush(queue_key, '1') if redis.llen(queue_key) == 0
141+
# queue next client execution if queue has space (concurrency)
142+
redis.lpush(queue_key, '1') if redis.llen(queue_key) < concurrency
136143
redis.expire(mutex_key, expire_in) # set expiration for auto removal
137144
redis.expire(queue_key, expire_in) # set expiration for auto removal
138145
end

redis-single-file.gemspec

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ Gem::Specification.new do |spec|
3636
end
3737
end
3838

39-
# Identify Gem Exectuables
40-
spec.bindir = 'exe'
41-
spec.executables = spec.files.grep(%r{\Aexe/}) { |f| File.basename(f) }
39+
# Identify Gem Executables
40+
# spec.bindir = 'exe'
41+
# spec.executables = spec.files.grep(%r{\Aexe/}) { |f| File.basename(f) }
4242

4343
# Redis Single File Dependencies
4444
spec.add_dependency 'redis', '~> 5.3.0'

spec/redis_single_file/semaphore_spec.rb

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,25 +37,41 @@
3737
expect(Redis).to have_received(:new).with(host: 'localhost', port: '1234')
3838
end
3939

40-
it '#synchronize method calls synchronize! with default timeout' do
40+
it '#synchronize method calls synchronize! with default timeout/concurrency' do
4141
semaphore = described_class.new
42-
expect(semaphore).to receive(:synchronize!).with(timeout: 0)
42+
allow(semaphore).to receive(:synchronize!)
4343

4444
semaphore.synchronize { nil }
45+
expect(semaphore).to(
46+
have_received(:synchronize!).with(timeout: 0, concurrency: 1)
47+
)
4548
end
4649

4750
it '#synchronize method calls synchronize! with provided timeout' do
4851
semaphore = described_class.new
49-
expect(semaphore).to receive(:synchronize!).with(timeout: 15)
52+
allow(semaphore).to receive(:synchronize!)
5053

5154
semaphore.synchronize(timeout: 15) { nil }
55+
expect(semaphore).to(
56+
have_received(:synchronize!).with(timeout: 15, concurrency: 1)
57+
)
58+
end
59+
60+
it '#synchronize method calls synchronize! with provided concurrency' do
61+
semaphore = described_class.new
62+
allow(semaphore).to receive(:synchronize!)
63+
64+
semaphore.synchronize(concurrency: 3) { nil }
65+
expect(semaphore).to(
66+
have_received(:synchronize!).with(timeout: 0, concurrency: 3)
67+
)
5268
end
5369

5470
it '#synchronize returns nil on timeout' do
5571
semaphore = described_class.new
5672
expect(semaphore).to(
5773
receive(:synchronize!)
58-
.with(timeout: 0)
74+
.with(timeout: 0, concurrency: 1)
5975
.and_raise(RedisSingleFile::QueueTimeoutError)
6076
)
6177

@@ -76,6 +92,7 @@
7692
it '#synchronize! primes the queue when first client' do
7793
expect(redis_mock).to receive(:del)
7894
expect(redis_mock).to receive(:lpush)
95+
expect(redis_mock).to receive(:llen).and_return(1)
7996
expect(redis_mock).to receive(:blpop).and_return('1')
8097

8198
semaphore = described_class.new
@@ -84,9 +101,22 @@
84101
expect(result).to eq('test body')
85102
end
86103

104+
it '#synchronize! primes the queue with concurrency when first client' do
105+
expect(redis_mock).to receive(:del)
106+
expect(redis_mock).to receive(:lpush).exactly(3).times
107+
expect(redis_mock).to receive(:llen).and_return(3)
108+
expect(redis_mock).to receive(:blpop).and_return('1')
109+
110+
semaphore = described_class.new
111+
result = semaphore.synchronize!(concurrency: 3) { 'test body' }
112+
113+
expect(result).to eq('test body')
114+
end
115+
87116
it '#synchronize! skips priming the queue when not first client' do
88117
expect(redis_mock).not_to receive(:del)
89118
expect(redis_mock).not_to receive(:lpush)
119+
expect(redis_mock).to receive(:llen).and_return(1)
90120
expect(redis_mock).to receive(:getset).and_return('1')
91121
expect(redis_mock).to receive(:blpop).and_return('1')
92122

@@ -99,6 +129,7 @@
99129
it '#synchronize! persists redis keys when executing block' do
100130
expect(redis_mock).to receive(:blpop).and_return('1')
101131
expect(redis_mock).to receive(:persist).twice
132+
expect(redis_mock).to receive(:llen).and_return(1)
102133

103134
semaphore = described_class.new
104135
result = semaphore.synchronize! { 'test body' }
@@ -108,7 +139,8 @@
108139

109140
it '#synchronize! unlocks queue when exiting' do
110141
expect(redis_mock).to receive(:blpop).and_return('1')
111-
expect(redis_mock).to receive(:lpush)
142+
expect(redis_mock).to receive(:lpush).twice
143+
expect(redis_mock).to receive(:llen).and_return(0)
112144
expect(redis_mock).to receive(:expire).twice
113145

114146
semaphore = described_class.new

0 commit comments

Comments
 (0)