Skip to content

Commit d6ccde6

Browse files
Implement AsioFuture class to genericly async await work
1 parent f42510f commit d6ccde6

File tree

2 files changed

+325
-0
lines changed

2 files changed

+325
-0
lines changed

lib/base/io-future.hpp

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */
2+
3+
#pragma once
4+
5+
#include "base/application.hpp"
6+
#include "base/shared-object.hpp"
7+
#include "base/threadpool.hpp"
8+
#include <boost/asio/dispatch.hpp>
9+
#include <boost/asio/executor_work_guard.hpp>
10+
#include <boost/asio/spawn.hpp>
11+
#include <future>
12+
13+
namespace boost::asio::detail {
14+
15+
struct fixed_throw_tag
16+
{};
17+
18+
/**
19+
* Fixes the issue where operations crash the program that can throw exceptions but don't.
20+
*
21+
* The issues is that in the orginal version of this specialization, the exception_ptr is
22+
* never checked against nullptr, but only the pointer to the exception_ptr, which is likely
23+
* a mistake.
24+
*/
25+
template<typename Executor, typename R, typename T>
26+
class spawn_handler<Executor, R(std::exception_ptr, T, fixed_throw_tag)> : public spawn_handler_base<Executor>
27+
{
28+
public:
29+
using return_type = T;
30+
31+
struct result_type
32+
{
33+
std::exception_ptr ex_;
34+
return_type* value_;
35+
};
36+
37+
spawn_handler(const basic_yield_context<Executor>& yield, result_type& result)
38+
: spawn_handler_base<Executor>(yield), result_(result)
39+
{
40+
}
41+
42+
void operator()(std::exception_ptr ex, T value)
43+
{
44+
result_.ex_ = ex;
45+
result_.value_ = &value;
46+
this->resume();
47+
}
48+
49+
static return_type on_resume(result_type& result)
50+
{
51+
if (result.ex_) {
52+
rethrow_exception(result.ex_);
53+
}
54+
return BOOST_ASIO_MOVE_CAST(return_type)(*result.value_);
55+
}
56+
57+
private:
58+
result_type& result_;
59+
};
60+
61+
} // namespace boost::asio::detail
62+
63+
namespace icinga {
64+
65+
template<typename>
66+
class AsioPromise;
67+
68+
/**
69+
* Implements a generic, asynchronously awaitable future.
70+
*
71+
* This allows to queue an CPU-intensive action on another thread without blocking any
72+
* IO-threads and pass back the result via the @c AsioPromise.
73+
*
74+
* Similar to @c std::future, this is single-use only. Once a value has been set by the
75+
* @c AsioPromise, the job is done.
76+
*/
77+
template<typename ValueType>
78+
class AsioFuture : public SharedObject
79+
{
80+
template<typename>
81+
friend class AsioPromise;
82+
83+
public:
84+
DECLARE_PTR_TYPEDEFS(AsioFuture);
85+
86+
/**
87+
* Returns the value held in the future, or waits for the promise to complete.
88+
*
89+
* If an exception has been stored in the future via AsioPromise::SetException(), it will be
90+
* thrown by this function. Simply passing `yc[ec]` as a token will not change this, even if
91+
* the exception that would be thrown is a @c boost::asio::system::system_error.
92+
*/
93+
template<typename CompletionToken>
94+
auto Get(CompletionToken&& token)
95+
{
96+
using Signature = void(std::exception_ptr, ValueType, boost::asio::detail::fixed_throw_tag);
97+
98+
return boost::asio::async_initiate<CompletionToken, Signature>(
99+
[this](auto&& handler) { InitOperation(std::forward<decltype(handler)>(handler)); },
100+
std::forward<CompletionToken>(token)
101+
);
102+
}
103+
104+
// TODO: Add WaitFor and WaitUntil
105+
106+
private:
107+
template<typename Handler>
108+
void CallHandler(Handler&& handler)
109+
{
110+
if (std::holds_alternative<ValueType>(m_Value)) {
111+
std::forward<Handler>(handler)(nullptr, std::get<ValueType>(m_Value));
112+
} else {
113+
std::forward<Handler>(handler)(std::get<std::exception_ptr>(m_Value), {});
114+
}
115+
}
116+
117+
template<typename Handler>
118+
void InitOperation(Handler&& handler)
119+
{
120+
auto handlerPtr = std::make_shared<std::decay_t<decltype(handler)>>(std::forward<decltype(handler)>(handler));
121+
122+
auto handlerWrapper = [handler = handlerPtr, future = AsioFuture::Ptr{this}]() {
123+
if (std::holds_alternative<ValueType>(future->m_Value)) {
124+
(*handler)({}, std::get<ValueType>(future->m_Value));
125+
} else {
126+
(*handler)(std::get<std::exception_ptr>(future->m_Value), {});
127+
}
128+
};
129+
130+
std::unique_lock lock(m_Mutex);
131+
132+
if (!std::holds_alternative<std::monostate>(m_Value)) {
133+
boost::asio::post(boost::asio::get_associated_executor(handler), handlerWrapper);
134+
return;
135+
}
136+
137+
auto work = boost::asio::make_work_guard(handler);
138+
m_Callback = [handler = std::move(handlerWrapper), work = std::move(work)]() mutable {
139+
boost::asio::dispatch(work.get_executor(), handler);
140+
work.reset();
141+
};
142+
}
143+
144+
std::mutex m_Mutex;
145+
std::variant<std::monostate, std::exception_ptr, ValueType> m_Value;
146+
std::function<void()> m_Callback;
147+
};
148+
149+
/**
150+
* A promise type that can be passed to any other thread or coroutine.
151+
*/
152+
template<typename ValueType>
153+
class AsioPromise
154+
{
155+
public:
156+
AsioPromise() : m_Future(new AsioFuture<ValueType>) {}
157+
158+
template<typename ForwardingType>
159+
void SetValue(ForwardingType&& value) const
160+
{
161+
std::unique_lock lock{m_Future->m_Mutex};
162+
163+
if (!std::holds_alternative<std::monostate>(m_Future->m_Value)) {
164+
BOOST_THROW_EXCEPTION(std::future_error{std::future_errc::promise_already_satisfied});
165+
}
166+
167+
m_Future->m_Value = std::forward<ForwardingType>(value);
168+
if (m_Future->m_Callback) {
169+
m_Future->m_Callback();
170+
}
171+
}
172+
173+
template<typename ExceptionType>
174+
void SetException(ExceptionType&& ex) const
175+
{
176+
std::unique_lock lock{m_Future->m_Mutex};
177+
178+
if (!std::holds_alternative<std::monostate>(m_Future->m_Value)) {
179+
BOOST_THROW_EXCEPTION(std::future_error{std::future_errc::promise_already_satisfied});
180+
}
181+
182+
m_Future->m_Value = std::make_exception_ptr(std::forward<ExceptionType>(ex));
183+
if (m_Future->m_Callback) {
184+
m_Future->m_Callback();
185+
}
186+
}
187+
188+
auto GetFuture() const { return m_Future; }
189+
190+
private:
191+
typename AsioFuture<ValueType>::Ptr m_Future;
192+
};
193+
194+
template<typename Callback>
195+
auto QueueAsioFutureCallback(Callback&& cb)
196+
{
197+
AsioPromise<decltype(cb())> promise;
198+
auto future = promise.GetFuture();
199+
Application::GetTP().Post(
200+
[cb = std::forward<Callback>(cb), promise = std::move(promise)]() {
201+
try {
202+
promise.SetValue(cb());
203+
} catch (const std::exception&) {
204+
promise.SetException(std::current_exception());
205+
}
206+
},
207+
{}
208+
);
209+
return future;
210+
};
211+
212+
} // namespace icinga

