Skip to content

Control.Distributed.Platform.Async #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
hyperthunk opened this issue Nov 9, 2012 · 26 comments
Closed

Control.Distributed.Platform.Async #8

hyperthunk opened this issue Nov 9, 2012 · 26 comments

Comments

@hyperthunk
Copy link
Member

Updated title to reflect name change

@hyperthunk
Copy link
Member Author

Some of the features here are not dissimilar, which might be worth looking to for inspiration.

@edsko
Copy link
Member

edsko commented Dec 5, 2012

Also have a look at the Haskell async library (the talk about the design of this library is well worth watching).

@rodlogic
Copy link

I have added preliminary support for async calls:

rodlogic@089681d

In a nutshell:

-- | Async data type
data Async a = Async MonitorRef (MVar a)

-- | Sync call to a server
call :: (Serializable rq, Show rq, Serializable rs, Show rs) => ServerId -> Timeout -> rq -> Process rs
call sid timeout rq = do
  a1 <- callAsync sid rq
  waitTimeout a1 timeout

-- | Async call to a server
callAsync :: (Serializable rq, Show rq, Serializable rs, Show rs) => ServerId -> rq -> Process (Async rs)
callAsync sid rq = do
    cid <- getSelfPid
    ref <- monitor sid
    --say $ "Calling server " ++ show cid ++ " - " ++ show rq
    send sid (CallMessage cid rq)
    respMVar <- liftIO newEmptyMVar
    return $ Async ref respMVar

-- | Wait for the call response
wait :: (Serializable a, Show a) => Async a -> Process a
wait a = waitTimeout a Infinity

-- | Wait for the call response given a timeout
waitTimeout :: (Serializable a, Show a) => Async a -> Timeout -> Process a
waitTimeout (Async ref respMVar) timeout =
  let
    receive to = case to of
        Infinity -> do
          resp <- receiveWait matches
          return $ Just resp
        Timeout t -> receiveTimeout (intervalToMs t) matches
    matches = [
      match return,
      match (\(ProcessMonitorNotification _ _ reason) -> do
        mayResp <- receiveTimeout 0 [match return]
        case mayResp of
          Just resp -> return resp
          Nothing -> error $ "Server died: " ++ show reason)]
  in do
    respM <- liftIO $ tryTakeMVar respMVar
    case respM of
      Just resp -> return resp
      Nothing -> do
        respM <- finally (receive timeout) (unmonitor ref)
        case respM of
          Just resp -> do
            liftIO $ putMVar respMVar resp
            return resp
          Nothing -> error "Response-receive timeout"

There is one pending issue with Async and receiveWait/receiveTimeout: the waitTimeout in the call works but not when calling callAsync and then waitTimeout separately: the matching is not working.

@hyperthunk
Copy link
Member Author

This is totally cool, I love it. I'll be keen to use this internally (for stuff like gen server) when you've ironed out the kinks. Brilliant.

@hyperthunk
Copy link
Member Author

@rodlogic - update on this: I would like to get the async functionality generalised and moved into Control.Distributed.Platform.Async so it can be used independently from GenServer (and therefore by the GenServer implementation).

Does that seem reasonable to you?

@rodlogic
Copy link

Definitely! I had the same thought regarding Async.hs, but decided to get an implementation working first. This has been a bit of my approach here in generalI: i.e. am taking small incremental steps and then refactoring.

I am also fully aware that the implementations I am cranking out could be way shorter if I were to reuse more of the existing Haskell functions to avoid, for instance, nested case expressions etc.

@rodlogic
Copy link

fyi: the current Async implementation is not yet complete besides the wait/waitTimeout issue mentioned above. Basically, it is not correlating a response with the corresponding request. Instead of implementing this in the Async module it seems reasonable to leverage CH channels to do the correlation work for us.

@hyperthunk
Copy link
Member Author

I have done a bug of experimental refactoring that I'd like to discuss, as there are a couple of issues in the current implementation (completeness notwithstanding) so I'll push to my personal fork later on so we can talk about them. I still haven't got my access rights back since the repository was transferred to the haskell-distributed organisation.

@rodlogic
Copy link

@hyperthunk Sure. I know you are pushing for an alpha release, but it seems that we may need quite a few iterations before that specially with respect to the overall code structure, organization and reuse.

Regarding the use of channels for async responses, there is one caveat: monitoring notifications come in the process inbox and the fact that we can't receive on both (unless we resort to spawning threads and the complexity that arises with it) is a bit problematic reducing the usefulness of channels in situations like this.

