diff --git a/src/main/java/com/spotify/reaper/resources/CommonTools.java b/src/main/java/com/spotify/reaper/resources/CommonTools.java index bee43916..11728339 100644 --- a/src/main/java/com/spotify/reaper/resources/CommonTools.java +++ b/src/main/java/com/spotify/reaper/resources/CommonTools.java @@ -157,12 +157,6 @@ private static void storeNewRepairSegments(AppContext context, List t repairSegmentBuilders.add(repairSegment); } context.storage.addRepairSegments(repairSegmentBuilders, repairRun.getId()); - if (repairRun.getSegmentCount() != tokenSegments.size()) { - LOG.debug("created segment amount differs from expected default {} != {}", - repairRun.getSegmentCount(), tokenSegments.size()); - context.storage.updateRepairRun( - repairRun.with().segmentCount(tokenSegments.size()).build(repairRun.getId())); - } } /** diff --git a/src/main/java/com/spotify/reaper/service/RepairRunner.java b/src/main/java/com/spotify/reaper/service/RepairRunner.java index 68dd4885..6e86a17e 100644 --- a/src/main/java/com/spotify/reaper/service/RepairRunner.java +++ b/src/main/java/com/spotify/reaper/service/RepairRunner.java @@ -40,6 +40,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicLongArray; +import javax.annotation.Nullable; + public class RepairRunner implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(RepairRunner.class); @@ -163,15 +165,15 @@ public void run() { LOG.error(Arrays.toString(e.getStackTrace())); e.printStackTrace(); synchronized (this) { - Optional repairRun = context.storage.getRepairRun(repairRunId); - if (repairRun.isPresent()) { - context.storage.updateRepairRun(repairRun.get() - .with() - .runState(RepairRun.RunState.ERROR) - .lastEvent(String.format("Exception: %s", e.getMessage())) - .endTime(DateTime.now()) - .build(repairRunId)); - } + context.storage.modifyRepairRun(repairRunId, new Function() { + @Override + public RepairRun.Builder apply(RepairRun.Builder original) { + return original + .runState(RepairRun.RunState.ERROR) + .lastEvent(String.format("Exception: %s", e.getMessage())) + .endTime(DateTime.now()); + } + }); context.repairManager.removeRunner(this); } } @@ -182,13 +184,14 @@ public void run() { */ private void start() throws ReaperException { LOG.info("Repairs for repair run #{} starting", repairRunId); - synchronized (this) { - RepairRun repairRun = context.storage.getRepairRun(repairRunId).get(); - context.storage.updateRepairRun(repairRun.with() - .runState(RepairRun.RunState.RUNNING) - .startTime(DateTime.now()) - .build(repairRun.getId())); - } + context.storage.modifyRepairRun(repairRunId, new Function() { + @Override + public RepairRun.Builder apply(RepairRun.Builder original) { + return original + .runState(RepairRun.RunState.RUNNING) + .startTime(DateTime.now()); + } + }); startNextSegment(); } @@ -198,12 +201,16 @@ private void start() throws ReaperException { private void end() { LOG.info("Repairs for repair run #{} done", repairRunId); synchronized (this) { - RepairRun repairRun = context.storage.getRepairRun(repairRunId).get(); - context.storage.updateRepairRun(repairRun.with() - .runState(RepairRun.RunState.DONE) - .endTime(DateTime.now()) - .lastEvent("All done") - .build(repairRun.getId())); + context.storage.modifyRepairRun(repairRunId, + new Function() { + @Override + public RepairRun.Builder apply(RepairRun.Builder original) { + return original + .runState(RepairRun.RunState.DONE) + .endTime(DateTime.now()) + .lastEvent("All done"); + } + }); context.repairManager.removeRunner(this); } } @@ -255,7 +262,8 @@ private void startNextSegment() throws ReaperException { * @param segmentId id of the segment to repair. * @param tokenRange token range of the segment to repair. */ - private void repairSegment(final int rangeIndex, final long segmentId, RingRange tokenRange) throws ReaperException { + private void repairSegment(final int rangeIndex, final long segmentId, final RingRange tokenRange) + throws ReaperException { final long unitId; final double intensity; final RepairParallelism validationParallelism; @@ -289,13 +297,15 @@ private void repairSegment(final int rangeIndex, final long segmentId, RingRange if (potentialCoordinators.isEmpty()) { // This segment has a faulty token range. Abort the entire repair run. synchronized (this) { - RepairRun repairRun = context.storage.getRepairRun(repairRunId).get(); - context.storage.updateRepairRun(repairRun - .with() - .runState(RepairRun.RunState.ERROR) - .lastEvent(String.format("No coordinators for range %s", tokenRange.toString())) - .endTime(DateTime.now()) - .build(repairRunId)); + context.storage.modifyRepairRun(repairRunId, new Function() { + @Override + public RepairRun.Builder apply(RepairRun.Builder original) { + return original + .runState(RepairRun.RunState.ERROR) + .lastEvent(String.format("No coordinators for range %s", tokenRange.toString())) + .endTime(DateTime.now()); + } + }); context.repairManager.removeRunner(this); } return; @@ -344,16 +354,20 @@ private void handleResult(long segmentId) { } } - public void updateLastEvent(String newEvent) { + public void updateLastEvent(final String newEvent) { synchronized (this) { RepairRun repairRun = context.storage.getRepairRun(repairRunId).get(); if (repairRun.getRunState().isTerminated()) { LOG.warn("Will not update lastEvent of run that has already terminated. The message was: " + "\"{}\"", newEvent); } else { - context.storage.updateRepairRun(repairRun.with() - .lastEvent(newEvent) - .build(repairRunId)); + context.storage.modifyRepairRun(repairRunId, new Function() { + @Override + public RepairRun.Builder apply(RepairRun.Builder original) { + return original + .lastEvent(newEvent); + } + }); } } } diff --git a/src/main/java/com/spotify/reaper/storage/IStorage.java b/src/main/java/com/spotify/reaper/storage/IStorage.java index 5d389ce6..d3f18688 100644 --- a/src/main/java/com/spotify/reaper/storage/IStorage.java +++ b/src/main/java/com/spotify/reaper/storage/IStorage.java @@ -13,6 +13,7 @@ */ package com.spotify.reaper.storage; +import com.google.common.base.Function; import com.google.common.base.Optional; import com.spotify.reaper.core.Cluster; @@ -55,8 +56,11 @@ public interface IStorage { RepairRun addRepairRun(RepairRun.Builder repairRun); + @Deprecated boolean updateRepairRun(RepairRun repairRun); + boolean modifyRepairRun(long id, Function modification); + Optional getRepairRun(long id); Collection getRepairRunsForCluster(String clusterName); diff --git a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java index ab1ce667..1dc93697 100644 --- a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java +++ b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java @@ -13,6 +13,7 @@ */ package com.spotify.reaper.storage; +import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -105,6 +106,7 @@ public RepairRun addRepairRun(RepairRun.Builder repairRun) { return newRepairRun; } + @Deprecated @Override public boolean updateRepairRun(RepairRun repairRun) { if (!getRepairRun(repairRun.getId()).isPresent()) { @@ -115,6 +117,43 @@ public boolean updateRepairRun(RepairRun repairRun) { } } + private final ConcurrentMap repairRunLocks = Maps.newConcurrentMap(); + + private Object getRepairRunLock(long id) { + Object newLock = new Object(); + Object existingLock = repairRunLocks.putIfAbsent(id, newLock); + return existingLock != null ? existingLock : newLock; + } + + @Override + public boolean modifyRepairRun(long id, + Function modification) { + synchronized (getRepairRunLock(id)) { + RepairRun repairRun = repairRuns.get(id); + if (repairRun == null) { + return false; + } else { + repairRuns.put(id, modification.apply(repairRun.with()).build(id)); + return true; + } + } + } + + @Override + public Optional deleteRepairRun(long id) { + synchronized (getRepairRunLock(id)) { + RepairRun deletedRun = repairRuns.remove(id); + if (deletedRun != null) { + if (getSegmentAmountForRepairRunWithState(id, RepairSegment.State.RUNNING) == 0) { + deleteRepairUnit(deletedRun.getRepairUnitId()); + deleteRepairSegmentsForRun(id); + deletedRun = deletedRun.with().runState(RepairRun.RunState.DELETED).build(id); + } + } + return Optional.fromNullable(deletedRun); + } + } + @Override public Optional getRepairRun(long id) { return Optional.fromNullable(repairRuns.get(id)); @@ -193,19 +232,6 @@ private int deleteRepairSegmentsForRun(long runId) { return segmentsMap != null ? segmentsMap.size() : 0; } - @Override - public Optional deleteRepairRun(long id) { - RepairRun deletedRun = repairRuns.remove(id); - if (deletedRun != null) { - if (getSegmentAmountForRepairRunWithState(id, RepairSegment.State.RUNNING) == 0) { - deleteRepairUnit(deletedRun.getRepairUnitId()); - deleteRepairSegmentsForRun(id); - deletedRun = deletedRun.with().runState(RepairRun.RunState.DELETED).build(id); - } - } - return Optional.fromNullable(deletedRun); - } - @Override public RepairUnit addRepairUnit(RepairUnit.Builder repairUnit) { Optional existing = diff --git a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java index 50abe25f..1608a514 100644 --- a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java +++ b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java @@ -13,8 +13,10 @@ */ package com.spotify.reaper.storage; +import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.spotify.reaper.ReaperApplicationConfiguration; import com.spotify.reaper.ReaperException; @@ -45,6 +47,7 @@ import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentMap; import io.dropwizard.jdbi.DBIFactory; import io.dropwizard.setup.Environment; @@ -190,43 +193,69 @@ public Collection getRepairRunsWithState(RepairRun.RunState runState) return result == null ? Lists.newArrayList() : result; } + + // Bj0rn: I don't quite like using this synchronization approach in the postgres storage. + // could transactions solve it in a better way? + private final ConcurrentMap repairRunLocks = Maps.newConcurrentMap(); + + private Object getRepairRunLock(long id) { + Object newLock = new Object(); + Object existingLock = repairRunLocks.putIfAbsent(id, newLock); + return existingLock != null ? existingLock : newLock; + } + + @Override + public boolean modifyRepairRun(long id, + Function modification) { + synchronized (getRepairRunLock(id)) { + Optional repairRun = getRepairRun(id); + if (!repairRun.isPresent()) { + return false; + } else { + return updateRepairRun(modification.apply(repairRun.get().with()).build(id)); + } + } + } + @Override public Optional deleteRepairRun(long id) { - RepairRun result = null; - Handle h = null; - try { - h = jdbi.open(); - h.begin(); - IStoragePostgreSQL pg = getPostgresStorage(h); - RepairRun runToDelete = pg.getRepairRun(id); - if (runToDelete != null) { - int segmentsRunning = pg.getSegmentAmountForRepairRunWithState(id, - RepairSegment.State.RUNNING); - if (segmentsRunning == 0) { - pg.deleteRepairSegmentsForRun(runToDelete.getId()); - pg.deleteRepairRun(id); - result = runToDelete.with().runState(RepairRun.RunState.DELETED).build(id); - } else { - LOG.warn("not deleting RepairRun \"{}\" as it has segments running: {}", - id, segmentsRunning); + synchronized (getRepairRunLock(id)) { + RepairRun result = null; + Handle h = null; + try { + h = jdbi.open(); + h.begin(); + IStoragePostgreSQL pg = getPostgresStorage(h); + RepairRun runToDelete = pg.getRepairRun(id); + if (runToDelete != null) { + int segmentsRunning = pg.getSegmentAmountForRepairRunWithState(id, + RepairSegment.State.RUNNING); + if (segmentsRunning == 0) { + pg.deleteRepairSegmentsForRun(runToDelete.getId()); + pg.deleteRepairRun(id); + result = runToDelete.with().runState(RepairRun.RunState.DELETED).build(id); + } else { + LOG.warn("not deleting RepairRun \"{}\" as it has segments running: {}", + id, segmentsRunning); + } + } + h.commit(); + } catch (DBIException ex) { + LOG.warn("DELETE failed", ex); + ex.printStackTrace(); + if (h != null) { + h.rollback(); + } + } finally { + if (h != null) { + h.close(); } } - h.commit(); - } catch (DBIException ex) { - LOG.warn("DELETE failed", ex); - ex.printStackTrace(); - if (h != null) { - h.rollback(); - } - } finally { - if (h != null) { - h.close(); + if (result != null) { + tryDeletingRepairUnit(result.getRepairUnitId()); } + return Optional.fromNullable(result); } - if (result != null) { - tryDeletingRepairUnit(result.getRepairUnitId()); - } - return Optional.fromNullable(result); } private void tryDeletingRepairUnit(long id) { @@ -251,6 +280,7 @@ public RepairRun addRepairRun(RepairRun.Builder newRepairRun) { return result; } + @Deprecated @Override public boolean updateRepairRun(RepairRun repairRun) { boolean result = false; diff --git a/src/test/java/com/spotify/reaper/unit/service/RepairRunnerTest.java b/src/test/java/com/spotify/reaper/unit/service/RepairRunnerTest.java index db33be8b..1aaa6a3e 100644 --- a/src/test/java/com/spotify/reaper/unit/service/RepairRunnerTest.java +++ b/src/test/java/com/spotify/reaper/unit/service/RepairRunnerTest.java @@ -191,19 +191,19 @@ public void testResumeRepair() throws InterruptedException { long cf = storage.addRepairUnit( new RepairUnit.Builder(CLUSTER_NAME, KS_NAME, CF_NAMES)).getId(); DateTimeUtils.setCurrentMillisFixed(TIME_RUN); - RepairRun run = storage.addRepairRun( + long runId = storage.addRepairRun( new RepairRun.Builder(CLUSTER_NAME, cf, DateTime.now(), INTENSITY, 1, - RepairParallelism.PARALLEL)); + RepairParallelism.PARALLEL)).getId(); storage.addRepairSegments(Lists.newArrayList( - new RepairSegment.Builder(run.getId(), new RingRange(BigInteger.ZERO, BigInteger.ONE), cf) + new RepairSegment.Builder(runId, new RingRange(BigInteger.ZERO, BigInteger.ONE), cf) .state(RepairSegment.State.RUNNING).startTime(DateTime.now()).coordinatorHost("reaper") .repairCommandId(1337), - new RepairSegment.Builder(run.getId(), new RingRange(BigInteger.ONE, BigInteger.ZERO), cf) - ), run.getId()); - final long RUN_ID = run.getId(); - final long SEGMENT_ID = storage.getNextFreeSegment(run.getId()).get().getId(); + new RepairSegment.Builder(runId, new RingRange(BigInteger.ONE, BigInteger.ZERO), cf) + ), runId); + final long SEGMENT_ID = storage.getNextFreeSegment(runId).get().getId(); - context.repairManager.initializeThreadPool(1, 500, TimeUnit.MILLISECONDS, 1, TimeUnit.MILLISECONDS); + context.repairManager.initializeThreadPool(1, 500, TimeUnit.MILLISECONDS, 1, + TimeUnit.MILLISECONDS); assertEquals(storage.getRepairSegment(SEGMENT_ID).get().getState(), RepairSegment.State.NOT_STARTED); @@ -240,13 +240,18 @@ public void run() { } }; - assertEquals(RepairRun.RunState.NOT_STARTED, storage.getRepairRun(RUN_ID).get().getRunState()); + assertEquals(RepairRun.RunState.NOT_STARTED, storage.getRepairRun(runId).get().getRunState()); context.repairManager.resumeRunningRepairRuns(context); - assertEquals(RepairRun.RunState.NOT_STARTED, storage.getRepairRun(RUN_ID).get().getRunState()); - storage.updateRepairRun(run.with().runState(RepairRun.RunState.RUNNING).build(RUN_ID)); + assertEquals(RepairRun.RunState.NOT_STARTED, storage.getRepairRun(runId).get().getRunState()); + storage.modifyRepairRun(runId, new Function() { + @Override + public RepairRun.Builder apply(RepairRun.Builder original) { + return original.runState(RepairRun.RunState.RUNNING); + } + }); context.repairManager.resumeRunningRepairRuns(context); Thread.sleep(100); - assertEquals(RepairRun.RunState.DONE, storage.getRepairRun(RUN_ID).get().getRunState()); + assertEquals(RepairRun.RunState.DONE, storage.getRepairRun(runId).get().getRunState()); } @Test