Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve routed messages if closed actor is supposed to restart #132

Open
loyd opened this issue Jul 3, 2024 · 4 comments
Open

Preserve routed messages if closed actor is supposed to restart #132

loyd opened this issue Jul 3, 2024 · 4 comments
Labels
A-core Area: The elfo-core crate. C-enhancement Category: A PR with an enhancement or a proposed on in an issue.
Milestone

Comments

@loyd
Copy link
Collaborator

loyd commented Jul 3, 2024

No description provided.

@loyd loyd added C-enhancement Category: A PR with an enhancement or a proposed on in an issue. A-core Area: The elfo-core crate. labels Jul 3, 2024
@loyd loyd added this to the v0.2 milestone Jul 3, 2024
@qwfy
Copy link

qwfy commented Jan 16, 2025

Hi, thanks for the library, I tried several other actor systems (ractor and coerce), and found this one to be the best suited for my purpose.

I'm building a system with this library, and found that I need this feature for at-least-once delivery semantics, can you share some insights on this? (eg. what it is, what are the design considerations, how will it be implemented, will the mailbox survive operating system process restarts, things like that).

Curiously enough I used to work with Erlang and don't remember that I needed such feature.

@loyd
Copy link
Collaborator Author

loyd commented Jan 18, 2025

found this one to be the best suited for my purpose

Thank you for your interest in the project! Feel free to ask anything; I know that the documentation is the weak point of this project for now. And I hope you use the recent alpha releases because v0.1.0 is very outdated and limited by functionality.

The issue name is confusing. I'll try to explain what this issue is about. It's probably not what you are looking for, so please feel free to describe your use case, and I'll try to help you.


Let's consider the following example: GRPC clients want to subscribe to some data from a database with parameters. To reduce data transfer from the database, all subscriptions with the same parameters are handled by one specific streamer actor (for these parameters).
Image

However, it's pointless to transfer data if nobody is interested in data, so if all GRPC clients with specific parameters are disconnected, the specific streamer should be terminated. It can occur immediately if reWATCHing is cheap or after some time if not (for instance, if the streamer reads data not from some specific DB but from some distributed system with some long process of initializing distributed subscriptions).

So, every new GRPC call with specific parameters leads to sending SubscribeToX to the streamer group. A new one is started inside the group if no actor handles streams with such parameters.

The problem is some race conditions if the streamer has decided to terminate, but a new SubscribeToX is sent while the termination process has not ended. More specifically, an actor no longer reads incoming messages but does not complete the termination process, thus the actor key is occupied and a new actor cannot be started. In such a case, the grpc actor will get an error (if SubscribeToX is a request) and should retry a subscription request after some time (an additional boilerplate) or return an error to a client (an unnecessary error that is not the problem of the client).

The simplest solution is simply to detach the current actor on ctx.close() and allow to spawn a new one under the same key. However, it has two significant cons: it requires more boilerplate code in the actor (to handle remaining messages before closing), and it means that the telemetry for this key is produced by two actors at the same time, what's usually ok (because it's typical to aggregate metrics for very large actor groups), but it cannot be a default policy because of GAUGE metrics.

Another solution requiring no boilerplate code and much less error-prone is to "schedule" a new actor under this key right after the termination of the currently active one (which has decided to terminate) and let it handle that new SubscribeToX. It requires no additional code from the crate's users, which is pure QoL. This is what's I'm planning to do because don't see more interesting solution. Technically, it means that all envelopes get a special one-bit flag marking that it's "routed" message and then transferring them into the mailbox of a new actor.

@qwfy
Copy link

qwfy commented Jan 24, 2025

I know that the documentation is the weak point of this project for now.

Have nice documents would obviously be better, but generally a one sentence summary is enough if the function and data type names describes what they do or encode. We already have the example directory to get started, and already have a good idea on how the system work. Documentation is needed more where the function does something unexpected.

I picked the 0.2 version from the start, in a functional language, the code tends to be reliable even for alpha versions in my experience.


