Skip to content

Commit fe4e3aa

Browse files
committed
- Detect and avoid taking two AccordExecutor locks simultaneously
1 parent dcc67a1 commit fe4e3aa

20 files changed

Lines changed: 173 additions & 88 deletions

modules/accord

Submodule accord updated 119 files

src/java/org/apache/cassandra/config/AccordSpec.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
import static org.apache.cassandra.config.AccordSpec.CatchupMode.NORMAL;
3232
import static org.apache.cassandra.config.AccordSpec.QueueShardModel.THREAD_POOL_PER_SHARD;
33-
import static org.apache.cassandra.config.AccordSpec.QueueSubmissionModel.SYNC;
33+
import static org.apache.cassandra.config.AccordSpec.QueueSubmissionModel.SEMI_SYNC;
3434
import static org.apache.cassandra.config.AccordSpec.RangeIndexMode.in_memory;
3535

3636
// TODO (expected): rename to AccordConf?
@@ -93,6 +93,8 @@ public enum QueueSubmissionModel
9393

9494
/**
9595
* The queue workers only require ownership of the lock, submissions happens fully asynchronously.
96+
*
97+
* NOTE: EXPERIMENTAL
9698
*/
9799
ASYNC,
98100

@@ -106,7 +108,7 @@ public enum QueueSubmissionModel
106108
}
107109

108110
public QueueShardModel queue_shard_model = THREAD_POOL_PER_SHARD;
109-
public QueueSubmissionModel queue_submission_model = SYNC;
111+
public QueueSubmissionModel queue_submission_model = SEMI_SYNC;
110112

111113
/**
112114
* The number of queue (and cache) shards.
@@ -137,16 +139,16 @@ public enum QueueSubmissionModel
137139
public DurationSpec.IntMillisecondsBound repair_timeout = new DurationSpec.IntMillisecondsBound("10m");
138140
public String recover_txn = "5s*attempts <= 60s";
139141
public StringRetryStrategy recover_syncpoint = new StringRetryStrategy("60s <= 30s*attempts...60s*attempts <= 600s");
140-
public String fetch_txn = "1s*attempts";
141-
public String fetch_syncpoint = "5s*attempts";
142-
public String expire_txn = "5s*attempts";
142+
public String fetch_txn = "2s*attempts <= 60s";
143+
public String fetch_syncpoint = "5s*attempts <= 60s";
144+
public String expire_txn = "5s*attempts <= 60s";
143145
public String expire_syncpoint = "60s*attempts<=300s";
144146
public String expire_epoch_wait = "10s";
145147
// we don't want to wait ages for durability as it blocks other durability progress; even this might be too long, as we can always retry
146148
public String expire_durability = "10s*attempts <= 30s";
147149
public String slow_syncpoint_preaccept = "10s";
148-
public String slow_txn_preaccept = "30ms <= p50*2 <= 100ms";
149-
public String slow_read = "30ms <= p50*2 <= 100ms";
150+
public String slow_txn_preaccept = "30ms <= p50*2 <= 1000ms";
151+
public String slow_read = "30ms <= p50*2 <= 1000ms";
150152
public StringRetryStrategy retry_syncpoint = new StringRetryStrategy("10s*attempt <= 600s");
151153
public StringRetryStrategy retry_durability = new StringRetryStrategy("10s*attempt <= 600s");
152154
public StringRetryStrategy retry_bootstrap = new StringRetryStrategy("10s*attempt <= 600s");
@@ -189,8 +191,6 @@ public enum CatchupMode
189191
HARD
190192
}
191193

192-
public RebootstrapMode rebootstrap_mode = RebootstrapMode.full_repair;
193-
194194
/**
195195
* default transactional mode for tables created by this node when no transactional mode has been specified in the DDL
196196
*/
@@ -199,7 +199,8 @@ public enum CatchupMode
199199
public boolean state_cache_listener_jfr_enabled = false;
200200

201201
public float hard_reject_ratio = 0.5f;
202-
public int soft_reject_count = 100;
202+
public int min_soft_reject_count = 10;
203+
public int max_soft_reject_count = 100;
203204
public DurationSpec.LongMicrosecondsBound soft_reject_age = new DurationSpec.LongMicrosecondsBound("10s");
204205
public DurationSpec.LongMicrosecondsBound soft_reject_cumulative_age = new DurationSpec.LongMicrosecondsBound("60s");
205206

src/java/org/apache/cassandra/config/DatabaseDescriptor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5577,8 +5577,7 @@ public static int getAccordQueueShardCount()
55775577
case THREAD_PER_SHARD_SYNC_QUEUE:
55785578
return conf.accord.queue_shard_count.or(DatabaseDescriptor::getAvailableProcessors);
55795579
case THREAD_POOL_PER_SHARD:
5580-
int defaultMax = getAccordQueueSubmissionModel() == AccordSpec.QueueSubmissionModel.SYNC ? 8 : 4;
5581-
return conf.accord.queue_shard_count.or(Math.min(defaultMax, DatabaseDescriptor.getAvailableProcessors()));
5580+
return conf.accord.queue_shard_count.or(DatabaseDescriptor.getAvailableProcessors()/4);
55825581
}
55835582
}
55845583

src/java/org/apache/cassandra/db/compaction/CursorCompactor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,9 @@ public static boolean isSupported(AbstractCompactionStrategy.ScannerList scanner
156156

157157
public static boolean unsupportedMetadata(TableMetadata metadata)
158158
{
159+
if (metadata.keyspace.equals(SchemaConstants.ACCORD_KEYSPACE_NAME))
160+
return false;
161+
159162
if (!metadata.partitioner.supportsReusableKeys())
160163
{
161164
if (LOGGER.isDebugEnabled()) logDebugReason(metadata, "Incompatible partitioner, does not support reusable keys:" + metadata.partitioner.getClass().getSimpleName());

src/java/org/apache/cassandra/service/accord/AccordCache.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,7 @@ public void release(AccordSafeState<K, V> safeRef, AccordTask<?> owner)
534534

535535
AccordCacheEntry<K, V> node = cache.get(key);
536536

537-
require(!safeRef.invalidated());
537+
require(!safeRef.isUnsafe());
538538
require(safeRef.global() != null, "safeRef node is null for %s", key);
539539
require(safeRef.global() == node, "safeRef node not in map: %s != %s", safeRef.global(), node);
540540
require(node.references() > 0, "references (%d) are zero for %s (%s)", node.references(), key, node);
@@ -563,7 +563,7 @@ else if (node.isLoadingOrWaiting())
563563
{
564564
evict = node.is(LOADED) && node.isNull();
565565
}
566-
safeRef.invalidate();
566+
safeRef.markUnsafe();
567567

568568
if (node.decrement() == 0)
569569
{

src/java/org/apache/cassandra/service/accord/AccordCommandStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ public AccordCommandStore(int id,
250250
{
251251
commands = exclusive.commands.newInstance(this);
252252
commandsForKey = exclusive.commandsForKey.newInstance(this);
253-
this.caches = new ExclusiveCaches(sharedExecutor.lock, exclusive.global, commands, commandsForKey);
253+
this.caches = new ExclusiveCaches(sharedExecutor.unsafeLock(), exclusive.global, commands, commandsForKey);
254254
}
255255

256256
this.exclusiveExecutor = sharedExecutor.executor(id);

src/java/org/apache/cassandra/service/accord/AccordExecutor.java

Lines changed: 82 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@
8787
import org.apache.cassandra.utils.concurrent.Condition;
8888
import org.apache.cassandra.utils.concurrent.Future;
8989

90+
import io.netty.util.concurrent.FastThreadLocal;
91+
9092
import static accord.utils.Invariants.createIllegalState;
9193
import static org.apache.cassandra.service.accord.AccordCache.CommandAdapter.COMMAND_ADAPTER;
9294
import static org.apache.cassandra.service.accord.AccordCache.CommandsForKeyAdapter.CFK_ADAPTER;
@@ -117,22 +119,20 @@ public enum Mode { RUN_WITH_LOCK, RUN_WITHOUT_LOCK }
117119
// WARNING: this is a shared object, so close is NOT idempotent
118120
public static final class ExclusiveGlobalCaches extends GlobalCaches implements AutoCloseable
119121
{
120-
final Lock lock;
121122
final AccordExecutor executor;
122123

123124
public ExclusiveGlobalCaches(AccordExecutor executor, AccordCache global, AccordCache.Type<TxnId, Command, AccordSafeCommand> commands, AccordCache.Type<RoutingKey, CommandsForKey, AccordSafeCommandsForKey> commandsForKey)
124125
{
125126
super(global, commands, commandsForKey);
126-
this.lock = executor.lock;
127127
this.executor = executor;
128128
}
129129

130130
@Override
131131
public void close()
132132
{
133133
executor.beforeUnlock();
134-
global.tryShrinkOrEvict(lock);
135-
lock.unlock();
134+
global.tryShrinkOrEvict(executor.lock);
135+
executor.unlock();
136136
}
137137
}
138138

@@ -150,7 +150,7 @@ public GlobalCaches(AccordCache global, AccordCache.Type<TxnId, Command, AccordS
150150
}
151151
}
152152

153-
final Lock lock;
153+
private final Lock lock;
154154
final Agent agent;
155155
final int executorId;
156156
private final AccordCache cache;
@@ -246,9 +246,57 @@ public int executorId()
246246

247247
public ExclusiveGlobalCaches lockCaches()
248248
{
249+
lock();
250+
return caches;
251+
}
252+
253+
private static final FastThreadLocal<Lock> paranoidPriorityInversionCheck = new FastThreadLocal<>();
254+
255+
final Lock unsafeLock()
256+
{
257+
return lock;
258+
}
259+
260+
final void lock()
261+
{
262+
if (Invariants.isParanoid())
263+
{
264+
Lock locked = paranoidPriorityInversionCheck.getAndSet(lock);
265+
Invariants.require(locked == null || locked == lock, "Tried to take multiple AccordExecutor locks on same thread - this is dangerous for progress");
266+
}
249267
//noinspection LockAcquiredButNotSafelyReleased
250268
lock.lock();
251-
return caches;
269+
}
270+
271+
final void unlock()
272+
{
273+
lock.unlock();
274+
paranoidPriorityInversionCheck.set(null);
275+
}
276+
277+
final boolean tryLock()
278+
{
279+
boolean result = lock.tryLock();
280+
if (Invariants.isParanoid())
281+
{
282+
if (result)
283+
{
284+
Lock locked = paranoidPriorityInversionCheck.getAndSet(lock);
285+
if (locked != null && locked != lock)
286+
{
287+
lock.unlock();
288+
paranoidPriorityInversionCheck.set(locked);
289+
Invariants.require(false, "Tried to take multiple AccordExecutor locks on same thread - this is dangerous for progress");
290+
return false;
291+
}
292+
}
293+
else
294+
{
295+
Lock locked = paranoidPriorityInversionCheck.get();
296+
Invariants.require(locked == null || locked == lock, "Tried to take multiple AccordExecutor locks on same thread - this is dangerous for progress");
297+
}
298+
}
299+
return result;
252300
}
253301

254302
public AccordCache cacheExclusive()
@@ -290,7 +338,7 @@ public Stream<? extends DebuggableTaskRunner> active()
290338
public void waitForQuiescence()
291339
{
292340
Condition condition;
293-
lock.lock();
341+
lock();
294342
try
295343
{
296344
if (tasks == 0 && runningThreads == 0)
@@ -303,7 +351,7 @@ public void waitForQuiescence()
303351
}
304352
finally
305353
{
306-
lock.unlock();
354+
unlock();
307355
}
308356
condition.awaitThrowUncheckedOnInterrupt();
309357
}
@@ -325,7 +373,7 @@ protected void notifyQuiescentExclusive()
325373

326374
public void afterSubmittedAndConsequences(Runnable run)
327375
{
328-
lock.lock();
376+
lock();
329377
try
330378
{
331379
if (tasks == 0 && runningThreads == 0)
@@ -344,7 +392,7 @@ public void afterSubmittedAndConsequences(Runnable run)
344392
}
345393
finally
346394
{
347-
lock.unlock();
395+
unlock();
348396
}
349397
}
350398

@@ -814,15 +862,15 @@ private Cancellable submit(Plain task)
814862

815863
public void executeDirectlyWithLock(Runnable command)
816864
{
817-
lock.lock();
865+
lock();
818866
try
819867
{
820868
command.run();
821869
}
822870
finally
823871
{
824872
beforeUnlock();
825-
lock.unlock();
873+
unlock();
826874
}
827875
}
828876

@@ -900,6 +948,7 @@ void clearRunning()
900948
public static abstract class Task extends IntrusivePriorityHeap.Node
901949
{
902950
int queuePosition;
951+
Thread assigned;
903952

904953
protected Task()
905954
{
@@ -999,8 +1048,8 @@ public class SequentialExecutor extends TaskQueue<Task> implements SequentialAsy
9991048
final int commandStoreId;
10001049
final SequentialQueueTask selfTask;
10011050
private Task task;
1051+
private Thread assigned;
10021052
private volatile Thread owner, waiting;
1003-
private boolean running;
10041053
private boolean stopped;
10051054
private volatile boolean visibleStopped;
10061055
private boolean terminated;
@@ -1020,20 +1069,24 @@ public class SequentialExecutor extends TaskQueue<Task> implements SequentialAsy
10201069
void preRunTask()
10211070
{
10221071
Invariants.require(task != null);
1072+
assigned = Thread.currentThread();
10231073
task.preRunExclusive();
1024-
running = true;
10251074
}
10261075

10271076
void runTask()
10281077
{
1029-
Thread self = Thread.currentThread();
1030-
while (!ownerUpdater.compareAndSet(this, null, self))
1078+
outer: while (!ownerUpdater.compareAndSet(this, null, assigned))
10311079
{
1032-
waiting = self;
1033-
while (owner != null)
1080+
waiting = assigned;
1081+
while (true)
1082+
{
1083+
Thread owner = this.owner;
1084+
if (owner == assigned) break outer;
1085+
if (owner == null) continue outer;
10341086
LockSupport.park();
1035-
waiting = null;
1087+
}
10361088
}
1089+
waiting = null;
10371090

10381091
try
10391092
{
@@ -1068,7 +1121,7 @@ void cleanupTask()
10681121
try { task.cleanupExclusive(); }
10691122
finally
10701123
{
1071-
running = false;
1124+
assigned = null;
10721125
task = super.poll();
10731126

10741127
// it should only be possible for this method to be invoked once we're on the running queue
@@ -1087,12 +1140,12 @@ protected void append(Task newTask)
10871140
{
10881141
if (task != null)
10891142
{
1090-
Invariants.require(running || waitingToRun.contains(selfTask));
1143+
Invariants.require(assigned != null || waitingToRun.contains(selfTask));
10911144
super.append(newTask);
10921145
}
10931146
else
10941147
{
1095-
Invariants.require(!running && isEmpty());
1148+
Invariants.require(assigned == null && isEmpty());
10961149
task = newTask;
10971150
selfTask.queuePosition = newTask.queuePosition;
10981151
waitingToRun.append(selfTask);
@@ -1115,7 +1168,7 @@ protected boolean removeIfContains(Task remove)
11151168

11161169
private boolean removeCurrentTask(Node remove)
11171170
{
1118-
if (running)
1171+
if (assigned != null)
11191172
return false;
11201173

11211174
Invariants.require(remove == task);
@@ -1183,7 +1236,7 @@ protected Task peek()
11831236
@Override
11841237
protected boolean contains(Task contains)
11851238
{
1186-
return super.contains(contains) || (task == contains && !running);
1239+
return super.contains(contains) || (task == contains && assigned == null);
11871240
}
11881241

11891242
@Override
@@ -1268,7 +1321,10 @@ public boolean tryExecuteImmediately(Runnable run)
12681321
{
12691322
this.owner = null;
12701323
if (waiting != null)
1324+
{
12711325
LockSupport.unpark(waiting);
1326+
ownerUpdater.compareAndSet(this, null, waiting);
1327+
}
12721328
}
12731329
}
12741330
return true;
@@ -1810,7 +1866,7 @@ public int compareTo(TaskInfo that)
18101866
public List<TaskInfo> taskSnapshot()
18111867
{
18121868
List<TaskInfo> result = new ArrayList<>();
1813-
lock.lock();
1869+
lock();
18141870
try
18151871
{
18161872
addToSnapshot(result, waitingToLoad, TaskInfo.Status.WAITING_TO_LOAD, TaskInfo.Status.WAITING_TO_LOAD);
@@ -1822,7 +1878,7 @@ public List<TaskInfo> taskSnapshot()
18221878
}
18231879
finally
18241880
{
1825-
lock.unlock();
1881+
unlock();
18261882
}
18271883
result.sort(TaskInfo::compareTo);
18281884
return result;

0 commit comments

Comments
 (0)