From 0f67c504c46243f90b7698bda308381be4d39f7e Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Tue, 23 Jan 2024 11:29:35 +0100 Subject: [PATCH] - Removed synchronization on RouterStub.writeRequest(), not needed: https://issues.redhat.com/browse/JGRP-2746 - RouterStubManager uses CopyOnWriteList for stubs to prevent over-synchronization - Separate tests ServerTest (Byteman) and ServerTests (which don't require byteman) - Removed Reader/connected in NioConnection --- conf/tcp-nio.xml | 1 - src/org/jgroups/blocks/cs/NioBaseServer.java | 15 +- src/org/jgroups/blocks/cs/NioConnection.java | 296 +++++------------- src/org/jgroups/blocks/cs/NioServer.java | 21 +- src/org/jgroups/blocks/cs/TcpConnection.java | 7 +- src/org/jgroups/nio/Buffers.java | 33 +- src/org/jgroups/stack/RouterStub.java | 4 +- src/org/jgroups/stack/RouterStubManager.java | 99 +++--- src/org/jgroups/util/Util.java | 4 +- .../org/jgroups/tests/byteman/ServerTest.java | 129 +------- .../org/jgroups/tests/BuffersTest.java | 27 +- .../org/jgroups/tests/ServerTests.java | 255 +++++++++++++++ 12 files changed, 444 insertions(+), 447 deletions(-) create mode 100644 tests/junit-functional/org/jgroups/tests/ServerTests.java diff --git a/conf/tcp-nio.xml b/conf/tcp-nio.xml index 507ad45f7fe..c2cc6a90cc4 100644 --- a/conf/tcp-nio.xml +++ b/conf/tcp-nio.xml @@ -33,6 +33,5 @@ - \ No newline at end of file diff --git a/src/org/jgroups/blocks/cs/NioBaseServer.java b/src/org/jgroups/blocks/cs/NioBaseServer.java index ec2623686fc..0802bca2867 100644 --- a/src/org/jgroups/blocks/cs/NioBaseServer.java +++ b/src/org/jgroups/blocks/cs/NioBaseServer.java @@ -23,7 +23,7 @@ public abstract class NioBaseServer extends BaseServer { protected volatile boolean registration; // set to true after a registration; the acceptor sets it back to false @ManagedAttribute(description="Max number of send buffers. Changing this value affects new buffers only",writable=true) - protected int max_send_buffers=5; // size of WriteBuffers send buffer array + protected int max_send_buffers=12; // size of send buffer array, will be doubled @ManagedAttribute(description="Number of times select() was called") protected int num_selects; @@ -129,7 +129,6 @@ public void run() { while(it.hasNext()) { SelectionKey key=it.next(); NioConnection conn=(NioConnection)key.attachment(); - it.remove(); try { if(!key.isValid()) continue; @@ -139,21 +138,23 @@ public void run() { // https://issues.redhat.com/browse/JGRP-2727 if((ch.isConnectionPending() && ch.finishConnect()) || ch.isConnected()) { conn.clearSelectionKey(SelectionKey.OP_CONNECT); - conn.connected(true); } } else if(key.isAcceptable()) handleAccept(key); else { - if (key.isReadable()) - conn.receive(); - if (key.isWritable()) + if(key.isReadable()) + conn.read(); + if(key.isWritable()) conn.send(); } } catch(Throwable ex) { closeConnection(conn); } + finally { + it.remove(); + } } } acceptorDone(); @@ -165,7 +166,7 @@ protected boolean doSelect() { int num=selector.select(); num_selects++; checkforPendingRegistrations(); - if(num == 0) return true; + return num >= 0; } catch(ClosedSelectorException closed_ex) { log.trace("selector was closed; acceptor terminating"); diff --git a/src/org/jgroups/blocks/cs/NioConnection.java b/src/org/jgroups/blocks/cs/NioConnection.java index 351ce408c9b..7d87d523d18 100644 --- a/src/org/jgroups/blocks/cs/NioConnection.java +++ b/src/org/jgroups/blocks/cs/NioConnection.java @@ -7,10 +7,8 @@ import org.jgroups.stack.IpAddress; import org.jgroups.util.ByteArrayDataInputStream; import org.jgroups.util.ByteArrayDataOutputStream; -import org.jgroups.util.CondVar; import org.jgroups.util.Util; -import java.io.Closeable; import java.io.EOFException; import java.io.IOException; import java.net.InetSocketAddress; @@ -25,7 +23,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.BooleanSupplier; + +import static java.nio.channels.SelectionKey.*; /** * An NIO based impl of {@link Connection} @@ -36,19 +35,12 @@ public class NioConnection extends Connection { protected SocketChannel channel; // the channel to the peer protected SelectionKey key; protected final NioBaseServer server; - protected final Buffers send_buf; // send messages via gathering writes - protected boolean write_interest_set; // set when a send() didn't manage to send all data + protected final ByteBuffer length_buf=ByteBuffer.allocate(Integer.BYTES); // reused: send the length of the next buf protected boolean copy_on_partial_write=true; protected int partial_writes; // number of partial writes (write which did not write all bytes) protected final Lock send_lock=new ReentrantLock(); // serialize send() - - // creates an array of 2: length buffer (for reading the length of the following data buffer) and data buffer - // protected Buffers recv_buf=new Buffers(2).add(ByteBuffer.allocate(Global.INT_SIZE), null); - protected Buffers recv_buf=new Buffers(4).add(ByteBuffer.allocate(cookie.length)); - protected Reader reader=new Reader(); // manages the thread which receives messages - protected long reader_idle_time=20000; // number of ms a reader can be idle (no msgs) until it terminates - protected boolean connected; + protected Buffers recv_buf=new Buffers(4).add(ByteBuffer.allocate(cookie.length)); //new Buffers(2).add(ByteBuffer.allocate(cookie.length)); @@ -71,7 +63,6 @@ public NioConnection(SocketChannel channel, NioBaseServer server) throws Excepti this.server=server; setSocketParameters(this.channel.socket()); channel.configureBlocking(false); - this.connected=channel.isConnected(); send_buf=new Buffers(server.maxSendBuffers() *2); // space for actual bufs and length bufs! this.peer_addr=server.usePeerConnections()? null /* read by first receive() */ : new IpAddress((InetSocketAddress)channel.getRemoteAddress()); @@ -79,32 +70,19 @@ public NioConnection(SocketChannel channel, NioBaseServer server) throws Excepti recv_buf.maxLength(server.getMaxLength()); } - - - - @Override - public boolean isConnected() { - return connected; - } + public boolean isConnected() {return channel != null && channel.isConnected();} @Override public boolean isConnectionPending() {return channel != null && channel.isConnectionPending();} @Override - public boolean isClosed() { - return channel == null || !channel.isOpen(); - } + public boolean isClosed() {return channel == null || !channel.isOpen();} @Override - public boolean isExpired(long now) { - return server.connExpireTime() > 0 && now - last_access >= server.connExpireTime(); - } + public boolean isExpired(long now) {return server.connExpireTime() > 0 && now - last_access >= server.connExpireTime();} - protected void updateLastAccessed() { - if(server.connExpireTime() > 0) - last_access=getTimestamp(); - } + protected void updateLastAccessed() {if(server.connExpireTime() > 0) last_access=getTimestamp();} @Override public Address localAddress() { @@ -121,24 +99,17 @@ public Address localAddress() { public NioConnection copyOnPartialWrite(boolean b) {this.copy_on_partial_write=b; return this;} public boolean copyOnPartialWrite() {return copy_on_partial_write;} public int numPartialWrites() {return partial_writes;} - public long readerIdleTime() {return reader_idle_time;} - public NioConnection readerIdleTime(long t) {this.reader_idle_time=t; return this;} - public boolean readerRunning() {return this.reader.isRunning();} - public NioConnection connected(boolean c) {connected=c; return this;} public synchronized void registerSelectionKey(int interest_ops) { - if(key == null) - return; - key.interestOps(key.interestOps() | interest_ops); + if(key != null && key.isValid()) + key.interestOps(key.interestOps() | interest_ops); } public synchronized void clearSelectionKey(int interest_ops) { - if(key == null) - return; - key.interestOps(key.interestOps() & ~interest_ops); + if(key != null && key.isValid()) + key.interestOps(key.interestOps() & ~interest_ops); } - @Override public void connect(Address dest) throws Exception { connect(dest, server.usePeerConnections()); @@ -149,12 +120,10 @@ protected void connect(Address dest, boolean send_local_addr) throws Exception { try { if(!server.deferClientBinding()) this.channel.bind(new InetSocketAddress(server.clientBindAddress(), server.clientBindPort())); - this.key=server.register(channel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ, this); + this.key=server.register(channel, OP_CONNECT | OP_READ, this); boolean success=Util.connect(channel, destAddr); - if(success || channel.finishConnect()) { - clearSelectionKey(SelectionKey.OP_CONNECT); - this.connected=channel.isConnected(); - } + if(success || channel.finishConnect()) + clearSelectionKey(OP_CONNECT); if(this.channel.getLocalAddress() != null && this.channel.getLocalAddress().equals(destAddr)) throw new IllegalStateException("socket's bind and connect address are the same: " + destAddr); if(send_local_addr) @@ -187,47 +156,47 @@ public void send(ByteBuffer buf) throws Exception { send(buf, true); } - - public void send() throws Exception { + protected void send(ByteBuffer buf, boolean send_length) throws Exception { send_lock.lock(); try { + // makeLengthBuffer() reuses the same pre-allocated buffer and copies it only if the write didn't complete + if(send_length) + send_buf.add(makeLengthBuffer(buf.remaining()), buf); + else + send_buf.add(buf); boolean success=send_buf.write(channel); - writeInterest(!success); - if(success) - updateLastAccessed(); if(!success) { + registerSelectionKey(OP_WRITE); if(copy_on_partial_write) - send_buf.copy(); // copy data on partial write as subsequent writes might corrupt data (https://issues.redhat.com/browse/JGRP-1991) + send_buf.copy(); // copy data on partial write as further writes might corrupt data (https://issues.redhat.com/browse/JGRP-1991) partial_writes++; } + updateLastAccessed(); + } + catch(Exception ex) { + if(!(ex instanceof SocketException || ex instanceof EOFException || ex instanceof ClosedChannelException)) + server.log().error("%s: failed sending message to %s: %s", server.localAddress(), peerAddress(), ex); + throw ex; } finally { send_lock.unlock(); } } - - - /** Read the length first, then the actual data. This method is not reentrant and access must be synchronized */ - public void receive() throws Exception { - reader.receive(); - } - - protected void send(ByteBuffer buf, boolean send_length) throws Exception { + public void send() throws Exception { send_lock.lock(); try { - // makeLengthBuffer() reuses the same pre-allocated buffer and copies it only if the write didn't complete - if(send_length) - send_buf.add(makeLengthBuffer(buf), buf); - else - send_buf.add(buf); boolean success=send_buf.write(channel); - writeInterest(!success); - if(success) + if(success) { + clearSelectionKey(OP_WRITE); updateLastAccessed(); - if(!success) { - if(copy_on_partial_write) - send_buf.copy(); // copy data on partial write as subsequent writes might corrupt data (https://issues.redhat.com/browse/JGRP-1991) + } + else { + // registerSelectionKey(OP_WRITE); + if(copy_on_partial_write) { + // copy data on partial write as further writes might corrupt data (https://issues.redhat.com/browse/JGRP-1991) + send_buf.copy(); + } partial_writes++; } } @@ -236,7 +205,25 @@ protected void send(ByteBuffer buf, boolean send_length) throws Exception { } } - protected boolean _receive(boolean update) throws Exception { + + /** Read the length first, then the actual data. This method is not reentrant and access must be synchronized */ + public void read() throws Exception { + for(;;) { // try to receive as many msgs as possible, until no more msgs are ready or the conn is closed + try { + if(!_read()) + break; + updateLastAccessed(); + } + catch(Exception ex) { + if(!(ex instanceof SocketException || ex instanceof EOFException || ex instanceof ClosedChannelException)) + server.log.warn("failed handling message", ex); + server.closeConnection(NioConnection.this); + return; + } + } + } + + protected boolean _read() throws Exception { ByteBuffer msg; Receiver receiver=server.receiver(); @@ -245,17 +232,13 @@ protected boolean _receive(boolean update) throws Exception { server.addConnection(peer_addr, this); return true; } - if((msg=recv_buf.readLengthAndData(channel)) == null) return false; if(receiver != null) receiver.receive(peer_addr, msg); - if(update) - updateLastAccessed(); return true; } - @Override public void close() throws IOException { send_lock.lock(); @@ -263,11 +246,9 @@ public void close() throws IOException { if(send_buf.remaining() > 0) { // try to flush send buffer if it still has pending data to send try {send();} catch(Throwable e) {} } - Util.close(reader); server.socketFactory().close(channel); } finally { - connected=false; send_lock.unlock(); } } @@ -290,9 +271,9 @@ public String toString() { try {remote=channel != null? (InetSocketAddress)channel.getRemoteAddress() : null;} catch(Throwable t) {} String loc=local == null ? "n/a" : local.getHostString() + ":" + local.getPort(), rem=remote == null? "n/a" : remote.getHostString() + ":" + remote.getPort(); - return String.format("<%s --> %s> (%d secs old) [%s] [recv_buf: %d, reader=%b]", + return String.format("<%s --> %s> (%d secs old) [%s] [recv_buf: %d]", loc, rem, TimeUnit.SECONDS.convert(getTimestamp() - last_access, TimeUnit.NANOSECONDS), - status(), recv_buf.get(1) != null? recv_buf.get(1).capacity() : 0, readerRunning()); + status(), recv_buf.get(1) != null? recv_buf.get(1).capacity() : 0); } @Override @@ -308,21 +289,6 @@ protected long getTimestamp() { return server.timeService() != null? server.timeService().timestamp() : System.nanoTime(); } - protected void writeInterest(boolean register) { - if(register) { - if(!write_interest_set) { - write_interest_set=true; - registerSelectionKey(SelectionKey.OP_WRITE); - } - } - else { - if(write_interest_set) { - write_interest_set=false; - clearSelectionKey(SelectionKey.OP_WRITE); - } - } - } - protected void setSocketParameters(Socket client_sock) throws SocketException { try { if(server.sendBufferSize() > 0) @@ -341,15 +307,10 @@ protected void setSocketParameters(Socket client_sock) throws SocketException { client_sock.setKeepAlive(true); client_sock.setTcpNoDelay(server.tcpNodelay()); - try { // todo: remove try-catch clause one https://github.com/oracle/graal/issues/1087 has been fixed - if(server.linger() >= 0) - client_sock.setSoLinger(true, server.linger()); - else - client_sock.setSoLinger(false, -1); - } - catch(Throwable t) { - server.log().warn("%s: failed setting SO_LINGER option: %s", server.localAddress(), t); - } + if(server.linger() >= 0) + client_sock.setSoLinger(true, server.linger()); + else + client_sock.setSoLinger(false, -1); } protected void sendLocalAddress(Address local_addr) throws Exception { @@ -417,134 +378,17 @@ protected static byte[] getBuffer(final ByteBuffer buf) { } - protected static ByteBuffer makeLengthBuffer(ByteBuffer buf) { - ByteBuffer buffer = ByteBuffer.allocate(Global.INT_SIZE).putInt(buf.remaining()); + protected ByteBuffer makeLengthBuffer(int length) { // Workaround for JDK8 compatibility // clear() returns java.nio.Buffer in JDK8, but java.nio.ByteBuffer since JDK9. - ((java.nio.Buffer) buffer).clear(); - return buffer; - } - - protected enum State {reading, waiting_to_terminate, done} - - protected class Reader implements Runnable, Closeable { - protected final Lock lock=new ReentrantLock(); // to synchronize receive() and state transitions - protected State state=State.done; - protected volatile boolean data_available=true; - protected final CondVar data_available_cond=new CondVar(); - protected volatile Thread thread; - protected volatile boolean running; - - protected void start() { - running=true; - thread=server.factory.newThread(this, String.format("NioConnection.Reader [%s]", peer_addr)); - thread.setDaemon(true); - thread.start(); - } - - - protected void stop() { - running=false; - data_available=true; - data_available_cond.signal(false); - } + ((java.nio.Buffer)length_buf).clear(); // buf was used before to write, so reset it again + length_buf.putInt(0, length); // absolute put; doesn't move position or limit - public void close() throws IOException {stop();} - public boolean isRunning() {Thread tmp=thread; return tmp != null && tmp.isAlive();} - - /** Called by the selector when data is ready to be read from the SocketChannel */ - public void receive() { - lock.lock(); - try { - data_available=true; - // only a single receive() at a time, until OP_READ is registered again (by the reader thread) - clear(SelectionKey.OP_READ); - switch(state) { - case reading: - break; - case waiting_to_terminate: - data_available_cond.signal(false); // only 1 consumer - break; - case done: - // make sure the selector doesn't wake up for our connection while the reader is reading msgs - state=State.reading; - start(); - break; - } - } - finally { - lock.unlock(); - } - } - public void run() { - try { - _run(); - } - finally { - register(SelectionKey.OP_READ); - } - } - - protected void _run() { - final BooleanSupplier is_data_available=() -> data_available || !running; - while(running) { - for(;;) { // try to receive as many msgs as possible, until no more msgs are ready or the conn is closed - try { - if(!_receive(false)) - break; - } - catch(Exception ex) { - if(!(ex instanceof SocketException || ex instanceof EOFException - || ex instanceof ClosedChannelException)) { - if(server.logDetails()) - server.log.warn("failed handling message", ex); - else - server.log.warn("failed handling message: " + ex); - } - server.closeConnection(NioConnection.this); - state(State.done); - return; - } - } - updateLastAccessed(); - - // Transition to state waiting_to_terminate and wait for server.readerIdleTime() ms - state(State.waiting_to_terminate); - data_available=false; - register(SelectionKey.OP_READ); // now we might get receive() calls again - if(data_available_cond.waitFor(is_data_available, server.readerIdleTime(), TimeUnit.MILLISECONDS)) - state(State.reading); - else { - state(State.done); - return; - } - } - } - - protected void register(int op) { - try { - registerSelectionKey(op); - key.selector().wakeup(); // no-op if the selector is not blocked in select() - } - catch(Exception t) { - } - } - - protected void clear(int op) { - try { - clearSelectionKey(op); - } - catch(Exception t) { - } - } + // ((java.nio.Buffer)length_buf).clear(); // to read from the correct offset (0) + return length_buf; + } - protected void state(State st) { - lock.lock(); - try {this.state=st;} - finally {lock.unlock();} - } - } } diff --git a/src/org/jgroups/blocks/cs/NioServer.java b/src/org/jgroups/blocks/cs/NioServer.java index e81128a1f6d..4a59a588311 100644 --- a/src/org/jgroups/blocks/cs/NioServer.java +++ b/src/org/jgroups/blocks/cs/NioServer.java @@ -5,14 +5,21 @@ import org.jgroups.util.*; import java.net.InetAddress; -import java.nio.channels.*; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +import static java.nio.channels.SelectionKey.OP_READ; /** * Server for sending and receiving messages via NIO channels. Uses only a single thread to accept, connect, write and - * read connections. Read messages are passed to a receiver, which typically uses a thread pool to process messages.

