-
Notifications
You must be signed in to change notification settings - Fork 93
No-op to idempotency table when table is empty on boot #3011
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
All contributors have signed the CLA ✍️ ✅ |
I have read the CLA Document and I hereby sign the CLA |
recheck |
@slinkydeveloper Hi, How can I trigger the CI? It seems that CI is not triggered automatically |
@zhaohaidao I've done that for you |
Thanks. I have fixed fmt check issue. PLAT. @AhmedSoliman @slinkydeveloper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for creating this PR @zhaohaidao. I think it looks really nice. I left a few minor comments and an ask to check whether the configured iterator for spotting empty idempotency tables works with our configured prefix extractor.
// Check if the idempotency table is empty | ||
let is_empty = store.is_idempotency_table_empty()?; | ||
if is_empty { | ||
info!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be a debug log statement as it's an internal optimization.
let partition_store = PartitionStore { | ||
rocksdb: self.rocksdb.clone(), | ||
partition_id: self.partition_id, | ||
data_cf_name: self.data_cf_name.clone(), | ||
key_range: self.key_range.clone(), | ||
key_buffer: BytesMut::new(), | ||
value_buffer: BytesMut::new(), | ||
idempotency_table_disabled: idempotency_disabled, | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of creating a new copy of self
, we can give self.idempotency_disabled
to the PartitionStoreTransaction
.
{ | ||
Ok(store) => store, | ||
Err(e) => { | ||
warn!( | ||
"Failed to check idempotency table: {e}, continuing with standard initialization" | ||
); | ||
PartitionStore::new( | ||
self.rocksdb.clone(), | ||
cf_name, | ||
partition_id, | ||
partition_key_range, | ||
) | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If new_with_idempotency_check
fails, then we can escalate the error because it points towards something is wrong with RocksDB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error returned by PartitionStore::new_with_idempotency_check should be a StorageError because the corresponding table does not exist. However, this error should not occur because all places that call open_partition_store ensure that the table must exist. So would it be more appropriate to panic directly here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition, I don’t seem to see a way to convert StorageError to RocksError. Do you have any suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could introduce a PartitionStoreManagerError
which has a variant RocksDb(#[from] RocksError)
and IdempotencyTableCheck(StorageError)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your adivce.
let partition_store = match PartitionStore::new_with_idempotency_check( | ||
self.rocksdb.clone(), | ||
cf_name, | ||
cf_name.clone(), | ||
partition_id, | ||
partition_key_range, | ||
); | ||
partition_key_range.clone(), | ||
) | ||
.await | ||
{ | ||
Ok(store) => store, | ||
Err(e) => { | ||
warn!( | ||
"Failed to check idempotency table: {e}, continuing with standard initialization" | ||
); | ||
PartitionStore::new( | ||
self.rocksdb.clone(), | ||
cf_name, | ||
partition_id, | ||
partition_key_range, | ||
) | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. We can escalate the error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense
Some(IdempotencyMetadata { | ||
invocation_id: InvocationId::from_parts(10, FIXTURE_INVOCATION_1), | ||
}) | ||
None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great to have a test where we have a non-empty idempotency table and an empty one so that we test both code paths.
@@ -150,11 +167,19 @@ impl IdempotencyTable for PartitionStoreTransaction<'_> { | |||
idempotency_id: &IdempotencyId, | |||
metadata: &IdempotencyMetadata, | |||
) -> Result<()> { | |||
if self.is_idempotency_table_disabled() { | |||
return Ok(()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that write operations should fail if the idempotency table is disabled. This establishes the invariant that it is truely disabled. Even better if we made the write path only available for cfg(test)
then no production code will ever depend on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When IdempotencyTable is empty when initializing PartitionStore, IdempotencyTable will be disabled. So there is no way to enable IdempotencyTable when creating PartitionStore. Do you have any good suggestions? Or should I just keep the test case where IdempotencyTable is disabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can move the non-readonly traits to cfg(test)
. That way it's only present for tests where they might be needed to populate the table for running the test case. Would this work?
opts.set_prefix_same_as_start(true); | ||
opts.set_async_io(true); | ||
opts.set_total_order_seek(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not 100% sure that this would work. I think a problem is that the configured prefix extractor is the fixed_prefix with a prefix length of 10 bytes (key kind + partition id). We would have loved to use the NewCappedPrefixTransform, but it's not yet exposed via the Rust bindings. Only seeking to the key kind might give wrong results w/o set_total_order_seek(true)
. Worth testing it out.
to resolve #2983