Skip to content

Commit 57f33b2

Browse files
authored
Change static schedulers to AsyncHandler to hold for heartbeat and reconnect logic. (#148)
1 parent 85cba9f commit 57f33b2

File tree

5 files changed

+45
-39
lines changed

5 files changed

+45
-39
lines changed

Client/src/main/java/io/github/jwdeveloper/tiktok/TikTokLiveClient.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
package io.github.jwdeveloper.tiktok;
2424

2525
import com.google.protobuf.ByteString;
26+
import io.github.jwdeveloper.tiktok.common.AsyncHandler;
2627
import io.github.jwdeveloper.tiktok.data.events.*;
2728
import io.github.jwdeveloper.tiktok.data.events.common.TikTokEvent;
2829
import io.github.jwdeveloper.tiktok.data.events.control.*;
@@ -39,7 +40,7 @@
3940
import lombok.Getter;
4041

4142
import java.util.Base64;
42-
import java.util.concurrent.CompletableFuture;
43+
import java.util.concurrent.*;
4344
import java.util.function.Consumer;
4445
import java.util.logging.Logger;
4546

@@ -89,12 +90,11 @@ public void connect() {
8990
tikTokEventHandler.publish(this, new TikTokDisconnectedEvent("Exception: " + e.getMessage()));
9091

9192
if (e instanceof TikTokLiveOfflineHostException && clientSettings.isRetryOnConnectionFailure()) {
92-
try {
93-
Thread.sleep(clientSettings.getRetryConnectionTimeout().toMillis());
94-
} catch (Exception ignored) {}
95-
logger.info("Reconnecting");
96-
tikTokEventHandler.publish(this, new TikTokReconnectingEvent());
97-
this.connect();
93+
AsyncHandler.getReconnectScheduler().schedule(() -> {
94+
logger.info("Reconnecting");
95+
tikTokEventHandler.publish(this, new TikTokReconnectingEvent());
96+
this.connect();
97+
}, clientSettings.getRetryConnectionTimeout().toMillis(), TimeUnit.MILLISECONDS);
9898
}
9999
throw e;
100100
} catch (Exception e) {

Client/src/main/java/io/github/jwdeveloper/tiktok/TikTokLiveClientBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public LiveClient build() {
133133

134134
//networking
135135
dependance.registerSingleton(HttpClientFactory.class);
136-
dependance.registerSingleton(WebSocketHeartbeatTask.class); // True global singleton - Static objects are located to serve as global
136+
dependance.registerSingleton(WebSocketHeartbeatTask.class);
137137
if (clientSettings.isOffline()) {
138138
dependance.registerSingleton(LiveSocketClient.class, TikTokWebSocketOfflineClient.class);
139139
dependance.registerSingleton(LiveHttpClient.class, TikTokLiveHttpOfflineClient.class);
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.github.jwdeveloper.tiktok.common;
2+
3+
import lombok.Getter;
4+
5+
import java.util.concurrent.*;
6+
7+
public class AsyncHandler
8+
{
9+
@Getter
10+
private static final ScheduledExecutorService heartBeatScheduler = Executors.newScheduledThreadPool(1, r -> {
11+
Thread t = new Thread(r, "heartbeat-pool");
12+
t.setDaemon(true);
13+
return t;
14+
});
15+
16+
@Getter
17+
private static final ScheduledExecutorService reconnectScheduler = Executors.newScheduledThreadPool(0, r -> {
18+
Thread t = new Thread(r, "reconnect-pool");
19+
t.setDaemon(true);
20+
return t;
21+
});
22+
}

Client/src/main/java/io/github/jwdeveloper/tiktok/websocket/TikTokWebSocketClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public void stop(LiveClientStopType type) {
142142
case DISCONNECT -> webSocketClient.closeConnection(CloseFrame.NORMAL, "");
143143
default -> webSocketClient.close();
144144
}
145-
heartbeatTask.stop(webSocketClient);
145+
heartbeatTask.stop();
146146
}
147147
webSocketClient = null;
148148
}

Client/src/main/java/io/github/jwdeveloper/tiktok/websocket/WebSocketHeartbeatTask.java

Lines changed: 14 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,54 +22,38 @@
2222
*/
2323
package io.github.jwdeveloper.tiktok.websocket;
2424

25+
import io.github.jwdeveloper.tiktok.common.AsyncHandler;
2526
import org.java_websocket.WebSocket;
2627

2728
import java.util.*;
2829
import java.util.concurrent.*;
2930

3031
public class WebSocketHeartbeatTask
3132
{
32-
// Single shared pool for all heartbeat tasks
33-
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, r -> {
34-
Thread t = new Thread(r, "heartbeat-pool");
35-
t.setDaemon(true);
36-
return t;
37-
});
38-
private static final Map<WebSocket, ScheduledFuture<?>> tasks = new ConcurrentHashMap<>();
39-
private static final Map<WebSocket, Long> commTime = new ConcurrentHashMap<>();
33+
private ScheduledFuture<?> task;
34+
private Long commTime;
4035

41-
private final byte[] heartbeatBytes = Base64.getDecoder().decode("MgJwYjoCaGI="); // Used to be '3A026862' aka ':\x02hb', now is '2\x02pb:\x02hb'.
36+
private final static byte[] heartbeatBytes = Base64.getDecoder().decode("MgJwYjoCaGI="); // Used to be '3A026862' aka ':\x02hb', now is '2\x02pb:\x02hb'.
4237

4338
public void run(WebSocket webSocket, long pingTaskTime) {
44-
stop(webSocket); // remove existing task if any
39+
stop(); // remove existing task if any
4540

46-
tasks.put(webSocket, scheduler.scheduleAtFixedRate(() -> {
41+
task = AsyncHandler.getHeartBeatScheduler().scheduleAtFixedRate(() -> {
4742
try {
4843
if (webSocket.isOpen()) {
4944
webSocket.send(heartbeatBytes);
50-
commTime.put(webSocket, System.currentTimeMillis());
51-
} else {
52-
Long time = commTime.get(webSocket);
53-
if (time != null && System.currentTimeMillis() - time >= 60_000) // Stop if disconnected longer than 60s
54-
stop(webSocket);
55-
}
45+
commTime = System.currentTimeMillis();
46+
} else if (commTime != null && System.currentTimeMillis() - commTime >= 60_000) // Stop if disconnected longer than 60s
47+
stop();
5648
} catch (Exception e) {
5749
e.printStackTrace();
58-
stop(webSocket);
50+
stop();
5951
}
60-
}, 0, pingTaskTime, TimeUnit.MILLISECONDS));
52+
}, 0, pingTaskTime, TimeUnit.MILLISECONDS);
6153
}
6254

63-
public void stop(WebSocket webSocket) {
64-
ScheduledFuture<?> future = tasks.remove(webSocket);
65-
if (future != null)
66-
future.cancel(true);
67-
commTime.remove(webSocket);
68-
}
69-
70-
public void shutdown() {
71-
tasks.values().forEach(f -> f.cancel(true));
72-
commTime.clear();
73-
scheduler.shutdownNow();
55+
public void stop() {
56+
if (task != null)
57+
task.cancel(true);
7458
}
7559
}

0 commit comments

Comments
 (0)