-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Optimize ScyllaDB's batch writes #4047
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
46899e9
to
78c5f0a
Compare
4acb372
to
6106db7
Compare
78c5f0a
to
0c95618
Compare
6106db7
to
af51607
Compare
0c95618
to
768208f
Compare
768208f
to
0b093e3
Compare
09d6c81
to
af0e20c
Compare
af0e20c
to
fa4280f
Compare
fa4280f
to
e1c5311
Compare
6ada5da
to
a8361a5
Compare
self.multi_keys | ||
.insert(num_markers, prepared_statement.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why switch back to the construction with two searches, get
and then insert
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dashmap
's entry
will actually deadlock if you call it and there's anyone else using the map :)
So it's not a good idea here, and was actually a bug to use it. We got "lucky" that CI didn't trigger it much in the PR that was merged, but it will be triggered increasingly more with these PRs, specially the new partitions one.
But CI in this PR was already deadlocking sometimes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, but then I would add a comment on that.
I think the most appropriate is "We are not using the dashmap::DashMap::entry
in order to reduce contention and creating deadlocks".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it's generally never safe to hold DashMap
references over an await
point. (Or is it?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MathieuDutSik The Dashmap
documentation is pretty clear about the deadlock risk, I think adding another comment here is redundant.
@afck Exactly, it's not safe. Which is why it's best to avoid things that hold a reference to the map for too long, like using entry
for example. It is designed for concurrent access though, there are just these nuances to be careful about
Do we need this if we never have batches that affect multiple shards? Can we enforce that each view is on a single shard, and then just not batch anything else? |
a8361a5
to
b6a6083
Compare
Yes, even if all queries in the batch are for the same partition key, the Rust driver will still just send the batch to a random node by default.
Yes, but that's a much bigger change I think 😅 both the enforcing that each view is on a single shard, and not batching. |
b6a6083
to
c8b5e29
Compare
877abcb
to
6976d9c
Compare
c8b5e29
to
36cfe7b
Compare
@@ -98,13 +113,38 @@ const MAX_BATCH_SIZE: usize = 5000; | |||
/// The keyspace to use for the ScyllaDB database. | |||
const KEYSPACE: &str = "kv"; | |||
|
|||
/// The default size of the cache for the load balancing policies. | |||
const DEFAULT_LOAD_BALANCING_POLICY_CACHE_SIZE: usize = 50_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this work now, as of Rust 1.83.0?
const DEFAULT_LOAD_BALANCING_POLICY_CACHE_SIZE: usize = 50_000; | |
const DEFAULT_LOAD_BALANCING_POLICY_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(50_000).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like it does!
policy | ||
} | ||
Err(error) => { | ||
// Cache that the policy creation failed, so we don't try again too soon, and don't |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this expected? Should we log if that happens?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can do a WARN
here I think, but it shouldn't happen too many times AFAIU
match policy { | ||
LoadBalancingPolicyCacheEntry::Ready(policy) => policy.clone(), | ||
LoadBalancingPolicyCacheEntry::NotReady(timestamp, token) => { | ||
if Timestamp::now().delta_since(*timestamp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should use Instance
here instead of Timestamp
? Our linera_base
timestamp type is just to define and serialize timestamps in the protocol as u64
s. Since this here is only used locally I'd go with the standard library. Also, this expression here could then be timestamp.elapsed()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I meant Instant
.
36cfe7b
to
cef2f5d
Compare
pub struct ScyllaDbClientConfig { | ||
/// The delay before the sticky load balancing policy creation is retried. | ||
pub delay_before_sticky_load_balancing_policy_retry_ms: u64, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we have a ScyllaDbClientConfig
, why not put the DEFAULT_LOAD_BALANCING_POLICY_CACHE_SIZE
in it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, the replication_factor could have its place here. It does not have to be in this PR, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could, sure! And on the replication_factor
, yeah, that's in my TODO list 😅 I noticed a while back it doesn't belong on common config as it's actually specific to ScyllaDb
enum LoadBalancingPolicyCacheEntry { | ||
Ready(Arc<dyn LoadBalancingPolicy>), | ||
// The timestamp of the last time the policy creation was attempted. | ||
NotReady(Instant, Option<Token>), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some maybe stupid concern, but I think that the sharding policy in ScyllaDb
is dynamic.
So, I wonder if this affects the caching.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. For a given partition_key
, the Token
is always unique, as it's always calculated by the same hash function. So the Token
won't change. As far as the endpoints, and AFAIU, we'll only reshuffle token ranges (causing the nodes that hold a different token to change) when we scale ScyllaDB up or down, as in add or remove VMs in the NodePool, or run ALTER TABLE
commands, stuff like that.
We currently don't do that, and it would complicate the code a bit to add support for it now, so I would rather leave it for when it's needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, we don't support changes in the sharding assignment on the ScyllaDb side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sticky load balancing policy part currently doesn't, but we can add support for that in a follow up PR, I think :)
cef2f5d
to
aca7d91
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No obstacle on my side, but I would like to see benchmarks confirming the improvements before merging.
Benchmarks can be added to the PR description.
if let Some(policy) = cache.get(partition_key) { | ||
match policy { | ||
LoadBalancingPolicyCacheEntry::Ready(policy) => policy.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for nesting if let
and match
:
match cache.get(partition_key) {
Some(LoadBalancingPolicyCacheEntry::Ready(policy)) => ...
...
None => ...
}
aca7d91
to
9367ee5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's experiment with this on a separate branch first. The fact that this doesn't support changes in the shard assignments by ScyllaDb is a problem.
I can add support for that on this PR, or in a following one, to not inflate the size of this one even more (I won't merge this without that, will merge the full stack at once, probably). I was thinking on creating a stack that we know works, then merging the full stack once we agree it gets things to a good state (instead of doing a separate branch). |
Motivation
So it is a known thing that batches are not token aware in the ScyllaDB’s Rust driver. Batches will be sent to a random node, which will then forward things to the proper nodes, which makes it not be “token aware”. Which also means there’s an extra network hop for most batch requests. This is what the default load balancing policies currently do.
So currently if someone using the Rust driver needs atomicity, they can use batches, but they’ll get a bit of a performance hit as the batch won’t be token aware.
So, for us to have the best performance on batches, maintaining the per partition atomicity that it guarantees, we would need shard aware batching, but that’s not yet supported in the Rust driver.
There is some work being attempted for “shard aware batching”, but one of the reviewers is arguing that there are ways of solving this problem that don’t involve user code. These ways involve creating a custom Load Balancing Policy, which is what I'm doing in this PR.
Proposal
Build a custom "Sticky" Load Balancing Policy. This policy will be specific to a given partition: given the partition, it will remember what are the
(node, shard)
pairs for all the replicas containing this partition's data. Then for every batch that we try to send, send them to one of the replicas, in a round robin fashion to spread load across the replicas.We'll have an LRU cache keyed on the partition key, that contains either a
Ready
value or aNotReady
value.The reason for this is that there are some cases where you try to get the endpoints information for a token, and the Rust driver hasn't updated it's metadata yet about the table, so that information isn't filled yet. If we have a
Ready
value, we have the actual "sticky" policy already, with the(node, shard)
endpoints, and we're good to go. If you have aNotReady
value, you'll have a timestamp of when we last attempted to get the endpoints. We always wait at least 2 seconds before trying again, to give the driver time to update itself, and not overload it with these endpoint requests. Until then we use the default policy and take a bit of a performance hit, but should be for very limited time.The
NotReady
state can also contain theToken
already for that partition, in case we managed to calculate it in the last attempt. TheToken
is calculated by doing aMurmur3
hash of the tables specs and partition key. If the table doesn't change thatToken
will never change for this partition key. Since there's hashing involved, we cache it to not do that repeated work.If we ever decide to auto scale our ScyllaDB deployment based on load, we'll need to add a mechanism here to invalidate these cache entries when that happens.
Test Plan
CI + I won't merge before I benchmark this code together with the new key space partitioning PR, to make sure the performance is what we expect.
Release Plan