14
14
15
15
#include < assert.h>
16
16
#include < functional>
17
- #include < optional >
17
+ #include < kj/function.h >
18
18
#include < map>
19
19
#include < memory>
20
+ #include < optional>
20
21
#include < sstream>
21
22
#include < string>
22
23
@@ -129,6 +130,28 @@ std::string LongThreadName(const char* exe_name);
129
130
130
131
// ! Event loop implementation.
131
132
// !
133
+ // ! Cap'n Proto threading model is very simple: all I/O operations are
134
+ // ! asynchronous and must be performed on a single thread. This includes:
135
+ // !
136
+ // ! - Code starting an asynchronous operation (calling a function that returns a
137
+ // ! promise object)
138
+ // ! - Code notifying that an asynchronous operation is complete (code using a
139
+ // ! fulfiller object)
140
+ // ! - Code handling a completed operation (code chaining or waiting for a promise)
141
+ // !
142
+ // ! All of this code needs to access shared state, and there is no mutex that
143
+ // ! can be acquired to lock this state because Cap'n Proto
144
+ // ! assumes it will only be accessed from one thread. So all this code needs to
145
+ // ! actually run on one thread, and the EventLoop::loop() method is the entry point for
146
+ // ! this thread. ProxyClient and ProxyServer objects that use other threads and
147
+ // ! need to perform I/O operations post to this thread using EventLoop::post()
148
+ // ! and EventLoop::sync() methods.
149
+ // !
150
+ // ! Specifically, because ProxyClient methods can be called from arbitrary
151
+ // ! threads, and ProxyServer methods can run on arbitrary threads, ProxyClient
152
+ // ! methods use the EventLoop thread to send requests, and ProxyServer methods
153
+ // ! use the thread to return results.
154
+ // !
132
155
// ! Based on https://groups.google.com/d/msg/capnproto/TuQFF1eH2-M/g81sHaTAAQAJ
133
156
class EventLoop
134
157
{
@@ -144,17 +167,21 @@ class EventLoop
144
167
145
168
// ! Run function on event loop thread. Does not return until function completes.
146
169
// ! Must be called while the loop() function is active.
147
- void post (const std::function <void ()>& fn);
170
+ void post (kj::Function <void ()> fn);
148
171
149
172
// ! Wrapper around EventLoop::post that takes advantage of the
150
173
// ! fact that callable will not go out of scope to avoid requirement that it
151
174
// ! be copyable.
152
175
template <typename Callable>
153
176
void sync (Callable&& callable)
154
177
{
155
- post (std::ref (callable));
178
+ post (std::forward<Callable> (callable));
156
179
}
157
180
181
+ // ! Register cleanup function to run on asynchronous worker thread without
182
+ // ! blocking the event loop thread.
183
+ void addAsyncCleanup (std::function<void ()> fn);
184
+
158
185
// ! Start asynchronous worker thread if necessary. This is only done if
159
186
// ! there are ProxyServerBase::m_impl objects that need to be destroyed
160
187
// ! asynchronously, without tying up the event loop thread. This can happen
@@ -166,13 +193,10 @@ class EventLoop
166
193
// ! is important that ProxyServer::m_impl destructors do not run on the
167
194
// ! eventloop thread because they may need it to do I/O if they perform
168
195
// ! other IPC calls.
169
- void startAsyncThread (std::unique_lock<std::mutex>& lock );
196
+ void startAsyncThread () MP_REQUIRES(m_mutex );
170
197
171
- // ! Add/remove remote client reference counts.
172
- void addClient (std::unique_lock<std::mutex>& lock);
173
- bool removeClient (std::unique_lock<std::mutex>& lock);
174
198
// ! Check if loop should exit.
175
- bool done (std::unique_lock<std::mutex>& lock ) const ;
199
+ bool done () const MP_REQUIRES(m_mutex) ;
176
200
177
201
Logger log ()
178
202
{
@@ -195,10 +219,10 @@ class EventLoop
195
219
std::thread m_async_thread;
196
220
197
221
// ! Callback function to run on event loop thread during post() or sync() call.
198
- const std::function <void ()>* m_post_fn = nullptr ;
222
+ kj::Function <void ()>* m_post_fn MP_GUARDED_BY (m_mutex) = nullptr;
199
223
200
224
// ! Callback functions to run on async thread.
201
- CleanupList m_async_fns;
225
+ std::optional< CleanupList> m_async_fns MP_GUARDED_BY (m_mutex) ;
202
226
203
227
// ! Pipe read handle used to wake up the event loop thread.
204
228
int m_wait_fd = -1 ;
@@ -208,11 +232,11 @@ class EventLoop
208
232
209
233
// ! Number of clients holding references to ProxyServerBase objects that
210
234
// ! reference this event loop.
211
- int m_num_clients = 0 ;
235
+ int m_num_clients MP_GUARDED_BY (m_mutex) = 0;
212
236
213
237
// ! Mutex and condition variable used to post tasks to event loop and async
214
238
// ! thread.
215
- std::mutex m_mutex;
239
+ Mutex m_mutex;
216
240
std::condition_variable m_cv;
217
241
218
242
// ! Capnp IO context.
@@ -263,11 +287,9 @@ struct Waiter
263
287
// in the case where a capnp response is sent and a brand new
264
288
// request is immediately received.
265
289
while (m_fn) {
266
- auto fn = std::move (m_fn);
267
- m_fn = nullptr ;
268
- lock.unlock ();
269
- fn ();
270
- lock.lock ();
290
+ auto fn = std::move (*m_fn);
291
+ m_fn.reset ();
292
+ Unlock (lock, fn);
271
293
}
272
294
const bool done = pred ();
273
295
return done;
@@ -276,7 +298,7 @@ struct Waiter
276
298
277
299
std::mutex m_mutex;
278
300
std::condition_variable m_cv;
279
- std::function< void ()> m_fn;
301
+ std::optional<kj::Function< void ()> > m_fn;
280
302
};
281
303
282
304
// ! Object holding network & rpc state associated with either an incoming server
@@ -290,21 +312,13 @@ class Connection
290
312
Connection (EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
291
313
: m_loop(loop), m_stream(kj::mv(stream_)),
292
314
m_network (*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
293
- m_rpc_system(::capnp::makeRpcClient(m_network))
294
- {
295
- std::unique_lock<std::mutex> lock (m_loop.m_mutex );
296
- m_loop.addClient (lock);
297
- }
315
+ m_rpc_system(::capnp::makeRpcClient(m_network)) {}
298
316
Connection (EventLoop& loop,
299
317
kj::Own<kj::AsyncIoStream>&& stream_,
300
318
const std::function<::capnp::Capability::Client(Connection&)>& make_client)
301
319
: m_loop(loop), m_stream(kj::mv(stream_)),
302
320
m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
303
- m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this )))
304
- {
305
- std::unique_lock<std::mutex> lock (m_loop.m_mutex );
306
- m_loop.addClient (lock);
307
- }
321
+ m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this ))) {}
308
322
309
323
// ! Run cleanup functions. Must be called from the event loop thread. First
310
324
// ! calls synchronous cleanup functions while blocked (to free capnp
@@ -319,10 +333,6 @@ class Connection
319
333
CleanupIt addSyncCleanup (std::function<void ()> fn);
320
334
void removeSyncCleanup (CleanupIt it);
321
335
322
- // ! Register asynchronous cleanup function to run on worker thread when
323
- // ! disconnect() is called.
324
- void addAsyncCleanup (std::function<void ()> fn);
325
-
326
336
// ! Add disconnect handler.
327
337
template <typename F>
328
338
void onDisconnect (F&& f)
@@ -333,12 +343,12 @@ class Connection
333
343
// to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
334
344
// error in cases where f deletes this Connection object.
335
345
m_on_disconnect.add (m_network.onDisconnect ().then (
336
- [f = std::forward<F>(f), this ]() mutable { m_loop. m_task_set ->add (kj::evalLater (kj::mv (f))); }));
346
+ [f = std::forward<F>(f), this ]() mutable { m_loop-> m_task_set ->add (kj::evalLater (kj::mv (f))); }));
337
347
}
338
348
339
- EventLoop& m_loop;
349
+ EventLoopRef m_loop;
340
350
kj::Own<kj::AsyncIoStream> m_stream;
341
- LoggingErrorHandler m_error_handler{m_loop};
351
+ LoggingErrorHandler m_error_handler{* m_loop};
342
352
kj::TaskSet m_on_disconnect{m_error_handler};
343
353
::capnp::TwoPartyVatNetwork m_network;
344
354
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
@@ -351,11 +361,10 @@ class Connection
351
361
// ! ThreadMap.makeThread) used to service requests to clients.
352
362
::capnp::CapabilityServerSet<Thread> m_threads;
353
363
354
- // ! Cleanup functions to run if connection is broken unexpectedly.
355
- // ! Lists will be empty if all ProxyClient and ProxyServer objects are
356
- // ! destroyed cleanly before the connection is destroyed.
364
+ // ! Cleanup functions to run if connection is broken unexpectedly. List
365
+ // ! will be empty if all ProxyClient are destroyed cleanly before the
366
+ // ! connection is destroyed.
357
367
CleanupList m_sync_cleanup_fns;
358
- CleanupList m_async_cleanup_fns;
359
368
};
360
369
361
370
// ! Vat id for server side of connection. Required argument to RpcSystem::bootStrap()
@@ -381,21 +390,12 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
381
390
: m_client(std::move(client)), m_context(connection)
382
391
383
392
{
384
- {
385
- std::unique_lock<std::mutex> lock (m_context.connection ->m_loop .m_mutex );
386
- m_context.connection ->m_loop .addClient (lock);
387
- }
388
-
389
393
// Handler for the connection getting destroyed before this client object.
390
394
auto cleanup_it = m_context.connection ->addSyncCleanup ([this ]() {
391
395
// Release client capability by move-assigning to temporary.
392
396
{
393
397
typename Interface::Client (std::move (m_client));
394
398
}
395
- {
396
- std::unique_lock<std::mutex> lock (m_context.connection ->m_loop .m_mutex );
397
- m_context.connection ->m_loop .removeClient (lock);
398
- }
399
399
m_context.connection = nullptr ;
400
400
});
401
401
@@ -423,16 +423,11 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
423
423
Sub::destroy (*this );
424
424
425
425
// FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
426
- m_context.connection -> m_loop . sync ([&]() {
426
+ m_context.loop -> sync ([&]() {
427
427
// Release client capability by move-assigning to temporary.
428
428
{
429
429
typename Interface::Client (std::move (m_client));
430
430
}
431
- {
432
- std::unique_lock<std::mutex> lock (m_context.connection ->m_loop .m_mutex );
433
- m_context.connection ->m_loop .removeClient (lock);
434
- }
435
-
436
431
if (destroy_connection) {
437
432
delete m_context.connection ;
438
433
m_context.connection = nullptr ;
@@ -454,12 +449,20 @@ ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Co
454
449
: m_impl(std::move(impl)), m_context(&connection)
455
450
{
456
451
assert (m_impl);
457
- std::unique_lock<std::mutex> lock (m_context.connection ->m_loop .m_mutex );
458
- m_context.connection ->m_loop .addClient (lock);
459
452
}
460
453
461
454
// ! ProxyServer destructor, called from the EventLoop thread by Cap'n Proto
462
455
// ! garbage collection code after there are no more references to this object.
456
+ // ! This will typically happen when the corresponding ProxyClient object on the
457
+ // ! other side of the connection is destroyed. It can also happen earlier if the
458
+ // ! connection is broken or destroyed. In the latter case this destructor will
459
+ // ! typically be called inside m_rpc_system.reset() call in the ~Connection
460
+ // ! destructor while the Connection object still exists. However, because
461
+ // ! ProxyServer objects are refcounted, and the Connection object could be
462
+ // ! destroyed while asynchronous IPC calls are still in-flight, it's possible
463
+ // ! for this destructor to be called after the Connection object no longer
464
+ // ! exists, so it is NOT valid to dereference the m_context.connection pointer
465
+ // ! from this function.
463
466
template <typename Interface, typename Impl>
464
467
ProxyServerBase<Interface, Impl>::~ProxyServerBase ()
465
468
{
@@ -483,14 +486,12 @@ ProxyServerBase<Interface, Impl>::~ProxyServerBase()
483
486
// connection is broken). Probably some refactoring of the destructor
484
487
// and invokeDestroy function is possible to make this cleaner and more
485
488
// consistent.
486
- m_context.connection ->addAsyncCleanup ([impl=std::move (m_impl), fns=std::move (m_context.cleanup_fns )]() mutable {
489
+ m_context.loop ->addAsyncCleanup ([impl=std::move (m_impl), fns=std::move (m_context.cleanup_fns )]() mutable {
487
490
impl.reset ();
488
491
CleanupRun (fns);
489
492
});
490
493
}
491
494
assert (m_context.cleanup_fns .empty ());
492
- std::unique_lock<std::mutex> lock (m_context.connection ->m_loop .m_mutex );
493
- m_context.connection ->m_loop .removeClient (lock);
494
495
}
495
496
496
497
// ! If the capnp interface defined a special "destroy" method, as described the
0 commit comments