From 773b393d96beec0faeb3d0d9c7d1855db17bf459 Mon Sep 17 00:00:00 2001 From: sha512sum Date: Tue, 19 Mar 2024 15:20:41 +0000 Subject: [PATCH] Add threadId to coroutines --- include/cserver/engine/coroutine.hpp | 428 ++++++++++++++++++++++++++- 1 file changed, 427 insertions(+), 1 deletion(-) diff --git a/include/cserver/engine/coroutine.hpp b/include/cserver/engine/coroutine.hpp index 6329c22..e3949c4 100644 --- a/include/cserver/engine/coroutine.hpp +++ b/include/cserver/engine/coroutine.hpp @@ -3,7 +3,433 @@ namespace cserver { -template +namespace this_coro { + +struct ThreadIdGetter : std::suspend_never { + std::size_t threadId; + inline constexpr auto await_resume() -> std::size_t { + return this->threadId; + }; +}; + +inline constexpr auto kGetThreadId = ThreadIdGetter{}; + +struct SetThreadId : public std::suspend_never { + std::size_t threadId; +}; + +} // namespace this_coro + +template using Task = boost::asio::awaitable; +template +struct TaskAwaitable {}; + } // namespace cserver + +namespace boost::asio::detail { +/* +Part of this code is taken from boost asio + +Boost Software License - Version 1.0 - August 17th, 2003 + +Permission is hereby granted, free of charge, to any person or organization +obtaining a copy of the software and accompanying documentation covered by +this license (the "Software") to use, reproduce, display, distribute, +execute, and transmit the Software, and to prepare derivative works of the +Software, and to permit third-parties to whom the Software is furnished to +do so, all subject to the following: + +The copyright notices in the Software and this entire statement, including +the above license grant, this restriction and the following disclaimer, +must be included in all copies of the Software, in whole or in part, and +all derivative works of the Software, unless such copies or derivative +works are solely in the form of machine-executable object code generated by +a source language processor. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT +SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE +FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE, +ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. +*/ +template <> +struct awaitable_frame_base { +public: + using Executor = any_io_executor; +#if !defined(BOOST_ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING) + auto constexpr operator new(std::size_t size) -> void* { + return boost::asio::detail::thread_info_base::allocate( + boost::asio::detail::thread_info_base::awaitable_frame_tag(), + boost::asio::detail::thread_context::top_of_thread_call_stack(), + size); + }; + + inline constexpr auto operator delete(void* pointer, std::size_t size) -> void { + boost::asio::detail::thread_info_base::deallocate( + boost::asio::detail::thread_info_base::awaitable_frame_tag(), + boost::asio::detail::thread_context::top_of_thread_call_stack(), + pointer, size); + }; +#endif // !defined(BOOST_ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING) + + // The frame starts in a suspended state until the awaitable_thread object + // pumps the stack. + inline constexpr auto initial_suspend() noexcept { + return suspend_always(); + }; + + // On final suspension the frame is popped from the top of the stack. + inline constexpr auto final_suspend() noexcept { + struct Result { + awaitable_frame_base* this_; + inline constexpr auto await_ready() const noexcept -> bool { + return false; + }; + + inline constexpr auto await_suspend(coroutine_handle) noexcept -> void { + this->this_->pop_frame(); + }; + + inline constexpr auto await_resume() const noexcept -> void {}; + }; + return Result{this}; + }; + + inline constexpr auto set_except(std::exception_ptr e) noexcept -> void { + pending_exception_ = e; + }; + + inline constexpr auto set_error(const boost::system::error_code& ec) -> void { + this->set_except(std::make_exception_ptr(boost::system::system_error(ec))); + }; + + inline constexpr auto unhandled_exception() -> void { + set_except(std::current_exception()); + }; + + inline constexpr auto rethrow_exception() -> void { + if(pending_exception_) { + std::exception_ptr ex = std::exchange(pending_exception_, nullptr); + std::rethrow_exception(ex); + }; + }; + + inline constexpr auto clear_cancellation_slot() -> void { + this->attached_thread_->entry_point()->cancellation_state_.slot().clear(); + }; + + template + inline constexpr auto await_transform(awaitable a) const -> awaitable { + if (attached_thread_->entry_point()->throw_if_cancelled_) { + if (!!attached_thread_->get_cancellation_state().cancelled()) { + throw_error(boost::asio::error::operation_aborted, "co_await"); + }; + }; + return a; + }; + + template + inline constexpr auto await_transform(Op&& op, + constraint_t::value> = 0 +#if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING) +# if defined(BOOST_ASIO_HAS_SOURCE_LOCATION) + , detail::source_location location = detail::source_location::current() +# endif // defined(BOOST_ASIO_HAS_SOURCE_LOCATION) +#endif // defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING) + ) { + if (attached_thread_->entry_point()->throw_if_cancelled_) { + if (!!attached_thread_->get_cancellation_state().cancelled()) { + throw_error(boost::asio::error::operation_aborted, "co_await"); + }; + }; + + return awaitable_async_op< + completion_signature_of_t, decay_t, Executor>{ + std::forward(op), this +#if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING) +# if defined(BOOST_ASIO_HAS_SOURCE_LOCATION) + , location +# endif // defined(BOOST_ASIO_HAS_SOURCE_LOCATION) +#endif // defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING) + }; + }; + + // This await transformation obtains the associated executor of the thread of + // execution. + inline constexpr auto await_transform(this_coro::executor_t) noexcept { + struct Result { + awaitable_frame_base* this_; + + inline constexpr auto await_ready() const noexcept -> bool { + return true; + }; + + inline constexpr auto await_suspend(coroutine_handle) noexcept -> void {}; + + inline constexpr auto await_resume() const noexcept { + return this_->attached_thread_->get_executor(); + }; + }; + + return Result{this}; + }; + + // This await transformation obtains the associated cancellation state of the + // thread of execution. + inline constexpr auto await_transform(this_coro::cancellation_state_t) noexcept { + struct Result { + awaitable_frame_base* this_; + + inline constexpr auto await_ready() const noexcept -> bool { + return true; + }; + + inline constexpr auto await_suspend(coroutine_handle) noexcept -> void {}; + + auto await_resume() const noexcept { + return this_->attached_thread_->get_cancellation_state(); + }; + }; + + return Result{this}; + }; + + // This await transformation resets the associated cancellation state. + inline constexpr auto await_transform(this_coro::reset_cancellation_state_0_t) noexcept { + struct Result { + awaitable_frame_base* this_; + + inline constexpr auto await_ready() const noexcept -> bool { + return true; + }; + + inline constexpr auto await_suspend(coroutine_handle) noexcept -> void {}; + + inline constexpr auto await_resume() const { + return this_->attached_thread_->reset_cancellation_state(); + }; + }; + + return Result{this}; + }; + + // This await transformation resets the associated cancellation state. + template + inline constexpr auto await_transform( + this_coro::reset_cancellation_state_1_t reset) noexcept { + struct Result { + awaitable_frame_base* this_; + Filter filter_; + + inline constexpr auto await_ready() const noexcept -> bool { + return true; + }; + + inline constexpr auto await_suspend(coroutine_handle) noexcept -> void {}; + + inline constexpr auto await_resume() { + return this_->attached_thread_->reset_cancellation_state( + static_cast(filter_)); + }; + }; + + return Result{this, static_cast(reset.filter)}; + } + template + inline constexpr auto await_transform(cserver::TaskAwaitable awaitable) -> cserver::TaskAwaitable { + return awaitable; + }; + + // This await transformation resets the associated cancellation state. + template + inline constexpr auto await_transform( + this_coro::reset_cancellation_state_2_t reset) noexcept { + struct Result { + awaitable_frame_base* this_; + InFilter in_filter_; + OutFilter out_filter_; + + inline constexpr auto await_ready() const noexcept -> bool { + return true; + }; + + inline constexpr auto await_suspend(coroutine_handle) noexcept -> void {}; + + inline constexpr auto await_resume() { + return this_->attached_thread_->reset_cancellation_state( + static_cast(in_filter_), + static_cast(out_filter_)); + }; + }; + + return Result{this, + static_cast(reset.in_filter), + static_cast(reset.out_filter)}; + }; + + // This await transformation determines whether cancellation is propagated as + // an exception. + inline constexpr auto await_transform(this_coro::throw_if_cancelled_0_t) noexcept { + struct Result { + awaitable_frame_base* this_; + + inline constexpr auto await_ready() const noexcept -> bool { + return true; + }; + + inline constexpr auto await_suspend(coroutine_handle) noexcept -> void {}; + + inline constexpr auto await_resume() { + return this_->attached_thread_->throw_if_cancelled(); + }; + }; + + return Result{this}; + } + + // This await transformation sets whether cancellation is propagated as an + // exception. + inline constexpr auto await_transform(this_coro::throw_if_cancelled_1_t throw_if_cancelled) noexcept { + struct Result { + awaitable_frame_base* this_; + bool value_; + + inline constexpr auto await_ready() const noexcept -> bool { + return true; + }; + + inline constexpr auto await_suspend(coroutine_handle) noexcept -> void {}; + + inline constexpr auto await_resume() { + this_->attached_thread_->throw_if_cancelled(value_); + }; + }; + + return Result{this, throw_if_cancelled.value}; + } + + // This await transformation is used to run an async operation's initiation + // function object after the coroutine has been suspended. This ensures that + // immediate resumption of the coroutine in another thread does not cause a + // race condition. + template + inline constexpr auto await_transform(Function f, + enable_if_t< + is_convertible< + result_of_t, + awaitable_thread* + >::value + >* = nullptr) { + struct Result { + Function function_; + awaitable_frame_base* this_; + + inline constexpr auto await_ready() const noexcept -> bool { + return false; + }; + + inline constexpr auto await_suspend(coroutine_handle) noexcept -> void { + this_->after_suspend( + [](void* arg) + { + Result* r = static_cast(arg); + r->function_(r->this_); + }, this); + }; + + inline constexpr auto await_resume() const noexcept -> void {}; + }; + + return Result{std::move(f), this}; + }; + + // Access the awaitable thread's has_context_switched_ flag. + inline constexpr auto await_transform(detail::awaitable_thread_has_context_switched) noexcept { + struct Result { + awaitable_frame_base* this_; + + inline constexpr auto await_ready() const noexcept -> bool { + return true; + }; + + inline constexpr auto await_suspend(coroutine_handle) noexcept -> void {}; + + inline constexpr auto await_resume() const noexcept -> bool& { + return this_->attached_thread_->entry_point()->has_context_switched_; + }; + }; + + return Result{this}; + }; + + inline constexpr auto attach_thread(awaitable_thread* handler) noexcept -> void { + attached_thread_ = handler; + }; + + inline constexpr auto detach_thread() noexcept ->awaitable_thread* { + attached_thread_->entry_point()->has_context_switched_ = true; + return std::exchange(attached_thread_, nullptr); + }; + + inline constexpr auto push_frame(awaitable_frame_base* caller) noexcept -> void { + this->threadId = caller->threadId; + this->caller_ = caller; + this->attached_thread_ = caller_->attached_thread_; + this->attached_thread_->entry_point()->top_of_stack_ = this; + this->caller_->attached_thread_ = nullptr; + }; + + inline constexpr auto pop_frame() noexcept -> void { + if (caller_) { + caller_->attached_thread_ = attached_thread_; + }; + attached_thread_->entry_point()->top_of_stack_ = caller_; + attached_thread_ = nullptr; + caller_ = nullptr; + }; + + struct resume_context { + void (*after_suspend_fn_)(void*) = nullptr; + void *after_suspend_arg_ = nullptr; + }; + + inline constexpr auto resume() -> void { + resume_context context; + resume_context_ = &context; + coro_.resume(); + if (context.after_suspend_fn_) { + context.after_suspend_fn_(context.after_suspend_arg_); + }; + }; + + inline constexpr auto after_suspend(void (*fn)(void*), void* arg) -> void { + resume_context_->after_suspend_fn_ = fn; + resume_context_->after_suspend_arg_ = arg; + }; + + inline constexpr auto destroy() -> void { + coro_.destroy(); + }; + + inline constexpr auto await_transform(cserver::this_coro::ThreadIdGetter awaitable) const noexcept -> cserver::this_coro::ThreadIdGetter { + awaitable.threadId = this->threadId; + return awaitable; + }; + inline constexpr auto await_transform(cserver::this_coro::SetThreadId awaitable) noexcept -> cserver::this_coro::SetThreadId { + this->threadId = awaitable.threadId; + return awaitable; + }; + std::size_t threadId{}; +protected: + coroutine_handle coro_ = nullptr; + awaitable_thread* attached_thread_ = nullptr; + awaitable_frame_base* caller_ = nullptr; + std::exception_ptr pending_exception_ = nullptr; + resume_context* resume_context_ = nullptr; +}; + +} // namespace boost::asio::detail