test/base-io-engine.cpp

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <boost/date_time/posix_time/posix_time.hpp>
77
#include <BoostTestTargetConfig.h>
88
#include <thread>
9+
#include "base/io-future.hpp"
910

1011
using namespace icinga;
1112

@@ -156,4 +157,116 @@ BOOST_AUTO_TEST_CASE(timeout_due_scope)
156157
BOOST_CHECK_EQUAL(called, 0);
157158
}
158159

160+
BOOST_AUTO_TEST_CASE(future_early_value)
161+
{
162+
boost::asio::io_context io;
163+
164+
AsioPromise<bool> promise;
165+
promise.SetValue(true);
166+
167+
IoEngine::SpawnCoroutine(io, [&, future = promise.GetFuture()](boost::asio::yield_context yc) {
168+
auto val = future->Get(yc);
169+
BOOST_REQUIRE(val);
170+
});
171+
172+
io.run();
173+
}
174+
175+
BOOST_AUTO_TEST_CASE(future_value)
176+
{
177+
boost::asio::io_context io;
178+
179+
AsioPromise<bool> promise;
180+
std::atomic_bool before = false;
181+
std::atomic_bool after = false;
182+
183+
IoEngine::SpawnCoroutine(io, [&, future = promise.GetFuture()](boost::asio::yield_context yc) {
184+
before = true;
185+
auto val = future->Get(yc);
186+
BOOST_REQUIRE(val);
187+
after = true;
188+
});
189+
190+
std::thread testThread
191+
{
192+
[&io]() {
193+
io.run();
194+
}};
195+
196+
while (!before) {
197+
Utility::Sleep(0.01);
198+
}
199+
200+
BOOST_REQUIRE(before);
201+
BOOST_REQUIRE(!after);
202+
203+
promise.SetValue(true);
204+
205+
while (!after) {
206+
Utility::Sleep(0.01);
207+
}
208+
209+
BOOST_REQUIRE(before);
210+
BOOST_REQUIRE(after);
211+
212+
testThread.join();
213+
}
214+
215+
BOOST_AUTO_TEST_CASE(future_early_exception)
216+
{
217+
boost::asio::io_context io;
218+
219+
AsioPromise<bool> promise;
220+
promise.SetException(std::runtime_error{"test"});
221+
222+
IoEngine::SpawnCoroutine(io, [&, future = promise.GetFuture()](boost::asio::yield_context yc) {
223+
BOOST_REQUIRE_EXCEPTION(future->Get(yc), std::runtime_error, [](const std::exception& ex) -> bool {
224+
return std::string_view{ex.what()} == "test";
225+
});
226+
});
227+
228+
io.run();
229+
}
230+
231+
BOOST_AUTO_TEST_CASE(future_exception)
232+
{
233+
boost::asio::io_context io;
234+
235+
AsioPromise<bool> promise;
236+
std::atomic_bool before = false;
237+
std::atomic_bool after = false;
238+
239+
IoEngine::SpawnCoroutine(io, [&, future = promise.GetFuture()](boost::asio::yield_context yc) {
240+
before = true;
241+
BOOST_REQUIRE_EXCEPTION(future->Get(yc), std::runtime_error, [](const std::exception& ex) -> bool {
242+
return std::string_view{ex.what()} == "test";
243+
});
244+
after = true;
245+
});
246+
247+
std::thread testThread
248+
{
249+
[&io]() {
250+
io.run();
251+
}};
252+
253+
while (!before) {
254+
Utility::Sleep(0.01);
255+
}
256+
257+
BOOST_REQUIRE(before);
258+
BOOST_REQUIRE(!after);
259+
260+
promise.SetException(std::runtime_error{"test"});
261+
262+
while (!after) {
263+
Utility::Sleep(0.01);
264+
}
265+
266+
BOOST_REQUIRE(before);
267+
BOOST_REQUIRE(after);
268+
269+
testThread.join();
270+
}
271+
159272
BOOST_AUTO_TEST_SUITE_END()

0 commit comments

Comments
 (0)