Skip to content

Commit 3676af3

Browse files
committed
New IO: Split QueryIO into Connection and CommandQueue
1 parent 9147281 commit 3676af3

File tree

12 files changed

+607
-442
lines changed

12 files changed

+607
-442
lines changed

example/ReconnectExample.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ public static void main(String[] args) {
6464
config.setConnectionHandler(new ConnectionHandler() {
6565

6666
@Override
67-
public void onConnect(TS3Query ts3Query) {
68-
stuffThatNeedsToRunEveryTimeTheQueryConnects(ts3Query.getApi());
67+
public void onConnect(TS3Api api) {
68+
stuffThatNeedsToRunEveryTimeTheQueryConnects(api);
6969
}
7070

7171
@Override
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
package com.github.theholywaffle.teamspeak3;
2+
3+
/*
4+
* #%L
5+
* TeamSpeak 3 Java API
6+
* %%
7+
* Copyright (C) 2018 Bert De Geyter, Roger Baumgartner
8+
* %%
9+
* Permission is hereby granted, free of charge, to any person obtaining a copy
10+
* of this software and associated documentation files (the "Software"), to deal
11+
* in the Software without restriction, including without limitation the rights
12+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13+
* copies of the Software, and to permit persons to whom the Software is
14+
* furnished to do so, subject to the following conditions:
15+
*
16+
* The above copyright notice and this permission notice shall be included in
17+
* all copies or substantial portions of the Software.
18+
*
19+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25+
* THE SOFTWARE.
26+
* #L%
27+
*/
28+
29+
import com.github.theholywaffle.teamspeak3.api.exception.TS3QueryShutDownException;
30+
import com.github.theholywaffle.teamspeak3.commands.Command;
31+
32+
import java.util.ArrayDeque;
33+
import java.util.ArrayList;
34+
import java.util.Collection;
35+
import java.util.Queue;
36+
import java.util.concurrent.locks.Condition;
37+
import java.util.concurrent.locks.Lock;
38+
import java.util.concurrent.locks.ReentrantLock;
39+
40+
class CommandQueue {
41+
42+
private static final int INITIAL_QUEUE_SIZE = 16;
43+
44+
private final Queue<Command> sendQueue;
45+
private final Queue<Command> receiveQueue;
46+
private final Lock queueLock;
47+
// Signalled when a command is added to sendQueue or removed from receiveQueue, or when rejectNew is set to true
48+
private final Condition canTransfer;
49+
50+
// API objects that insert commands into this queue
51+
private final TS3Api api;
52+
private final TS3ApiAsync asyncApi;
53+
54+
private final boolean unlimitedInFlightCommands;
55+
private final boolean isGlobal;
56+
57+
private boolean rejectNew = false;
58+
private long firstEnqueueTimeAfterEmpty;
59+
60+
static CommandQueue newGlobalQueue(TS3Query query, boolean unlimited) {
61+
return new CommandQueue(query, true, unlimited);
62+
}
63+
64+
static CommandQueue newConnectQueue(TS3Query query) {
65+
return new CommandQueue(query, false, true);
66+
}
67+
68+
private CommandQueue(TS3Query query, boolean global, boolean unlimited) {
69+
isGlobal = global;
70+
unlimitedInFlightCommands = unlimited;
71+
72+
sendQueue = new ArrayDeque<>(INITIAL_QUEUE_SIZE);
73+
receiveQueue = new ArrayDeque<>(unlimited ? INITIAL_QUEUE_SIZE : 1);
74+
queueLock = new ReentrantLock();
75+
canTransfer = queueLock.newCondition();
76+
77+
asyncApi = new TS3ApiAsync(query, this);
78+
api = new TS3Api(asyncApi);
79+
}
80+
81+
TS3Api getApi() {
82+
return api;
83+
}
84+
85+
TS3ApiAsync getAsyncApi() {
86+
return asyncApi;
87+
}
88+
89+
boolean isGlobal() {
90+
return isGlobal;
91+
}
92+
93+
void enqueueCommand(Command command) {
94+
queueLock.lock();
95+
try {
96+
if (rejectNew) {
97+
command.getFuture().fail(new TS3QueryShutDownException());
98+
return;
99+
}
100+
101+
if (isEmpty()) {
102+
firstEnqueueTimeAfterEmpty = System.currentTimeMillis();
103+
}
104+
sendQueue.add(command);
105+
canTransfer.signalAll();
106+
} finally {
107+
queueLock.unlock();
108+
}
109+
}
110+
111+
Command transferCommand() throws InterruptedException {
112+
queueLock.lockInterruptibly();
113+
try {
114+
while (sendQueue.isEmpty() || (!receiveQueue.isEmpty() && !unlimitedInFlightCommands)) {
115+
if (sendQueue.isEmpty() && rejectNew) return null;
116+
canTransfer.await();
117+
}
118+
119+
Command command = sendQueue.remove();
120+
receiveQueue.add(command);
121+
122+
return command;
123+
} finally {
124+
queueLock.unlock();
125+
}
126+
}
127+
128+
Command peekReceiveQueue() {
129+
queueLock.lock();
130+
try {
131+
return receiveQueue.peek();
132+
} finally {
133+
queueLock.unlock();
134+
}
135+
}
136+
137+
void removeFromReceiveQueue() {
138+
queueLock.lock();
139+
try {
140+
if (receiveQueue.isEmpty()) throw new IllegalStateException("Empty receive queue");
141+
142+
receiveQueue.remove();
143+
canTransfer.signalAll();
144+
} finally {
145+
queueLock.unlock();
146+
}
147+
}
148+
149+
void resetSentCommands() {
150+
queueLock.lock();
151+
try {
152+
Collection<Command> allCommands = getAllCommands();
153+
154+
sendQueue.clear();
155+
receiveQueue.clear();
156+
sendQueue.addAll(allCommands);
157+
158+
rejectNew = false;
159+
firstEnqueueTimeAfterEmpty = System.currentTimeMillis();
160+
161+
canTransfer.signalAll();
162+
} finally {
163+
queueLock.unlock();
164+
}
165+
}
166+
167+
boolean isEmpty() {
168+
queueLock.lock();
169+
try {
170+
return receiveQueue.isEmpty() && sendQueue.isEmpty();
171+
} finally {
172+
queueLock.unlock();
173+
}
174+
}
175+
176+
long getBusyTime() {
177+
queueLock.lock();
178+
try {
179+
if (isEmpty()) {
180+
return 0L;
181+
} else {
182+
return System.currentTimeMillis() - firstEnqueueTimeAfterEmpty;
183+
}
184+
} finally {
185+
queueLock.unlock();
186+
}
187+
}
188+
189+
void shutDown() {
190+
queueLock.lock();
191+
try {
192+
rejectNew = true;
193+
canTransfer.signalAll();
194+
195+
while (!isEmpty()) {
196+
canTransfer.awaitUninterruptibly();
197+
}
198+
} finally {
199+
queueLock.unlock();
200+
}
201+
}
202+
203+
void quit() {
204+
queueLock.lock();
205+
try {
206+
// Enqueue the last command - don't wait for a response, we'll wait in shutDown
207+
if (!rejectNew) asyncApi.quit();
208+
// And wait until all commands have been sent
209+
shutDown();
210+
} finally {
211+
queueLock.unlock();
212+
}
213+
}
214+
215+
void failRemainingCommands() {
216+
queueLock.lock();
217+
try {
218+
rejectNew = true;
219+
canTransfer.signalAll();
220+
221+
Collection<Command> allCommands = getAllCommands();
222+
for (Command command : allCommands) {
223+
command.getFuture().fail(new TS3QueryShutDownException());
224+
}
225+
} finally {
226+
queueLock.unlock();
227+
}
228+
}
229+
230+
// Only call this when holding queueLock
231+
private Collection<Command> getAllCommands() {
232+
Collection<Command> allCommands = new ArrayList<>(sendQueue.size() + receiveQueue.size());
233+
allCommands.addAll(sendQueue);
234+
allCommands.addAll(receiveQueue);
235+
return allCommands;
236+
}
237+
}

0 commit comments

Comments
 (0)