Skip to content

Commit 7a11bb1

Browse files
committed
Ensure that cursor requiring getmore ops will not
be affected by replica set refresh. Prep for sending commands to secondaries.
1 parent 1aad8d1 commit 7a11bb1

File tree

8 files changed

+183
-94
lines changed

8 files changed

+183
-94
lines changed

lib/mongo/connection.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,13 @@ def read_primary?
565565
end
566566
alias :primary? :read_primary?
567567

568+
# The socket pool that this connection reads from.
569+
#
570+
# @return [Mongo::Pool]
571+
def read_pool
572+
@primary_pool
573+
end
574+
568575
# The value of the read preference. Because
569576
# this is a single-node connection, the value
570577
# is +:primary+, and the connection will read

lib/mongo/cursor.rb

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ def initialize(collection, opts={})
9898
else
9999
@command = false
100100
end
101+
102+
@checkin_read_pool = false
103+
@checkin_connection = false
104+
@read_pool = nil
101105
end
102106

103107
# Guess whether the cursor is alive on the server.
@@ -460,10 +464,15 @@ def refresh
460464
def send_initial_query
461465
message = construct_query_message
462466
payload = instrument_payload if @logger
467+
sock = @socket || checkout_socket_from_connection
463468
instrument(:find, payload) do
469+
begin
464470
results, @n_received, @cursor_id = @connection.receive_message(
465-
Mongo::Constants::OP_QUERY, message, nil, @socket, @command,
466-
@read_preference, @options & OP_QUERY_EXHAUST != 0)
471+
Mongo::Constants::OP_QUERY, message, nil, sock, @command,
472+
nil, @options & OP_QUERY_EXHAUST != 0)
473+
ensure
474+
checkin_socket(sock) unless @socket
475+
end
467476
@returned += @n_received
468477
@cache += results
469478
@query_run = true
@@ -491,13 +500,63 @@ def send_get_more
491500
# Cursor id.
492501
message.put_long(@cursor_id)
493502
log(:debug, "cursor.refresh() for cursor #{@cursor_id}") if @logger
503+
sock = @socket || checkout_socket_for_op_get_more
504+
505+
begin
494506
results, @n_received, @cursor_id = @connection.receive_message(
495-
Mongo::Constants::OP_GET_MORE, message, nil, @socket, @command, @read_preference)
507+
Mongo::Constants::OP_GET_MORE, message, nil, sock, @command, nil)
508+
ensure
509+
checkin_socket(sock) unless @socket
510+
end
496511
@returned += @n_received
497512
@cache += results
498513
close_cursor_if_query_complete
499514
end
500515

516+
def checkout_socket_from_connection
517+
@checkin_connection = true
518+
if @read_preference == :primary
519+
@connection.checkout_writer
520+
else
521+
@read_pool = @connection.read_pool
522+
@connection.checkout_reader
523+
end
524+
end
525+
526+
def checkout_socket_for_op_get_more
527+
if @read_pool && (@read_pool != @connection.read_pool)
528+
checkout_socket_from_read_pool
529+
else
530+
checkout_socket_from_connection
531+
end
532+
end
533+
534+
def checkout_socket_from_read_pool
535+
new_pool = @connection.secondary_pools.detect do |pool|
536+
pool.host == @read_pool.host && pool.port == @read_pool.port
537+
end
538+
if new_pool
539+
@read_pool = new_pool
540+
sock = new_pool.checkout
541+
@checkin_read_pool = true
542+
return sock
543+
else
544+
raise Mongo::OperationFailure, "Failure to continue iterating " +
545+
"cursor because the the replica set member persisting this " +
546+
" cursor cannot be found."
547+
end
548+
end
549+
550+
def checkin_socket(sock)
551+
if @checkin_read_pool
552+
@read_pool.checkin(sock)
553+
@checkin_read_pool = false
554+
elsif @checkin_connection
555+
@connection.checkin(sock)
556+
@checkin_connection = false
557+
end
558+
end
559+
501560
def construct_query_message
502561
message = BSON::ByteBuffer.new
503562
message.put_int(@options)

lib/mongo/repl_set_connection.rb

Lines changed: 75 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ def initialize(*args)
119119
# Refresh
120120
@refresh_mode = opts.fetch(:refresh_mode, :sync)
121121
@refresh_interval = opts[:refresh_interval] || 90
122+
@last_refresh = Time.now
122123