- * Note that writes can get dropped, e.g. in the case where we have a previous write pending and a new write is received. + * read to/from connections. + *
+ * Note that writes can get dropped, e.g. in the case where we have a previous write pending and a new write is + * invoked. * This is typically not an issue as JGroups retransmits messages, but might become one when using NioServer standalone, - * ie. outside of JGroups. + * i.e. outside JGroups. * @author Bela Ban * @since 3.6.5 */ @@ -75,11 +82,11 @@ public NioServer(ThreadFactory thread_factory, SocketFactory socket_factory, Ine @Override protected void handleAccept(SelectionKey key) throws Exception { SocketChannel client_channel=channel.accept(); - NioConnection conn=null; if(client_channel == null) return; // can happen if no connection is available to accept + NioConnection conn=null; try { conn=new NioConnection(client_channel, NioServer.this); - SelectionKey client_key=client_channel.register(selector, SelectionKey.OP_READ, conn); + SelectionKey client_key=client_channel.register(selector, OP_READ, conn); conn.key(client_key); // we need to set the selection key of the client channel *not* the server channel Address peer_addr=conn.peerAddress(); if(use_peer_connections) @@ -113,8 +120,6 @@ public synchronized void start() throws Exception { public synchronized void stop() { super.stop(); if(running.compareAndSet(true, false)) { - // Util.close(selector); // closing the selector also stops the acceptor thread - // socket_factory.close(channel); selector.wakeup(); // Wait for server channel to close (via acceptorDone()) Util.interruptAndWaitToDie(acceptor); diff --git a/src/org/jgroups/blocks/cs/TcpConnection.java b/src/org/jgroups/blocks/cs/TcpConnection.java index a49e9e757d5..f6801d9d185 100644 --- a/src/org/jgroups/blocks/cs/TcpConnection.java +++ b/src/org/jgroups/blocks/cs/TcpConnection.java @@ -147,7 +147,7 @@ public void start() { /** * - * @param data Guaranteed to be non null + * @param data Guaranteed to be non-null * @param offset * @param length */ @@ -235,9 +235,8 @@ protected void setSocketParameters(Socket client_sock) throws SocketException { /** - * Send the cookie first, then the our port number. If the cookie - * doesn't match the receiver's cookie, the receiver will reject the - * connection and close it. + * Send the cookie first, then our port number. If the cookie doesn't match the receiver's cookie, + * the receiver will reject the connection and close it. */ protected void sendLocalAddress(Address local_addr) throws Exception { try { diff --git a/src/org/jgroups/nio/Buffers.java b/src/org/jgroups/nio/Buffers.java index 26b76e24462..556a9274cea 100644 --- a/src/org/jgroups/nio/Buffers.java +++ b/src/org/jgroups/nio/Buffers.java @@ -15,14 +15,20 @@ /** * Class to do scattering reads or gathering writes on a sequence of {@link ByteBuffer} instances. The buffers are * kept in an array with fixed capacity (max Short.MAX_VALUE). - * Buffers can be added and removed dynamically (they're dropped when the capacity is exceeded).

- * A read is successful when all non-null buffers from left to right are filled, ie. all {@link ByteBuffer#remaining()} - * methods return 0.

+ * Buffers can be added and removed dynamically, but they're dropped when the capacity is exceeded. This + * means that, if a we have configured a capacity of 5 buffers, and want to add 6, none of the 6 buffers will + * be added! + *
+ * A read is successful when all non-null buffers from left to right are filled, i.e. all {@link ByteBuffer#remaining()} + * methods return 0. + *
* Same for writes: when all non-null buffers (from left to right) have been written ({@link ByteBuffer#remaining()} == 0), - * a write is considered successful; otherwise it is partial.

+ * a write is considered successful; otherwise it is partial. + *
* Individual buffers can be accessed; e.g. for reading its value after a read. It is also possible to add buffers * dynamically, e.g. after reading a 'length' buffer, a user may want to add a new buffer allocated for reading - * 'length' bytes.

+ * 'length' bytes. + *
* This class is not synchronized. * @author Bela Ban * @since 3.6.5 @@ -82,7 +88,10 @@ public boolean hasRemaining() { return false; } - + /** + * Adds a number of buffers. Note that if the buffers cannot be added as a whole, + * e.g. because of exceeding the capacity, none of them will be added! + */ public Buffers add(ByteBuffer ... buffers) { if(buffers == null) return this; @@ -95,6 +104,7 @@ public Buffers add(ByteBuffer ... buffers) { return this; } + /** Adds a buffer. If there's no capacity, the buffer will not be added and will be silently dropped */ public Buffers add(ByteBuffer buf) { if(buf == null) return this; @@ -118,7 +128,6 @@ public Buffers remove(int index) { } - /** * Reads length and then length bytes into the data buffer, which is grown if needed. * @param ch The channel to read data from @@ -198,10 +207,9 @@ public boolean write(GatheringByteChannel ch) throws Exception { throw closed_ex; } catch(NotYetConnectedException | IOException others) { - ; // ignore, we'll queue 1 write + ; // ignore, we'll queue writes up to Buffers.capacity } } - return nullData(); } @@ -289,12 +297,6 @@ protected static int assertPositiveUnsignedShort(int num) { return num; } - /*protected void assertNextToCopy() { - boolean condition=position <= next_to_copy && next_to_copy <= limit; - assert condition - : String.format("position=%d next_to_copy=%d limit=%d\n", position, next_to_copy, limit); - }*/ - /** Copies a ByteBuffer by copying and wrapping the underlying array of a heap-based buffer. Direct buffers are converted to heap-based buffers */ public static ByteBuffer copyBuffer(final ByteBuffer buf) { @@ -305,7 +307,6 @@ public static ByteBuffer copyBuffer(final ByteBuffer buf) { if(!buf.isDirect()) System.arraycopy(buf.array(), offset, tmp, 0, len); else { - // buf.get(tmp, 0, len); for(int i=0; i < len; i++) tmp[i]=buf.get(i+offset); } diff --git a/src/org/jgroups/stack/RouterStub.java b/src/org/jgroups/stack/RouterStub.java index cffb9eae7e6..cada2e1b425 100644 --- a/src/org/jgroups/stack/RouterStub.java +++ b/src/org/jgroups/stack/RouterStub.java @@ -101,7 +101,7 @@ public RouterStub(InetSocketAddress local_sa, InetSocketAddress remote_sa, boole /** * Registers mbr with the GossipRouter under the given group, with the given logical name and physical address. * Establishes a connection to the GossipRouter and sends a CONNECT message. - * @param group The group cluster) name under which to register the member + * @param group The group cluster name under which to register the member * @param addr The address of the member * @param logical_name The logical name of the member * @param phys_addr The physical address of the member @@ -267,7 +267,7 @@ protected BaseServer createClient(SocketFactory sf) { return cl; } - public synchronized void writeRequest(GossipData req) throws Exception { + public void writeRequest(GossipData req) throws Exception { int size=req.serializedSize(); ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(size+5); req.writeTo(out); diff --git a/src/org/jgroups/stack/RouterStubManager.java b/src/org/jgroups/stack/RouterStubManager.java index e0956658e6d..4d19fb53e70 100644 --- a/src/org/jgroups/stack/RouterStubManager.java +++ b/src/org/jgroups/stack/RouterStubManager.java @@ -10,21 +10,21 @@ import org.jgroups.util.Util; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; /** - * Manages a list of RouterStubs (e.g. health checking, reconnecting etc. + * Manages a list of RouterStubs (e.g. health checking, reconnecting). * @author Vladimir Blagojevic * @author Bela Ban */ public class RouterStubManager implements Runnable, RouterStub.CloseListener { - protected final List stubs=new ArrayList<>(); + protected final List stubs=new CopyOnWriteArrayList<>(); protected final TimeScheduler timer; protected final String cluster_name; protected final Address local_addr; @@ -97,9 +97,7 @@ public RouterStubManager heartbeat(long heartbeat_interval, long heartbeat_timeo * Applies action to all connected RouterStubs */ public void forEach(Consumer action) { - synchronized(stubs) { - stubs.stream().filter(RouterStub::isConnected).forEach(action); - } + stubs.stream().filter(RouterStub::isConnected).forEach(action); } /** @@ -120,35 +118,29 @@ public RouterStub createAndRegisterStub(InetSocketAddress local, InetSocketAddre public RouterStub createAndRegisterStub(InetSocketAddress local, InetSocketAddress router_addr, int linger) { RouterStub stub=new RouterStub(local, router_addr, use_nio, this, socket_factory, linger) .handleHeartbeats(heartbeat_interval > 0); - synchronized(stubs) { - this.stubs.add(stub); - } + this.stubs.add(stub); return stub; } public RouterStub unregisterStub(InetSocketAddress router_addr_sa) { - synchronized(stubs) { - RouterStub s=stubs.stream().filter(st -> Objects.equals(st.remote_sa, router_addr_sa)).findFirst().orElse(null); - if(s != null) { - s.destroy(); - stubs.remove(s); - } - return s; + RouterStub s=stubs.stream().filter(st -> Objects.equals(st.remote_sa, router_addr_sa)).findFirst().orElse(null); + if(s != null) { + s.destroy(); + stubs.remove(s); } + return s; } public void connectStubs() { boolean failed_connect_attempts=false; - synchronized(stubs) { - for(RouterStub stub : stubs) { - if(!stub.isConnected()) { - try { - stub.connect(cluster_name, local_addr, logical_name, phys_addr); - } - catch(Exception ex) { - failed_connect_attempts=true; - } + for(RouterStub stub: stubs) { + if(!stub.isConnected()) { + try { + stub.connect(cluster_name, local_addr, logical_name, phys_addr); + } + catch(Exception ex) { + failed_connect_attempts=true; } } } @@ -159,23 +151,19 @@ public void connectStubs() { public void disconnectStubs() { stopReconnector(); - synchronized(stubs) { - for(RouterStub stub: stubs) { - try { - stub.disconnect(cluster_name, local_addr); - } - catch(Throwable ignored) { - } + for(RouterStub stub: stubs) { + try { + stub.disconnect(cluster_name, local_addr); + } + catch(Throwable ignored) { } } } public void destroyStubs() { stopReconnector(); - synchronized(stubs) { - stubs.forEach(RouterStub::destroy); - stubs.clear(); - } + stubs.forEach(RouterStub::destroy); + stubs.clear(); } public String printStubs() { @@ -194,17 +182,15 @@ public String print() { public void run() { int failed_reconnect_attempts=0; - synchronized(stubs) { - for(RouterStub stub : stubs) { - if(!stub.isConnected()) { - try { - stub.connect(this.cluster_name, this.local_addr, this.logical_name, this.phys_addr); - log.debug("%s: re-established connection to GossipRouter %s (group: %s)", - local_addr, stub.remote(), this.cluster_name); - } - catch(Exception ex) { - failed_reconnect_attempts++; - } + for(RouterStub stub: stubs) { + if(!stub.isConnected()) { + try { + stub.connect(this.cluster_name, this.local_addr, this.logical_name, this.phys_addr); + log.debug("%s: re-established connection to GossipRouter %s (group: %s)", + local_addr, stub.remote(), this.cluster_name); + } + catch(Exception ex) { + failed_reconnect_attempts++; } } } @@ -259,14 +245,12 @@ protected synchronized void stopTimeoutChecker() { protected RouterStub findRandomConnectedStub() { RouterStub stub=null; - synchronized(stubs) { - while(connectedStubs() > 0) { - RouterStub tmp=Util.pickRandomElement(stubs); - if(tmp != null && tmp.isConnected()) - return tmp; - } - return stub; + while(connectedStubs() > 0) { + RouterStub tmp=Util.pickRandomElement(stubs); + if(tmp != null && tmp.isConnected()) + return tmp; } + return stub; } protected void sendHeartbeat() { @@ -295,15 +279,12 @@ protected void checkTimeouts() { startReconnector(); } - // unsynchronized - protected int connectedStubs() { + public int connectedStubs() { return (int)stubs.stream().filter(RouterStub::isConnected).count(); } public boolean disconnectedStubs() { - synchronized(stubs) { - return stubs.stream().anyMatch(st -> !st.isConnected()); - } + return stubs.stream().anyMatch(st -> !st.isConnected()); } } diff --git a/src/org/jgroups/util/Util.java b/src/org/jgroups/util/Util.java index 56a95a46a68..402090d6915 100644 --- a/src/org/jgroups/util/Util.java +++ b/src/org/jgroups/util/Util.java @@ -476,7 +476,7 @@ public static void close(Closeable closeable) { public static void close(Closeable... closeables) { if(closeables != null) { - for(Closeable closeable : closeables) + for(Closeable closeable: closeables) Util.close(closeable); } } @@ -2990,7 +2990,7 @@ public static T pickRandomElement(List list) { return list.get(index); } - public static T pickRandomElement(Set set) { + public static T pickRandomElement(Collection set) { if(set == null || set.isEmpty()) return null; int size=set.size(); int random=(int)Util.random(size)-1; diff --git a/tests/byteman/org/jgroups/tests/byteman/ServerTest.java b/tests/byteman/org/jgroups/tests/byteman/ServerTest.java index 7fe06bd4683..cbe3658998a 100644 --- a/tests/byteman/org/jgroups/tests/byteman/ServerTest.java +++ b/tests/byteman/org/jgroups/tests/byteman/ServerTest.java @@ -4,10 +4,7 @@ import org.jboss.byteman.contrib.bmunit.BMScript; import org.jgroups.Address; import org.jgroups.Global; -import org.jgroups.blocks.cs.BaseServer; -import org.jgroups.blocks.cs.NioServer; -import org.jgroups.blocks.cs.ReceiverAdapter; -import org.jgroups.blocks.cs.TcpServer; +import org.jgroups.blocks.cs.*; import org.jgroups.util.Bits; import org.jgroups.util.ResourceManager; import org.jgroups.util.Util; @@ -19,7 +16,6 @@ import java.net.InetAddress; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CountDownLatch; /** * Tests concurrent connection establishments in TcpServer @@ -34,7 +30,6 @@ public class ServerTest extends BMNGRunner { protected static final int PORT_A, PORT_B; public static Address A=null, B=null; // need to be static for the byteman rule scripts to access them protected static final String STRING_A="a.req", STRING_B="b.req"; - protected static final int NUM_SENDERS=50; static { @@ -57,10 +52,6 @@ protected Object[][] configProvider() { }; } - protected void setup(BaseServer one, BaseServer two) throws Exception { - setup(one,two, true); - } - protected void setup(BaseServer one, BaseServer two, boolean use_peer_conns) throws Exception { a=one; a.usePeerConnections(use_peer_conns); @@ -81,39 +72,6 @@ protected void destroy() { } - public void testStart(BaseServer a, BaseServer b) throws Exception { - setup(a, b); - assert !a.hasConnection(B) && !b.hasConnection(A); - assert a.getNumConnections() == 0 && b.getNumConnections() == 0; - } - - public void testSimpleSend(BaseServer a, BaseServer b) throws Exception { - setup(a,b); - send(STRING_A, a, B); - check(receiver_b.getList(), STRING_A); - } - - - /** - * Tests A connecting to B, and then B connecting to A; no concurrent connections - */ - public void testSimpleConnection(BaseServer first, BaseServer second) throws Exception { - setup(first,second); - send("hello", a, B); - waitForOpenConns(1, a, b); - assert a.getNumOpenConnections() == 1 : "number of connections for conn_a: " + a.getNumOpenConnections(); - assert b.getNumOpenConnections() == 1 : "number of connections for conn_b: " + b.getNumOpenConnections(); - check(receiver_b.getList(),"hello"); - - send("hello", b, A); - waitForOpenConns(1, a, b); - assert a.getNumOpenConnections() == 1 : "number of connections for conn_a: " + a.getNumOpenConnections(); - assert b.getNumOpenConnections() == 1 : "number of connections for conn_b: " + b.getNumOpenConnections(); - check(receiver_b.getList(), "hello"); - check(receiver_a.getList(), "hello"); - } - - /** * Tests the case where A and B connect to each other concurrently: *

    @@ -124,42 +82,7 @@ public void testSimpleConnection(BaseServer first, BaseServer second) throws Exc @BMScript(dir="scripts/ServerTest", value="testConcurrentConnect") // @Test(dataProvider="configProvider",invocationCount=5) public void testConcurrentConnect(BaseServer first, BaseServer second) throws Exception { - setup(first, second); - _testConcurrentConnect(1, 1, 0); - } - - /** - * Tests multiple threads sending a message to the same (unconnected) server; the first thread should establish - * the connection to the server and the other threads should be blocked until the connection has been created.
    - * JIRA: https://issues.redhat.com/browse/JGRP-2271 - */ - // @Test(invocationCount=50,dataProvider="configProvider") - public void testConcurrentConnect2(BaseServer first, BaseServer second) throws Exception { - setup(first, second, false); - final CountDownLatch latch=new CountDownLatch(1); - Sender2[] senders=new Sender2[NUM_SENDERS]; - for(int i=0; i < senders.length; i++) { - senders[i]=new Sender2(latch, first, B, String.valueOf(i)); - senders[i].start(); - } - latch.countDown(); - for(Thread sender: senders) - sender.join(); - - final List list=receiver_b.getList(); - for(int i=0; i < 10; i++) { - if(list.size() == NUM_SENDERS) - break; - Util.sleep(1000); - } - assert list.size() == NUM_SENDERS : String.format("list (%d elements): %s", list.size(), list); - for(int i=0; i < list.size(); i++) - assert list.contains(String.valueOf(i)); - } - - - - protected void _testConcurrentConnect(int expected_msgs_in_A, int expected_msgs_in_B, int alt_b) throws Exception { + setup(first, second, true); new Thread(new Sender(a,B, STRING_A), "sender-1").start(); new Thread(new Sender(b,A, STRING_B), "sender-2").start(); @@ -175,29 +98,18 @@ protected void _testConcurrentConnect(int expected_msgs_in_A, int expected_msgs_ List list_b=receiver_b.getList(); for(int i=0; i < 10; i++) { - if(list_a.size() == expected_msgs_in_A && (list_b.size() == expected_msgs_in_B || list_b.size() == alt_b)) + if(list_a.size() == 1 && (list_b.size() == 1 || list_b.isEmpty())) break; Util.sleep(500); } - System.out.println("list A=" + list_a + " (expected=" + expected_msgs_in_A + ")" + - "\nlist B=" + list_b + "( expected=" + expected_msgs_in_B + " or " + alt_b + ")"); - assert list_a.size() == expected_msgs_in_A && (list_b.size() == expected_msgs_in_B || list_b.size() == alt_b) + System.out.println("list A=" + list_a + " (expected=1)" + + "\nlist B=" + list_b + "( expected=1 or 0)"); + assert list_a.size() == 1 && (list_b.size() == 1 || list_b.isEmpty()) : "list A=" + list_a + "\nlist B=" + list_b; } - protected static void check(List list, String expected_str) { - for(int i=0; i < 20; i++) { - if(list.isEmpty()) - Util.sleep(500); - else - break; - } - assert !list.isEmpty() && list.get(0).equals(expected_str) : " list: " + list + ", expected " + expected_str; - } - - protected static void waitForOpenConns(int expected, BaseServer... servers) { for(int i=0; i < 10; i++) { boolean all_ok=true; @@ -254,32 +166,8 @@ public void run() { } } - protected static class Sender2 extends Thread { - protected final CountDownLatch latch; - protected final BaseServer server; - protected final Address dest; - protected final String payload; - public Sender2(CountDownLatch latch, BaseServer server, Address dest, String payload) { - this.latch=latch; - this.server=server; - this.dest=dest; - this.payload=payload; - } - - public void run() { - try { - latch.await(); - Util.sleep(1000); - send(payload, server, dest); - } - catch(InterruptedException e) { - e.printStackTrace(); - } - } - } - - protected static class MyReceiver extends ReceiverAdapter { + protected static class MyReceiver extends ReceiverAdapter { protected final String name; protected final List reqs=new ArrayList<>(); @@ -303,7 +191,7 @@ public void receive(Address sender, DataInput in) throws Exception { int len=in.readInt(); byte[] data=new byte[len]; in.readFully(data, 0, data.length); - String str=new String(data, 0, data.length); + String str=new String(data); System.out.println("[" + name + "] received request \"" + str + "\" from " + sender); synchronized(reqs) { reqs.add(str); @@ -312,5 +200,4 @@ public void receive(Address sender, DataInput in) throws Exception { } - } diff --git a/tests/junit-functional/org/jgroups/tests/BuffersTest.java b/tests/junit-functional/org/jgroups/tests/BuffersTest.java index 73b005f6aa0..c331d0bff27 100644 --- a/tests/junit-functional/org/jgroups/tests/BuffersTest.java +++ b/tests/junit-functional/org/jgroups/tests/BuffersTest.java @@ -320,6 +320,31 @@ public void testMakeSpaceOverlappingMove() throws Exception { check(buf, 0, 2, 2, remaining(buf2,buf3)); } + /** + * post=limit=capacity=0: add an element that greater than capacity + */ + public void testAddAtEnd() throws Exception { + MockSocketChannel ch=new MockSocketChannel().bytesToWrite(1000); + Buffers buf=new Buffers(4); + for(int i=0; i < 4; i++) + buf.add(buf1); + check(buf, 0, 4, 4, remaining(buf1)*4); + boolean rc=buf.write(ch); + assert rc; + check(buf, 0, 0, 0, 0); + buf.add(buf1, buf1, buf1); + check(buf, 0, 3, 3, remaining(buf1)*3); + buf.add(buf1, buf1, buf1); // not added as capacity has been exceeded! + check(buf, 0, 3, 3, remaining(buf1)*3); + + rc=buf.write(ch); + assert rc; + check(buf, 3, 3, 0, 0); + buf1.flip(); + buf.add(buf1, buf1, buf1); // will be added; space was made for this + check(buf, 0, 3, 3, remaining(buf1)*3); + } + public void testNullData() throws Exception { MockSocketChannel ch=new MockSocketChannel().bytesToWrite(1000); Buffers buf=new Buffers(3); @@ -489,7 +514,7 @@ public void testCopyWithPartialWrite() throws Exception { Buffers bufs=new Buffers(a,b,c,d); ByteBuffer recorder=ByteBuffer.allocate(100); MockSocketChannel ch=new MockSocketChannel().bytesToWrite(13) // a, b: OK, c: partial, d: fail - .recorder(recorder); // we're recording the writes so we can compare expected bytes to actually written bytes + .recorder(recorder); // we're recording the writes, so we can compare expected bytes to actually written bytes boolean success=bufs.write(ch); System.out.println("bufs = " + bufs); assert !success; diff --git a/tests/junit-functional/org/jgroups/tests/ServerTests.java b/tests/junit-functional/org/jgroups/tests/ServerTests.java new file mode 100644 index 00000000000..e6be52f13a0 --- /dev/null +++ b/tests/junit-functional/org/jgroups/tests/ServerTests.java @@ -0,0 +1,255 @@ +package org.jgroups.tests; + +import org.jgroups.Address; +import org.jgroups.Global; +import org.jgroups.blocks.cs.BaseServer; +import org.jgroups.blocks.cs.NioServer; +import org.jgroups.blocks.cs.ReceiverAdapter; +import org.jgroups.blocks.cs.TcpServer; +import org.jgroups.util.Bits; +import org.jgroups.util.ResourceManager; +import org.jgroups.util.Util; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.DataInput; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +/** + * @author Bela Ban + * @since 5.3.2 + */ +@Test(groups= Global.FUNCTIONAL,singleThreaded=true,dataProvider="configProvider") +public class ServerTests { + protected BaseServer a, b; + protected static final InetAddress loopback; + protected MyReceiver receiver_a, receiver_b; + protected static int PORT_A, PORT_B, NUM_SENDERS=50; + public static Address A=null, B=null; // need to be static for the byteman rule scripts to access them + protected static final String STRING_A="a.req"; + + + static { + try { + loopback=Util.getLoopback(); + PORT_A=ResourceManager.getNextTcpPort(loopback); + PORT_B=ResourceManager.getNextTcpPort(loopback); + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + } + + + @DataProvider + protected Object[][] configProvider() { + return new Object[][] { + {create(false, PORT_A), create(false, PORT_B)}, + {create(true, PORT_A), create(true, PORT_B)} + }; + } + + protected void setup(BaseServer one, BaseServer two) throws Exception { + setup(one,two, true); + } + + protected void setup(BaseServer one, BaseServer two, boolean use_peer_conns) throws Exception { + a=one.usePeerConnections(use_peer_conns); + b=two.usePeerConnections(use_peer_conns); + A=a.localAddress(); + B=b.localAddress(); + assert A.compareTo(B) < 0; + a.receiver(receiver_a=new MyReceiver("A").verbose(false)); + a.start(); + b.receiver(receiver_b=new MyReceiver("B").verbose(false)); + b.start(); + } + + @AfterMethod + protected void destroy() throws TimeoutException { + Util.close(a,b); + } + + + public void testStart(BaseServer a, BaseServer b) throws Exception { + setup(a, b); + assert !a.hasConnection(B) && !b.hasConnection(A); + assert a.getNumConnections() == 0 && b.getNumConnections() == 0; + } + + public void testSimpleSend(BaseServer a, BaseServer b) throws Exception { + setup(a,b); + send(STRING_A, a, B); + check(receiver_b.getList(), STRING_A); + } + + + /** + * Tests A connecting to B, and then B connecting to A; no concurrent connections + */ + public void testSimpleConnection(BaseServer first, BaseServer second) throws Exception { + setup(first,second); + send("hello", a, B); + waitForOpenConns(1, a, b); + assert a.getNumOpenConnections() == 1 : "number of connections for conn_a: " + a.getNumOpenConnections(); + assert b.getNumOpenConnections() == 1 : "number of connections for conn_b: " + b.getNumOpenConnections(); + check(receiver_b.getList(),"hello"); + + send("hello", b, A); + waitForOpenConns(1, a, b); + assert a.getNumOpenConnections() == 1 : "number of connections for conn_a: " + a.getNumOpenConnections(); + assert b.getNumOpenConnections() == 1 : "number of connections for conn_b: " + b.getNumOpenConnections(); + check(receiver_b.getList(), "hello"); + check(receiver_a.getList(), "hello"); + } + + + + /** + * Tests multiple threads sending a message to the same (unconnected) server; the first thread should establish + * the connection to the server and the other threads should be blocked until the connection has been created.
    + * JIRA: https://issues.redhat.com/browse/JGRP-2271 + */ + // @Test(invocationCount=50,dataProvider="configProvider") + public void testConcurrentConnect(BaseServer first, BaseServer second) throws Exception { + setup(first, second, false); + final List list=receiver_b.getList(); + final CountDownLatch latch=new CountDownLatch(1); + Sender[] senders=new Sender[NUM_SENDERS]; + for(int i=0; i < senders.length; i++) { + senders[i]=new Sender(latch, first, B, receiver_b.getList()); + senders[i].start(); + } + latch.countDown(); + for(Sender sender: senders) + sender.join(); + List ids=Arrays.stream(senders).map(t -> String.valueOf(t.getId())).collect(Collectors.toList()); + Util.waitUntil(1000, 100, () -> list.size() == NUM_SENDERS, + () -> { + list.sort(String::compareTo); + return String.format("list (%d): %s", list.size(), list); + }); + list.sort(String::compareTo); + ids.sort(String::compareTo); + System.out.printf("list (%d elements): %s\n", list.size(), list); + assert ids.equals(list) : String.format("expected:\n%s\nactual:\n%s\n", ids, list); + } + + + protected static void check(List list, String expected_str) { + for(int i=0; i < 20; i++) { + if(list.isEmpty()) + Util.sleep(500); + else + break; + } + assert !list.isEmpty() && list.get(0).equals(expected_str) : " list: " + list + ", expected " + expected_str; + } + + + protected static void waitForOpenConns(int expected, BaseServer... servers) { + for(int i=0; i < 10; i++) { + boolean all_ok=true; + for(BaseServer server: servers) { + if(server.getNumOpenConnections() != expected) { + all_ok=false; + break; + } + } + if(all_ok) + return; + Util.sleep(500); + } + } + + + protected static BaseServer create(boolean nio, int port) { + try { + return nio? new NioServer(loopback, port) + : new TcpServer(loopback, port); + } + catch(Exception ex) { + return null; + } + } + + protected static void send(String str, BaseServer server, Address dest) throws Exception { + byte[] request=str.getBytes(); + byte[] data=new byte[request.length + Global.INT_SIZE]; + Bits.writeInt(request.length, data, 0); + System.arraycopy(request, 0, data, Global.INT_SIZE, request.length); + server.send(dest, data, 0, data.length); + } + + + protected static class Sender extends Thread { + protected final CountDownLatch latch; + protected final BaseServer server; + protected final Address dest; + protected final List receiver; + + public Sender(CountDownLatch latch, BaseServer server, Address dest, List r) { + this.latch=latch; + this.server=server; + this.dest=dest; + this.receiver=r; + } + + public void run() { + try { + latch.await(); + String payload=String.valueOf(Thread.currentThread().getId()); + send(payload, server, dest); + } + catch(Exception ex) { + System.err.printf("[%d]: %s\n", getId(), ex); + } + } + } + + protected static class MyReceiver extends ReceiverAdapter { + protected final String name; + protected final List reqs=new ArrayList<>(); + protected boolean verbose=true; + + public MyReceiver(String name) { + this.name=name; + } + + public List getList() {return reqs;} + public void clear() {reqs.clear();} + public int size() {synchronized(reqs) {return reqs.size();}} + public MyReceiver verbose(boolean v) {verbose=v; return this;} + + + public void receive(Address sender, byte[] data, int offset, int length) { + int len=Bits.readInt(data, offset); + String str=new String(data, offset+Global.INT_SIZE, len); + if(verbose) + System.out.println("[" + name + "] received request \"" + str + "\" from " + sender); + synchronized(reqs) { + reqs.add(str); + } + } + + public void receive(Address sender, DataInput in) throws Exception { + int len=in.readInt(); + byte[] data=new byte[len]; + in.readFully(data, 0, data.length); + String str=new String(data); + if(verbose) + System.out.println("[" + name + "] received request \"" + str + "\" from " + sender); + synchronized(reqs) { + reqs.add(str); + } + } + } + +}