Skip to content

Commit a2f5304

Browse files
authored
Merge pull request #1344 from nats-io/kv-purge-per-message-ttl
KV Purge Per Message TTL
2 parents 12625ef + 4a98253 commit a2f5304

File tree

3 files changed

+116
-23
lines changed

3 files changed

+116
-23
lines changed

src/main/java/io/nats/client/KeyValue.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,27 @@ public interface KeyValue {
182182
*/
183183
void purge(String key, long expectedRevision) throws IOException, JetStreamApiException;
184184

185+
/**
186+
* Purge all values/history from the specific key
187+
* @param key the key
188+
* @param messageTtl the individual ttl for the key
189+
* @throws IOException covers various communication issues with the NATS
190+
* server such as timeout or interruption
191+
* @throws JetStreamApiException the request had an error related to the data
192+
*/
193+
void purge(String key, MessageTtl messageTtl) throws IOException, JetStreamApiException;
194+
195+
/**
196+
* Purge all values/history from the specific key iff the key exists and its last revision matches the expected
197+
* @param key the key
198+
* @param expectedRevision the expected last revision
199+
* @param messageTtl the individual ttl for the key
200+
* @throws IOException covers various communication issues with the NATS
201+
* server such as timeout or interruption
202+
* @throws JetStreamApiException the request had an error related to the data
203+
*/
204+
void purge(String key, long expectedRevision, MessageTtl messageTtl) throws IOException, JetStreamApiException;
205+
185206
/**
186207
* Watch updates for a specific key.
187208
* @param key the key.

src/main/java/io/nats/client/impl/NatsKeyValue.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,22 @@ public void purge(String key, long expectedRevision) throws IOException, JetStre
233233
_write(key, null, getPurgeHeaders(), getPublishOptions(expectedRevision, null));
234234
}
235235

236+
/**
237+
* {@inheritDoc}
238+
*/
239+
@Override
240+
public void purge(String key, MessageTtl messageTtl) throws IOException, JetStreamApiException {
241+
_write(key, null, getPurgeHeaders(), getPublishOptions(-1, messageTtl));
242+
}
243+
244+
/**
245+
* {@inheritDoc}
246+
*/
247+
@Override
248+
public void purge(String key, long expectedRevision, MessageTtl messageTtl) throws IOException, JetStreamApiException {
249+
_write(key, null, getPurgeHeaders(), getPublishOptions(expectedRevision, messageTtl));
250+
}
251+
236252
private PublishAck _write(String key, byte[] data, Headers h, PublishOptions popts) throws IOException, JetStreamApiException {
237253
validateNonWildcardKvKeyRequired(key);
238254
return js.publish(NatsMessage.builder().subject(writeSubject(key)).data(data).headers(h).build(), popts);

src/test/java/io/nats/client/impl/KeyValueTests.java

Lines changed: 79 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1847,59 +1847,115 @@ public void testLimitMarker() throws Exception {
18471847
public void testLimitMarkerAlso() throws Exception {
18481848
jsServer.run(TestBase::atLeast2_11_2, nc -> {
18491849
String bucket = bucket();
1850-
String key = key();
1850+
String key1 = key();
1851+
String key2 = key();
1852+
String key3 = key();
18511853

18521854
KeyValueManagement kvm = nc.keyValueManagement();
18531855
KeyValueConfiguration config = KeyValueConfiguration.builder()
18541856
.name(bucket)
18551857
.storageType(StorageType.Memory)
1856-
.limitMarker(Duration.ofSeconds(6))
1858+
.limitMarker(Duration.ofSeconds(5))
18571859
.build();
18581860
kvm.create(config);
18591861

1862+
Dispatcher d = nc.createDispatcher();
1863+
18601864
KeyValue kv = nc.keyValue(bucket);
18611865

1862-
AtomicInteger puts = new AtomicInteger();
1863-
AtomicInteger dels = new AtomicInteger();
1864-
AtomicInteger purge = new AtomicInteger();
1865-
AtomicInteger eod = new AtomicInteger();
1866+
AtomicInteger wPuts = new AtomicInteger();
1867+
AtomicInteger wDels = new AtomicInteger();
1868+
AtomicInteger wPurges = new AtomicInteger();
1869+
AtomicInteger wEod = new AtomicInteger();
18661870

18671871
KeyValueWatcher watcher = new KeyValueWatcher() {
18681872
@Override
18691873
public void watch(KeyValueEntry keyValueEntry) {
18701874
if (keyValueEntry.getOperation() == KeyValueOperation.PUT) {
1871-
puts.incrementAndGet();
1875+
wPuts.incrementAndGet();
18721876
}
18731877
else if (keyValueEntry.getOperation() == KeyValueOperation.DELETE) {
1874-
dels.incrementAndGet();
1878+
wDels.incrementAndGet();
18751879
}
18761880
else if (keyValueEntry.getOperation() == KeyValueOperation.PURGE) {
1877-
purge.incrementAndGet();
1881+
wPurges.incrementAndGet();
18781882
}
18791883
}
18801884

18811885
@Override
18821886
public void endOfData() {
1883-
eod.incrementAndGet();
1887+
wEod.incrementAndGet();
18841888
}
18851889
};
18861890

18871891
NatsKeyValueWatchSubscription watch = kv.watchAll(watcher);
18881892

1889-
kv.create(key, dataBytes(), MessageTtl.seconds(2));
1890-
1891-
KeyValueEntry kve = kv.get(key);
1892-
assertNotNull(kve);
1893-
1894-
sleep(2100); // longer than the message ttl
1895-
1896-
kve = kv.get(key);
1897-
assertNull(kve);
1893+
AtomicInteger rMessages = new AtomicInteger();
1894+
AtomicInteger rPurges = new AtomicInteger();
1895+
AtomicInteger rMaxAges = new AtomicInteger();
1896+
AtomicInteger rTtl2 = new AtomicInteger();
1897+
AtomicInteger rTtl5 = new AtomicInteger();
1898+
1899+
MessageHandler rawHandler = msg -> {
1900+
rMessages.incrementAndGet();
1901+
if (msg.hasHeaders()) {
1902+
String h = msg.getHeaders().getFirst("KV-Operation");
1903+
if (h != null && h.equals("PURGE")) {
1904+
rPurges.incrementAndGet();
1905+
}
1906+
h = msg.getHeaders().getFirst("Nats-Marker-Reason");
1907+
if (h != null && h.equals("MaxAge")) {
1908+
rMaxAges.incrementAndGet();
1909+
}
1910+
h = msg.getHeaders().getFirst("Nats-TTL");
1911+
if (h != null) {
1912+
if (h.equals("2s")) {
1913+
rTtl2.incrementAndGet();
1914+
}
1915+
else {
1916+
rTtl5.incrementAndGet();
1917+
}
1918+
}
1919+
}
1920+
};
18981921

1899-
assertEquals(1, puts.get());
1900-
assertEquals(0, dels.get());
1901-
assertEquals(1, purge.get());
1902-
assertEquals(1, eod.get());
1922+
JetStreamSubscription sub = nc.jetStream().subscribe(null, d, rawHandler, true,
1923+
PushSubscribeOptions.builder()
1924+
.stream("KV_" + bucket)
1925+
.configuration(ConsumerConfiguration.builder().filterSubject(">")
1926+
.build())
1927+
.build());
1928+
1929+
kv.create(key1, dataBytes(), MessageTtl.seconds(2));
1930+
kv.create(key2, dataBytes());
1931+
kv.create(key3, dataBytes());
1932+
1933+
assertNotNull(kv.get(key1));
1934+
assertNotNull(kv.get(key2));
1935+
assertNotNull(kv.get(key3));
1936+
1937+
kv.purge(key2, MessageTtl.seconds(2));
1938+
kv.purge(key3);
1939+
1940+
// This section will have to be modified if there are changes
1941+
// to how purge markers are handled (double purge on ttl purge, fix for no purge of non-ttl purge)
1942+
sleep(8000); // longer than the message ttl plus the limit marker since double purge plus some extra
1943+
1944+
assertNull(kv.get(key1));
1945+
assertNull(kv.get(key2));
1946+
assertNull(kv.get(key3));
1947+
1948+
// create and put
1949+
assertEquals(3, wPuts.get());
1950+
assertEquals(4, wPurges.get()); // the 2 message ttl purge markers, the manual purge and the manual purge's purge.
1951+
assertEquals(0, wDels.get());
1952+
assertEquals(1, wEod.get());
1953+
1954+
assertEquals(7, rMessages.get());
1955+
assertEquals(2, rPurges.get());
1956+
assertEquals(2, rMaxAges.get());
1957+
assertEquals(2, rTtl2.get());
1958+
assertEquals(2, rTtl5.get());
19031959
});
19041960
}
19051961
}

0 commit comments

Comments
 (0)