Skip to content

Commit a3ac0ee

Browse files
Added is_retriable() to FlussError
1 parent a37c5ba commit a3ac0ee

5 files changed

Lines changed: 110 additions & 0 deletions

File tree

bindings/cpp/include/fluss.hpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,17 @@ struct ErrorCode {
180180
static constexpr int INVALID_ALTER_TABLE_EXCEPTION = 56;
181181
/// Deletion operations are disabled on this table.
182182
static constexpr int DELETION_DISABLED_EXCEPTION = 57;
183+
184+
/// Returns true if retrying the request may succeed. Similar toJava's RetriableException hierarchy.
185+
static constexpr bool IsRetriable(int32_t code) {
186+
return code == NETWORK_EXCEPTION || code == CORRUPT_MESSAGE ||
187+
code == SCHEMA_NOT_EXIST || code == NOT_LEADER_OR_FOLLOWER ||
188+
code == CORRUPT_RECORD_EXCEPTION ||
189+
code == UNKNOWN_TABLE_OR_BUCKET_EXCEPTION || code == REQUEST_TIME_OUT ||
190+
code == STORAGE_EXCEPTION ||
191+
code == NOT_ENOUGH_REPLICAS_AFTER_APPEND_EXCEPTION ||
192+
code == NOT_ENOUGH_REPLICAS_EXCEPTION || code == LEADER_NOT_AVAILABLE_EXCEPTION;
193+
}
183194
};
184195

185196
struct Date {
@@ -326,6 +337,9 @@ struct Result {
326337
std::string error_message;
327338

328339
bool Ok() const { return error_code == 0; }
340+
341+
/// Returns true if retrying the request may succeed. Client-side errors always return false.
342+
bool IsRetriable() const { return ErrorCode::IsRetriable(error_code); }
329343
};
330344

331345
struct TablePath {

bindings/python/fluss/__init__.pyi

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,8 @@ class FlussError(Exception):
772772
error_code: int
773773
def __init__(self, message: str, error_code: int = -2) -> None: ...
774774
def __str__(self) -> str: ...
775+
@property
776+
def is_retriable(self) -> bool: ...
775777

776778
class LakeSnapshot:
777779
def __init__(self, snapshot_id: int) -> None: ...

bindings/python/src/error.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,16 @@ impl FlussError {
5151
format!("FlussError: {}", self.message)
5252
}
5353
}
54+
55+
/// Returns ``True`` if retrying the request may succeed. Client-side errors always return ``False``.
56+
#[getter]
57+
fn is_retriable(&self) -> bool {
58+
use fluss::rpc::FlussError as CoreFlussError;
59+
if self.error_code == CLIENT_ERROR_CODE {
60+
return false;
61+
}
62+
CoreFlussError::for_code(self.error_code).is_retriable()
63+
}
5464
}
5565

5666
impl FlussError {

crates/fluss/src/error.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,17 @@ impl Error {
165165
None
166166
}
167167
}
168+
169+
/// Returns `true` if retrying the request may succeed.
170+
/// [`Error::RpcError`] is always retriable; [`Error::FlussAPIError`] delegates to
171+
/// [`ApiError::is_retriable`]; all other variants are not.
172+
pub fn is_retriable(&self) -> bool {
173+
match self {
174+
Error::RpcError { .. } => true,
175+
Error::FlussAPIError { api_error } => api_error.is_retriable(),
176+
_ => false,
177+
}
178+
}
168179
}
169180

170181
impl From<ArrowError> for Error {

crates/fluss/src/rpc/fluss_api_error.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@ impl Display for ApiError {
3939
}
4040
}
4141

42+
impl ApiError {
43+
/// Returns `true` if retrying the request may succeed. Delegates to [`FlussError::is_retriable`].
44+
pub fn is_retriable(&self) -> bool {
45+
FlussError::for_code(self.code).is_retriable()
46+
}
47+
}
48+
4249
/// Fluss protocol errors. These errors are part of the client-server protocol.
4350
/// The error codes cannot be changed, but the names can be.
4451
///
@@ -172,6 +179,23 @@ impl FlussError {
172179
*self as i32
173180
}
174181

182+
pub fn is_retriable(&self) -> bool {
183+
matches!(
184+
self,
185+
FlussError::NetworkException
186+
| FlussError::CorruptMessage
187+
| FlussError::SchemaNotExist
188+
| FlussError::NotLeaderOrFollower
189+
| FlussError::CorruptRecordException
190+
| FlussError::UnknownTableOrBucketException
191+
| FlussError::RequestTimeOut
192+
| FlussError::StorageException
193+
| FlussError::NotEnoughReplicasAfterAppendException
194+
| FlussError::NotEnoughReplicasException
195+
| FlussError::LeaderNotAvailableException
196+
)
197+
}
198+
175199
/// Returns a friendly description of the error.
176200
pub fn message(&self) -> &'static str {
177201
match self {
@@ -403,4 +427,53 @@ mod tests {
403427
let fluss_error = FlussError::from(api_error);
404428
assert_eq!(fluss_error, FlussError::TableNotExist);
405429
}
430+
431+
#[test]
432+
fn is_retriable_known_retriable_errors() {
433+
let retriable = [
434+
FlussError::NetworkException,
435+
FlussError::CorruptMessage,
436+
FlussError::SchemaNotExist,
437+
FlussError::NotLeaderOrFollower,
438+
FlussError::CorruptRecordException,
439+
FlussError::UnknownTableOrBucketException,
440+
FlussError::RequestTimeOut,
441+
FlussError::StorageException,
442+
FlussError::NotEnoughReplicasAfterAppendException,
443+
FlussError::NotEnoughReplicasException,
444+
FlussError::LeaderNotAvailableException,
445+
];
446+
for err in &retriable {
447+
assert!(err.is_retriable(), "{err:?} should be retriable");
448+
}
449+
}
450+
451+
#[test]
452+
fn is_retriable_known_non_retriable_errors() {
453+
let non_retriable = [
454+
FlussError::UnknownServerError,
455+
FlussError::None,
456+
FlussError::TableNotExist,
457+
FlussError::AuthenticateException,
458+
FlussError::AuthorizationException,
459+
FlussError::RecordTooLargeException,
460+
FlussError::DeletionDisabledException,
461+
FlussError::InvalidCoordinatorException,
462+
FlussError::FencedLeaderEpochException,
463+
FlussError::FencedTieringEpochException,
464+
FlussError::RetriableAuthenticateException,
465+
];
466+
for err in &non_retriable {
467+
assert!(!err.is_retriable(), "{err:?} should not be retriable");
468+
}
469+
}
470+
471+
#[test]
472+
fn api_error_is_retriable_delegates_to_fluss_error() {
473+
let retriable_api = FlussError::RequestTimeOut.to_api_error(None);
474+
assert!(retriable_api.is_retriable());
475+
476+
let permanent_api = FlussError::TableNotExist.to_api_error(None);
477+
assert!(!permanent_api.is_retriable());
478+
}
406479
}

0 commit comments

Comments
 (0)