diff --git a/src/main/java/db/SlowCompletableFutureDb.java b/src/main/java/db/SlowCompletableFutureDb.java index 4d2b373..b64c412 100755 --- a/src/main/java/db/SlowCompletableFutureDb.java +++ b/src/main/java/db/SlowCompletableFutureDb.java @@ -7,7 +7,6 @@ import java.util.concurrent.*; public class SlowCompletableFutureDb implements DataStorage, Closeable { - private volatile Map values; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); diff --git a/src/main/java/part2/cache/CachingDataStorageImpl.java b/src/main/java/part2/cache/CachingDataStorageImpl.java index a2ae460..6bc8902 100755 --- a/src/main/java/part2/cache/CachingDataStorageImpl.java +++ b/src/main/java/part2/cache/CachingDataStorageImpl.java @@ -1,7 +1,6 @@ package part2.cache; import db.DataStorage; -import db.SlowCompletableFutureDb; import java.util.concurrent.*; @@ -32,12 +31,27 @@ public CachingDataStorageImpl(DataStorage db, int timeout, TimeUnit t @Override public OutdatableResult getOutdatable(String key) { - // TODO implement - // TODO use ScheduledExecutorService to remove outdated result from cache - see SlowCompletableFutureDb implementation - // TODO complete OutdatableResult::outdated after removing outdated result from cache - // TODO don't use obtrudeException on result - just don't - // TODO use remove(Object key, Object value) to remove target value - // TODO Start timeout after receiving result in CompletableFuture, not after receiving CompletableFuture itself - throw new UnsupportedOperationException(); + final OutdatableResult newResult + = new OutdatableResult<>(new CompletableFuture<>(), new CompletableFuture<>()); + + final OutdatableResult cashed = + cache.putIfAbsent(key, newResult); + if (cashed != null) { + return cashed; + } + + db.get(key).whenComplete((res, ex) -> { + if (ex != null) { + newResult.getResult().completeExceptionally(ex); + } else { + newResult.getResult().complete(res); + } + scheduledExecutorService.schedule(() -> { + cache.remove(key, newResult); + newResult.getOutdated().complete(null); + }, timeout, timeoutUnits); + }); + + return newResult; } } diff --git a/src/main/java/part2/cache/TypedEmployeeCachedStorage.java b/src/main/java/part2/cache/TypedEmployeeCachedStorage.java index e9049a5..fbd31b7 100755 --- a/src/main/java/part2/cache/TypedEmployeeCachedStorage.java +++ b/src/main/java/part2/cache/TypedEmployeeCachedStorage.java @@ -2,14 +2,25 @@ import data.typed.Employee; import data.typed.Employer; +import data.typed.JobHistoryEntry; import data.typed.Position; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.function.Function; +import java.util.function.IntFunction; + +import static java.util.stream.Collectors.toList; + public class TypedEmployeeCachedStorage implements CachingDataStorage { private final CachingDataStorage employeeStorage; private final CachingDataStorage positionStorage; private final CachingDataStorage employerStorage; + public TypedEmployeeCachedStorage(CachingDataStorage employeeStorage, CachingDataStorage positionStorage, CachingDataStorage employerStorage) { @@ -18,9 +29,77 @@ public TypedEmployeeCachedStorage(CachingDataStorage empl this.employerStorage = employerStorage; } + private OutdatableResult asyncToTyped(data.Employee e) { + final List> collect = e.getJobHistory().stream() + .map(this::asyncToTyped) + .collect(toList()); + + return new OutdatableResult<>( + CompletableFuture.allOf(collect.stream() + .map(OutdatableResult::getResult) + .toArray((IntFunction[]>) CompletableFuture[]::new)) + .thenApplyAsync(x -> { + final List jobHistory = collect.stream() + .map(OutdatableResult::getResult) + .map(this::getOrNull) + .collect(toList()); + return new data.typed.Employee(e.getPerson(), jobHistory); + }) + .thenApply(Function.identity()), + CompletableFuture.anyOf(collect.stream() + .map(OutdatableResult::getOutdated) + .toArray((IntFunction[]>) CompletableFuture[]::new)) + .thenApply(x -> null) + ); + } + + private OutdatableResult asyncToTyped(final data.JobHistoryEntry j) { + final OutdatableResult outdatable = employerStorage.getOutdatable(j.getEmployer()); + final OutdatableResult outdatable1 = positionStorage.getOutdatable(j.getPosition()); + + return new OutdatableResult<>(outdatable.getResult().thenCombine(outdatable1.getResult(), + (e, p) -> new JobHistoryEntry(p, e, j.getDuration())), + CompletableFuture.anyOf(outdatable.getOutdated(), + outdatable1.getOutdated()) + .thenApply(x -> null)); + } + + private T getOrNull(Future f) { + try { + return f.get(); + } catch (InterruptedException | ExecutionException e1) { + e1.printStackTrace(); + return null; + } + } + + private void complete(Void res, Throwable ex, OutdatableResult result) { + if (ex != null) { + result.getOutdated().completeExceptionally(ex); + } else { + result.getOutdated().complete(res); + } + } + @Override public OutdatableResult getOutdatable(String key) { - // TODO note that you don't know timeouts for different storage. And timeouts can be different. - throw new UnsupportedOperationException(); + final OutdatableResult outdatable = employeeStorage.getOutdatable(key); + + final CompletableFuture> future = outdatable.getResult().thenApply(this::asyncToTyped); + final OutdatableResult result = new OutdatableResult<>(new CompletableFuture<>(), new CompletableFuture<>()); + + //thenComplete - + outdatable.getOutdated().whenComplete((res, ex) -> complete(res, ex, result)); + + future.whenComplete((res, ex) -> { + if (ex != null) { + result.getResult().completeExceptionally(ex); + } else { + result.getResult().complete(getOrNull(res.getResult())); + res.getOutdated().whenComplete((res2, ex2) -> complete(res2, ex2, result)); + } + }); + return result; } } + diff --git a/src/test/java/part2/cache/CachingDataStorageImplTest.java b/src/test/java/part2/cache/CachingDataStorageImplTest.java index 041370d..d0b3173 100755 --- a/src/test/java/part2/cache/CachingDataStorageImplTest.java +++ b/src/test/java/part2/cache/CachingDataStorageImplTest.java @@ -5,10 +5,10 @@ import data.typed.Employer; import data.typed.Position; import db.SlowCompletableFutureDb; -import part2.cache.CachingDataStorage.OutdatableResult; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import part2.cache.CachingDataStorage.OutdatableResult; import java.io.IOException; import java.util.Arrays; @@ -83,5 +83,4 @@ public void expiration() throws InterruptedException, ExecutionException, Timeou assertEquals(person2, result3.getResult().get().getPerson()); } - } diff --git a/src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java b/src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java index fecb3b4..c8d7a0a 100755 --- a/src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java +++ b/src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java @@ -1,6 +1,8 @@ package part2.cache; import data.Employee; +import data.JobHistoryEntry; +import data.Person; import data.typed.Employer; import data.typed.Position; import db.SlowCompletableFutureDb; @@ -10,18 +12,22 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Function; import static java.util.stream.Collectors.toMap; +import static org.junit.Assert.assertEquals; public class TypedEmployeeCachedStorageTest { private static SlowCompletableFutureDb employeeDb; private static SlowCompletableFutureDb employerDb; private static SlowCompletableFutureDb positionDb; + @BeforeClass public static void defore() { final Map employerMap = @@ -49,19 +55,45 @@ public static void after() { } @Test - public void expiration() { + public void expiration() throws ExecutionException, InterruptedException { final CachingDataStorageImpl employeeCache = - new CachingDataStorageImpl<>(employeeDb, 1, TimeUnit.SECONDS); + new CachingDataStorageImpl<>(employeeDb, 3, TimeUnit.SECONDS); final CachingDataStorageImpl employerCache = new CachingDataStorageImpl<>(employerDb, 2, TimeUnit.SECONDS); final CachingDataStorageImpl positionCache = - new CachingDataStorageImpl<>(positionDb, 100, TimeUnit.MILLISECONDS); + new CachingDataStorageImpl<>(positionDb, 1, TimeUnit.SECONDS); + + Map employeeTmp = new HashMap<>(); + + final Person person1 = new Person("John", "Doe", 30); + employeeTmp.put("a", new Employee(person1, + Collections.singletonList(new JobHistoryEntry(1, Position.BA.name(), Employer.EPAM.name())))); + employeeDb.setValues(employeeTmp); final TypedEmployeeCachedStorage typedCache = new TypedEmployeeCachedStorage(employeeCache, positionCache, employerCache); - // TODO check than cache gets outdated with the firs outdated inner cache + final CachingDataStorage.OutdatableResult aPerson = typedCache.getOutdatable("a"); + + assertEquals(aPerson.getResult().get().getPerson(), person1); + assertEquals(aPerson.getResult().get().getJobHistoryEntries(), + Collections.singletonList(new data.typed.JobHistoryEntry(Position.BA, Employer.EPAM, 1))); + + Thread.sleep(500); + + employeeTmp = new HashMap<>(); + final Person person2 = new Person("Dagni", "Taggart", 30); + employeeTmp.put("a", new Employee(person2, Collections.emptyList())); + employeeDb.setValues(employeeTmp); + + final CachingDataStorage.OutdatableResult aPerson2 = typedCache.getOutdatable("a"); + assertEquals(aPerson2.getResult().get().getPerson(), person1); + + Thread.sleep(700); + final CachingDataStorage.OutdatableResult aPerson3 = typedCache.getOutdatable("a"); + + assertEquals(aPerson3.getResult().get().getPerson(), person2); } }