-
Notifications
You must be signed in to change notification settings - Fork 17
GenServer #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Continuing over here since it is purely genserver related... How do I register a process name given I have a SendPort? Is there an easy way to extract the ProcessId from it? I also could not find a timeout version of receiveChan. |
Yes,
This is available in HEAD. |
I am not 100% if this is an issue with the prototype code I have or an issue with spawnChannelLocal. Basically, spawnChannelLocal returns a SendPort a, but spawnChannelLocal is called from GenServer which is a step removed from the concrete types in Counter? Is this something we will have to defer to the Server callbacks too to make sure it is fully typed? This is the compilation error in line 132 of GenServer.hs (https://github.com/rodlogic/distributed-process-platform/blob/master/src/Control/Distributed/Platform/GenServer.hs).
@hyperthunk I am committing my experiments in the above fork, btw. Still putting my fingers on the different areas of distributed-process. |
@edsko Is there a way to register the SendPort by name instead? If the server is using channels, the pid is a bit useless. |
No. You could add your own registry for sendports if you wish. |
No, that's not the issue; the logic in your code doesn't quite add up. On line 123 you ask for a message from the channel taht containts both the request and a process ID, but yet you claim in the type of the function that the channel only carries requests. |
@rodlogic - I like these ideas and think we should start to collaboratively work on GenServer now. I have some feedback I'd like to make so once you've got this compiling, please send me a pull request so I can comment on individual lines of code in the diff. Once we're merged I'll start writing some HUnit tests for it and we can split up some more fine grained tasks to work on individually. I quite like the setup you've got now, bar some stylistic questions, especially the use of temporary channels which is really nice. Most of the things I'd like to suggest are renaming, and a bit of splitting up 'where' into separate functions. Some things we can add quite quickly once we've merged this (apart from tests!) include: Tagging internal messagesOTP's gen_server differentiates between messages sent via call/cast and 'info' messages by sending the former in a tagged tuple type ReplyTo = ReceivePort
data CallRequest = CallReq Serialisable ReplyTo
data CastRequest = CastReq Serialisable and instead of receiveWait [ match (\(CallReq m replyTo) -> handleCall server m replyTo)
, match (\(CastReq m) -> handleCast server m)
, match (\(Serializable m) -> handleInfo server m)
] When the callbacks return a An explicit asynchronous call APIAs well as providing a |
@hyperthunk Sounds good. Let me get it into a running state again and then I'll send a pull request. I have paid no attention to the code structure so far and was more focused on the actual call interaction with expect/channels/etc. I am/was expecting we'll iterate this and refactor the code as necessary. Tagging internal messagesIf we go the typed-channels route, which imo we should unless we have specific reason not to, we will have to use receiveChan instead of expect/receiveWait and to handle multiple types of messages we will have to "merge" the different ports with mergePort{RR,Biased}. Aside from the order in which the ports are selected, I am assuming that the underlying semantics is the same as receiveWait's with multiple matches. Is that so? Now, what about info messages? They seem to be used for 'all the other kinds of messages', but what kind of messages are expected there in Erlang? The gen_server.erl has simple examples like: handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ... And from the documentation:
Are there other specific examples of how info messages are used in Erlang? An explicit asynchronous call APIDefinitely. In the Java world is common to see APIs like:
And the Future interface, which I think is what you are more or less referring to with the difference that it hide the internals from the client process.
I am sure Haskell has similar patterns. In it's simplest form, this Future value (the reply) will wrap the reply's receive port completely from the call. The only limitation here is that it won't be possible to send future values to other processes, but unless we have a clear and valuable use case it is not worth solving that. Another point for us to consider and somewhat related to the async API for calls, is the handling of deferred reply's by the server process, which in Erlang I think happens when you return {noreply, State} from the handle_call callback. It would be nice to have a dual of Future that would represent the promise to send a typed reply back by any process and only once since there is a caller waiting for this reply. The server process's implementation could decide to send this promise to another process, which would then finally reply to the caller. For instance, instead of something like:
it would look like:
where Promise would be a simple function/API such as:
The server implemenation would have the option of simply calling serverReply and returning CallOk or sending the Promise to another process and returning CallDeferred. The promise wrapper will guarantee that serverReply can be called only once. |
Created a pull request: https://github.com/hyperthunk/distributed-process-platform/pull/5 |
@rodlogic good, I think we're on the same track here. Some thoughts...
I do like the typed channels route, but it kind of kills off the handle_info concept, which is for handling 'unexpected' messages or (in other words) messages that aren't a primary part of this particular server's protocol, but are possible such as system messages (for dynamic debugging) and monitor signals and so on. Also, we can't monitor other processes very easily in this way, because monitor signals come via expect/receiveWait rather than on channels. Basically the typical use of handle_info in OTP is to deal with processes that are interested in other processes, though perhaps not supervising them. Consider this example from RabbitMQ: handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason},
State = #state{monitors = Monitors}) ->
rabbit_log:info("rabbit on node ~p down~n", [Node]),
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}),
ok = handle_dead_rabbit(Node),
{noreply, State#state{monitors = pmon:erase({rabbit, Node}, Monitors)}}; This is a classic example of receiving a monitor notification and doing some hidden internal state change in response without replying. Personally, I think we should actually have two kinds of gen-server: one using channels and one using bare messages. They can probably share a lot of infrastructure, and you'll pick the bare messages one only if you need to do things like monitor and/or link handling. Killing Processes (also needed for supervision)Unless I'm really missing something, there doesn't appear to be a corollary to One way to do this is to pass some The point is partly that we should prioritize 'shutdown' messages. If the Channel based GenServer is going to support this, then need to check we understand the semantics of FuturesI like the things you've said about this: There is a similar set of concepts in the .NET world (BeginInvoke, EndInvoke).
Yes the 'reply later' option is very useful. I do wonder whether this will work more cleanly for a Channels based server API wise though. Another thing Promises/Futures enable is the ability to construct higher order channels, dealing with things like delegation and proxying, which is very very useful (and important!) in building fault tolerant systems. One of the key things about OTP's gen_server is that the implementation is no longer in control of their own mailbox. This can be vitally important. Consider this wrapping code, for example, which deals with the common situation where you need fine grained control over a process that might get into a blocking state very easily - in this case we're managing an external resource (like an OS process) that could take forever to respond, but we must be able to interact with the wrapper in a timely fashion if we need to shut it down: https://github.com/nebularis/systest/blob/resource2/src/systest_resource.erl Insulating yourself and providing middle-man processes to make sure that you can avoid deadlocks is vital in a complex actor based system, and I suspect doing that purely with Channels might be hard, though I'm willing to be persuaded otherwise. |
@rodlogic thanks that's perfect - I'll try and give it proper attention over the next day or so and hopefully we can get it merged over the weekend. |
Now that we have some sort of a starting point, I was pondering a bit about the direction we are taking here and wondering if the GenServer design could be even simpler by leveraging more of Haskell's strengths. Barred some of the limitations of the current design, which we will fix over many iterations, can we simplify even more what someone building a server has to implement? Can we make this 'design survace area' even smaller? I think it should be dead simple to create a server and the type system should keep me honest even if that means a bit more complexity under the covers. What is a server, really? A server is nothing more than a Process with an id and one or more channels of interaction (in a more abstract sense). These channels are either a call (request/response) or a cast (one-way) and a client may choose to use a call channel synchronously or asynchronously, and a server may choose to 'reply later', delegate the reply to another server/process, or just reply right there. I am ignoring info messages here. So what could be better in the "Counter" example from the point of view of who is designing and implementing it? Having to implement CounterRequest and CounterResponse smells like cruft to me and somewhat error prone. It would be nice if I could design Counter with mostly functions and have the rest inferred automatically somehow. For instance, the count service in the Counter server could be just: count :: () -> Int This is already specifying that there is a Request () and a Response Int. Could this Request and Response message be automatically generated for us? Isn't this was Closure in CloudHaskell is proposing/implementing? In addition, it would be great if the following could also be automatically derived from the above definition (and implementation): countAsync :: () -> Future Int And something like the following for the handler type: countHandler :: () -> Promise Int -> Maybe Int |
@rodlogic I like these ideas in general, though obviously we'll need to understand what the plumbing looks like before we can generate any of it. :) As the input to any GenServer callback function must be an instance of And yes, the return type should tell us whether we're dealing with a future/promise or an immediate reply. Of course there are servers that never reply, dealing only with casts and casts in general would need to be handled a little differently, but some indication that we're not replying shouldn't be too hard to dream up. Of course we always need to deal with timeouts as well, so even if we do avoid making the implementation return a data type indicating whether they're replying or not, we will need to somehow deal with timeouts, hibernation and so on. Ideally we should be able to do this with a type, though I'm struggling to see how we'd wrap the return type (whether it is Int, Future Int or Promise Int) without forcing the author to do something in their code. I suppose, and I shudder to suggest this, that we can have timeout/hibernate throw an exception containing the required span, but that feels really clunky and I'm sure there's a better way. One thing though:
Yes, and I am holding to my point about info messages, which is that we probably need a non-channel based option for people who do care about info messages coming from monitors. |
Guys, I'm getting a bit stuck with this. I do not want to specify the exact input type for my gen server, as this means that it can only deal with one kind of input message. Honestly, what use is that for writing a supervisor, that needs to handle the following instructions:
So how can we write a generic process that accepts all these different types of input messages and handles them uniformly by evaluating a callback function? The callback can be written in terms of generic inputs, for example handleCall :: (Serializable m) => m -> ProcessId -> Process () We need to handle these 5 different input types now, so how does a record with 1 input type help us at all? As I've said in distributed-process issue 71 this seems to completely defeat the purpose of having Now I completely understand why this is the case - how on earth is the type system supposed to guess what we mean if we don't specify the types? I don't know if opening up Having a gen server that accepts just one type of input is fine, if all you want to do is centralize the error handling, timeouts and so on. If that's what we want to do, then the gen server infrastructure might be overkill. How I arrived at this....What I tried (over the weekend) to get this working was several things. First of all, I tried to stop limiting the input domain of the handleX functions to a specific type. As long as we don't mind not being able to use record accessors we can match out the handleX functions we need... -- we can *try* using existential types for this....
data Server s = forall m. (Serializable m) => Server {
init :: .....
handleCall :: m -> state -> ProcessAction s
state :: s
}
handleRequest :: (Serializable m) => Server s -> m -> ProcessAction s
handleRequest Server{ handleCall = hc } = hc
But is So @edsko my question is, taking the following, which compiles (therefore it must work!) ;) are we on the right track and is there some type trickery I'm missing to use the record instead of the type class, because that completely bombed out for me, and what's the neat way of dealing with the {-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
module Control.Distributed.Platform.GenProcess where
import Prelude hiding (catch, init)
import Control.Distributed.Process
import Control.Distributed.Process.Serializable
import Control.Monad (forever)
import Control.Concurrent
import Data.Typeable (Typeable)
import Data.Binary
import Data.DeriveTH
type Name = String
data TimeUnit = Hours | Minutes | Seconds | Millis
data Timeout = Timeout TimeUnit Int | Infinity
data ProcessAction =
ProcessContinue
| ProcessTimeout Timeout
| ProcessStop String
data GenMessage m where
Message :: (Typeable m) => GenMessage (m, ReplyTo)
deriving (Typeable)
data ReplyTo = ReplyTo ProcessId | None
deriving (Typeable, Show)
data Gen = Call | Cast
deriving (Typeable, Show)
$(derive makeBinary ''ReplyTo)
$(derive makeBinary ''Gen)
class (Typeable s) => GenProcess s where
init :: Process () -> a -> s
state :: Process (s) -> s
handleCall :: (Serializable m) => s -> m -> ReplyTo -> (ProcessAction, s)
serverContinue :: (GenProcess s) => s -> Timeout -> Process (s)
serverContinue s t = do
case t of
(Infinity) -> receiveWait handlers
-- (Timeout u v) -> receiveTimeout (timeToMs u v) handlers
where handlers = [ (match (\(Call, m, r) -> handleRequest s m r)) ]
handleRequest :: (GenProcess s, Serializable m) =>
s -> m -> ReplyTo -> Process (s)
handleRequest s m r = do
let (action, s2) = handleCall s m r
case action of
ProcessContinue -> serverContinue s2 Infinity
(ProcessTimeout t) -> serverContinue s2 t
timeToMs :: TimeUnit -> Int -> Int
timeToMs Millis ms = ms
timeToMs Seconds sec = sec * 1000
timeToMs Minutes min = (min * 60) * 1000
reply :: (Serializable m) => ReplyTo -> m -> Process ()
reply (ReplyTo pid) m = send pid m
reply _ _ = return ()
replyVia :: (Serializable m) => SendPort m -> m -> Process ()
replyVia p m = sendChan p m
|
Urgh - I take that back, I just hadn't reconfigured recently enough.
So how do I handle messages in the general case? I can be more specific by using an existential type, but it's still too ambiguous: type GenMessage = forall m. (Typeable m, Serializable m) => m
-- snip
serverContinue :: (GenProcess s) => s -> Timeout -> Process (s)
serverContinue s t = do
case t of
(Infinity) -> receiveWait handlers
-- (Timeout u v) -> receiveTimeout (timeToMs u v) handlers
where handlers = [ (match (\(Call, m, r) -> handleRequest s m r)) ]
handleRequest :: (GenProcess s) =>
s -> GenMessage -> ReplyTo -> Process (s)
handleRequest s m r = do
let (action, s2) = handleCall s m r
case action of
ProcessContinue -> serverContinue s2 Infinity
(ProcessTimeout t) -> serverContinue s2 t will still yield the same error. So do we actually need an API on Interestingly, I wonder if instead of trying to define |
@hyperthunk I have been hitting similar issues trying to improve GenServer in different ways:
The simplest answer is to force servers to use a sum type: e.g.: data SupervisionRequest
= AddChild
| DeleteChild
| RestartChild
| StopChildren However, this will get cumbersome very quickly and is not really a solution considering the different types of messages a single server may need to handle. Besides this approach would mean that there would be no way to reuse protocols across servers without writing wrapper types. I think we are on the same page here: GenServer has to support dispatching any number of different message types.
I am also struggling with getting the dynamic dispatching right. However, I don't think this has anything to do with Haskell type system, but with our knowledge (or lack) of it. I can vaguely conceive a solution for this but I just can't get it to work properly. We need to construct a list of handlers that encapsulate the message type using an existential and then have a function associated with this handler that returns a Match type, which is what we need for receiveWait/receiveTimeout (note that the match primitive is also using an existential and hiding the original type from receiveWait/Timeout). The key thing is that who needs to return this Match type is each handler in the list since only the individual handler knows it's type hidden by the existential. That is the gist of what I a pursuing right now.
Serializable is also Binary and Typeable. Afaik, CloudHaskell's marshalling is generating a fingerprint of the message type using Typeable and this fingerprint is then compared when unmarshalling the message on the other end. So, as long as CloudHaskell can compute a fingerprint of the receiving type with the fingerprint that was unmarshalled, the receive/expect should work fine. I am close to a stable set of changes after tinkering with this for a few days. I will share what I have as soon as I clean it up a bit. |
Well....
Maybe I'm wrong but I suspect this will require either some clever redesign on our part or opening up of AbstractMessage in CH.
That doesn't seem so hard if we can solve the need for handling types we know nothing about.
In fact, I don't see how that solves the problem at all. You still have to write the code to either expect that specific type (which means its not a general purpose server) or pass in the input type(s) to the record or type class so that the code which uses them can be type checked properly.
Oh no, there's nothing wrong with Haskell at all. I'm very familiar with the ocaml type system though less with Haskell but this constraint seems perfectly sensible to me. We just need to figure out what type to use to open up the API awe want. The list of marchers is close to what what I had in mind too, but you still can't write a function that operates on the existential. What this means is that each handler has to return the ProcessAction (state) and we write the handleRequest function (which is called after we've matched something) to decide on the looping and timeout.
Great, but remember that the decision about being able to use a type in expect/receiveX is based on the clauses the handlers provide. We shouldn't need to care about that.
Great. I'm going to experiment with the same idea (list of handlers that produces [Match (ProcessAction)] in a separate module so as to minimise merge conflicts. |
@hyperthunk Just as a quick side note re: "..., but with our knowledge (or lack) of it". Change that 'our' to a 'my' as I don't have a lot of practical experience with Haskell (nor ML) and was speaking with myself in mind. 2nd GenServer iterationI have committed another iteration of the GenServer based on the experiments from the past few days. At least back into a stable state after much tinkering so I can take a bit more space to consider the code you sent above. This commit (the past 2 actually) reverts to using process messages as opposed to channel messages. I am assuming the implementation could be changed to support typed-channels by merging the channels, but considering that it may not be possible to receive process and channels messages at the same time I am keeping it simple for right now. It also abstract the two basic protocols of gen_server, i.e. calls and casts, and assumes that specific variations will be based on either one of those (e.g. a AbstractMessage support would be implemented using the cast protocol, iow just simple, one-way messaging; the same for info messags, i.e. they come in and conform to the cast protocol. See their types: type CallHandler a b = a -> Process (CallResult b)
type CastHandler a = a -> Process CastResult
data CallResult a
= CallOk a
| CallForward ServerId
| CallStop a String
deriving (Show, Typeable)
data CastResult
= CastOk
| CastForward ServerId
| CastStop String I think this is in line with your code snippets above. I.e. by separating the cast and call result types we can enforce a certain invariants: No way to send a reply in a CastHandler (I am assuming we can wrap these handlers in a monad to better control what can go on inside them), only process the message, stop the server or forward it. Or, in the case of the CallHandler, you are forced to either generate a CallOk with the reply, of forward the request to another server with a CallForward, or stop the server with a CallStop. handleReset ResetCount = do
return $ CastForward mySlaveServerId The code is working for the simple Counter example and shows how the GenServer can be used with a synchronous CallHandler, an async CastHandler, and two separate data types: CounterRequest and ResetCount. For example: -- Handler for the CounterRequest call
handleCounter IncrementCounter = return $ CallOk (CounterIncremented)
handleCounter GetCount = return $ CallOk (Count 0)
-- Handler for the ResetCount cast
handleReset ResetCount = return $ CastOk Support for AbstractMessage?I also added an experimental handleAny to deal with untyped messages based on CloudHaskell's AbstractMessage. Not sure if it works, but shows another flexibility of the design (or so it seems). Threading state through the handlersThere is more refinements needed but the next todo there seems to figure out how to thread the server state through the handlers/callbacks. The approach I am pursuing now is to define a Server monad that is really a StateT monad wrapping the Process monad. This is immediately useful for managing the server state, but could also possibly be used to create additional DSLs on top of it (not sure). type Server s = StateT s Process
type InitHandler s = Server s InitResult
type TerminateHandler s = TerminateReason -> Server s ()
type CallHandler s a b = a -> Server s (CallResult b)
type CastHandler s a = a -> Server s CastResult And a sample CastHandler: handleReset ResetCount = do
state <- get
-- do something with the state
put state
return $ CastOk I am stuck now trying to figure out where to store this state s so that the MessageDispatcher can access it. The first iteration of GenServer was using a closure to do that but now I have N MessageDispatcher that share the same state and no closure around them. |
I didn't mean to be touchy though - I've been struggling with some of the finer details here too. :) So..... looking at your code, I think you're generally going in the right direction. Feel free to send a pull request when you're ready - I'm doing some experiments in parallel in a different module, so we shouldn't have any clashes. |
@hyperthunk Ok, the pull request is in. I will not have time for this until the weekend. |
That's absolutely fine, I'm doing this in my spare time too and your input is most welcome and appreciated! :) |
Sorry guys, the Parallel Haskell project has come to an end and so I will only be able to look at this in my spare time, of which I have very little. You already seem to have made progress with this, but let me just explain something, perhaps it will help, because the 'expect' matching (or 'receiveTimeout' and co) is indeed a little confusing. You should think of expect :: Serializable a => Process a which is really expect :: (Typeable a, Binary a) => Process a as having an (implicit) argument expect :: Binary a => Fingerprint -> Process a That Introducing an existential type along the lines of data Foo :: * where
mkFoo :: forall e. Serializable e => ... -> Foo and then matching on data Foo :: * where
mkFoo :: forall e. Serializable e => `Fingerprint` -> ... -> Foo So Cloud Haskell's wrap :: Serializable a => a -> AbstractMessage
matchAgainst :: [Match a] -> AbstractMessage -> Process (Maybe b) and moreover make Note by the way that making |
I had a play with this too and found it pretty awkward. The dispatchers need to take
Yes exactly. One approach I tried was along the lines of data Dispatcher s =
forall a . (Serializable a) =>
Dispatch { handler :: s -> Message a -> Process () }
| forall a . (Serializable a) =>
DispatchIf { handler :: s -> Message a -> Process (),
predicate :: s -> Message a -> Bool }
| DispatchAny { abstractMessagehandler :: s -> AbstractMessage -> Process () }
data GenProcess s = GenProcess {
procInit :: InitHandler s, -- ^ initialization handler
procDispatchers :: [Dispatcher s], -- ^ request dispatchers
procTerminate :: TerminateHandler -- ^ termination handler
} But as soon as you want to initialize the state your |
@edsko thanks for the additional info and details. @hyperthunk there is another pull request with another iterative improvement to GenServer: now threading server state through the handlers (init, handle, terminate). The code could probably be much better, but at least it compiles and runs and it keeps us moving forward. |
@edsko @hyperthunk Committed a few additional changes to support the handling of CallStop and CastStop (this is when the server can instruct a termination after handling the call/cast). I am a bit unsure about the following:
callServer :: (Serializable rq, Serializable rs) => ServerId -> Timeout -> rq -> Process rs One option would be to change the result type to Process (Maybe rs) and another one would be to throw an exception instead. How is this usually handled in Erlang?
-- | Sync call (in terms of callAsyn + waitReply)
call :: ServerId -> a -> Timeout -> Process (Either SomeException a)
-- | Asynchronous call to server
callAsync :: ServerId -> a -> Process (Future b)
-- | Wait for a reply blocking if necessary
waitReply :: Future a -> Timeout -> Process (Either SomeException a)
-- | Poll a future to see if there is a reply without blocking
pollReply :: Future a -> Process (Maybe (Either SomeException a))
-- | Cancel a future
cancelReply :: Future a -> Process () |
data Dispatcher s =
forall a . (Serializable a) =>
Dispatch { handler :: s -> Message a -> Process () }
| forall a . (Serializable a) =>
DispatchIf { handler :: s -> Message a -> Process (),
predicate :: s -> Message a -> Bool }
| DispatchAny { abstractMessagehandler :: s -> AbstractMessage -> Process () }
data GenProcess s = GenProcess {
procInit :: InitHandler s, -- ^ initialization handler
procDispatchers :: [Dispatcher s], -- ^ request dispatchers
procTerminate :: TerminateHandler -- ^ termination handler
} Yes, threading the state explicitly adds noise dispatcher/handler functions and another reason to got for a StateT Monad, imo, to thread that state in and out of the handlers.
What do you think of the following as the contract between GenServer/Process and the user-defined handlers? type InitHandler s = Server s InitResult
type TerminateHandler s = TerminateReason -> Server s ()
type CallHandler s a b = a -> Server s (CallResult b)
type CastHandler s a = a -> Server s CastResult The Server monad is just a StateT type alias that wraps the Process monad. The only part that deserves a bit more attention is where we have to call receiveWait/Timeout since that is in the Process monad and not in the Server monad. Apart from that it is quite clean and the best I can come with so far. For example, implementing the handler that returns the current count in the counter server would look like: handleCounter GetCount = do
count <- getState
return $ CallOk (Count count) Then the startCounter API would hook the handlers up with the following: startCounter :: Int -> Process ServerId
startCounter count = startServer count defaultServer {
msgHandlers = [
handleCall handleCounter,
handleCast handleReset
]} And the handleCall and handleCast would wrap the handler in a Dispatcher data type and set up the whole server process. There may be opportunities to simplify this a bit further with some helper functions or smart constructors, but that would be something minor at this point. I also added an additional example very similar to the Counter server just as a 2nd exercise. Next steps?Now, where should we go from here? Unless you have very different ideas on how the GenProcess/Server should look like, it seems that we should give a 1st shot at the Supervisor module using this GenServer and fill any gaps we may find along the way. Any thoughts? |
@rodlogic - first of all, thank you so much for your ongoing contributions; you've really pushed this forward and I think you've pretty much cracked the gen server API for the most part! For my own part, I've been trying to reduce the amount of boilerplate required by the implementation code, and the reason I've not committed any of those changes is so as to avoid merge conflicts with your work (which is generally more complete and compiling properly than mine!) :)
I think the API is just fine. One of the things I'm attempting to do is to reduce the amount of indirection between the server process and the handlers API, by returning an 'Action' (which is basically your init :: Behaviour s -> Server s InitResult
loop :: Behaviour s -> Timeout -> Server s TerminateReason
processReceive :: [Dispatcher s] -> Timeout -> Server s (Either ProcessAction TerminateReason) I'm also interested in whether the call/cast handlers can be made into pure functions and wrapped (by the API entry points) in the state monad, so that we get something more like data CastResult s =
CastOk s
| CastForward s ServerId
| CastStop s String
type CastHandler s a = a -> CastResult s Keeping the Anyway, these are minor (and mainly cosmetic, from the API perspective) things and in general I think we will move forward with the gen server implementation you've come up with and if I manage to do these simplifications - and if they do turn out to be simpler, rather than more complicated - then we'll do some refactoring then.
Yes, I agree we should move on to looking at supervisor now. As I said above, I'll bring those simplifications to gen server in on a branch and discuss them with you there before merging anything. I think that before we can actually implement supervisor properly however, that we need to solve distributed-process issue 69, as without a way to kill processes we're going to struggle to implement supervision properly. I noticed that you've created a So what I propose is that we take a look at the Cloud Haskell issue and see if we can solve it and contribute a pull request. This seems to involve having the node controller listen for a new kind of message ( |
@hyperthunk CH issue #69 is behind us and a good base for us to continue. It was a great move, btw, to raise this as a CH issue. Regarding the API, I am running out of ideas now on how to improve it and imo we are close enough. I also think that the implementation needs a few iterations to remove more cruft and simplify, but maybe not a good idea to get stuck there now. So giving the Supervisor a first shot seems a great next step. I have also incorporated some additional folders/files/content to deal with tests, code coverage and benchmarking. There is nothing there at this point, but a small step forward. Please take a look when you have a chance and I can send a pull request if it makes sense. |
Just a very minor comment on an earlier question:
I would suggest sticking to the conventions used elsewhere in the Cloud Haskell API (in particular, |
@edsko - thanks for clarification there, I agree that going for consistency is the thing to do. @rodlogic - glad you can see where I'm coming from and you've got exactly the right idea with what you said above. That's precisely how I'd like to layer the APIs going forward.
Indeed. I'll try and find some time during this week to push parallel layering track forward. If you could roll up the catch/exit handling then I'll start writing some test cases for gen server as well. |
PS: I'm moving some of the shared infrastructure around a bit as part of this effort. Take a look at https://github.com/hyperthunk/distributed-process-platform/tree/timer to get a feel for what has moved where. |
It is a good time to restructure as we are starting to add more modules. I'll ping if I have any comments re: these moves etc and we can discuss. |
@hyperthunk I am going to spend the next few hours integrating catchExit/exit into GenServer. I am, however, a bit unsure about the best way to proceed. The current GenServer has an InitHandler, a TerminateHandler and a list of message handlers (call and/or cast). The server's life cycle today calls InitHandler once, loops over over the message handlers and finally calls TerminateHandler when one of the handlers return a stop. Any async exceptions thrown today will terminate the server without calling TerminateHandler. The first goal is to make sure that the InitHandler and TerminateHandler are always called irrespective of whether a stop was issued or an exit async exception is thrown. The second case will involve using catchExit to trigger the TerminateHandler, but what about the typed message in catchExit?
Do you have any recommendations? |
Looks good. I'll get rid of the Timeout types in GenServer once this is in master. |
@hyperthunk I sent a pull request (see pull's description for more info): https://github.com/hyperthunk/distributed-process-platform/pull/20 I am also seeing a behavior that I am not entirely sure if it is a problem with CH or platform code (more likely). Basically, the Counter.hs example is stopping the server when the count is greater than 10 and the subsequent `ìncCount`` is hanging waiting for a reply that never comes. A Timeout would help here as a possibly solution, but shouldnt CH throw an exception if the underlying process has already terminated? I will take a second look later tonight, but if you have any insights let me know. |
Hi @rodlogic - thanks for the pull request. I responded asking if you could rebase against the recent changes in master, as I'm unable to merge it cleanly and I suspect you'll do a better job of resolving conflicts in your own code. I had already updated the GenServer in master to use the
I must confess I've not looked in much depth at the counter example, but from this comment, I'm fairly sure this is doing exactly what it should do!
This is an incredibly common source of confusion for new Erlang programmers too. In both CH and Erlang, inc_counter(Pid) ->
if erlang:is_process_alive(Pid) == true -> Pid ! increment;
true -> {error, noproc}
end. Cloud Haskell doesn't have an incCount :: ServerId -> Process ()
incCount sid = do
CounterIncremented <- callServer sid NoTimeout IncrementCounter
return ()
-- snip
callServer :: (Serializable rq, Serializable rs) => ServerId -> Timeout -> rq -> Process rs
callServer sid timeout rq = do
cid <- getSelfPid
--say $ "Calling server " ++ show cid
send sid (CallMessage cid rq)
case timeout of
-- this next line will block infinitely if `sid` dies
-- NB: sid can die in several places
-- 1. before we call `send' as sending never fails
-- 2. after `send' but before we get sent a reply
NoTimeout -> expect
Timeout time -> do
mayResp <- expectTimeout time
case mayResp of
Just msg -> return msg
Nothing -> error $ "timeout! value = " ++ show time The other way to handle this is to use a timeout, but a timeout is not the right general solution to this problem of making an rpc style call to a running process, because it is entirely possible that the process did receive and handle the message but if we timeout before we get the reply, there is no way for us to indicate that to our caller. This is bad news on many levels, as I'm sure you can imagine. The correct solution to this conundrum, in both Erlang and CH, is to use a monitor to check that the caller is alive at the time we made our call. This works quite neatly in Erlang because when you call call(Pid, Expected) ->
MRef = setup_monitor(Pid),
receive
{'DOWN', MRef, process, Pid, noproc} ->
{error, {not_sent, 'DOWN'}};
{'DOWN', MRef, process, Pid, Reason} ->
receive
%% we *might* still have the reply in our mailbox, so let's check
{Expected, Reply} -> {ok, Reply}
after 0 ->
{error, Reason}
end;
{Expected, Return} ->
demonitor(Pid, [flush]),
{ok, Return}
end. According to my conversation with @edsko at the end of this thread, we will get exactly the same behaviour with CH's monitors. If the process is already dead, we will get a monitor notification with So, to summarise:
This last point is vitally important as setting up a monitor between processes is kind of a big deal. You do not want to pollute the caller's mailbox with unexpected deliveries, which is why that last line in the erlang 'demo' function calls This is BTW almost exactly how the OTP gen server handles this situation, so we're going down a well trodden path here. Hope that's useful! :) |
@hyperthunk Yes, indeed I was putting additional empty lines to separate definitions based on what seemed like a good style I saw in the async package. However, either way works for me as long as we are all following the same standard. So a style guide will be great. Thanks for the details above about async sends and how monitoring is a good solution for a robust call implementation. I have modified the
I am also currently throwing an exception when there is a timeout or a ProcessMonitorNotification with no call response in the mailbox. The alternative is to make this explicit in the return type of the callServer API with an Either, for instance, but I am not sure what is the most idiomatic Haskell here (@edsko?). This seems like a small change and we can easily address this at any time. On a side note, even though I moved the Kitty and Counter examples out of the main platform library and Main.hs into simple test cases in TestGenServer, I have also moved the old src/Main.hs into test/Main.sh. I am having a hard time to debug/trace test cases when there are problems (there are no logs unless the test cases finish) and this Main.hs is quite convenient to experiment in ghci (fyi: just type ghci in the root folder and it should automatically load Main and its dependencies; a subsequent I also noticed that some of the functions related to Timeouts and TimeIntervals are located in the Timer.hs module. Shouldnt they reside in Types.hs since it is also useful in GenProcess/GenServer? |
I took a look at the latest Async.hs and saw a typical 1 blank line between most definitions, with a 2 line space between conceptually different blocks of definitions. Perhaps Simon has tidied this up more recently though! I will write up a style guide, as that could help future contributors as well as us.
I suspect the matching implementation in CH prevents matching on fields in data constructors, as it has only the type fingerprint to work with. You'd need to implement deeper matches using _ <- receiveWait [matchIf (\(ProcessMonitorNotification _ _ reason) -> reason == DiedUnknownId)
(\(ProcessMonitorNotification ref pid _) -> flush ref pid)]
This depends on whether or not the monitor infrastructure is removed after the monitored process dies. In erlang this happens automatically, but it doesn't hurt to call demonitor anyway. I suspect the same is true for CH, so this is probably fine.
I'm willing to be corrected by @edsko here, but personally I think it's better to return Either (or Maybe) here and let the caller throw an exception if they consider it appropriate. Adding tests/Main.hs is a bit heavy for having another executable in the cabal file. Can we not just add another project that imports the platform library (such as distributed-process-playground or whatever) and then create a top level wrapper to pull both repositories down or some such? I don't think we should include executable definitions in a library build just for developer convenience, though I completely agree that tracing et al is a bit onerous without one.
That's a good point, although I don't think they should live in Types.hs. Let me chew on that a little. |
I agree 100% with @hyperthunk's reply "send is an asynchronous operations that never fails and so, if the process you're communicating with is dead, you just get a silent failure." and following. Couldn't have said it better myself :) |
I would agree with that -- if only because that's consistent with what we do in the core CH infrastructure. However, I think this is only appropriate if the user can specify the timeout explicitly. For instance, that's why we have this comment in the CH core library: -- | Query a remote process registry (asynchronous)
--
-- Reply will come in the form of a 'WhereIsReply' message.
--
-- There is currently no synchronous version of 'whereisRemoteAsync': if
-- you implement one yourself, be sure to take into account that the remote
-- node might die or get disconnect before it can respond (i.e. you should
-- use 'monitorNode' and take appropriate action when you receive a
-- 'NodeMonitorNotification').
whereisRemoteAsync :: NodeId -> String -> Process () Of course, we could provide a |
Indeed. I also checked the older versions but they all show 1 blank line so I am not sure what I looked at. In any case, it is irrelevant.
This explains why, clearly. Out of curiosity, do you know if this is something that will be expanded in future releases? MatchIf is already powerful, but it would be awesome to be able to use nested pattern matching in the receive calls.
I need to read a bit more on the test framework to see what kind of execution options exist. If there was a way to run test cases interactively so that trace messages were logged as they happened: that would be ideal. I will remove Main.hs from the repository and keep it local in the meantime. |
As @hyperthunk says, data Request = Add | Subtract and using data Add = Add
data Subtract = Subtract and use |
Argh - @rodlogic I realise that I haven't responded to your query about
Hmn, this makes life a bit more complicated doesn't it.
That would be easiest.
This sounds better, but I can see it causing all kinds of trouble in practice. What if the exit reason isn't supported by one of the terminate handlers that the implementor registered?
To borrow a phrase from an American friend, I kinda-sorta like this one. Here's what I think would work best.
How does that sound? |
Sorry for the delay, time has been limited for the past few days.
Sounds good. I will look into this this weekend or next week. |
Ok no rush though, as I'm picking up some tasks over in distributed-process too, so I'll probably just do some monkey stuff (build changes, CI and documentation/admin) over the weekend myself. |
@hyperthunk Just a quick note to say that the next 2-3 weeks I'll be for the most part offline (vacation, traveling, etc), but I will be back to D.P./D.P.P right after that. Finishing GenServer based on our recent discussions is first in the list of TODOs, but, in any case, I'll touch base then. |
Hi @rodlogic cool, thanks for letting me know. I hope you have a good vacation and look forward to working with you again when you're able to rejoin us here. Thanks again for your contributions! :) |
Hey @rodlogic - when you get back to this, please take a good look at https://github.com/jepst/distributed-process-global/blob/master/src/Control/Distributed/Process/Global/Call.hs - I think we can take some good ideas from there. I'm probably going to ask Jeff if we can move BTW, I opened an issue for rolling server upgrades at haskell-distributed/distributed-process#106 - if we do something like that then gen-server and supervisor APIs will be the primary entry points for application state migration between failing/taking-over nodes, so the state monad stuff you've introduced is going to pay for itself very nicely. :) |
@hyperthunk Will do. However, I am going to be with zero free time for the next 2 weeks. I will be back to this after that and pick it from there. |
@rodlogic that's perfectly fine - thanks for letting me know! :) |
NB: I'm afraid that the API which |
Please see haskell-distributed/distributed-process#30 (comment) for details on the proposal I've made for supporting |
So.... The gen process API is actually more feature complete than gen-server now, though I've yet to go through and write a decent test suite for it. There are some differences:
For timeouts, the delay can be changed after each call. Also, there is just one Finally, the handle-info support is still semi-typed, and therefore requires the user to specify a policy for handling unmatched message, which can either drop them, send them on to a dead letter queue, re-queue them (not yet implemented) or terminate the process. I might add support for changing this policy on the fly. The API requires this branch of d-process to get merged and I'm waiting on feedback from @edsko et all about that merge in haskell-distributed/distributed-process#30 We discussed merging the approaches for gen-server a while back and once the d-p merge has taken place and I've introduced a working test suite, I'd say that the gen-process API is almost feature complete. Do we want to push forward with both these APIs or is it time to unify them? If so, which bits do we keep from which implementation? I'm open to opinions here, especially from @rodlogic as he did all the heavy lifting on the original implementation, without which I would not have been able to progress with gen-process anywhere near as fast. I do think that it would be better if users only had one Here's what a gen-process implementation looks like in practise... -- demo
data Reset = Reset
deriving (Typeable)
$(derive makeBinary ''Reset)
type MyState = [String]
startMyServer :: Process ProcessId
startMyServer =
let srv = myServer in do
spawnLocal $ start () startup srv >> return ()
where startup :: InitHandler () MyState
startup _ = return $ InitOk [] Infinity
myServer :: Behaviour MyState
myServer = Behaviour {
dispatchers = [
handleCall handleAdd
, handleCast handleReset
]
, infoHandlers = [handleInfo handleMonitorSignal]
, timeoutHandler = onTimeout
, terminateHandler = undefined
, unhandledMessagePolicy = Drop
}
handleAdd :: MyState -> String -> Process (ProcessReply MyState String)
handleAdd s x =
let s' = (x:s)
in reply "ok" s'
handleReset :: MyState -> Reset -> Process (ProcessAction MyState)
handleReset _ Reset = continue []
handleMonitorSignal :: MyState -> ProcessMonitorNotification -> Process (ProcessAction MyState)
handleMonitorSignal s (ProcessMonitorNotification _ _ _) = continue s
onTimeout :: TimeoutHandler MyState
onTimeout _ _ = stop $ TerminateOther "timeout" |
One option here, and I like this idea, would be to let What would be nice is if we had a builder style API for creating servers and the server monad for threading state. Then you could write something like demo :: [String] -> Server s
demo names = do
init $ do -- doesn't *have* to be inline
putState names
ok
handleCall [add, divide]
handleCast [ -- let's have another inline
-- if 'ok' returns () then we can just skip it unless we
-- want to make and explicit change, as below
(\Reset -> modifyState [])
]
handleInfo $ (\(Subtract pid _ _) -> send pid ApiError >> stop ApiError) It is probably quite trivial to support all those illustrative feature ideas on top of the existing |
I just re-introduced the Kitty example using {-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Counter
( startCounter,
getCount,
getCountAsync,
incCount,
resetCount,
wait,
waitTimeout
) where
import Control.Distributed.Process hiding (call)
import Control.Distributed.Process.Platform.Async
import Control.Distributed.Process.Platform.GenProcess
import Control.Distributed.Process.Platform.Time
import Data.Binary
import Data.DeriveTH
import Data.Typeable (Typeable)
--------------------------------------------------------------------------------
-- Types --
--------------------------------------------------------------------------------
-- Call and Cast request types. Response types are unnecessary as the GenProcess
-- API uses the Async API, which in turn guarantees that an async handle can
-- /only/ give back a reply for that *specific* request through the use of an
-- anonymous middle-man (as the sender and reciever in our case).
data Increment = Increment
deriving (Show, Typeable)
$(derive makeBinary ''Increment)
data Fetch = Fetch
deriving (Show, Typeable)
$(derive makeBinary ''Fetch)
data Reset = Reset deriving (Show, Typeable)
$(derive makeBinary ''Reset)
type State = Int
--------------------------------------------------------------------------------
-- API --
--------------------------------------------------------------------------------
-- | Increment count
incCount :: ProcessId -> Process Int
incCount sid = call sid Increment
-- | Get the current count - this is replicating what 'call' actually does
getCount :: ProcessId -> Process Int
getCount sid = getCountAsync sid >>= wait >>= unpack
where unpack :: AsyncResult Int -> Process Int
unpack (AsyncDone i) = return i
unpack asyncOther = die asyncOther
-- | Get the current count asynchronously
getCountAsync :: ProcessId -> Process (Async Int)
getCountAsync sid = callAsync sid Fetch
-- | Reset the current count
resetCount :: ProcessId -> Process ()
resetCount sid = cast sid Reset
--------------------------------------------------------------------------------
-- Implementation --
--------------------------------------------------------------------------------
-- | Start a counter server
startCounter :: Int -> Process ProcessId
startCounter startCount =
let server = defaultProcess {
dispatchers = [
handleCallIf (state (\count -> count <= 10)) -- invariant
(\_ (_ :: Increment) ->
noReply_ (TerminateOther "Count > 10"))
, handleCall handleIncrement
, handleCall (\count (_ :: Fetch) -> reply count count)
, handleCast (\_ Fetch -> continue 0)
]
} :: ProcessDefinition State
in spawnLocal $ start startCount init' server >> return ()
where init' :: InitHandler Int Int
init' count = return $ InitOk count Infinity
handleIncrement :: State -> Increment -> Process (ProcessReply State Int)
handleIncrement count _ =
let newCount = count + 1 in do
next <- continue newCount
replyWith newCount next I'll re-introduce the tests and bring Kitty.hs back for that purpose too. Here's a more complete example from the tests: mkServer :: UnhandledMessagePolicy
-> Process (ProcessId, MVar (Either (InitResult ()) TerminateReason))
mkServer policy =
let s = statelessProcess {
dispatchers = [
-- note: state is passed here, as a 'stateless' process is
-- in fact process definition whose state is ()
handleCall (\s' (m :: String) -> reply m s')
, handleCall_ (\(n :: Int) -> return (n * 2)) -- "stateless"
, handleCast (\s' ("ping", pid :: ProcessId) ->
send pid "pong" >> continue s')
, handleCastIf_ (input (\(c :: String, _ :: Delay) -> c == "timeout"))
(\("timeout", Delay d) -> timeoutAfter_ d)
, handleCast_ (\("stop") -> stop_ TerminateNormal)
, handleCast_ (\("hibernate", d :: TimeInterval) -> hibernate_ d)
]
, unhandledMessagePolicy = policy
, timeoutHandler = \_ _ -> stop $ TerminateOther "timeout"
}
in do
exitReason <- liftIO $ newEmptyMVar
pid <- spawnLocal $ do
catch (start () (statelessInit Infinity) s >>= stash exitReason)
(\(e :: SomeException) -> stash exitReason $ Right (TerminateOther (show e)))
return (pid, exitReason) |
Right. We've moved over to Jira and I'll be closing all github issues later on today, and working only on the corresponding jira issues. This issue is all but closed now. @rodlogic really kick started this work and I'd like to say a big thankyou for doing so. The current |
See issue #1 and https://gist.github.com/4025934
The text was updated successfully, but these errors were encountered: