Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,18 @@ struct ErrorCode {
static constexpr int INVALID_ALTER_TABLE_EXCEPTION = 56;
/// Deletion operations are disabled on this table.
static constexpr int DELETION_DISABLED_EXCEPTION = 57;

/// Returns true if retrying the request may succeed. Mirrors Java's RetriableException hierarchy.
static constexpr bool IsRetriable(int32_t code) {
return code == NETWORK_EXCEPTION || code == CORRUPT_MESSAGE ||

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we update docs, btw?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have a look and let me know

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@toxicteddy00077 I only see cpp docs updated.
What about rust, python?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah thats why I asked you to have a look. Just wanted to confirm if this was fine. Now I'll make the updates ASAP thanks

code == SCHEMA_NOT_EXIST || code == LOG_STORAGE_EXCEPTION ||
code == KV_STORAGE_EXCEPTION || code == NOT_LEADER_OR_FOLLOWER ||
code == CORRUPT_RECORD_EXCEPTION ||
code == UNKNOWN_TABLE_OR_BUCKET_EXCEPTION || code == REQUEST_TIME_OUT ||
code == STORAGE_EXCEPTION ||
code == NOT_ENOUGH_REPLICAS_AFTER_APPEND_EXCEPTION ||
code == NOT_ENOUGH_REPLICAS_EXCEPTION || code == LEADER_NOT_AVAILABLE_EXCEPTION;
}
};

struct Date {
Expand Down Expand Up @@ -326,6 +338,9 @@ struct Result {
std::string error_message;

bool Ok() const { return error_code == 0; }

/// Returns true if retrying the request may succeed. Client-side errors always return false.
bool IsRetriable() const { return ErrorCode::IsRetriable(error_code); }
};

struct TablePath {
Expand Down
2 changes: 2 additions & 0 deletions bindings/python/fluss/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,8 @@ class FlussError(Exception):
error_code: int
def __init__(self, message: str, error_code: int = -2) -> None: ...
def __str__(self) -> str: ...
@property
def is_retriable(self) -> bool: ...

class LakeSnapshot:
def __init__(self, snapshot_id: int) -> None: ...
Expand Down
10 changes: 10 additions & 0 deletions bindings/python/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ impl FlussError {
format!("FlussError: {}", self.message)
}
}

/// Returns ``True`` if retrying the request may succeed. Client-side errors always return ``False``.
#[getter]
fn is_retriable(&self) -> bool {
use fluss::rpc::FlussError as CoreFlussError;
if self.error_code == CLIENT_ERROR_CODE {
return false;
}
CoreFlussError::for_code(self.error_code).is_retriable()
}
}

impl FlussError {
Expand Down
11 changes: 11 additions & 0 deletions crates/fluss/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,17 @@ impl Error {
None
}
}

/// Returns `true` if retrying the request may succeed.
/// [`Error::RpcError`] is always retriable; [`Error::FlussAPIError`] delegates to
/// [`ApiError::is_retriable`]; all other variants are not.
pub fn is_retriable(&self) -> bool {
match self {
Error::RpcError { .. } => true,
Error::FlussAPIError { api_error } => api_error.is_retriable(),
_ => false,
}
}
}

impl From<ArrowError> for Error {
Expand Down
77 changes: 77 additions & 0 deletions crates/fluss/src/rpc/fluss_api_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ impl Display for ApiError {
}
}

impl ApiError {
/// Returns `true` if retrying the request may succeed. Delegates to [`FlussError::is_retriable`].
pub fn is_retriable(&self) -> bool {
FlussError::for_code(self.code).is_retriable()
}
}

/// Fluss protocol errors. These errors are part of the client-server protocol.
/// The error codes cannot be changed, but the names can be.
///
Expand Down Expand Up @@ -172,6 +179,25 @@ impl FlussError {
*self as i32
}

pub fn is_retriable(&self) -> bool {
matches!(
self,
FlussError::NetworkException
| FlussError::CorruptMessage
| FlussError::SchemaNotExist
| FlussError::LogStorageException
| FlussError::KvStorageException
| FlussError::NotLeaderOrFollower
| FlussError::CorruptRecordException
| FlussError::UnknownTableOrBucketException
| FlussError::RequestTimeOut
| FlussError::StorageException
| FlussError::NotEnoughReplicasAfterAppendException
| FlussError::NotEnoughReplicasException
| FlussError::LeaderNotAvailableException
)
}

/// Returns a friendly description of the error.
pub fn message(&self) -> &'static str {
match self {
Expand Down Expand Up @@ -403,4 +429,55 @@ mod tests {
let fluss_error = FlussError::from(api_error);
assert_eq!(fluss_error, FlussError::TableNotExist);
}

#[test]
fn is_retriable_known_retriable_errors() {
let retriable = [
FlussError::NetworkException,
FlussError::CorruptMessage,
FlussError::SchemaNotExist,
FlussError::LogStorageException,
FlussError::KvStorageException,
FlussError::NotLeaderOrFollower,
FlussError::CorruptRecordException,
FlussError::UnknownTableOrBucketException,
FlussError::RequestTimeOut,
FlussError::StorageException,
FlussError::NotEnoughReplicasAfterAppendException,
FlussError::NotEnoughReplicasException,
FlussError::LeaderNotAvailableException,
];
for err in &retriable {
assert!(err.is_retriable(), "{err:?} should be retriable");
}
}

#[test]
fn is_retriable_known_non_retriable_errors() {
let non_retriable = [
FlussError::UnknownServerError,
FlussError::None,
FlussError::TableNotExist,
FlussError::AuthenticateException,
FlussError::AuthorizationException,
FlussError::RecordTooLargeException,
FlussError::DeletionDisabledException,
FlussError::InvalidCoordinatorException,
FlussError::FencedLeaderEpochException,
FlussError::FencedTieringEpochException,
FlussError::RetriableAuthenticateException,
];
for err in &non_retriable {
assert!(!err.is_retriable(), "{err:?} should not be retriable");
}
}

#[test]
fn api_error_is_retriable_delegates_to_fluss_error() {
let retriable_api = FlussError::RequestTimeOut.to_api_error(None);
assert!(retriable_api.is_retriable());

let permanent_api = FlussError::TableNotExist.to_api_error(None);
assert!(!permanent_api.is_retriable());
}
}
47 changes: 47 additions & 0 deletions website/docs/user-guide/cpp/error-handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,53 @@ if (!result.Ok()) {

See `fluss::ErrorCode` in `fluss.hpp` for the full list of named constants.

## Retry Logic

Some errors are transient, where the server may be temporarily unavailable, mid-election, or under load. `IsRetriable()` can be used for deciding to to retry an operation rather than treating the error as permanent.

`ErrorCode::IsRetriable(int32_t code)` is a static helper available directly on the error code:

```cpp
fluss::Result result = writer.Append(row);
if (!result.Ok()) {
if (result.IsRetriable()) {
// Transient failure — safe to retry
} else {
// Permanent failure — log and abort
std::cerr << "Fatal error (code " << result.error_code
<< "): " << result.error_message << std::endl;
}
}
```

`Result::IsRetriable()` delegates to `ErrorCode::IsRetriable()`, so you can also call it directly on the code:

```cpp
if (fluss::ErrorCode::IsRetriable(result.error_code)) {
// retry
}
```

### Retriable Error Codes

| Constant | Code | Reason |
|-------------------------------------------------------------|------|-------------------------------------------|
| `ErrorCode::NETWORK_EXCEPTION` | 1 | Server disconnected |
| `ErrorCode::CORRUPT_MESSAGE` | 3 | CRC or size error |
| `ErrorCode::SCHEMA_NOT_EXIST` | 9 | Schema may not exist |
| `ErrorCode::LOG_STORAGE_EXCEPTION` | 10 | Transient log storage error |
| `ErrorCode::KV_STORAGE_EXCEPTION` | 11 | Transient KV storage error |
| `ErrorCode::NOT_LEADER_OR_FOLLOWER` | 12 | Leader election in progress |
| `ErrorCode::CORRUPT_RECORD_EXCEPTION` | 14 | Corrupt record |
| `ErrorCode::UNKNOWN_TABLE_OR_BUCKET_EXCEPTION` | 21 | Metadata not yet available |
| `ErrorCode::REQUEST_TIME_OUT` | 25 | Request timed out |
| `ErrorCode::STORAGE_EXCEPTION` | 26 | Transient storage error |
| `ErrorCode::NOT_ENOUGH_REPLICAS_AFTER_APPEND_EXCEPTION` | 28 | Wrote to server but with low ISR size |
| `ErrorCode::NOT_ENOUGH_REPLICAS_EXCEPTION` | 29 | Low ISR size at write time |
| `ErrorCode::LEADER_NOT_AVAILABLE_EXCEPTION` | 44 | No leader available for partition |

Client-side errors (`ErrorCode::CLIENT_ERROR`, code -2) always return `false` from `IsRetriable()`.

## Common Error Scenarios

### Connection Refused
Expand Down
40 changes: 40 additions & 0 deletions website/docs/user-guide/python/error-handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,46 @@ except fluss.FlussError as e:

See `fluss.ErrorCode` for the full list of named constants.

## Retry Logic

Some errors are transient, where the server may be temporarily unavailable, mid-election, or under load. `is_retriable` can be used for deciding to retry an operation rather than treating the error as permanent.

`FlussError.is_retriable` is a property available directly on the exception:

```python
import fluss

try:
await writer.append(row)
except fluss.FlussError as e:
if e.is_retriable:
# Transient failure — safe to retry
pass
else:
# Permanent failure — log and abort
print(f"Fatal error (code {e.error_code}): {e.message}")
```

### Retriable Error Codes

| Constant | Code | Reason |
|--------------------------------------------------------------|------|-------------------------------------------|
| `ErrorCode.NETWORK_EXCEPTION` | 1 | Server disconnected |
| `ErrorCode.CORRUPT_MESSAGE` | 3 | CRC or size error |
| `ErrorCode.SCHEMA_NOT_EXIST` | 9 | Schema may not exist |
| `ErrorCode.LOG_STORAGE_EXCEPTION` | 10 | Transient log storage error |
| `ErrorCode.KV_STORAGE_EXCEPTION` | 11 | Transient KV storage error |
| `ErrorCode.NOT_LEADER_OR_FOLLOWER` | 12 | Leader election in progress |
| `ErrorCode.CORRUPT_RECORD_EXCEPTION` | 14 | Corrupt record |
| `ErrorCode.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION` | 21 | Metadata not yet available |
| `ErrorCode.REQUEST_TIME_OUT` | 25 | Request timed out |
| `ErrorCode.STORAGE_EXCEPTION` | 26 | Transient storage error |
| `ErrorCode.NOT_ENOUGH_REPLICAS_AFTER_APPEND_EXCEPTION` | 28 | Wrote to server but with low ISR size |
| `ErrorCode.NOT_ENOUGH_REPLICAS_EXCEPTION` | 29 | Low ISR size at write time |
| `ErrorCode.LEADER_NOT_AVAILABLE_EXCEPTION` | 44 | No leader available for partition |

Client-side errors (`ErrorCode.CLIENT_ERROR`, code -2) always return `False` from `is_retriable`.

## Common Error Scenarios

### Connection Refused
Expand Down
42 changes: 42 additions & 0 deletions website/docs/user-guide/rust/error-handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,48 @@ match result {
}
```

## Retry Logic

Some errors are transient, where the server may be temporarily unavailable, mid-election, or under load. `is_retriable()` can be used for deciding to retry an operation rather than treating the error as permanent.

`Error::is_retriable()` is available directly on any `Error` value. `RpcError` is always retriable; `FlussAPIError` delegates to the server error code; all other variants return `false`.

```rust
use fluss::error::Error;

match writer.append(&row) {
Ok(_) => {}
Err(ref e) if e.is_retriable() => {
// Transient failure — safe to retry
}
Err(e) => {
// Permanent failure — log and abort
eprintln!("Fatal error: {}", e);
}
}
```

### Retriable Variants

| Variant / Error | Code | Reason |
|----------------------------------------------|------|-------------------------------------------|
| `Error::RpcError` | — | Network-level failure, always retriable |
| `FlussError::NetworkException` | 1 | Server disconnected |
| `FlussError::CorruptMessage` | 3 | CRC or size error |
| `FlussError::SchemaNotExist` | 9 | Schema may not exist |
| `FlussError::LogStorageException` | 10 | Transient log storage error |
| `FlussError::KvStorageException` | 11 | Transient KV storage error |
| `FlussError::NotLeaderOrFollower` | 12 | Leader election in progress |
| `FlussError::CorruptRecordException` | 14 | Corrupt record |
| `FlussError::UnknownTableOrBucketException` | 21 | Metadata not yet available |
| `FlussError::RequestTimeOut` | 25 | Request timed out |
| `FlussError::StorageException` | 26 | Transient storage error |
| `FlussError::NotEnoughReplicasAfterAppendException` | 28 | Wrote to server but with low ISR size |
| `FlussError::NotEnoughReplicasException` | 29 | Low ISR size at write time |
| `FlussError::LeaderNotAvailableException` | 44 | No leader available for partition |

All other `Error` variants (e.g. `RowConvertError`, `IllegalArgument`, `UnsupportedOperation`) always return `false` from `is_retriable()`.

## Common Error Scenarios

### Connection Refused
Expand Down
Loading