diff --git a/coordination/pom.xml b/coordination/pom.xml
new file mode 100644
index 0000000..cf8a6bc
--- /dev/null
+++ b/coordination/pom.xml
@@ -0,0 +1,20 @@
+
+
+ 4.0.0
+
+ tech.ydb.examples
+ ydb-sdk-examples
+ 1.1.0-SNAPSHOT
+
+
+ tech.ydb.coordination.examples
+ ydb-coordination-examples
+
+ YDB SDK Coordination Service examples
+
+ pom
+
+
+ recipes
+
+
diff --git a/coordination/recipes/pom.xml b/coordination/recipes/pom.xml
new file mode 100644
index 0000000..9a6ed9c
--- /dev/null
+++ b/coordination/recipes/pom.xml
@@ -0,0 +1,58 @@
+
+
+ 4.0.0
+
+
+ tech.ydb.coordination.examples
+ ydb-coordination-examples
+ 1.1.0-SNAPSHOT
+
+
+ ydb-coordination-recipes-example
+ YDB Coordination Service recipes example
+ jar
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+
+
+
+ tech.ydb
+ ydb-sdk-coordination
+
+
+ tech.ydb.auth
+ yc-auth-provider
+
+
+
+
+ jdbc-coordination-recipes-example
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ true
+ libs/
+ tech.ydb.coordination.recipes.example.Main
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/LockApp.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/LockApp.java
new file mode 100644
index 0000000..c7b4b92
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/LockApp.java
@@ -0,0 +1,88 @@
+package tech.ydb.coordination.recipes.example;
+
+import tech.ydb.coordination.CoordinationClient;
+import tech.ydb.coordination.recipes.example.lib.locks.InterProcessLock;
+import tech.ydb.coordination.recipes.example.lib.locks.InterProcessMutex;
+
+import java.time.Duration;
+import java.util.Scanner;
+
+public class LockApp {
+
+ InterProcessLock lock;
+
+ LockApp(CoordinationClient client) {
+ client.createNode("examples/app").join().expectSuccess("cannot create coordination path");
+ lock = new InterProcessMutex(
+ client,
+ "examples/app",
+ "data".getBytes(),
+ "default_lock"
+ );
+ }
+
+ public void lock(Duration duration) {
+ try {
+ if (duration == null) {
+ lock.acquire();
+ } else {
+ lock.acquire(duration);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void release() {
+ try {
+ lock.release();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private boolean isAcquired() {
+ return lock.isAcquiredInThisProcess();
+ }
+
+ public void run() {
+ Scanner scanner = new Scanner(System.in);
+ System.out.println("Enter commands: lock [seconds] | release | reconnect | ?");
+
+ while (scanner.hasNextLine()) {
+ String commandLine = scanner.nextLine().trim();
+ String[] commandParts = commandLine.split("\\s+");
+ String command = commandParts[0];
+
+ switch (command.toLowerCase()) {
+ case "lock":
+ int seconds = -1;
+ if (commandParts.length > 1) {
+ try {
+ seconds = Integer.parseInt(commandParts[1]);
+ } catch (NumberFormatException e) {
+ System.out.println("Invalid number format, defaulting to 0 seconds");
+ }
+ }
+ if (seconds == -1) {
+ lock(null);
+ } else {
+ lock(Duration.ofSeconds(seconds));
+ }
+ break;
+ case "release":
+ release();
+ break;
+ case "?":
+ System.out.println("Lock is acquired: " + isAcquired());
+ break;
+ default:
+ System.out.println("Unknown command: " + command);
+ }
+ }
+
+ scanner.close();
+ }
+
+}
+
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/Main.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/Main.java
new file mode 100644
index 0000000..a9c9579
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/Main.java
@@ -0,0 +1,30 @@
+package tech.ydb.coordination.recipes.example;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import tech.ydb.auth.iam.CloudAuthHelper;
+import tech.ydb.coordination.CoordinationClient;
+import tech.ydb.core.grpc.GrpcTransport;
+
+public class Main {
+ private final static Logger logger = LoggerFactory.getLogger(Main.class);
+
+ public static void main(String[] args) {
+ if (args.length != 1) {
+ System.err.println("Usage: java -jar jdbc-coordination-api-example.jar ");
+ return;
+ }
+
+ try (GrpcTransport transport = GrpcTransport.forConnectionString(args[0])
+ .withAuthProvider(CloudAuthHelper.getAuthProviderFromEnviron())
+ .build()) {
+
+ logger.info("run lock app example");
+ CoordinationClient client = CoordinationClient.newClient(transport);
+ LockApp lockApp = new LockApp(client);
+ lockApp.run();
+ logger.info("lock app example finished");
+ }
+ }
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/election/LeaderElectionListener.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/election/LeaderElectionListener.java
new file mode 100644
index 0000000..957f403
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/election/LeaderElectionListener.java
@@ -0,0 +1,5 @@
+package tech.ydb.coordination.recipes.example.lib.election;
+
+public interface LeaderElectionListener {
+ void takeLeadership() throws Exception;
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/election/LeaderElector.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/election/LeaderElector.java
new file mode 100644
index 0000000..5ca863c
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/election/LeaderElector.java
@@ -0,0 +1,194 @@
+package tech.ydb.coordination.recipes.example.lib.election;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import tech.ydb.coordination.CoordinationClient;
+import tech.ydb.coordination.CoordinationSession;
+import tech.ydb.coordination.recipes.example.lib.watch.Participant;
+import tech.ydb.coordination.recipes.example.lib.locks.InterProcessMutex;
+import tech.ydb.coordination.recipes.example.lib.watch.SemaphoreWatchAdapter;
+import tech.ydb.coordination.recipes.example.lib.util.Listenable;
+import tech.ydb.coordination.recipes.example.lib.util.ListenableProvider;
+
+public class LeaderElector implements Closeable, ListenableProvider {
+ private static final Logger logger = LoggerFactory.getLogger(LeaderElector.class);
+
+ private final CoordinationClient client;
+ private final LeaderElectionListener leaderElectionListener;
+ private final String coordinationNodePath;
+ private final String semaphoreName;
+ private final ExecutorService electionExecutor;
+ private final InterProcessMutex lock;
+ private final SemaphoreWatchAdapter semaphoreWatchAdapter;
+
+ private AtomicReference state = new AtomicReference<>(State.STARTED);
+ private volatile boolean autoRequeue = false;
+ private volatile boolean isLeader = false;
+ private Future electionTask = null;
+
+
+ private enum State { // TODO: needs third state (CREATED)?
+ STARTED,
+ CLOSED
+ }
+
+ public LeaderElector(
+ CoordinationClient client,
+ LeaderElectionListener leaderElectionListener,
+ String coordinationNodePath,
+ String semaphoreName
+ ) {
+ this(client, leaderElectionListener, coordinationNodePath, semaphoreName, Executors.newSingleThreadExecutor());
+ }
+
+ public LeaderElector(
+ CoordinationClient client,
+ LeaderElectionListener leaderElectionListener,
+ String coordinationNodePath,
+ String semaphoreName,
+ ExecutorService executorService
+ ) {
+ this.client = client;
+ this.leaderElectionListener = leaderElectionListener;
+ this.coordinationNodePath = coordinationNodePath;
+ this.semaphoreName = semaphoreName;
+ this.electionExecutor = executorService;
+ this.lock = new InterProcessMutex(
+ client,
+ coordinationNodePath,
+ semaphoreName
+ );
+ this.semaphoreWatchAdapter = new SemaphoreWatchAdapter(lock.getSession(), semaphoreName);
+ semaphoreWatchAdapter.start();
+ }
+
+ public boolean isLeader() {
+ return isLeader;
+ }
+
+ public synchronized void interruptLeadership() {
+ Future> task = electionTask;
+ if (task != null) {
+ task.cancel(true);
+ }
+ }
+
+ /**
+ * Re-queue an attempt for leadership. If this instance is already queued, nothing
+ * happens and false is returned. If the instance was not queued, it is re-queued and true
+ * is returned
+ *
+ * @return true if re-enqueue was successful
+ */
+ public boolean requeue() {
+ Preconditions.checkState(state.get() == State.STARTED, "Already closed or not yet started");
+
+ return enqueueElection();
+ }
+
+ public void autoRequeue() {
+ autoRequeue = true;
+ }
+
+ private synchronized boolean enqueueElection() {
+ if (!isQueued() && state.get() == State.STARTED) {
+ electionTask = electionExecutor.submit(new Callable() {
+ @Override
+ public Void call() throws Exception {
+ try {
+ doWork();
+ } finally {
+ finishTask();
+ }
+ return null;
+ }
+ });
+ return true;
+ }
+
+ return false;
+ }
+
+ private void doWork() throws Exception {
+ isLeader = false;
+
+ try {
+ lock.acquire();
+ isLeader = true;
+ try {
+ leaderElectionListener.takeLeadership();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw e;
+ } catch (Throwable e) {
+ logger.debug("takeLeadership exception", e);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw e;
+ } finally {
+ if (isLeader) {
+ isLeader = false;
+ boolean wasInterrupted = Thread.interrupted();
+ try {
+ lock.release();
+ } catch (Exception e) {
+ logger.error("Lock release exception for: " + coordinationNodePath);
+ } finally {
+ if (wasInterrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+ }
+
+ private synchronized void finishTask() {
+ electionTask = null;
+ if (autoRequeue) { // TODO: requeue if critical exception?
+ enqueueElection();
+ }
+ }
+
+ private boolean isQueued() {
+ return electionTask != null;
+ }
+
+ public List getParticipants() {
+ return semaphoreWatchAdapter.getParticipants();
+ }
+
+ public Optional getLeader() {
+ return semaphoreWatchAdapter.getOwners().stream().findFirst();
+ }
+
+ @Override
+ public synchronized void close() {
+ Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed");
+
+ Future task = electionTask;
+ if (task != null) {
+ task.cancel(true);
+ }
+
+ electionTask = null;
+ electionExecutor.close();
+ semaphoreWatchAdapter.close();
+ getListenable().clearListeners();
+ }
+
+ @Override
+ public Listenable getListenable() {
+ return lock.getListenable();
+ }
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/InterProcessLock.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/InterProcessLock.java
new file mode 100644
index 0000000..b67d5ea
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/InterProcessLock.java
@@ -0,0 +1,25 @@
+package tech.ydb.coordination.recipes.example.lib.locks;
+
+import java.time.Duration;
+
+import tech.ydb.coordination.CoordinationSession;
+import tech.ydb.coordination.recipes.example.lib.util.Listenable;
+
+public interface InterProcessLock extends Listenable {
+ void acquire() throws Exception, LockAlreadyAcquiredException, LockAcquireFailedException;
+
+ /**
+ * @return true - if successfully acquired lock, false - if lock waiting time expired
+ */
+ boolean acquire(Duration waitDuration) throws Exception, LockAlreadyAcquiredException, LockAcquireFailedException;
+
+ /**
+ * @return false if nothing to release
+ */
+ boolean release() throws Exception;
+
+ /**
+ * @return true if the lock is acquired by a thread in this JVM
+ */
+ boolean isAcquiredInThisProcess();
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/InterProcessMutex.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/InterProcessMutex.java
new file mode 100644
index 0000000..11b18e3
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/InterProcessMutex.java
@@ -0,0 +1,251 @@
+package tech.ydb.coordination.recipes.example.lib.locks;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import tech.ydb.coordination.CoordinationClient;
+import tech.ydb.coordination.CoordinationSession;
+import tech.ydb.coordination.SemaphoreLease;
+import tech.ydb.coordination.description.SemaphoreDescription;
+import tech.ydb.coordination.recipes.example.lib.util.SessionListenerWrapper;
+import tech.ydb.coordination.recipes.example.lib.util.Listenable;
+import tech.ydb.coordination.recipes.example.lib.util.ListenableProvider;
+import tech.ydb.coordination.settings.DescribeSemaphoreMode;
+import tech.ydb.core.Result;
+import tech.ydb.core.Status;
+import tech.ydb.core.StatusCode;
+
+@ThreadSafe
+public class InterProcessMutex implements InterProcessLock, ListenableProvider {
+ private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30);
+ private static final Logger logger = LoggerFactory.getLogger(InterProcessMutex.class);
+
+ private final Lock leaseLock = new ReentrantLock();
+ private final CoordinationSession session;
+ private final CompletableFuture sessionConnectionTask;
+ private final SessionListenerWrapper sessionListenerWrapper;
+ private final String semaphoreName;
+ private final String coordinationNodePath;
+
+ private volatile SemaphoreLease processLease = null;
+
+ public InterProcessMutex(
+ CoordinationClient client,
+ String coordinationNodePath,
+ String lockName
+ ) {
+ this.coordinationNodePath = coordinationNodePath;
+ this.session = client.createSession(coordinationNodePath);
+ this.sessionListenerWrapper = new SessionListenerWrapper(session);
+ this.semaphoreName = lockName;
+
+ this.sessionConnectionTask = session.connect().thenApply(status -> {
+ logger.debug("Session connection status: " + status);
+ return status;
+ });
+ session.addStateListener(state -> {
+ switch (state) {
+ case RECONNECTED: {
+ logger.debug("Session RECONNECTED");
+ reconnect();
+ break;
+ }
+ case CLOSED: {
+ logger.debug("Session CLOSED, releasing lock");
+ internalRelease();
+ break;
+ }
+ case LOST: {
+ logger.debug("Session LOST, releasing lock");
+ internalRelease();
+ break;
+ }
+ }
+ });
+ }
+
+ private CoordinationSession connectedSession() {
+ try {
+ sessionConnectionTask.get().expectSuccess("Unable to connect to session on: " + coordinationNodePath);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ return session;
+ }
+
+ private void reconnect() {
+ connectedSession().describeSemaphore(
+ semaphoreName,
+ DescribeSemaphoreMode.WITH_OWNERS_AND_WAITERS
+ ).thenAccept(result -> {
+ if (!result.isSuccess()) {
+ logger.error("Unable to describe semaphore {}", semaphoreName);
+ return;
+ }
+ SemaphoreDescription semaphoreDescription = result.getValue();
+ SemaphoreDescription.Session owner = semaphoreDescription.getOwnersList().getFirst();
+ if (owner.getId() != session.getId()) {
+ logger.warn(
+ "Current session with id: {} lost lease after reconnection on semaphore: {}",
+ owner.getId(),
+ semaphoreName
+ );
+ internalRelease();
+ }
+ });
+ }
+
+ @Override
+ public void acquire() throws Exception {
+ logger.debug("Trying to acquire without timeout");
+ safeAcquire(null);
+ }
+
+ @Override
+ public boolean acquire(Duration duration) throws Exception {
+ logger.debug("Trying to acquire with deadline: {}", duration);
+ Instant deadline = Instant.now().plus(duration);
+ return safeAcquire(deadline);
+ }
+
+ @Override
+ public boolean release() throws Exception {
+ return internalRelease().get();
+ }
+
+ private CompletableFuture internalRelease() {
+ logger.debug("Trying to release");
+ if (processLease == null) {
+ logger.debug("Already released");
+ return CompletableFuture.completedFuture(false);
+ }
+
+ leaseLock.lock();
+ try {
+ if (processLease != null) {
+ return processLease.release().thenApply(it -> {
+ logger.debug("Released lock");
+ processLease = null;
+ leaseLock.unlock();
+ return true;
+ });
+ }
+ } finally {
+ leaseLock.unlock();
+ }
+
+ logger.debug("Already released");
+ return CompletableFuture.completedFuture(false);
+ }
+
+ @Override
+ public boolean isAcquiredInThisProcess() {
+ return processLease != null;
+ }
+
+ // TODO: implement interruption
+
+ /**
+ * @param deadline
+ * @return true - if successfully acquired lock
+ * @throws Exception
+ * @throws LockAlreadyAcquiredException
+ */
+ private boolean safeAcquire(@Nullable Instant deadline) throws Exception, LockAlreadyAcquiredException {
+ if (processLease != null) {
+ logger.debug("Already acquired lock: {}", semaphoreName);
+ throw new LockAlreadyAcquiredException(semaphoreName);
+ }
+
+ leaseLock.lock();
+ try {
+ if (processLease != null) {
+ logger.debug("Already acquired lock: {}", semaphoreName);
+ throw new LockAlreadyAcquiredException(semaphoreName);
+ }
+
+ SemaphoreLease lease = internalLock(deadline);
+ if (lease != null) {
+ processLease = lease;
+ logger.debug("Successfully acquired lock: {}", semaphoreName);
+ return true;
+ }
+ } finally {
+ leaseLock.unlock();
+ }
+
+ logger.debug("Unable to acquire lock: {}", semaphoreName);
+ return false;
+ }
+
+ private SemaphoreLease internalLock(@Nullable Instant deadline) throws ExecutionException, InterruptedException {
+ int retryCount = 0;
+ while (connectedSession().getState().isActive() && (deadline == null || Instant.now().isBefore(deadline))) {
+ retryCount++;
+
+ Duration timeout;
+ if (deadline == null) {
+ timeout = DEFAULT_TIMEOUT;
+ } else {
+ timeout = Duration.between(Instant.now(), deadline); // TODO: use external Clock instead of Instant?
+ }
+ CompletableFuture> acquireTask = connectedSession().acquireEphemeralSemaphore(
+ semaphoreName, true, null, timeout // TODO: change Session API to use deadlines
+ );
+ Result leaseResult;
+ try {
+ leaseResult = acquireTask.get();
+ } catch (InterruptedException e) {
+ // If acquire is interrupted, then release immediately
+ Thread.currentThread().interrupt();
+ acquireTask.thenAccept(acquireResult -> {
+ if (!acquireResult.getStatus().isSuccess()) {
+ return;
+ }
+ SemaphoreLease lease = acquireResult.getValue();
+ lease.release();
+ });
+ throw e;
+ }
+
+ Status status = leaseResult.getStatus();
+ logger.debug("Lease result status: {}", status);
+
+ if (status.isSuccess()) {
+ logger.debug("Successfully acquired the lock");
+ return leaseResult.getValue();
+ }
+
+ if (status.getCode() == StatusCode.TIMEOUT) {
+ logger.debug("Trying to acquire again, retries: {}", retryCount);
+ continue;
+ }
+
+ if (!status.getCode().isRetryable(true)) {
+ status.expectSuccess("Unable to retry acquiring semaphore");
+ return null;
+ }
+ }
+
+ // TODO: handle timeout and error differently
+ throw new LockAcquireFailedException(coordinationNodePath, semaphoreName);
+ }
+
+ @Override
+ public Listenable getListenable() {
+ return sessionListenerWrapper;
+ }
+
+ public CoordinationSession getSession() {
+ return session;
+ }
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockAcquireFailedException.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockAcquireFailedException.java
new file mode 100644
index 0000000..d4ea7b1
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockAcquireFailedException.java
@@ -0,0 +1,20 @@
+package tech.ydb.coordination.recipes.example.lib.locks;
+
+public class LockAcquireFailedException extends RuntimeException {
+ private final String coordinationNodePath;
+ private final String semaphoreName;
+
+ public LockAcquireFailedException(String coordinationNodePath, String semaphoreName) {
+ super("Failed to acquire semaphore=" + semaphoreName + ", on coordination node=" + coordinationNodePath);
+ this.coordinationNodePath = coordinationNodePath;
+ this.semaphoreName = semaphoreName;
+ }
+
+ public String getCoordinationNodePath() {
+ return coordinationNodePath;
+ }
+
+ public String getSemaphoreName() {
+ return semaphoreName;
+ }
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockAlreadyAcquiredException.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockAlreadyAcquiredException.java
new file mode 100644
index 0000000..1d1009a
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockAlreadyAcquiredException.java
@@ -0,0 +1,20 @@
+package tech.ydb.coordination.recipes.example.lib.locks;
+
+public class LockAlreadyAcquiredException extends RuntimeException {
+ private final String coordinationNodePath;
+ private final String semaphoreName;
+
+ public LockAlreadyAcquiredException(String coordinationNodePath, String semaphoreName) {
+ super("Semaphore=" + semaphoreName + " on path=" + coordinationNodePath + " is already acquired");
+ this.coordinationNodePath = coordinationNodePath;
+ this.semaphoreName = semaphoreName;
+ }
+
+ public String getCoordinationNodePath() {
+ return coordinationNodePath;
+ }
+
+ public String getSemaphoreName() {
+ return semaphoreName;
+ }
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockInternals.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockInternals.java
new file mode 100644
index 0000000..57d16bf
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockInternals.java
@@ -0,0 +1,267 @@
+package tech.ydb.coordination.recipes.example.lib.locks;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import tech.ydb.coordination.CoordinationClient;
+import tech.ydb.coordination.CoordinationSession;
+import tech.ydb.coordination.SemaphoreLease;
+import tech.ydb.coordination.description.SemaphoreDescription;
+import tech.ydb.coordination.recipes.example.lib.util.Listenable;
+import tech.ydb.coordination.recipes.example.lib.util.ListenableProvider;
+import tech.ydb.coordination.recipes.example.lib.util.SessionListenerWrapper;
+import tech.ydb.coordination.settings.DescribeSemaphoreMode;
+import tech.ydb.core.Result;
+import tech.ydb.core.Status;
+import tech.ydb.core.StatusCode;
+
+@ThreadSafe
+class LockInternals implements ListenableProvider, Closeable {
+ private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30);
+ private static final Logger logger = LoggerFactory.getLogger(LockInternals.class);
+
+ private final String coordinationNodePath;
+ private final String semaphoreName;
+ private final CoordinationSession session;
+ private final SessionListenerWrapper sessionListenerWrapper;
+
+ private CompletableFuture sessionConnectionTask = null;
+ private volatile SemaphoreLease processLease = null; // TODO: volatile?
+
+ LockInternals(
+ CoordinationClient client,
+ String coordinationNodePath,
+ String lockName
+ ) {
+ this.coordinationNodePath = coordinationNodePath;
+ this.semaphoreName = lockName;
+ this.session = client.createSession(coordinationNodePath);
+ this.sessionListenerWrapper = new SessionListenerWrapper(session);
+ }
+
+ public void start() {
+ this.sessionConnectionTask = session.connect().thenApply(status -> {
+ logger.debug("Session connection status: {}", status);
+ return status;
+ });
+
+ Consumer listener = state -> {
+ switch (state) {
+ case RECONNECTED: {
+ logger.debug("Session RECONNECTED");
+ reconnect();
+ break;
+ }
+ case CLOSED: {
+ logger.debug("Session CLOSED, releasing lock");
+ internalRelease();
+ break;
+ }
+ case LOST: {
+ logger.debug("Session LOST, releasing lock");
+ internalRelease();
+ break;
+ }
+ }
+ };
+
+ session.addStateListener(listener);
+ }
+
+ private CoordinationSession connectedSession() {
+ try {
+ sessionConnectionTask.get().expectSuccess("Unable to connect to session on: " + coordinationNodePath);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ return session;
+ }
+
+ private void reconnect() {
+ CoordinationSession coordinationSession = connectedSession();
+ coordinationSession.describeSemaphore(
+ semaphoreName,
+ DescribeSemaphoreMode.WITH_OWNERS_AND_WAITERS
+ ).thenAccept(result -> {
+ if (!result.isSuccess()) {
+ logger.error("Unable to describe semaphore {}", semaphoreName);
+ return;
+ }
+ SemaphoreDescription semaphoreDescription = result.getValue();
+ SemaphoreDescription.Session owner = semaphoreDescription.getOwnersList().getFirst();
+ if (owner.getId() != coordinationSession.getId()) {
+ logger.warn(
+ "Current session with id: {} lost lease after reconnection on semaphore: {}",
+ owner.getId(),
+ semaphoreName
+ );
+ internalRelease();
+ }
+ });
+ }
+
+ public boolean tryAcquire(@Nullable Duration duration, boolean exclusive, byte[] data) throws Exception {
+ logger.debug("Trying to acquire with deadline: {}", duration);
+ Instant deadline = Instant.now().plus(duration);
+ return safeAcquire(deadline, exclusive, data);
+ }
+
+ public boolean release() {
+ return internalRelease();
+ }
+
+ // TODO: interruptible?
+ private synchronized boolean internalRelease() {
+ logger.debug("Trying to release");
+ if (processLease == null) {
+ logger.debug("Already released");
+ return false;
+ }
+
+ try {
+ return processLease.release().thenApply(it -> {
+ logger.debug("Released lock");
+ processLease = null;
+ return true;
+ }).get();
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * @param deadline
+ * @return true - if successfully acquired lock
+ * @throws Exception
+ * @throws LockAlreadyAcquiredException
+ * @throws LockAcquireFailedException
+ */
+ // TODO: deadlock? Move synchronized?
+ private synchronized boolean safeAcquire(
+ @Nullable Instant deadline,
+ boolean exclusive,
+ byte[] data
+ ) throws Exception {
+ if (processLease != null) {
+ logger.debug("Already acquired lock: {}", semaphoreName);
+ throw new LockAlreadyAcquiredException(coordinationNodePath, semaphoreName);
+ }
+
+ Optional lease = tryBlockingLock(deadline, true, data);
+ if (lease.isPresent()) {
+ processLease = lease.get();
+ logger.debug("Successfully acquired lock: {}", semaphoreName);
+ return true;
+ }
+
+ logger.debug("Unable to acquire lock: {}", semaphoreName);
+ return false;
+ }
+
+ private Optional tryBlockingLock(
+ @Nullable Instant deadline,
+ boolean exclusive,
+ byte[] data
+ ) throws Exception {
+ int retryCount = 0;
+ CoordinationSession coordinationSession = connectedSession();
+
+ while (coordinationSession.getState().isActive() && (deadline == null || Instant.now().isBefore(deadline))) {
+ retryCount++;
+
+ Duration timeout;
+ if (deadline == null) {
+ timeout = DEFAULT_TIMEOUT;
+ } else {
+ timeout = Duration.between(Instant.now(), deadline); // TODO: use external Clock instead of Instant?
+ }
+
+ CompletableFuture> acquireTask = coordinationSession.acquireEphemeralSemaphore(
+ semaphoreName, exclusive, data, timeout // TODO: change Session API to use deadlines
+ );
+ Result leaseResult;
+ try {
+ leaseResult = acquireTask.get();
+ } catch (InterruptedException e) {
+ // If acquire is interrupted, then release immediately
+ Thread.currentThread().interrupt();
+ acquireTask.thenAccept(acquireResult -> {
+ if (!acquireResult.getStatus().isSuccess()) {
+ return;
+ }
+ SemaphoreLease lease = acquireResult.getValue();
+ lease.release();
+ });
+ throw e;
+ }
+
+ Status status = leaseResult.getStatus();
+ logger.debug("Lease result status: {}", status);
+
+ if (status.isSuccess()) {
+ logger.debug("Successfully acquired the lock");
+ return Optional.of(leaseResult.getValue());
+ }
+
+ if (status.getCode() == StatusCode.TIMEOUT) {
+ logger.debug("Trying to acquire semaphore {} again, retries: {}", semaphoreName, retryCount);
+ continue;
+ }
+
+ if (!status.getCode().isRetryable(true)) {
+ status.expectSuccess("Unable to retry acquiring semaphore");
+ throw new LockAcquireFailedException(coordinationNodePath, semaphoreName);
+ }
+ }
+
+ if (deadline != null && Instant.now().compareTo(deadline) >= 0) {
+ return Optional.empty();
+ }
+
+ throw new LockAcquireFailedException(coordinationNodePath, semaphoreName);
+ }
+
+ public String getCoordinationNodePath() {
+ return coordinationNodePath;
+ }
+
+ public String getSemaphoreName() {
+ return semaphoreName;
+ }
+
+ public CoordinationSession getCoordinationSession() {
+ return connectedSession();
+ }
+
+ public @Nullable SemaphoreLease getProcessLease() {
+ return processLease;
+ }
+
+ @Override
+ public Listenable getListenable() {
+ return sessionListenerWrapper;
+ }
+
+ @Override
+ public void close() {
+ try {
+ release();
+ } catch (Exception ignored) {
+ }
+
+ session.close();
+ sessionListenerWrapper.clearListeners();
+ }
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/ReadWriteLock.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/ReadWriteLock.java
new file mode 100644
index 0000000..7b5ad67
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/ReadWriteLock.java
@@ -0,0 +1,79 @@
+package tech.ydb.coordination.recipes.example.lib.locks;
+
+import java.time.Duration;
+
+import tech.ydb.coordination.CoordinationClient;
+import tech.ydb.coordination.CoordinationSession;
+import tech.ydb.coordination.recipes.example.lib.util.Listenable;
+import tech.ydb.coordination.recipes.example.lib.util.ListenableProvider;
+
+public class ReadWriteLock {
+ private final InternalLock readLock;
+ private final InternalLock writeLock;
+
+ public ReadWriteLock(
+ CoordinationClient client,
+ String coordinationNodePath,
+ String lockName
+ ) {
+ LockInternals lockInternals = new LockInternals(
+ client, coordinationNodePath, lockName
+ );
+ lockInternals.start();
+ // TODO: Share same lockInternals?
+ this.readLock = new InternalLock(lockInternals, false);
+ this.writeLock = new InternalLock(lockInternals, true);
+ }
+
+ public InterProcessLock writeLock() {
+ return readLock;
+ }
+
+ public InterProcessLock readLock() {
+ return writeLock;
+ }
+
+ private static class InternalLock implements InterProcessLock, ListenableProvider {
+ private final LockInternals lockInternals;
+ private final boolean isExclisive;
+
+ private InternalLock(LockInternals lockInternals, boolean isExclisive) {
+ this.lockInternals = lockInternals;
+ this.isExclisive = isExclisive;
+ }
+
+ @Override
+ public void acquire() throws Exception {
+ lockInternals.tryAcquire(
+ null,
+ isExclisive,
+ null
+ );
+ }
+
+ @Override
+ public boolean acquire(Duration waitDuration) throws Exception {
+ return lockInternals.tryAcquire(
+ waitDuration,
+ isExclisive,
+ null
+ );
+ }
+
+ @Override
+ public boolean release() {
+ return lockInternals.release();
+ }
+
+ @Override
+ public boolean isAcquiredInThisProcess() {
+ return lockInternals.getProcessLease() != null;
+ }
+
+ @Override
+ public Listenable getListenable() {
+ return lockInternals.getListenable();
+ }
+ }
+
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/Listenable.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/Listenable.java
new file mode 100644
index 0000000..fa887a3
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/Listenable.java
@@ -0,0 +1,19 @@
+package tech.ydb.coordination.recipes.example.lib.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+
+public interface Listenable {
+ void addListener(Consumer listener);
+
+ /**
+ * Listener call will be processed in executor
+ * @param listener
+ * @param executor
+ */
+ void addListener(Consumer listener, ExecutorService executor);
+
+ void removeListener(Consumer listener);
+
+ void clearListeners();
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/ListenableProvider.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/ListenableProvider.java
new file mode 100644
index 0000000..f4b80f2
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/ListenableProvider.java
@@ -0,0 +1,29 @@
+package tech.ydb.coordination.recipes.example.lib.util;
+
+
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+
+public interface ListenableProvider extends Listenable {
+ Listenable getListenable();
+
+ @Override
+ default void addListener(Consumer listener) {
+ getListenable().addListener(listener);
+ }
+
+ @Override
+ default void addListener(Consumer listener, ExecutorService executor) {
+ getListenable().addListener(listener, executor);
+ }
+
+ @Override
+ default void removeListener(Consumer listener) {
+ getListenable().removeListener(listener);
+ }
+
+ @Override
+ default void clearListeners() {
+ getListenable().clearListeners();
+ }
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/SessionListenerWrapper.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/SessionListenerWrapper.java
new file mode 100644
index 0000000..0ef56d7
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/SessionListenerWrapper.java
@@ -0,0 +1,54 @@
+package tech.ydb.coordination.recipes.example.lib.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+
+import tech.ydb.coordination.CoordinationSession;
+import tech.ydb.coordination.CoordinationSession.State;
+
+public class SessionListenerWrapper implements Listenable {
+ private final CoordinationSession session;
+ /**
+ * key - original (external) consumer, value - consumer wrapper or original consumer depending on executor
+ */
+ private final Map, Consumer> listenersMapping = new HashMap<>();
+
+ public SessionListenerWrapper(CoordinationSession session) {
+ this.session = session;
+ }
+
+ @Override
+ public void addListener(Consumer listener) {
+ if (listenersMapping.containsKey(listener)) {
+ return;
+ }
+
+ listenersMapping.put(listener, listener);
+ session.addStateListener(listener);
+ }
+
+ @Override
+ public void addListener(Consumer listener, ExecutorService executor) {
+ if (listenersMapping.containsKey(listener)) {
+ return;
+ }
+
+ Consumer wrapper = state -> executor.submit(() -> listener.accept(state));
+ listenersMapping.put(listener, wrapper);
+ session.addStateListener(wrapper);
+ }
+
+ @Override
+ public void removeListener(Consumer listener) {
+ Consumer removed = listenersMapping.remove(listener);
+ session.removeStateListener(removed);
+ }
+
+ @Override
+ public void clearListeners() {
+ listenersMapping.keySet().forEach(this::removeListener);
+ listenersMapping.clear();
+ }
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/watch/Participant.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/watch/Participant.java
new file mode 100644
index 0000000..568b6a3
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/watch/Participant.java
@@ -0,0 +1,61 @@
+package tech.ydb.coordination.recipes.example.lib.watch;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+public class Participant {
+ private final long id;
+ private final byte[] data;
+ private final long count;
+ private final boolean isLeader;
+
+ public Participant(long id, byte[] data, long count, boolean isLeader) {
+ this.id = id;
+ this.data = data;
+ this.count = count;
+ this.isLeader = isLeader;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public byte[] getData() {
+ return data;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ public boolean isLeader() {
+ return isLeader;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Participant that = (Participant) o;
+ return id == that.id && count == that.count && isLeader == that.isLeader && Objects.deepEquals(data, that.data);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, Arrays.hashCode(data), count, isLeader);
+ }
+
+ @Override
+ public String toString() {
+ return "Participant{" +
+ "id=" + id +
+ ", data=" + Arrays.toString(data) +
+ ", count=" + count +
+ ", isLeader=" + isLeader +
+ '}';
+ }
+}
diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/watch/SemaphoreWatchAdapter.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/watch/SemaphoreWatchAdapter.java
new file mode 100644
index 0000000..d1b982d
--- /dev/null
+++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/watch/SemaphoreWatchAdapter.java
@@ -0,0 +1,178 @@
+package tech.ydb.coordination.recipes.example.lib.watch;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import tech.ydb.coordination.CoordinationSession;
+import tech.ydb.coordination.description.SemaphoreDescription;
+import tech.ydb.coordination.description.SemaphoreWatcher;
+import tech.ydb.coordination.settings.DescribeSemaphoreMode;
+import tech.ydb.coordination.settings.WatchSemaphoreMode;
+import tech.ydb.core.Result;
+import tech.ydb.core.Status;
+
+public class SemaphoreWatchAdapter implements Closeable {
+ private static final Logger logger = LoggerFactory.getLogger(SemaphoreWatchAdapter.class);
+
+ private final CoordinationSession session;
+ private final String semaphoreName;
+
+ private AtomicReference state;
+ private Future watchTask;
+ private volatile WatchData watchData;
+
+ private enum State {
+ CREATED,
+ STARTED,
+ CLOSED
+ }
+
+ private class WatchData {
+ final long count;
+ final byte[] data;
+ final List waiters;
+ final List owners;
+ final List participants;
+
+ WatchData(long count, byte[] data, List waiters, List owners) {
+ this.count = count;
+ this.data = data;
+ this.waiters = waiters;
+ this.owners = owners;
+ this.participants = Stream.concat(owners.stream(), waiters.stream()).collect(Collectors.toList());
+ }
+ }
+
+ public SemaphoreWatchAdapter(CoordinationSession session, String semaphoreName) {
+ this.session = session;
+ this.semaphoreName = semaphoreName;
+ this.state = new AtomicReference<>(State.CREATED);
+ this.watchTask = null;
+ this.watchData = null;
+ }
+
+ public List getOwners() {
+ // TODO: block until initialized or throw exception or return default value or return Optional.empty()
+ Preconditions.checkState(watchData == null, "Is not yet fetched state");
+
+ return Collections.unmodifiableList(watchData.owners); // TODO: copy Participant.data[]?
+ }
+
+ public List getWaiters() {
+ Preconditions.checkState(watchData == null, "Is not yet fetched state");
+
+ return Collections.unmodifiableList(watchData.waiters); // TODO: copy Participant.data[]?
+ }
+
+ public List getParticipants() {
+ Preconditions.checkState(watchData == null, "Is not yet fetched state");
+
+ return Collections.unmodifiableList(watchData.participants); // TODO: copy Participant.data[]?
+ }
+
+ public long getCount() {
+ Preconditions.checkState(watchData == null, "Is not yet fetched state");
+
+ return watchData.count;
+ }
+
+ public byte[] getData() {
+ Preconditions.checkState(watchData == null, "Is not yet fetched state");
+
+ return watchData.data.clone();
+ }
+
+ public boolean start() {
+ Preconditions.checkState(state.compareAndSet(State.CREATED, State.STARTED), "Already started or closed");
+
+ return enqueueWatch();
+ }
+
+ private synchronized boolean enqueueWatch() {
+ if (watchIsQueued() && state.get() == State.STARTED) {
+ return false;
+ }
+
+ watchTask = watchSemaphore().thenCompose(status -> {
+ if (!status.isSuccess()) {
+ // TODO: stop watching on error?
+ logger.error("Wailed to watch semaphore: {} with status: {}", semaphoreName, status);
+ }
+
+ finish();
+ return null;
+ });
+ return true;
+ }
+
+ private boolean watchIsQueued() {
+ return watchTask != null;
+ }
+
+ private synchronized void finish() {
+ watchTask = null;
+ enqueueWatch();
+ }
+
+ private CompletableFuture watchSemaphore() {
+ return session.watchSemaphore(
+ semaphoreName,
+ DescribeSemaphoreMode.WITH_OWNERS_AND_WAITERS,
+ WatchSemaphoreMode.WATCH_DATA_AND_OWNERS
+ ).thenCompose(result -> {
+ Status status = result.getStatus();
+ if (!status.isSuccess()) {
+ return CompletableFuture.completedFuture(status);
+ }
+ SemaphoreWatcher watcher = result.getValue();
+ saveWatchState(watcher.getDescription());
+ return watcher.getChangedFuture().thenApply(Result::getStatus);
+ });
+ }
+
+ private void saveWatchState(SemaphoreDescription description) {
+ List waitersList = description.getWaitersList().stream().map(it -> new Participant(
+ it.getId(),
+ it.getData(),
+ it.getCount(),
+ false
+ )).collect(Collectors.toList());
+ List ownersList = description.getOwnersList().stream().map(it -> new Participant(
+ it.getId(),
+ it.getData(),
+ it.getCount(),
+ true
+ )).collect(Collectors.toList());
+
+ watchData = new WatchData(
+ description.getCount(),
+ description.getData(),
+ waitersList,
+ ownersList
+ );
+ }
+
+ private synchronized void stopWatch() {
+ Future task = watchTask;
+ if (task != null) {
+ task.cancel(true);
+ }
+ watchTask = null;
+ }
+
+ @Override
+ public void close() {
+ Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Is not yet started");
+
+ stopWatch();
+ }
+}
diff --git a/coordination/recipes/src/main/resources/log4j2.xml b/coordination/recipes/src/main/resources/log4j2.xml
new file mode 100644
index 0000000..1cf1cde
--- /dev/null
+++ b/coordination/recipes/src/main/resources/log4j2.xml
@@ -0,0 +1,27 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 1655ee9..d05d409 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,6 +29,7 @@
ydb-cookbook
url-shortener-demo
jdbc
+ coordination