This seems to be a broader question of how to use channels together with process messages and monitoring notifications and a question better answered by CH.

@hyperthunk
Copy link
Member Author

@rodlogic

I know you are pushing for an alpha release, but it seems that we may need quite a few iterations before that specially with respect to the overall code structure, organization and reuse.

Yes you're right about that. Probably we'll see another few hundred commits before 0.1.0.

This seems to be a broader question of how to use channels together with process messages and monitoring notifications and a question better answered by CH.

I've had some thoughts about this and will post them shortly along with a link to the experimenting I've been doing.

@hyperthunk
Copy link
Member Author

Ok. I've pushed a branch with some refactoring ideas. I'll go through and explain them in this thread when I've got a bit more time. There's currently a silly clause in the waitTimeout implementation which will overflow the stack in a hurry - that's quite easy to fix.

The change btw were not stylistic in nature and were motivated by specific things viz API consistency with the other async package but more importantly the need to prevent stray (unexpected) messages being sent to unsuspecting code. Anyway like I said I'll write up what was going through my mind and we can go through it and see which bits actually make sense.

@hyperthunk
Copy link
Member Author

Oh and I should probably point out that I deliberately reverted the gen server changes that were using async. That was just a temporary hack to avoid dealing with lots of compilation breakage. In reality this branch is a total experiment (and tool for discussion) so even if we do use some of its code, we will patch that back into a feature branch off development anyway.

@rodlogic
Copy link

@hyperthunk This is an interesting alternative.

First, you are spawning a child process to receive the call's response and monitoring notifications (a good reminder to me that threads are cheap in Haskell/Erlang and can be used in interesting ways) and solving the correlation problem this way.

Second, you created a richer protocol between the client and the async process with AsyncResult a. So that the the client can react properly based on what happened.

Finally, I see how the async child process is also responsible for the applying the SpawnAsync function instead of applying that in the client process (I guess my initial implementation was trying to solve the hole async problem using the same client process inbox).

I didn't understand, though, why there is the notion of a wpid (worker process id?) in addition to the gpid (gathered process id) and also why the SpawnAsync function is returning the wpid. Does the design assumes that SpawnAsync will spawn another CH process (e.g. cancel is cancelling wpid)?

@hyperthunk
Copy link
Member Author

First, you are spawning a child process to receive the call's response and monitoring notifications (a good reminder to me that threads are cheap in Haskell/Erlang and can be used in interesting ways) and solving the correlation problem this way.

Yes. This approach is idiomatic Erlang. :)

Second, you created a richer protocol between the client and the async process with AsyncResult a. So that the the client can react properly based on what happened.

That was really inspired by the .NET async APIs, and Java's Future/Promise to some extent.

I didn't understand, though, why there is the notion of a wpid (worker process id?) in addition to the gpid (gathered process id) and also why the SpawnAsync function is returning the wpid. Does the design assumes that SpawnAsync will spawn another CH process (e.g. cancel is cancelling wpid)?

Yes that's right, the SpawnAsync function actually goes off an creates a new worker process. I'll change the variable name as wpid is obviously a bit too pithy to be transparent to the reader.

The advantage of letting the caller provide the spawn function that creates the worker is that you can spawn a worker on any node you like, but the gatherer (which uses the MVar to stash the final result) is localised. The other reason for this is that if we just ran the process then it wouldn't necessarily execute asynchronously, as code encapsulated in the Process monad, when run in the same context doesn't necessarily have to do anything asynchronous. The fact that our first use case was async $ send pid msg made it seem thus, because sending is asynchronous by nature in CH but that could've been some blocking operation. For example, in the original implementation there wasn't anything to prevent a call such as this:

getValue mvar = async $ do
    val <- liftIO $ takeMVar mvar
    return val    

The call to async makes the reader think that the takeMVar call will execute on another thread, but in practise it would have just been evaluated as an IO action (or in our case, an action in the Process monad). So ostensibly, I think that forcing the asynchronous action to take place in another thread by making the user supply it as a spawn function makes sense.

also why the SpawnAsync function is returning the wpid. Does the design assumes that SpawnAsync will spawn another CH process (e.g. cancel is cancelling wpid)?

This is exactly right. My reasoning worked thus:

  • we need both the worker and the listener/proxy pids so we can cancel
  • the listener needs the worker pid so it can monitor it on demand
  • the worker needs the listener's pid so that it knows where to reply to once it is finished
  • because of the previous point, the SpawnAsync type needs to take the listeners pid as its first input
  • we don't have the listeners pid until after it is spawned, so to avoid writing some complex startup protocol I just spawn the worker from inside the listener!
  • the listener is spawned in the caller's thread, and uses the caller's pid to send back the worker's pid after it is spawned