About the problem you described and the proposed solution, If I remember correctly, In erlangPid ! foo would still success if Pid is already dead. On the other hand some_register_name ! foo would fail if some_register_name is not registered. So there is a semantic difference between different ways of sending the message. So another solution I guess would be best-effort for cast and reply with error for call (the caller needs to handle error anyway - but in Erlang I think the error handling effort is mitigated by relying on the "let it crash" philosophy).


My problem is two folds, the first one is to provide a at least once delivery semantics to the consumers A: A polls messages from a data source (in my case, redis stream), and distribute it to the consumers B, the distribution is done using cast since the processing can take a long time (seconds) and I do not want to prevent A from distributing messages to other consumers, however this introduces a problem - B might fail (B also send the message to other actors C and C can also fail, transiently or permanently ), when that fails, I want to B to receive that message again, and if the previous error is a transient error, then hopefully the second time it would succeed. And if failed too much time, maybe there is an error counter associated with the message and after a few retries, the message is dropped (by A).

The second problem is to ack the message to A somehow, since redis does not delete the message from memory upon message delivery, so A has to delete it somehow, this requires some coordination between the consumers of the message (A, B, C).

I'm thinking starting a timer in A, and redeliver the message if a ack is not received in time, and drop the message if errors too much (so there is a counter associated with the message). The only PIA is I have to coordinate the ack in my code (maybe the library can send back an ack to A automatically upon the handler function returns Ok(_)? just brainstorming, not sure this is a good idea, e.g. what if the message if fan-out to multiple receivers?), and the only thing prevent me from losing track of it is carefulness.

Another approach is to implement some kind of persistence (Coerce has this - persistent mailbox), and restart the whole supervision tree if there is an error (maybe not this much brutal, but something like this)

I imagine the two problems would be somehow common to a system using asynchronous message passing, but maybe too much to implement it in a library, and best left it to the application layer. Include it as a design pattern in the book would be well-received by the community I suppose.

Anyhow, the reason I opened this issue is just looking for inspirations, so feel free to ignore it :)


I sincerely wish this project would succeed and have some serious adoptions. I miss the Erlang approach of doing things but the lack of static typing is a pain point and the ecosystem is not that good.

PS. your approach on defining the route in a topology.rs is really interesting, the libraries I see all use the send-to-an-address approach I think, it's very much like defining a static routing table, it makes me wonder do you have background in networking. It also solves the problem of "have no way of knowing the path the message took to get here, or no idea where it is originated. (when the application get complicated)".

@loyd
Copy link
Collaborator Author

loyd commented Feb 2, 2025

@qwfy Thanks for your thoughts! You mentioned common and essential issues in message-passing systems. On the rights of "just brainstorming" (so feel free to ignore it), I would like to share my opinion and the principles I use to design such systems, how elfo helps now, and what should be added to improve the situation further.

Firstly, I'm convinced that reliable systems shouldn't use regular messaging at all, except in some rare cases. Instead, any communication should use either request-response or subscriptions.

Requests

Everything is evident with the requests. Most popular cases are fetching historical data or asking the data owner to change it. Elfo provides first-class support for requests and guarantees that either the request will be successfully delivered and handled or an error will be returned. Of course, the responder actor can freeze (e.g., an infinite loop), but it's another topic. Note that if a failed request is repeated, it gives at-least-once semantic because failure can happen after responding, but before getting a response (e.g. the network connection fails before delivering a response).

However, elfo doesn't support "non-waiting" requests directly, so the actor should call ctx.request(..).resolve().await, which "blocks" the actor, meaning it cannot handle other messages. It's possible to use ctx.attach(Stream::once(async { ctx.request(..).resolve().await })) to emulate "non-waiting" requests but it's not convenient and error-prone. We're going to provide first-class support #91 (btw, many preparatory works have already been done for this, but some questions according to API design left. Also, the example in the issue is slightly outdated).

Subscriptions

A subscription is a long-term virtual connection between actors. Both peers should know when this connection is broken so they can recover (e.g., renew a subscription).