123124
if ![:sync, :async, false].include?(@refresh_mode)
124125
raise MongoArgumentError,
@@ -339,6 +340,80 @@ def logout_pools(db)
339340
end
340341
end
341342

343+
# Checkout a socket for reading (i.e., a secondary node).
344+
# Note that @read_pool might point to the primary pool
345+
# if no read pool has been defined.
346+
def checkout_reader
347+
connect unless connected?
348+
socket = get_socket_from_pool(@read_pool)
349+
350+
if !socket
351+
refresh
352+
socket = get_socket_from_pool(@primary_pool)
353+
end
354+
355+
if socket
356+
socket
357+
else
358+
raise ConnectionFailure.new("Could not connect to a node for reading.")
359+
end
360+
end
361+
362+
# Checkout a socket for writing (i.e., a primary node).
363+
def checkout_writer
364+
connect unless connected?
365+
socket = get_socket_from_pool(@primary_pool)
366+
367+
if !socket
368+
refresh
369+
socket = get_socket_from_pool(@primary_pool)
370+
end
371+
372+
if socket
373+
socket
374+
else
375+
raise ConnectionFailure.new("Could not connect to primary node.")
376+
end
377+
end
378+
379+
def checkin(socket)
380+
sync_synchronize(:SH) do
381+
if pool = @sockets_to_pools[socket]
382+
pool.checkin(socket)
383+
elsif socket
384+
begin
385+
socket.close
386+
rescue IOError
387+
log(:info, "Tried to close socket #{socket} but already closed.")
388+
end
389+
end
390+
end
391+
392+
# Refresh synchronously every @refresh_interval seconds
393+
# if synchronous refresh mode is enabled.
394+
if @refresh_mode == :sync &&
395+
((Time.now - @last_refresh) > @refresh_interval)
396+
refresh
397+
@last_refresh = Time.now
398+
end
399+
end
400+
401+
def get_socket_from_pool(pool)
402+
begin
403+
sync_synchronize(:SH) do
404+
if pool
405+
socket = pool.checkout
406+
@sockets_to_pools[socket] = pool
407+
socket
408+
end
409+
end
410+
411+
rescue ConnectionFailure => ex
412+
log(:info, "Failed to checkout from #{pool} with #{ex.class}; #{ex.message}")
413+
return nil
414+
end
415+
end
416+
342417
private
343418

344419
# Given a pool manager, update this connection's
@@ -377,25 +452,6 @@ def initiate_refresh_mode
377452
@last_refresh = Time.now
378453
end
379454

380-
# Checkout a socket for reading (i.e., a secondary node).
381-
# Note that @read_pool might point to the primary pool
382-
# if no read pool has been defined.
383-
def checkout_reader
384-
connect unless connected?
385-
socket = get_socket_from_pool(@read_pool)
386-
387-
if !socket
388-
refresh
389-
socket = get_socket_from_pool(@primary_pool)
390-
end
391-
392-
if socket
393-
socket
394-
else
395-
raise ConnectionFailure.new("Could not connect to a node for reading.")
396-
end
397-
end
398-
399455
# Checkout a socket connected to a node with one of
400456
# the provided tags. If no such node exists, raise
401457
# an exception.
@@ -417,39 +473,6 @@ def checkout_tagged(tags)
417473
"Could not find a connection tagged with #{tags}."
418474
end
419475

420-
# Checkout a socket for writing (i.e., a primary node).
421-
def checkout_writer
422-
connect unless connected?
423-
socket = get_socket_from_pool(@primary_pool)
424-
425-
if !socket
426-
refresh
427-
socket = get_socket_from_pool(@primary_pool)
428-
end
429-
430-
if socket
431-
socket
432-
else
433-
raise ConnectionFailure.new("Could not connect to primary node.")
434-
end
435-
end
436-
437-
def get_socket_from_pool(pool)
438-
begin
439-
sync_synchronize(:SH) do
440-
if pool
441-
socket = pool.checkout
442-
@sockets_to_pools[socket] = pool
443-
socket
444-
end
445-
end
446-
447-
rescue ConnectionFailure => ex
448-
log(:info, "Failed to checkout from #{pool} with #{ex.class}; #{ex.message}")
449-
return nil
450-
end
451-
end
452-
453476
# Checkin a socket used for reading.
454477
def checkin_reader(socket)
455478
warn "ReplSetConnection#checkin_writer is deprecated and will be removed " +
@@ -463,27 +486,5 @@ def checkin_writer(socket)
463486
"in driver v2.0. Use ReplSetConnection#checkin instead."
464487
checkin(socket)
465488
end
466-
467-
def checkin(socket)
468-
sync_synchronize(:SH) do
469-
if pool = @sockets_to_pools[socket]
470-
pool.checkin(socket)
471-
elsif socket
472-
begin
473-
socket.close
474-
rescue IOError
475-
log(:info, "Tried to close socket #{socket} but already closed.")
476-
end
477-
end
478-
end
479-
480-
# Refresh synchronously every @refresh_interval seconds
481-
# if synchronous refresh mode is enabled.
482-
if @refresh_mode == :sync &&
483-
((Time.now - @last_refresh) > @refresh_interval)
484-
refresh
485-
@last_refresh = Time.now
486-
end
487-
end
488489
end
489490
end

lib/mongo/util/pool.rb

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
module Mongo
1919
class Pool
2020
PING_ATTEMPTS = 6
21+
MAX_PING_TIME = 1_000_000
2122

2223
attr_accessor :host, :port, :address,
2324
:size, :timeout, :safe, :checked_out, :connection
@@ -35,7 +36,7 @@ def initialize(connection, host, port, opts={})
3536
@address = "#{@host}:#{@port}"
3637

3738
# Pool size and timeout.
38-
@size = opts[:size] || 1
39+
@size = opts[:size] || 10
3940
@timeout = opts[:timeout] || 5.0
4041

4142
# Mutex for synchronizing pool access
@@ -52,6 +53,7 @@ def initialize(connection, host, port, opts={})
5253
@checked_out = []
5354
@ping_time = nil
5455
@last_ping = nil
56+
@closed = false
5557
end
5658

5759
def close
@@ -67,9 +69,14 @@ def close
6769
@sockets.clear
6870
@pids.clear
6971
@checked_out.clear
72+
@closed = true
7073
end
7174
end
7275

76+
def closed?
77+
@closed
78+
end
79+
7380
def inspect
7481
"#<Mongo::Pool:0x#{self.object_id.to_s(16)} @host=#{@host} @port=#{port} " +
7582
"@ping_time=#{@ping_time} #{@checked_out.size}/#{@size} sockets available.>"
@@ -101,14 +108,12 @@ def ping_time
101108
# to do a round-trip against this node.
102109
def refresh_ping_time
103110
trials = []
104-
begin
105-
PING_ATTEMPTS.times do
106-
t1 = Time.now
107-
self.connection['admin'].command({:ping => 1}, :socket => @node.socket)
108-
trials << (Time.now - t1) * 1000
109-
end
110-
rescue OperationFailure, SocketError, SystemCallError, IOError => ex
111-
return nil
111+
PING_ATTEMPTS.times do
112+
t1 = Time.now
113+
if !self.ping
114+
return MAX_PING_TIME
115+
end
116+
trials << (Time.now - t1) * 1000
112117
end
113118

114119
trials.sort!
@@ -123,6 +128,14 @@ def refresh_ping_time
123128
(total / trials.length).ceil
124129
end
125130

131+
def ping
132+
begin
133+
return self.connection['admin'].command({:ping => 1}, :socket => @node.socket)
134+
rescue OperationFailure, SocketError, SystemCallError, IOError => ex
135+
return false
136+
end
137+
end
138+
126139
# Return a socket to the pool.
127140
def checkin(socket)
128141
@connection_mutex.synchronize do

test/connection_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ def test_connection_activity
318318
TCPSocket.stubs(:new).returns(fake_socket)
319319

320320
@con.primary_pool.checkout_new_socket
321-
assert_equal [], @con.primary_pool.close
321+
assert @con.primary_pool.close
322322
end
323323
end
324324
end

test/cursor_test.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def test_exhaust
5454
if @@version >= "2.0"
5555
@@coll.remove
5656
data = "1" * 100_000
57-
10_000.times do |n|
57+
5000.times do |n|
5858
@@coll.insert({:n => n, :data => data})
5959
end
6060

@@ -65,7 +65,7 @@ def test_exhaust
6565

6666
c = Cursor.new(@@coll)
6767
c.add_option(OP_QUERY_EXHAUST)
68-
9999.times do
68+
4999.times do
6969
c.next
7070
end
7171
assert c.has_next?

0 commit comments

Comments
 (0)