Coming from an initial issue where we need to provide an implementation for vert.x cluster manager for the shared data feature, provide concurrent and clustered programming capabilities.
-
Clustered Locks
-
Clustered CountDownLatch
package org.infinispan.lock.api;
public class ClusteredLockManager {
boolean defineLock(String name, LockConfiguration configuration);
InfinispanLock get(String name);
LockConfiguration getConfiguration(String name);
boolean isDefined(String name);
CompletableFuture<Boolean> remove(String name);
CompletableFuture<Void> reset(String name);
}
public class LockConfiguration {
private final RentrancyLevel lentrancyLevel; // default NOT_REENTRANT
private final boolean silentFailover; // default true
// The maximum length of time for which a client can hold and renew a lock aquisition
private final long maxLeaseTime;
// Maximum length of time a lock may be held without updating the lease,
// after that time any attempt to lock it will succeed
private final long renewalLeaseTime;
}
public enum RentrancyLevel {
NODE, // Node can lock if it owns the lock without blocking, only the owner node can unlock
INSTANCE, // Instance can lock multiple times if it owns the lock without blocking, only the owner instance can unlock
NOT_REENTRANT // Nobody can take the lock if already taken, but everybody can release it
}
@NODE : when a node owns the lock,
Defines a lock with the specific name and LockConfiguration. It does not overwrite existing configurations.
Returns true if successfully defined or false if the lock is already defined or any other failure. If silentFailover is false, then InfinispanLockException will be raised.
Get’s a InfinipanLock by it’s name and throws InfinispanLockException if the lock is not not defined. User must call defineLock before this method.
Get’s the configuration for a Lock. If the Lock does not exist, Optional.empty() will be returned.
Removes a Lock from the system. Returns true when it was removed, false when the lock does not exist. If any other Runtime problems appear, InfinispanLockException will be raised withe the reason. As Locks are not removed automatically, so this has to be done programatically when the Lock is no longer needed. Otherwise, OutOfMemoryException could happen.
Remove must be executed when the lock is locked, because running that without exclusive access should result in an exception. Internally, the implementation should contain generation number so that attempts to acquire a lock of a removed generation will result it exceptions in the other callers, too.
When a cluster node holding a Lock dies, this lock is released and available for the others.
public interface InfinispanLock {
CompletableFuture<Void> lock();
CompletableFuture<Boolean> tryLock();
CompletableFuture<Boolean> tryLock(long time, TimeUnit unit);
CompletableFuture<Void> unlock();
}
CompletableFuture is completed successfully when the lock is acquired When a lock is aquired by a client, it will be automatically released after the maxLeaseTime specified. RenewalLeaseTime is the interval time is the time a client can aquire a lock consecutively User should set the timeouts to non-positive value The initial embedded implementation does not have to support positive values
Acquires the lock only if it is free at the time of invocation. Acquires the lock if it is available and returns with the value true. If the lock is not available then this method with the value false.
Acquires the lock if it is free within the given waiting time. If the lock is available this method returns with the value true.
Parameters: time - the maximum time to wait for the lock unit - the time unit of the time argument Returns: true if the lock was acquired and false if the waiting time elapsed before the lock was acquired
CompletableFuture fails with InfinispanLockException in case of error (InterruptedException, or any other non checked exceptions)
If the lock is rentrant (Node or Instance), only the instance or node holding the lock will be able to unlock, otherwise, anybody can unlock and it will behave as a Semaphore with one permit. True answer will say that the operation was succesul and the lock has been released, false the lock has not been relased
This main is an example on calling the API as if it was blocking, the corrent implementation should be done
public static void main(String[] args) throws Exception {
EmbeddedCacheManager cm = Infinispan.createClustered();
CounterManager counterManager = EmbeddedCounterManagerFactory.asCounterManager(cm);
counterManager.defineCounter("counter", ...);
WeakCounter counter = counterManager.weakCounter("counter");
ClusteredLockManager lockManager = EmbeddedLockManagerFactory.asClusteredLockManager(cm);
lockManager.defineLock("lock", ...);
InfinispanLock lock = lockManager.get("lock");
for (int i = 0; i < 100; i++) {
System.out.println("Counter on " + i + " is => " + counter.getValue());
lock.lock()
.thenRun(new CounterExample(counter))
.whenComplete((nil, t) -> lock.unlock());
}
cm.stop();
}
static class CounterExample implements Runnable {
private WeakCounter counter;
public CounterExample(WeakCounter counter) {
this.counter = counter;
}
@Override
public void run() {
counter.increment();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
counter.decrement();
}
}
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
public class CountDownLatchManager {
InfinispanCountDownLatch define(String name, CountDownLatchConfiguration configuration);
InfinispanCountDownLatch get(String name);
boolean remove(String name);
}
public interface InfinispanCountDownLatch {
CompletableFuture<Void> await();
CompletableFuture<Boolean> await(long timeout, TimeUnit unit);
CompletableFuture<Long> getCount();
void countDown();
}
public class CountDownLatchConfiguration {
private long count;
private boolean autoRemove;
}
* @name A unique name
* @count initial count. If the countDownLatch already exists, override the current value
* @autoRemove when the countdown is zero, destroy it automatically.
public class Driver {
public static void main(String[] args) throws InterruptedException {
EmbeddedCacheManager cm = Infinispan.createClustered();
InfinispanCountDownLatch startSignal = cm.countDownLatch("startSignal", CountDownLatchConfig.count(1).create().build());
InfinispanCountDownLatch doneSignal = cm.countDownLatch("doneSignal", CountDownLatchConfig.count(1).create().build());
new Thread(new Worker()).start();
System.out.println("Do something on driver");
Thread.sleep(5000); // don't let run yet
System.out.println("End something on driver");
startSignal.countDown(); // let all threads proceed
System.out.println("Do something on driver");
Thread.sleep(5000);
System.out.println("End something on driver");
boolean await = doneSignal.await(10, TimeUnit.SECONDS);// wait for all to finish
if(await)
System.out.println("END DRIVER");
Infinispan.stopAll();
}
static class Worker implements Runnable {
private final InfinispanCountDownLatch startSignal;
private final InfinispanCountDownLatch doneSignal;
Worker() {
EmbeddedCacheManager cm = Infinispan.createClustered();
this.startSignal = cm.countDownLatch("startSignal");
this.doneSignal = cm.countDownLatch("doneSignal");
}
public void run() {
try {
boolean await = startSignal.await(4, TimeUnit.SECONDS);
if (await)
doWork();
else
System.out.println("work could not be done !");
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() throws InterruptedException {
System.out.println("Start work");
Thread.sleep(10000);
System.out.println("End work");
}
}
}