-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Optimize ScyllaDB usage #3985
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize ScyllaDB usage #3985
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
903558c
to
6ceaa35
Compare
} | ||
|
||
impl ScyllaDbClient { | ||
async fn get_multi_stmt( | ||
&self, | ||
selects_value: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we remove this parameter and have two separate methods get_multi_key_values_statement
and get_multi_keys_statement
instead?
find_keys_by_prefix_unbounded: PreparedStatement, | ||
find_keys_by_prefix_bounded: PreparedStatement, | ||
find_key_values_by_prefix_unbounded: PreparedStatement, | ||
find_key_values_by_prefix_bounded: PreparedStatement, | ||
multi_kv_ps: DashMap<usize, PreparedStatement>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
multi_key_values
and multi_keys
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How large are PreparedStatement
in memory by the way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be pretty small, it's just some metadata, not sure it justifies an LRU cache or something here tbh 🤔
// The schema appears too complicated for non-trivial reasons. | ||
// See TODO(#1069). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#1069 is closed. Let's remove this mysterious comment
let default_profile = ExecutionProfile::builder() | ||
.load_balancing_policy(lbp) | ||
.retry_policy(Arc::new(DefaultRetryPolicy::new())) | ||
.consistency(Consistency::LocalOne) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This consistency level may be insufficient in the future for dynamic shard migrations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And more importantly, it could also be a problem for blob storage and certificates. Because we occasionally store a blob, then notify another worker (or indirectly the proxy) to do something with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see 🤔 makes sense. Let me see how it performs with LOCAL_QUORUM
instead. Maybe after we redo the partitioning it'll be good enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Performance hit is pretty substantial btw, but hopefully we can make up for it by optimizing stuff after partitioning the data more
218d2ee
to
8493ce6
Compare
8493ce6
to
d681ce9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Nit: The PR description suggests it removes the use of refactoring and tuned page creation.)
async fn read_multi_values_internal( | ||
&self, | ||
root_key: &[u8], | ||
fn get_occurences_map( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fn get_occurences_map( | |
fn get_occurrences_map( |
keys: Vec<Vec<u8>>, | ||
) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> { | ||
let mut values = vec![None; keys.len()]; | ||
let map = Self::get_occurences_map(keys)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let map = Self::get_occurences_map(keys)?; | |
let map = Self::get_occurrences_map(keys)?; |
while let Some(row) = rows.next().await { | ||
let (key,) = row?; | ||
for i_key in map.get(&key).unwrap().clone() { | ||
*values.get_mut(i_key).expect("an entry in values") = true; | ||
for i_key in map.get(&key).expect("key is supposed to be in map") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Or even just map[&key]
?)
let query3 = &self.write_batch_deletion; | ||
try_join_all(futures).await?; | ||
|
||
let mut futures: Vec<BoxFuture<'_, Result<(), ScyllaDbStoreInternalError>>> = Vec::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the deletions of individual keys could even happen concurrently with the prefix deletions?
'class' : 'NetworkTopologyStrategy', \ | ||
'replication_factor' : {} \ | ||
}}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'class' : 'NetworkTopologyStrategy', \ | |
'replication_factor' : {} \ | |
}}", | |
'class' : 'NetworkTopologyStrategy', \ | |
'replication_factor' : {} \ | |
}}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, missed that 😅
root_key blob, \ | ||
k blob, \ | ||
v blob, \ | ||
PRIMARY KEY (root_key, k) \ | ||
) \ | ||
WITH compaction = {{ \ | ||
'class' : 'SizeTieredCompactionStrategy', \ | ||
'min_sstable_size' : 268435456, \ | ||
'bucket_low' : 0.5, \ | ||
'bucket_high' : 1.5, \ | ||
'min_threshold' : 4, \ | ||
'max_threshold' : 32 \ | ||
}} \ | ||
AND compression = {{ 'sstable_compression': 'LZ4Compressor', 'chunk_length_kb':'8' }} \ | ||
AND caching = {{ 'enabled': 'true' }}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
root_key blob, \ | |
k blob, \ | |
v blob, \ | |
PRIMARY KEY (root_key, k) \ | |
) \ | |
WITH compaction = {{ \ | |
'class' : 'SizeTieredCompactionStrategy', \ | |
'min_sstable_size' : 268435456, \ | |
'bucket_low' : 0.5, \ | |
'bucket_high' : 1.5, \ | |
'min_threshold' : 4, \ | |
'max_threshold' : 32 \ | |
}} \ | |
AND compression = {{ 'sstable_compression': 'LZ4Compressor', 'chunk_length_kb':'8' }} \ | |
AND caching = {{ 'enabled': 'true' }}", | |
root_key blob, \ | |
k blob, \ | |
v blob, \ | |
PRIMARY KEY (root_key, k) \ | |
) \ | |
WITH compaction = {{ \ | |
'class' : 'SizeTieredCompactionStrategy', \ | |
'min_sstable_size' : 268435456, \ | |
'bucket_low' : 0.5, \ | |
'bucket_high' : 1.5, \ | |
'min_threshold' : 4, \ | |
'max_threshold' : 32 \ | |
}} \ | |
AND compression = {{ 'sstable_compression': 'LZ4Compressor', 'chunk_length_kb':'8' }} \ | |
AND caching = {{ 'enabled': 'true' }}", |
d681ce9
to
2fccd92
Compare
2fccd92
to
8702072
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some changes I can approve and some I cannot.
By this I mean the use of the futures
for the writing of batches.
@@ -51,15 +57,15 @@ use crate::{ | |||
/// The limit is in reality 100. But we need one entry for the root key. | |||
const MAX_MULTI_KEYS: usize = 99; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe put 100 - 1
here in spirit to other entries of the file.
if let Some(entry) = self.multi_key_values.get(&num_markers) { | ||
return Ok(entry.clone()); | ||
} | ||
let markers = std::iter::repeat_n("?", num_markers) | ||
.collect::<Vec<_>>() | ||
.join(","); | ||
let query = format!( | ||
"SELECT k,v FROM kv.{} WHERE root_key = ? AND k IN ({})", | ||
self.namespace, markers | ||
); | ||
let prepared_statement = self.session.prepare(query).await?; | ||
self.multi_key_values | ||
.insert(num_markers, prepared_statement.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A small problem of this approach is that we have two queries:
- The first
multi_key_values.get
. - The second with
multi_key_values.insert
.
This is slightly suboptimal.
For BTreeMap
we can get an entry and then later fill it up. This seems possible with DashMap
as well: https://docs.rs/dashmap/6.1.0/dashmap/mapref/entry/enum.Entry.html
write_batch_delete_prefix_unbounded: BatchStatement, | ||
write_batch_delete_prefix_bounded: BatchStatement, | ||
write_batch_deletion: BatchStatement, | ||
write_batch_insertion: BatchStatement, | ||
write_batch_delete_prefix_unbounded: PreparedStatement, | ||
write_batch_delete_prefix_bounded: PreparedStatement, | ||
write_batch_deletion: PreparedStatement, | ||
write_batch_insertion: PreparedStatement, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The switch from BatchStatement
to PrepareStatement
is done by the removal of the .into()
?
Nice find.
let query = format!( | ||
"SELECT v FROM kv.{} WHERE root_key = ? AND k = ? ALLOW FILTERING", | ||
namespace | ||
); | ||
let read_value = session.prepare(query).await?; | ||
let read_value = session | ||
.prepare(format!( | ||
"SELECT v FROM kv.{} WHERE root_key = ? AND k = ?", | ||
namespace | ||
)) | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer the old style where we build first the query
and the read_value
, but that is only stylistic.
async fn read_multi_values_internal( | ||
&self, | ||
root_key: &[u8], | ||
fn get_occurrences_map( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
for i_key in map.get(&key).unwrap().clone() { | ||
*values.get_mut(i_key).expect("an entry in values") = true; | ||
for i_key in &map[&key] { | ||
values[*i_key] = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting rewrite.
} | ||
let query4 = &self.write_batch_insertion; | ||
|
||
try_join_all(futures).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have several issues with the design using the try_join_all
:
- Is it actually faster to use many futures instead of a single batch? I think no.
- What we want is for all the transactions to pass, or none at all. That is what the batch should allow doing.
So, I am not sure that this rewrite is a good one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you're right, this one is wrong. I wrote this before I realized our need for atomicity in these operations. In the PR altering the partitions we'll either need to use LOGGED
batches (and take a huge performance hit), or make sure that we make it clear that we guarantee atomicity only within the same partition, and group the batches by partition key.
Anyways, I'll mostly revert this part for now, thanks for the feedback!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up just writing the batching properly here, seemed to make more sense than in the next PR
let prepared_statement = &self.write_batch_insertion; | ||
let values = (root_key.clone(), key, value); | ||
futures.push(Box::pin(async move { | ||
session | ||
.execute_single_page(prepared_statement, values, PagingState::start()) | ||
.await | ||
.map(|_| ()) | ||
.map_err(Into::into) | ||
})); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, n keys being inserted, lead to n futures? How could that work well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We were trading extra network calls overhead with the fact that we were sending prepared statements to the correct shards directly (which is more performant), but this approach overall will be rewritten, as I mentioned in the other comment, as we lose the atomicity of batches
async fn build_session(uri: &str) -> Result<Session, ScyllaDbStoreInternalError> { | ||
let policy = DefaultPolicy::builder().token_aware(true).build(); | ||
let default_profile = ExecutionProfile::builder() | ||
.load_balancing_policy(policy) | ||
.retry_policy(Arc::new(DefaultRetryPolicy::new())) | ||
.consistency(Consistency::LocalQuorum) | ||
.build(); | ||
let handle = default_profile.into_handle(); | ||
SessionBuilder::new() | ||
.known_node(uri) | ||
.default_execution_profile_handle(handle) | ||
.compression(Some(Compression::Lz4)) | ||
.build() | ||
.boxed() | ||
.await | ||
.map_err(Into::into) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a number of choices being made here, and so it would be nice to have some documentation.
"CREATE TABLE kv.{} (\ | ||
root_key blob, \ | ||
k blob, \ | ||
v blob, \ | ||
PRIMARY KEY (root_key, k) \ | ||
) \ | ||
WITH compaction = {{ \ | ||
'class' : 'SizeTieredCompactionStrategy', \ | ||
'min_sstable_size' : 52428800, \ | ||
'bucket_low' : 0.8, \ | ||
'bucket_high' : 1.25, \ | ||
'min_threshold' : 4, \ | ||
'max_threshold' : 8 \ | ||
}} \ | ||
AND compression = {{ \ | ||
'sstable_compression': 'LZ4Compressor', \ | ||
'chunk_length_in_kb':'4' \ | ||
}} \ | ||
AND caching = {{ \ | ||
'enabled': 'true' \ | ||
}}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some documentation of this would be nice as well. Why are those parameters chosen that way?
d91638d
to
e8c62d5
Compare
ce533ed
to
ef194de
Compare
// Returns a batch query with a sticky shard policy, that always tries to route to the same | ||
// ScyllaDB shard. | ||
// Should be used only on batches where all statements are to the same partition key. | ||
async fn get_sticky_batch_query( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's happening here? Do we need this? This looks pretty non-trivial and the PR summary doesn't mention it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explain why default load balancing policies are not enough (if that's the case)
https://java-driver.docs.scylladb.com/stable/manual/core/load_balancing/#built-in-policies
Talked offline and will do the write batch rewrite in a different PR |
ef194de
to
4acb372
Compare
4acb372
to
6106db7
Compare
Motivation
There are some optimization that we can do in how we're using ScyllaDB
Proposal
This PR removes the use of:
ALLOW FILTERING
(the ones remaining should be fine)Also:
Test Plan
CI + deployed a network, benchmarked it, and saw an over 10x decrease in ScyllaDB`s write latency
Release Plan