I avoided using linking as that imposes a policy, which is better described by the user than the library API itself.

I was thinking of adding to this...

  • some sugar in the API to reduce boilerplace
  • additional utilities a la Simon Marlow's async package (STM support, deliberate races, etc)
  • support for using typed channels instead of MVar to get the response (these support timeouts natively)

But before I do that, it is time to fix the stack overflow in waitTimeout and then write up some serious tests. I do think this API will be useful in gen server and other places. I might refactor the Timer module to use it as well, if doing so improves the code.

@hyperthunk
Copy link
Member Author

On another note, after reading through Simon M's code in detail, I've noticed that he's in-lining in a few select places (that's less of a concern for us as most of our functions are likely too long to benefit) but he's also forcing strict evaluation in a few places too. I must admit that my knowledge of optimising code for the GHC runtime is somewhat limited, and I do worry that we will have a tough time making sure we're time and space efficient enough to be useful in a production environment. Hopefully at some point in the future, the other folks around the CH may have a bit of time to dip into that and point us in the right direction.

@hyperthunk
Copy link
Member Author

@rodlogic - my latest commit makes a sane implementation of waitTimeout I think. If I put some test cases in place around this, how do you feel about moving this into a feature branch with a view to merging it?

@rodlogic
Copy link

I like the changes: it generalizes it quite a bit. I was initially considering a single use-case when I extracted the first Async implementation: separating the asynchronous reply from GenServer's synchronous call. This new implementation makes it much more general. The only slight concern is what kind of overhead this adds when thinking about GenServer's call usage, but let that become a clear problem, if at all.

getValue mvar = async $ do
    val <- liftIO $ takeMVar mvar
    return val

The call to async makes the reader think that the takeMVar call will execute on another thread...

