Add threadId to coroutines

This commit is contained in:
sha512sum 2024-03-19 15:20:41 +00:00
parent 84c562f45d
commit 773b393d96

View file

@ -3,7 +3,433 @@
namespace cserver { namespace cserver {
template <typename T> namespace this_coro {
struct ThreadIdGetter : std::suspend_never {
std::size_t threadId;
inline constexpr auto await_resume() -> std::size_t {
return this->threadId;
};
};
inline constexpr auto kGetThreadId = ThreadIdGetter{};
struct SetThreadId : public std::suspend_never {
std::size_t threadId;
};
} // namespace this_coro
template <typename T = void>
using Task = boost::asio::awaitable<T>; using Task = boost::asio::awaitable<T>;
template <typename...>
struct TaskAwaitable {};
} // namespace cserver } // namespace cserver
namespace boost::asio::detail {
/*
Part of this code is taken from boost asio
Boost Software License - Version 1.0 - August 17th, 2003
Permission is hereby granted, free of charge, to any person or organization
obtaining a copy of the software and accompanying documentation covered by
this license (the "Software") to use, reproduce, display, distribute,
execute, and transmit the Software, and to prepare derivative works of the
Software, and to permit third-parties to whom the Software is furnished to
do so, all subject to the following:
The copyright notices in the Software and this entire statement, including
the above license grant, this restriction and the following disclaimer,
must be included in all copies of the Software, in whole or in part, and
all derivative works of the Software, unless such copies or derivative
works are solely in the form of machine-executable object code generated by
a source language processor.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
*/
template <>
struct awaitable_frame_base<any_io_executor> {
public:
using Executor = any_io_executor;
#if !defined(BOOST_ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING)
auto constexpr operator new(std::size_t size) -> void* {
return boost::asio::detail::thread_info_base::allocate(
boost::asio::detail::thread_info_base::awaitable_frame_tag(),
boost::asio::detail::thread_context::top_of_thread_call_stack(),
size);
};
inline constexpr auto operator delete(void* pointer, std::size_t size) -> void {
boost::asio::detail::thread_info_base::deallocate(
boost::asio::detail::thread_info_base::awaitable_frame_tag(),
boost::asio::detail::thread_context::top_of_thread_call_stack(),
pointer, size);
};
#endif // !defined(BOOST_ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING)
// The frame starts in a suspended state until the awaitable_thread object
// pumps the stack.
inline constexpr auto initial_suspend() noexcept {
return suspend_always();
};
// On final suspension the frame is popped from the top of the stack.
inline constexpr auto final_suspend() noexcept {
struct Result {
awaitable_frame_base* this_;
inline constexpr auto await_ready() const noexcept -> bool {
return false;
};
inline constexpr auto await_suspend(coroutine_handle<void>) noexcept -> void {
this->this_->pop_frame();
};
inline constexpr auto await_resume() const noexcept -> void {};
};
return Result{this};
};
inline constexpr auto set_except(std::exception_ptr e) noexcept -> void {
pending_exception_ = e;
};
inline constexpr auto set_error(const boost::system::error_code& ec) -> void {
this->set_except(std::make_exception_ptr(boost::system::system_error(ec)));
};
inline constexpr auto unhandled_exception() -> void {
set_except(std::current_exception());
};
inline constexpr auto rethrow_exception() -> void {
if(pending_exception_) {
std::exception_ptr ex = std::exchange(pending_exception_, nullptr);
std::rethrow_exception(ex);
};
};
inline constexpr auto clear_cancellation_slot() -> void {
this->attached_thread_->entry_point()->cancellation_state_.slot().clear();
};
template <typename T>
inline constexpr auto await_transform(awaitable<T, Executor> a) const -> awaitable<T, Executor> {
if (attached_thread_->entry_point()->throw_if_cancelled_) {
if (!!attached_thread_->get_cancellation_state().cancelled()) {
throw_error(boost::asio::error::operation_aborted, "co_await");
};
};
return a;
};
template <typename Op>
inline constexpr auto await_transform(Op&& op,
constraint_t<is_async_operation<Op>::value> = 0
#if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
# if defined(BOOST_ASIO_HAS_SOURCE_LOCATION)
, detail::source_location location = detail::source_location::current()
# endif // defined(BOOST_ASIO_HAS_SOURCE_LOCATION)
#endif // defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
) {
if (attached_thread_->entry_point()->throw_if_cancelled_) {
if (!!attached_thread_->get_cancellation_state().cancelled()) {
throw_error(boost::asio::error::operation_aborted, "co_await");
};
};
return awaitable_async_op<
completion_signature_of_t<Op>, decay_t<Op>, Executor>{
std::forward<Op>(op), this
#if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
# if defined(BOOST_ASIO_HAS_SOURCE_LOCATION)
, location
# endif // defined(BOOST_ASIO_HAS_SOURCE_LOCATION)
#endif // defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
};
};
// This await transformation obtains the associated executor of the thread of
// execution.
inline constexpr auto await_transform(this_coro::executor_t) noexcept {
struct Result {
awaitable_frame_base* this_;
inline constexpr auto await_ready() const noexcept -> bool {
return true;
};
inline constexpr auto await_suspend(coroutine_handle<void>) noexcept -> void {};
inline constexpr auto await_resume() const noexcept {
return this_->attached_thread_->get_executor();
};
};
return Result{this};
};
// This await transformation obtains the associated cancellation state of the
// thread of execution.
inline constexpr auto await_transform(this_coro::cancellation_state_t) noexcept {
struct Result {
awaitable_frame_base* this_;
inline constexpr auto await_ready() const noexcept -> bool {
return true;
};
inline constexpr auto await_suspend(coroutine_handle<void>) noexcept -> void {};
auto await_resume() const noexcept {
return this_->attached_thread_->get_cancellation_state();
};
};
return Result{this};
};
// This await transformation resets the associated cancellation state.
inline constexpr auto await_transform(this_coro::reset_cancellation_state_0_t) noexcept {
struct Result {
awaitable_frame_base* this_;
inline constexpr auto await_ready() const noexcept -> bool {
return true;
};
inline constexpr auto await_suspend(coroutine_handle<void>) noexcept -> void {};
inline constexpr auto await_resume() const {
return this_->attached_thread_->reset_cancellation_state();
};
};
return Result{this};
};
// This await transformation resets the associated cancellation state.
template <typename Filter>
inline constexpr auto await_transform(
this_coro::reset_cancellation_state_1_t<Filter> reset) noexcept {
struct Result {
awaitable_frame_base* this_;
Filter filter_;
inline constexpr auto await_ready() const noexcept -> bool {
return true;
};
inline constexpr auto await_suspend(coroutine_handle<void>) noexcept -> void {};
inline constexpr auto await_resume() {
return this_->attached_thread_->reset_cancellation_state(
static_cast<Filter&&>(filter_));
};
};
return Result{this, static_cast<Filter&&>(reset.filter)};
}
template <typename T>
inline constexpr auto await_transform(cserver::TaskAwaitable<T> awaitable) -> cserver::TaskAwaitable<T> {
return awaitable;
};
// This await transformation resets the associated cancellation state.
template <typename InFilter, typename OutFilter>
inline constexpr auto await_transform(
this_coro::reset_cancellation_state_2_t<InFilter, OutFilter> reset) noexcept {
struct Result {
awaitable_frame_base* this_;
InFilter in_filter_;
OutFilter out_filter_;
inline constexpr auto await_ready() const noexcept -> bool {
return true;
};
inline constexpr auto await_suspend(coroutine_handle<void>) noexcept -> void {};
inline constexpr auto await_resume() {
return this_->attached_thread_->reset_cancellation_state(
static_cast<InFilter&&>(in_filter_),
static_cast<OutFilter&&>(out_filter_));
};
};
return Result{this,
static_cast<InFilter&&>(reset.in_filter),
static_cast<OutFilter&&>(reset.out_filter)};
};
// This await transformation determines whether cancellation is propagated as
// an exception.
inline constexpr auto await_transform(this_coro::throw_if_cancelled_0_t) noexcept {
struct Result {
awaitable_frame_base* this_;
inline constexpr auto await_ready() const noexcept -> bool {
return true;
};
inline constexpr auto await_suspend(coroutine_handle<void>) noexcept -> void {};
inline constexpr auto await_resume() {
return this_->attached_thread_->throw_if_cancelled();
};
};
return Result{this};
}
// This await transformation sets whether cancellation is propagated as an
// exception.
inline constexpr auto await_transform(this_coro::throw_if_cancelled_1_t throw_if_cancelled) noexcept {
struct Result {
awaitable_frame_base* this_;
bool value_;
inline constexpr auto await_ready() const noexcept -> bool {
return true;
};
inline constexpr auto await_suspend(coroutine_handle<void>) noexcept -> void {};
inline constexpr auto await_resume() {
this_->attached_thread_->throw_if_cancelled(value_);
};
};
return Result{this, throw_if_cancelled.value};
}
// This await transformation is used to run an async operation's initiation
// function object after the coroutine has been suspended. This ensures that
// immediate resumption of the coroutine in another thread does not cause a
// race condition.
template <typename Function>
inline constexpr auto await_transform(Function f,
enable_if_t<
is_convertible<
result_of_t<Function(awaitable_frame_base*)>,
awaitable_thread<Executor>*
>::value
>* = nullptr) {
struct Result {
Function function_;
awaitable_frame_base* this_;
inline constexpr auto await_ready() const noexcept -> bool {
return false;
};
inline constexpr auto await_suspend(coroutine_handle<void>) noexcept -> void {
this_->after_suspend(
[](void* arg)
{
Result* r = static_cast<Result*>(arg);
r->function_(r->this_);
}, this);
};
inline constexpr auto await_resume() const noexcept -> void {};
};
return Result{std::move(f), this};
};
// Access the awaitable thread's has_context_switched_ flag.
inline constexpr auto await_transform(detail::awaitable_thread_has_context_switched) noexcept {
struct Result {
awaitable_frame_base* this_;
inline constexpr auto await_ready() const noexcept -> bool {
return true;
};
inline constexpr auto await_suspend(coroutine_handle<void>) noexcept -> void {};
inline constexpr auto await_resume() const noexcept -> bool& {
return this_->attached_thread_->entry_point()->has_context_switched_;
};
};
return Result{this};
};
inline constexpr auto attach_thread(awaitable_thread<Executor>* handler) noexcept -> void {
attached_thread_ = handler;
};
inline constexpr auto detach_thread() noexcept ->awaitable_thread<Executor>* {
attached_thread_->entry_point()->has_context_switched_ = true;
return std::exchange(attached_thread_, nullptr);
};
inline constexpr auto push_frame(awaitable_frame_base<Executor>* caller) noexcept -> void {
this->threadId = caller->threadId;
this->caller_ = caller;
this->attached_thread_ = caller_->attached_thread_;
this->attached_thread_->entry_point()->top_of_stack_ = this;
this->caller_->attached_thread_ = nullptr;
};
inline constexpr auto pop_frame() noexcept -> void {
if (caller_) {
caller_->attached_thread_ = attached_thread_;
};
attached_thread_->entry_point()->top_of_stack_ = caller_;
attached_thread_ = nullptr;
caller_ = nullptr;
};
struct resume_context {
void (*after_suspend_fn_)(void*) = nullptr;
void *after_suspend_arg_ = nullptr;
};
inline constexpr auto resume() -> void {
resume_context context;
resume_context_ = &context;
coro_.resume();
if (context.after_suspend_fn_) {
context.after_suspend_fn_(context.after_suspend_arg_);
};
};
inline constexpr auto after_suspend(void (*fn)(void*), void* arg) -> void {
resume_context_->after_suspend_fn_ = fn;
resume_context_->after_suspend_arg_ = arg;
};
inline constexpr auto destroy() -> void {
coro_.destroy();
};
inline constexpr auto await_transform(cserver::this_coro::ThreadIdGetter awaitable) const noexcept -> cserver::this_coro::ThreadIdGetter {
awaitable.threadId = this->threadId;
return awaitable;
};
inline constexpr auto await_transform(cserver::this_coro::SetThreadId awaitable) noexcept -> cserver::this_coro::SetThreadId {
this->threadId = awaitable.threadId;
return awaitable;
};
std::size_t threadId{};
protected:
coroutine_handle<void> coro_ = nullptr;
awaitable_thread<Executor>* attached_thread_ = nullptr;
awaitable_frame_base<Executor>* caller_ = nullptr;
std::exception_ptr pending_exception_ = nullptr;
resume_context* resume_context_ = nullptr;
};
} // namespace boost::asio::detail