Skip to content

Commit 0fd1017

Browse files
authored
[CLOUDTRUST-1635] Fix concurrency issue
* Fix concurrency issue, * Exit loop if any failure * Improve logging
1 parent 0493c60 commit 0fd1017

File tree

5 files changed

+68
-102
lines changed

5 files changed

+68
-102
lines changed

src/main/java/io/cloudtrust/keycloak/eventemitter/ConcurrentEvictingQueue.java

Lines changed: 0 additions & 45 deletions
This file was deleted.

src/main/java/io/cloudtrust/keycloak/eventemitter/EventEmitterProvider.java

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.cloudtrust.keycloak.eventemitter;
22

3+
import com.fasterxml.jackson.core.JsonProcessingException;
34
import io.cloudtrust.keycloak.snowflake.IdGenerator;
45
import org.apache.http.client.methods.CloseableHttpResponse;
56
import org.apache.http.client.methods.HttpPost;
@@ -14,6 +15,7 @@
1415
import java.io.IOException;
1516
import java.nio.ByteBuffer;
1617
import java.util.Base64;
18+
import java.util.concurrent.LinkedBlockingQueue;
1719

1820

1921
/**
@@ -36,8 +38,8 @@ public class EventEmitterProvider implements EventListenerProvider{
3638
private CloseableHttpClient httpClient;
3739
private HttpClientContext httpContext;
3840
private IdGenerator idGenerator;
39-
private ConcurrentEvictingQueue<IdentifiedEvent> pendingEvents;
40-
private ConcurrentEvictingQueue<IdentifiedAdminEvent> pendingAdminEvents;
41+
private LinkedBlockingQueue<IdentifiedEvent> pendingEvents;
42+
private LinkedBlockingQueue<IdentifiedAdminEvent> pendingAdminEvents;
4143
private String targetUri;
4244
private String username;
4345
private String secretToken;
@@ -55,8 +57,8 @@ public class EventEmitterProvider implements EventListenerProvider{
5557
*/
5658
EventEmitterProvider(CloseableHttpClient httpClient, IdGenerator idGenerator,
5759
String targetUri, SerialisationFormat format,
58-
ConcurrentEvictingQueue<IdentifiedEvent> pendingEvents,
59-
ConcurrentEvictingQueue<IdentifiedAdminEvent> pendingAdminEvents){
60+
LinkedBlockingQueue<IdentifiedEvent> pendingEvents,
61+
LinkedBlockingQueue<IdentifiedAdminEvent> pendingAdminEvents){
6062
logger.debug("EventEmitterProvider contructor call");
6163
this.httpClient = httpClient;
6264
this.httpContext = HttpClientContext.create();
@@ -86,7 +88,19 @@ public void onEvent(Event event) {
8688
logger.debugf("Pending Events to send stored in buffer: %d", pendingEvents.size());
8789
long uid = idGenerator.nextValidId();
8890
IdentifiedEvent identifiedEvent = new IdentifiedEvent(uid, event);
89-
pendingEvents.offer(identifiedEvent);
91+
92+
while(!pendingEvents.offer(identifiedEvent)){
93+
Event skippedEvent = pendingEvents.poll();
94+
if(skippedEvent != null) {
95+
String strEvent = null;
96+
try {
97+
strEvent = SerialisationUtils.toJson(skippedEvent);
98+
} catch (JsonProcessingException e) {
99+
strEvent = "SerializationFailure";
100+
}
101+
logger.errorf("Event dropped(%s) due to full queue", strEvent);
102+
}
103+
}
90104

91105
sendEvents();
92106
}
@@ -97,7 +111,19 @@ public void onEvent(AdminEvent adminEvent, boolean b) {
97111
logger.debugf("Pending AdminEvents to send stored in buffer: %d", pendingAdminEvents.size());
98112
long uid = idGenerator.nextValidId();
99113
IdentifiedAdminEvent identifiedAdminEvent = new IdentifiedAdminEvent(uid, adminEvent);
100-
pendingAdminEvents.offer(identifiedAdminEvent);
114+
115+
while(!pendingAdminEvents.offer(identifiedAdminEvent)){
116+
AdminEvent skippedAdminEvent = pendingAdminEvents.poll();
117+
if(skippedAdminEvent != null) {
118+
String strAdminEvent = null;
119+
try {
120+
strAdminEvent = SerialisationUtils.toJson(skippedAdminEvent);
121+
} catch (JsonProcessingException e) {
122+
strAdminEvent = "SerializationFailure";
123+
}
124+
logger.errorf("AdminEvent dropped(%s) due to full queue", strAdminEvent);
125+
}
126+
}
101127

102128
sendEvents();
103129
}
@@ -125,27 +151,37 @@ private void sendEventsWithJsonFormat() {
125151
for (int i=0; i < pendingEventsSize; i++){
126152
IdentifiedEvent event = pendingEvents.poll();
127153

154+
if(event == null){
155+
break;
156+
}
157+
128158
try {
129159
String json = SerialisationUtils.toJson(event);
130160
sendJson(json);
131161
} catch (EventEmitterException | IOException e) {
132162
pendingEvents.offer(event);
133163
logger.infof("Failed to send event(ID=%s), try again later.", event.getUid());
134164
logger.debug("Failed to serialize or send event", e);
165+
break;
135166
}
136167
}
137168

138169
int pendingAdminEventsSize = pendingAdminEvents.size();
139170
for (int i=0; i < pendingAdminEventsSize; i++){
140171
IdentifiedAdminEvent event = pendingAdminEvents.poll();
141172

173+
if(event == null){
174+
break;
175+
}
176+
142177
try {
143178
String json = SerialisationUtils.toJson(event);
144179
sendJson(json);
145180
} catch (EventEmitterException | IOException e) {
146181
pendingAdminEvents.offer(event);
147182
logger.infof("Failed to send adminEvent(ID=%s), try again later.", event.getUid());
148183
logger.debug("Failed to serialize or send adminEvent", e);
184+
break;
149185
}
150186
}
151187
}
@@ -156,27 +192,37 @@ private void sendEventsWithFlatbufferFormat() {
156192
for (int i=0; i < pendingEventsSize; i++){
157193
IdentifiedEvent event = pendingEvents.poll();
158194

195+
if(event == null){
196+
break;
197+
}
198+
159199
try {
160200
ByteBuffer buffer = SerialisationUtils.toFlat(event);
161201
sendBytes(buffer, Event.class.getSimpleName());
162202
} catch (EventEmitterException | IOException e) {
163203
pendingEvents.offer(event);
164204
logger.infof("Failed to send event(ID=%s), try again later.", event.getUid());
165205
logger.debug("Failed to serialize or send event", e);
206+
break;
166207
}
167208
}
168209

169210
int pendingAdminEventsSize = pendingAdminEvents.size();
170211
for (int i=0; i < pendingAdminEventsSize; i++){
171212
IdentifiedAdminEvent event = pendingAdminEvents.poll();
172213

214+
if(event == null){
215+
break;
216+
}
217+
173218
try {
174219
ByteBuffer buffer = SerialisationUtils.toFlat(event);
175220
sendBytes(buffer, AdminEvent.class.getSimpleName());
176221
} catch (EventEmitterException | IOException e) {
177222
pendingAdminEvents.offer(event);
178223
logger.infof("Failed to send adminEvent(ID=%s), try again later.", event.getUid());
179224
logger.debug("Failed to serialize or send adminEvent", e);
225+
break;
180226
}
181227
}
182228
}

src/main/java/io/cloudtrust/keycloak/eventemitter/EventEmitterProviderFactory.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.io.IOException;
1616
import java.util.LinkedHashMap;
1717
import java.util.Map;
18+
import java.util.concurrent.LinkedBlockingQueue;
1819

1920
/**
2021
* Factory for EventEmitterProvider.
@@ -50,8 +51,8 @@ public class EventEmitterProviderFactory implements EventListenerProviderFactory
5051

5152
private CloseableHttpClient httpClient;
5253
private IdGenerator idGenerator;
53-
private ConcurrentEvictingQueue<IdentifiedEvent> pendingEventsToSend;
54-
private ConcurrentEvictingQueue<IdentifiedAdminEvent> pendingAdminEventsToSend;
54+
private LinkedBlockingQueue<IdentifiedEvent> pendingEventsToSend;
55+
private LinkedBlockingQueue<IdentifiedAdminEvent> pendingAdminEventsToSend;
5556

5657

5758

@@ -132,8 +133,8 @@ public void init(Config.Scope config) {
132133
// Initialisation
133134
httpClient = HttpClients.createDefault();
134135
idGenerator = new IdGenerator(keycloakId, datacenterId);
135-
pendingEventsToSend = new ConcurrentEvictingQueue<>(bufferCapacity);
136-
pendingAdminEventsToSend = new ConcurrentEvictingQueue<>(bufferCapacity);
136+
pendingEventsToSend = new LinkedBlockingQueue<>(bufferCapacity);
137+
pendingAdminEventsToSend = new LinkedBlockingQueue<>(bufferCapacity);
137138

138139
}
139140

src/test/java/io/cloudtrust/keycloak/eventemitter/ConcurrentEvictingQueueTest.java

Lines changed: 0 additions & 37 deletions
This file was deleted.

src/test/java/io/cloudtrust/keycloak/eventemitter/EventEmitterProviderTest.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.nio.ByteBuffer;
2323
import java.nio.charset.StandardCharsets;
2424
import java.util.Base64;
25+
import java.util.concurrent.LinkedBlockingQueue;
2526

2627
import org.xnio.streams.ChannelInputStream;
2728

@@ -58,8 +59,8 @@ public void testFlatbufferFormatOutput() throws IOException, InterruptedExceptio
5859
Undertow server = startHttpServer(handler);
5960
CloseableHttpClient httpClient = HttpClients.createDefault();
6061
IdGenerator idGenerator = new IdGenerator(1,1);
61-
ConcurrentEvictingQueue<IdentifiedEvent> pendingEvents = new ConcurrentEvictingQueue<>(BUFFER_CAPACITY);
62-
ConcurrentEvictingQueue<IdentifiedAdminEvent> pendingAdminEvents = new ConcurrentEvictingQueue<>(BUFFER_CAPACITY);
62+
LinkedBlockingQueue<IdentifiedEvent> pendingEvents = new LinkedBlockingQueue<>(BUFFER_CAPACITY);
63+
LinkedBlockingQueue<IdentifiedAdminEvent> pendingAdminEvents = new LinkedBlockingQueue<>(BUFFER_CAPACITY);
6364
EventEmitterProvider eventEmitterProvider = new EventEmitterProvider(httpClient,
6465
idGenerator, TARGET, SerialisationFormat.FLATBUFFER, pendingEvents,pendingAdminEvents);
6566

@@ -88,8 +89,8 @@ public void testJsonFormatOutput() throws IOException, InterruptedException {
8889
Undertow server = startHttpServer(handler);
8990
CloseableHttpClient httpClient = HttpClients.createDefault();
9091
IdGenerator idGenerator = new IdGenerator(1,1);
91-
ConcurrentEvictingQueue<IdentifiedEvent> pendingEvents = new ConcurrentEvictingQueue<>(BUFFER_CAPACITY);
92-
ConcurrentEvictingQueue<IdentifiedAdminEvent> pendingAdminEvents = new ConcurrentEvictingQueue<>(BUFFER_CAPACITY);
92+
LinkedBlockingQueue<IdentifiedEvent> pendingEvents = new LinkedBlockingQueue<>(BUFFER_CAPACITY);
93+
LinkedBlockingQueue<IdentifiedAdminEvent> pendingAdminEvents = new LinkedBlockingQueue<>(BUFFER_CAPACITY);
9394
EventEmitterProvider eventEmitterProvider = new EventEmitterProvider(httpClient,
9495
idGenerator, TARGET, SerialisationFormat.JSON, pendingEvents,pendingAdminEvents);
9596

@@ -111,8 +112,8 @@ public void testJsonFormatOutput() throws IOException, InterruptedException {
111112
public void testNoConnection() throws IOException {
112113
CloseableHttpClient httpClient = HttpClients.createDefault();
113114
IdGenerator idGenerator = new IdGenerator(1,1);
114-
ConcurrentEvictingQueue<IdentifiedEvent> pendingEvents = new ConcurrentEvictingQueue<>(BUFFER_CAPACITY);
115-
ConcurrentEvictingQueue<IdentifiedAdminEvent> pendingAdminEvents = new ConcurrentEvictingQueue<>(BUFFER_CAPACITY);
115+
LinkedBlockingQueue<IdentifiedEvent> pendingEvents = new LinkedBlockingQueue<>(BUFFER_CAPACITY);
116+
LinkedBlockingQueue<IdentifiedAdminEvent> pendingAdminEvents = new LinkedBlockingQueue<>(BUFFER_CAPACITY);
116117
EventEmitterProvider eventEmitterProvider = new EventEmitterProvider(httpClient,
117118
idGenerator, TARGET, SerialisationFormat.JSON, pendingEvents,pendingAdminEvents);
118119

@@ -138,8 +139,8 @@ public void testServerError() throws IOException, InterruptedException {
138139
Undertow server = startHttpServer(handler);
139140
CloseableHttpClient httpClient = HttpClients.createDefault();
140141
IdGenerator idGenerator = new IdGenerator(1,1);
141-
ConcurrentEvictingQueue<IdentifiedEvent> pendingEvents = new ConcurrentEvictingQueue<>(BUFFER_CAPACITY);
142-
ConcurrentEvictingQueue<IdentifiedAdminEvent> pendingAdminEvents = new ConcurrentEvictingQueue<>(BUFFER_CAPACITY);
142+
LinkedBlockingQueue<IdentifiedEvent> pendingEvents = new LinkedBlockingQueue<>(BUFFER_CAPACITY);
143+
LinkedBlockingQueue<IdentifiedAdminEvent> pendingAdminEvents = new LinkedBlockingQueue<>(BUFFER_CAPACITY);
143144
EventEmitterProvider eventEmitterProvider = new EventEmitterProvider(httpClient,
144145
idGenerator, TARGET, SerialisationFormat.JSON, pendingEvents,pendingAdminEvents);
145146

@@ -166,8 +167,8 @@ public void testBufferAndSend() throws IOException, InterruptedException {
166167

167168
CloseableHttpClient httpClient = HttpClients.createDefault();
168169
IdGenerator idGenerator = new IdGenerator(1,1);
169-
ConcurrentEvictingQueue<IdentifiedEvent> pendingEvents = new ConcurrentEvictingQueue<>(BUFFER_CAPACITY);
170-
ConcurrentEvictingQueue<IdentifiedAdminEvent> pendingAdminEvents = new ConcurrentEvictingQueue<>(BUFFER_CAPACITY);
170+
LinkedBlockingQueue<IdentifiedEvent> pendingEvents = new LinkedBlockingQueue<>(BUFFER_CAPACITY);
171+
LinkedBlockingQueue<IdentifiedAdminEvent> pendingAdminEvents = new LinkedBlockingQueue<>(BUFFER_CAPACITY);
171172
EventEmitterProvider eventEmitterProvider = new EventEmitterProvider(httpClient,
172173
idGenerator, TARGET, SerialisationFormat.JSON, pendingEvents,pendingAdminEvents);
173174

0 commit comments

Comments
 (0)