Initially, we implemented subscriptions by making an opening request (e.g., SubscribeToX { some params } if a subscriber initializes a subscription, but it can also be a "push" subscription when a producer initializes a subscription). Then, a producer sends messages back to the subscriber using the envelope.sender() of the initial request. The producer eventually knows about failure if the next ctx.(try_)send_to(addr, msg) fails. To notify subscribers about the producer's failures, we used a special guard that performs ctx.unbounded_send_to(subscriber_addr, Unsubscribed) in impl Drop. This approach works relatively well inside one node, but if the subscriber and producer are located on different nodes, there is a problem: it's possible that Unsubscribed won't be delivered (e.g., if the TCP/UDS connection is down).

Now, we have a dedicated (internal) library called "elfo-subs", which implements such virtual connections by using never-ending requests (so, the response token is held until an actor dies or explicitly unsubscribed) and provides API to make some sort of ctx.subscribe(|sub_id| SubscribeToX { sub_id, .. }) where closure is called, and the created requests are routed when the subscription is failed (obviously with backoff). Thus, this is a more reliable way to detect when the virtual connection is broken.

However, never-ending requests feel like a hack, so we're going to introduce linking mechanics #113 (see a sketch there; that's not the final API but very close to it). Note: "link" differs from the similar term in erlang because links must be attached to some requests (to perform routing and to avoid some races). This is actually the most wanted feature for elfo v0.2.

Ideally, elfo should provide not only links to implement subscriptions but also provide them as a first-class feature. However, it's unclear now which API should be provided, so I will show (as examples and descriptions in the actoromicon) subscriptions as a pattern based on links. And later (v0.3), think about providing more high-level mechanics.

Types of subscriptions

I like to separate two types of subscriptions: log subs and snapshot subs.

Snapshot subs

Here, the subscriber is only interested in the newest state.

Such subscriptions are implemented by sending a snapshot in the first response and then sending incremental updates:

#[message(ret = BalancesReport)]
struct SubscribeToBalances;

#[message]
struct BalancesReport {
    entities: Vec<Balance>,
    is_snapshot: bool,
}

This is the simplest type of subscription in terms of reliability. If the producer detects an error, he drops subscriptions (because a subscriber will re-request again later). If the subscriber detects an error, he drops his state and tries to resubscribe (obviously with some backoff).

Log subs

Here the subscriber wants to observe all events, so the recovery process should retransmit events on failures.
As you noted, it means that events should be stored either in the persistent storage or in RAM with some limits (btw, akka provides "topic" actors partially for this purpose). And, yes, consumers should send acks to the producer. Either on the timer's tick or in case of some logical event (for instance, on successful INSERT into the destination database).

Usually, this pattern requires adding some sequence_no to messages and some sort of ack:

#[message(ret = TradesReport)] // consumer -> producer
struct SubscribeToTrades {
    sequence_no: SequenceNo,
}

#[message] // producer -> consumer
struct TradesReport {
    entities: Vec<Balance>,
    sequence_no: SequenceNo,
}

#[message] // consumer -> producer
struct TradesAck(SequenceNo);

We have both cases: when we want to persistent events (where the destination is the database) on disks and when we want to store them in memory until some threshold is reached, and we'll decide this is an emergency and have another way to repair subscribers.

For the persistent case, we have the "keepers" and "collectors" actor groups where keepers are located on essential nodes and implement persistence logic and such a subscription with collectors located on different nodes and inserting events into the database.

Routing

The topology's main point is route requests (one-shot requests and subscription requests). We don't know the exact address (it can be some local actor, remote, or both of them in some cases). So, we perform inter-group routing. Because it can often occur in the case of regular requests (resubscriptions are rarer), the routing should be fast, so it has first-class support for stateless routers in elfo.

So, the routing allows the implementation of loosely coupling groups. Another solution is, obviously, actor locators, but I think that it's a less flexible approach and more error-prone. However, this comment is already too long to explain my points. Anyway, loose coupling is essential to recover after failures by locating (or starting) a new producer.

The current implementation is fine for local nodes, but if groups are located on different nodes, it's not because, in this case, providing "NodeNo" is required (or Outcome::Broadcast). To improve the situation here, I'm going to introduce "tags" to choose nodes more precisely based on their tags (either populated via configuration globally or in runtime) #134.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-core Area: The elfo-core crate. C-enhancement Category: A PR with an enhancement or a proposed on in an issue.
Projects
None yet
Development

No branches or pull requests

2 participants