Yes, I see how the initial async implementation was misleading in this regard. I was reading async above as 'get the reply/response asynchronously' only (very focused on GenServer's call semantics) and not 'execute this computation (locally or remotely) asynchrnously and get the reply/response'.

Back to SpawnAsync, although I see it as a very flexible mechanism, I am wondering if it is leaking a bit specially wrt wpid and gpid. Wouldnt is be tighter to have something like:

async :: (Serializable a) => Process () -> Process (Async a)
async proc = ...
   wpid <- spawnProcess localNode proc
   ...

Where async is responsible for spawning (or not) a local or remote Process passed as an argument? This way wpid and gpid are completely internal concepts and don't leak to the outside. The client code's responsibility is just to provide the computation that is not tied to Async in any way. The only question then is to spawn or not to spawn, but that could be a Bool parameter or an extra variant of the async function.

@rodlogic - my latest commit makes a sane implementation of waitTimeout I think. If I put some test cases in place around this, how do you feel about moving this into a feature branch with a view to merging it?

Sounds good! I can help integrating this into the GenServer, if you want.

@rodlogic
Copy link

@hyperthunk I think the improvements to Async are a good step in the right direction and we should move forward with it. I have, though, a few comments for consideration regarding how to better integrate 'client' operations within server handler's. First to make sure the thinking at least make sense and if so consider a possible solution, if one is needed.

Right now, a GenServer is a collection of handlers, or Behavior's as you nicely named in GenProcess, that are served by a single CH receive loop. When these Behavior's need to interact with other servers or processes, they act as a client. Before Async, that was done through a synchronous call, which would block the current Behavior and, in fact, any other Behavior as the main receive loop isn't in the picture. The Async module improves this by allowing the current Behavior to perform asynchronous calls and choosing when to block waiting for the response(s), but this is done outside of the main receive loop. This gives the ability to perform calls concurrently instead of sequentially, or even to store the Async data structure somewhere so that the response could be received at a later point in time.

Based on this, here is one observation:

  • Should we have an Async API to wait/poll for a list of Async's. I am assuming you also have this in mind in subsequent Async iterations.

Now, the other observation is about the main receive loop vs the Async receive loop:

  • Should the wait/poll receive be integrated with the main receive loop?
    I think the answer could be a yes or a no depending on the Server's implementation. I.e. In some cases, the fact that Behavior 1 is blocked waiting for a response, means that a message for Behavior 2 should not execute. In other cases, Behavior 1 and Behavior 2 should be able to execute concurrently, which would/could increasing the concurrency. And in another case, Behavior 1 for Id1 is blocked, but Behavior 1 for Id2 should be able to execute concurrently as the main invariant here is to make sure messages are sequential for state identified by Id1 or Id2.
    There are different ways we can go about this, but I am curious how this is handled in Erlang. The simplest would be to map one ID to one Process so that concurrency across IDs is handled by CH (1 ID vs 1 Process). Another extreme would be to handle state within a single Process complicating the GenServers implementation but possibly simplifying other parts of the system (M IDs x 1 Process). There could also be a middle ground where a range of IDs are mapped to a single Process (M IDS x N Processes). How is the notion of ID and state handled in Erlang?

@hyperthunk
Copy link
Member Author

Hi @rodlogic - thanks for taking a look at this.

Back to SpawnAsync, although I see it as a very flexible mechanism, I am wondering if it is leaking a bit specially wrt > wpid and gpid.

Well, the Async constructor isn't exported but yes, you're probably right. Hmn.

Wouldnt is be tighter to have something like:

async :: (Serializable a) => Process () -> Process (Async a)`

Yes I can see the argument for doing that, but...

The only question then is to spawn or not to spawn, but that could be a Bool parameter or an extra variant of the async function.

That doesn't work, because if you don't spawn then the computation isn't asynchronous at all. However...

This way wpid and gpid are completely internal concepts and don't leak to the outside. The client code's responsibility is just to provide the computation that is not tied to Async in any way.

That is a very good point and would make for a much nicer API design. I'll add that shortly.

Should we have an Async API to wait/poll for a list of Async's. I am assuming you also have this in mind in subsequent Async iterations.

Absolutely. I'm going to add that in once I've managed to get the tests passing. :)

Now, with regards the gen server design, I think we should probably move the discussion over to issue #4, but I'll cover the basics here to get us started.

How is the notion of ID and state handled in Erlang?

I'm going to stick to this question for now. Erlang's gen server is implemented as a single process (the server) that receives messages serially and defers to a user defined behaviour/module to process them. The result of that processing can do one of several things, possibly at the same time

  • alter the state of the process/server
  • cause the process/server to interact with another process (e.g., by message passing)
  • cause the process/server to exit

The call/cast abstraction is just a bit of protocol layering on top of that. Because the gen server is implemented as a single process, it can only take messages from its mailbox one at a time. When the server/process loop calls the user defined behaviours, it does so sequentially. This leads to two important characteristics:

  1. a gen server can only service one client at a time
  2. a gen server can only be executing one behaviour (callback function) at a time

These characteristics are something that Erlang programmers rely heavily upon whilst using the gen server, and they prove quite useful when reasoning about one's code.

Now the gen_server:cast/2 API simply allows the client to avoid dealing with a reply from the server. The server/process can nonetheless end up blocking for an arbitrary period of time whilst handling the cast request, even though the client doesn't know (or care) about this.

So how do I think this should work for us in Haskell? Here's my opinion, in two parts:

1. If in doubt, do what Erlang does ;)

I think that our gen server should work very much the same way as the Erlang/OTP gen server. This is a battle tested, weather worn design that has stood the test of time and is easy to program.

The role of the Async module in our gen server design, is probably just to provide a simple way to implement the call :: (Serializable a) => ServerId -> Process a API without having to re-implement the whole block until I see a result and maybe timeout if I say so protocol.

I don't think that our async API has much of a role to play in the server process' implementation, although obviously the person writing a gen server can easily use this API to implement non-blocking themselves. Here's a typical example provided in both Erlang and Haskell - I think our CH version is much prettier personally.

%% the client API - this *looks* to the caller as though we're
%% making a blocking call, even though it is non-blocking in the server

run_command(Server, Cmd) ->
    ok = gen_server:cast(Server, {execute, Cmd}),
    receive
        {done, Result} ->
            Result;
        {failed, Reason} ->
            {error, Reason}
    end.

%% and somewhere in the gen server callback module

handle_call({execute, Cmd}, From, State) ->
    Self = self(),
    {MRef, Pid} = spawn_monitor(
        fun() -> Result = command_executor:execute(Cmd), Self ! {finished, MRef, Result} end),
    %% do *not* block the server loop, nor reply to the client
    {noreply, add_client(MRef, From, State)};
%% .... snip

handle_info(Result={finished, _, _}, State) ->
    %% a task has finished
    reply(Result, State),
    {noreply, remove_client(MRef, State)};
handle_info({'DOWN', MRef, process, _, normal}, State) ->
    case lookup_client(MRef, State) of
        undefined ->
            %% we've already replied - just drop this message
            ok;
        Client ->
            %% oh dear, we've not seen a reply for this job!
            receive
                {finished, _, _}=R -> reply(R, State);
            after 0 ->
                gen_server:reply(Client, {failed, unknown})
            end
    end,
    {noreply, remove_client(MRef, State)};
handle_info({'DOWN', MRef, process, _, Reason}, State) ->
    Client = lookup_client(MRef, State),
    gen_server:reply(Client, {failed, Reason}),
    {noreply, remove_client(MRef, State)}.

reply({finished, MRef, Result}, State) ->
    Client = lookup_client(MRef, State),
    gen_server:reply(Client, {done, Result}).

Versus a CH version....

type TaskID
data TaskFailed = Reason String | Cancelled

submitTask :: ServerId -> Task -> Process TaskID
submitTask server task = call server $ Submit task -- returns jobId

waitForTask (Serializable a) => ServerId -> TaskID -> Process (Either a TaskFailed)
waitForTask server taskId = call server $ WaitFor taskId

-- and on the server side, as it were

handleTaskSubmitted task = getState >>= startWorker task >>= reply
  where
    stateWorker task state = do
        hAsync <- async $ task 
        (taskId, state') <- addWorker state hAsync
        putState state'
        return taskId

handleWaitForJob jobId =
    s <- getState
    hAsync <- lookupJob s jobId
    r <- wait hAsync
    case r of
        AsyncFailed r -> reply TaskFailed (show r)
        AsyncCancelled -> reply Cancelled
        AsyncDone  a -> reply a

2. On the other hand, don't limit ourselves to what Erlang/OTP provides

Just because our gen server follows the pattern of OTP, doesn't mean the points you've raised aren't valid. Associating tasks with simultaneous clients is an important issue and the asynchronous bed-fellow of the reactor pattern is probably a very important abstraction.

There are other important abstractions that we should implement in the platform layer, and I will create tickets for each of these. One classic abstraction, for example is that of composable asynchronous tasks or higher order channels. Keep your eyes peeled on the issue tracker and by all means add your own concepts as tasks that we can plan and discuss!

@hyperthunk
Copy link
Member Author

Another possibility is that Async might be better implemented with typed channels instead of using an MVar to deal with blocking on the result.

This implementation would basically set up a channel and put the SendPort used for replying into the gatherer. The big advantage of this approach is that it would greatly simplify the implementation of wait and waitTimeout as we could simply defer to receiveChanTimeout and receiveChan instead of looping ourselves. We would not want to dispense with the gatherer process though, as this would remove our monitoring support which can be used to detect failures in async tasks.

@hyperthunk
Copy link
Member Author

Another possibility is that Async might be better implemented with typed channels instead of using an MVar to deal with blocking on the result.

I have pushed just that change set. It is much shorter and cleaner code, however it's far from perfect. In particular it still needs some kind of API for spawning the task on a remote node.

@rodlogic
Copy link

In particular it still needs some kind of API for spawning the task on a remote node.

Isn't that already supported by CH:

spawn :: NodeId -> Closure (Process ()) -> Process ProcessId

The K-means seems again a good reference: http://www.well-typed.com/blog/74

@rodlogic
Copy link

Another possibility is that Async might be better implemented with typed channels instead of using an MVar to deal with blocking on the result.

Make sense. I did experiment with this in the first Async iteration but the fact that we couldnt efficiently receive channel and process (for notifications) messages was a bit problematic. Maybe this is no longer the case now that we are spawning a child/worker process, but, for what it is worth, see this commit by Simon Marlow:

haskell-distributed/distributed-process@847abf4

@rodlogic
Copy link

@hyperthunk Thanks for the detailed explanation of how Erlang handles this. It is quite helpful and puts the discussion in firm grounds.

The way I am 'reading' the Async module is that it is basically the async package by Simon Marlow but built on top of CH and so fully distributed. Maybe it could even be part of CH directly.

@hyperthunk
Copy link
Member Author

Hi @rodlogic - if you look at the latest changes to that branch you'll see that I've got asyn working using typed channels now. Locally I have passing test cases for all the API calls and will
Push them once I'm out from behind the firewall. :)

@hyperthunk
Copy link
Member Author

This is coming along nicely now. There are two variants: channel based and STM based, where the latter is useful if you need to 'wait' or 'poll' on the asyncHandle from outside the process which created it. Neither module supports sending the async handle to another process - that kind of thing will be supported by the 'Task' API though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants