Skip to content

Commit bed9036

Browse files
Added is_retriable() to FlussError
1 parent a37c5ba commit bed9036

5 files changed

Lines changed: 115 additions & 0 deletions

File tree

bindings/cpp/include/fluss.hpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,18 @@ struct ErrorCode {
180180
static constexpr int INVALID_ALTER_TABLE_EXCEPTION = 56;
181181
/// Deletion operations are disabled on this table.
182182
static constexpr int DELETION_DISABLED_EXCEPTION = 57;
183+
184+
/// Returns true if retrying the request may succeed. Mirrors Java's RetriableException hierarchy.
185+
static constexpr bool IsRetriable(int32_t code) {
186+
return code == NETWORK_EXCEPTION || code == CORRUPT_MESSAGE ||
187+
code == SCHEMA_NOT_EXIST || code == LOG_STORAGE_EXCEPTION ||
188+
code == KV_STORAGE_EXCEPTION || code == NOT_LEADER_OR_FOLLOWER ||
189+
code == CORRUPT_RECORD_EXCEPTION ||
190+
code == UNKNOWN_TABLE_OR_BUCKET_EXCEPTION || code == REQUEST_TIME_OUT ||
191+
code == STORAGE_EXCEPTION ||
192+
code == NOT_ENOUGH_REPLICAS_AFTER_APPEND_EXCEPTION ||
193+
code == NOT_ENOUGH_REPLICAS_EXCEPTION || code == LEADER_NOT_AVAILABLE_EXCEPTION;
194+
}
183195
};
184196

185197
struct Date {
@@ -326,6 +338,9 @@ struct Result {
326338
std::string error_message;
327339

328340
bool Ok() const { return error_code == 0; }
341+
342+
/// Returns true if retrying the request may succeed. Client-side errors always return false.
343+
bool IsRetriable() const { return ErrorCode::IsRetriable(error_code); }
329344
};
330345

331346
struct TablePath {

bindings/python/fluss/__init__.pyi

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,8 @@ class FlussError(Exception):
772772
error_code: int
773773
def __init__(self, message: str, error_code: int = -2) -> None: ...
774774
def __str__(self) -> str: ...
775+
@property
776+
def is_retriable(self) -> bool: ...
775777

776778
class LakeSnapshot:
777779
def __init__(self, snapshot_id: int) -> None: ...

bindings/python/src/error.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,16 @@ impl FlussError {
5151
format!("FlussError: {}", self.message)
5252
}
5353
}
54+
55+
/// Returns ``True`` if retrying the request may succeed. Client-side errors always return ``False``.
56+
#[getter]
57+
fn is_retriable(&self) -> bool {
58+
use fluss::rpc::FlussError as CoreFlussError;
59+
if self.error_code == CLIENT_ERROR_CODE {
60+
return false;
61+
}
62+
CoreFlussError::for_code(self.error_code).is_retriable()
63+
}
5464
}
5565

5666
impl FlussError {

crates/fluss/src/error.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,17 @@ impl Error {
165165
None
166166
}
167167
}
168+
169+
/// Returns `true` if retrying the request may succeed.
170+
/// [`Error::RpcError`] is always retriable; [`Error::FlussAPIError`] delegates to
171+
/// [`ApiError::is_retriable`]; all other variants are not.
172+
pub fn is_retriable(&self) -> bool {
173+
match self {
174+
Error::RpcError { .. } => true,
175+
Error::FlussAPIError { api_error } => api_error.is_retriable(),
176+
_ => false,
177+
}
178+
}
168179
}
169180

170181
impl From<ArrowError> for Error {

crates/fluss/src/rpc/fluss_api_error.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@ impl Display for ApiError {
3939
}
4040
}
4141

42+
impl ApiError {
43+
/// Returns `true` if retrying the request may succeed. Delegates to [`FlussError::is_retriable`].
44+
pub fn is_retriable(&self) -> bool {
45+
FlussError::for_code(self.code).is_retriable()
46+
}
47+
}
48+
4249
/// Fluss protocol errors. These errors are part of the client-server protocol.
4350
/// The error codes cannot be changed, but the names can be.
4451
///
@@ -172,6 +179,25 @@ impl FlussError {
172179
*self as i32
173180
}
174181

182+
pub fn is_retriable(&self) -> bool {
183+
matches!(
184+
self,
185+
FlussError::NetworkException
186+
| FlussError::CorruptMessage
187+
| FlussError::SchemaNotExist
188+
| FlussError::LogStorageException
189+
| FlussError::KvStorageException
190+
| FlussError::NotLeaderOrFollower
191+
| FlussError::CorruptRecordException
192+
| FlussError::UnknownTableOrBucketException
193+
| FlussError::RequestTimeOut
194+
| FlussError::StorageException
195+
| FlussError::NotEnoughReplicasAfterAppendException
196+
| FlussError::NotEnoughReplicasException
197+
| FlussError::LeaderNotAvailableException
198+
)
199+
}
200+
175201
/// Returns a friendly description of the error.
176202
pub fn message(&self) -> &'static str {
177203
match self {
@@ -403,4 +429,55 @@ mod tests {
403429
let fluss_error = FlussError::from(api_error);
404430
assert_eq!(fluss_error, FlussError::TableNotExist);
405431
}
432+
433+
#[test]
434+
fn is_retriable_known_retriable_errors() {
435+
let retriable = [
436+
FlussError::NetworkException,
437+
FlussError::CorruptMessage,
438+
FlussError::SchemaNotExist,
439+
FlussError::LogStorageException,
440+
FlussError::KvStorageException,
441+
FlussError::NotLeaderOrFollower,
442+
FlussError::CorruptRecordException,
443+
FlussError::UnknownTableOrBucketException,
444+
FlussError::RequestTimeOut,
445+
FlussError::StorageException,
446+
FlussError::NotEnoughReplicasAfterAppendException,
447+
FlussError::NotEnoughReplicasException,
448+
FlussError::LeaderNotAvailableException,
449+
];
450+
for err in &retriable {
451+
assert!(err.is_retriable(), "{err:?} should be retriable");
452+
}
453+
}
454+
455+
#[test]
456+
fn is_retriable_known_non_retriable_errors() {
457+
let non_retriable = [
458+
FlussError::UnknownServerError,
459+
FlussError::None,
460+
FlussError::TableNotExist,
461+
FlussError::AuthenticateException,
462+
FlussError::AuthorizationException,
463+
FlussError::RecordTooLargeException,
464+
FlussError::DeletionDisabledException,
465+
FlussError::InvalidCoordinatorException,
466+
FlussError::FencedLeaderEpochException,
467+
FlussError::FencedTieringEpochException,
468+
FlussError::RetriableAuthenticateException,
469+
];
470+
for err in &non_retriable {
471+
assert!(!err.is_retriable(), "{err:?} should not be retriable");
472+
}
473+
}
474+
475+
#[test]
476+
fn api_error_is_retriable_delegates_to_fluss_error() {
477+
let retriable_api = FlussError::RequestTimeOut.to_api_error(None);
478+
assert!(retriable_api.is_retriable());
479+
480+
let permanent_api = FlussError::TableNotExist.to_api_error(None);
481+
assert!(!permanent_api.is_retriable());
482+
}
406483
}

0 commit comments

Comments
 (0)