Skip to content

Fix and improve error handling#215

Merged
the-orex merged 8 commits intomainfrom
feature/fix-error-handling-not-resetting-after-success
Mar 9, 2026
Merged

Fix and improve error handling#215
the-orex merged 8 commits intomainfrom
feature/fix-error-handling-not-resetting-after-success

Conversation

@kino-andrew
Copy link
Collaborator

Runtime

  • bug was not returning error with status but server hangup instead

Kafka-sink

  • Adding cancellation token when there is error sending to kafka to avoid silent loss

Block-coordinator

  • Adding idleness check config on block coordinator source (default is no check)
  • Adding source_label to allow customization of logging

@kino-andrew kino-andrew changed the title Feature/fix error handling not resetting after success Fix and improve error handling Mar 5, 2026
Comment on lines +197 to +203
let update = if idle_warn_secs == 0 {
match stream.next().await {
Some(update) => update,
None => break SourceExitStatus::StreamEnded,
}
} else {
match tokio::time::timeout(stream_idle_timeout, stream.next()).await {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't smell normal. The grpc client sends ping requests if you are seeing idle connections that is more likely an issue with the grpc source.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes i agree, i use https://richat.juprpc.com as source.

For the context, I configured it to 2s (since i'm always supposed to receive data pretty quick e.g slot update status is minimum every 400ms at worst) but it happened 100 times last night and the stream resumed only 95 times so it shows a problem not detected by the ping.

So we have some kind of lag highlighted through this. I don't know where in the chain it happens (load balancer or lag from_slot memory fetch etc...) tho, need to investigate properly.

}
}

fn check_send<T>(&self, result: Result<(), tokio::sync::mpsc::error::SendError<T>>) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn check_send<T>(&self, result: Result<(), tokio::sync::mpsc::error::SendError<T>>) {
fn cancelable_send<T>(&self, result: Result<(), tokio::sync::mpsc::error::SendError<T>>) {

More expressive than check?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes i like it too, will change

}

#[derive(Debug)]
struct MockSourceExitStreamErrorSource;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool!

impl SourceTrait for MockSourceExitStreamErrorSource {
type Config = NullConfig;

fn new(_: NullConfig, _: vixen_core::Filters) -> Self { Self }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the new be a trait level method so doesn't need to be defined on each source trait? Something for later but always bugged me having to define the new when its inferable from the Config type declartion.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I check about it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So actually if we put the new in the trait, we would have to do

impl From<(MyConfig, Filters)> for MySource {
    fn from((config, filters): (MyConfig, Filters)) -> Self {
        Self { config, filters }
    }
}

impl SourceTrait for MySource {
    type Config = MyConfig;

    async fn connect(...) -> Result<(), Error> { ... }
}

instead of now having

impl SourceTrait for MySource {
    type Config = MyConfig;

    fn new(config: Self::Config, filters: Filters) -> Self {
        Self { config, filters }
    }

    async fn connect(...) -> Result<(), Error> { ... }
}

which doesn't seem a big gain.

I guess we could add a derive macro (have to be careful on field order for the tuple conversion though).

use derive_more::From;

#[derive(Debug, From)]
pub struct YellowstoneGrpcSource {
    config: YellowstoneGrpcConfig,
    filters: Filters,
}

#[async_trait]
impl SourceTrait for YellowstoneGrpcSource {
    type Config = YellowstoneGrpcConfig;

    async fn connect(...) -> Result<(), VixenError> { ... }
}

Seems a lot of headache anyway (and breaking change existing impl of SourceTrait)...

@the-orex the-orex merged commit 6e5b6cd into main Mar 9, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants