Merge pull request #2 from linuxnyasha/parallel_initialisation

Parallel initialisation components
This commit is contained in:
sha512sum 2024-04-09 13:31:13 +00:00 committed by GitHub
commit 6e79e69a2c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,5 +1,6 @@
#pragma once
#include <cserver/engine/basic/task_processor.hpp>
#include <cserver/engine/coroutine.hpp>
#include <utempl/utils.hpp>
#include <thread>
@ -63,38 +64,105 @@ struct ConstexprConfig {
};
};
template <ConstexprConfig config, auto names, auto Options, typename... Ts>
namespace impl {
template <typename InitFlag, std::size_t>
inline constexpr auto GetInitFlagFor(auto&... args) -> InitFlag& {
static InitFlag flag{args...};
return flag;
};
template <typename InitFlag, utempl::Tuple Tuple>
inline constexpr auto TransformDependencyGraphToInited(auto&&... args) {
return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
return utempl::Tuple{[&]<auto Array>(utempl::Wrapper<Array>){
return [&]<std::size_t... IIs>(std::index_sequence<IIs...>){
return std::array<InitFlag*, Array.size()>{&GetInitFlagFor<InitFlag, Array[IIs]>(args...)...};
}(std::make_index_sequence<Array.size()>());
}(utempl::Wrapper<Get<Is>(Tuple)>{})...};
}(std::make_index_sequence<utempl::kTupleSize<decltype(Tuple)>>());
};
struct AsyncConditionVariable {
inline AsyncConditionVariable(boost::asio::io_context& ioContext) :
mtx{},
deadlineTimer(ioContext),
flag{} {};
template <typename Token = boost::asio::use_awaitable_t<>>
inline auto AsyncWait(Token&& token = {}) -> Task<> {
return boost::asio::async_initiate<
Token,
void(void)>([this]<typename Handler>(Handler&& handler) -> void {
std::unique_lock lc(this->mtx);
if(this->flag) {
return handler();
};
this->deadlineTimer.async_wait([h = std::make_unique<Handler>(std::forward<Handler>(handler))](const boost::system::error_code&){
return (*h)();
});
}, token);
};
inline auto Wait() -> void {
this->deadlineTimer.wait();
};
inline auto NotifyOne() {
std::unique_lock lc(this->mtx);
this->deadlineTimer.cancel_one();
};
inline auto NotifyAll() {
std::unique_lock lc(this->mtx);
this->flag = true;
this->deadlineTimer.cancel();
};
std::mutex mtx;
boost::asio::deadline_timer deadlineTimer;
bool flag;
};
template <utempl::Tuple DependencyGraph, typename T>
inline constexpr auto InitComponents(T& ccontext) -> void {
static auto& context = ccontext;
static auto& ioContext = context.template FindComponent<kBasicTaskProcessorName>().ioContext;
static utempl::Tuple inited = TransformDependencyGraphToInited<AsyncConditionVariable, DependencyGraph>(ioContext);
[]<std::size_t... Is>(std::index_sequence<Is...>){
(boost::asio::co_spawn(ioContext, []<auto I>(utempl::Wrapper<I>) -> cserver::Task<> {
auto& dependencies = Get<I>(inited);
for(auto* flag : dependencies) {
co_await flag->AsyncWait();
};
Get<I>(context.storage).emplace(utempl::Wrapper<Get<I>(T::kNames)>{}, context);
auto& componentInitFlag = GetInitFlagFor<AsyncConditionVariable, I>(ioContext);
componentInitFlag.NotifyAll();
if constexpr(requires{Get<I>(context.storage)->Run();}) {
Get<I>(context.storage)->Run();
};
}(utempl::Wrapper<Is>{}), boost::asio::detached), ...);
}(std::make_index_sequence<utempl::kTupleSize<decltype(DependencyGraph)>>());
};
} // namespace impl
template <ConstexprConfig config, utempl::Tuple DependencyGraph, auto names, auto Options, typename... Ts>
struct ServiceContext {
static constexpr auto kNames = names;
static constexpr auto kConfig = config;
static constexpr auto kList = utempl::TypeList<Ts...>{};
static constexpr auto kThreadsCount = config.template Get<"threads">();
engine::basic::TaskProcessor<kThreadsCount> taskProcessor;
utempl::Tuple<Ts...> storage;
engine::basic::TaskProcessor<kThreadsCount - 1> taskProcessor;
utempl::Tuple<std::optional<Ts>...> storage;
inline constexpr ServiceContext() :
taskProcessor{utempl::Wrapper<utempl::ConstexprString{kBasicTaskProcessorName}>{}, *this},
storage{
[&]<auto... Is>(std::index_sequence<Is...>) -> utempl::Tuple<Ts...> {
return {[&]<auto I>(utempl::Wrapper<I>) {
return [&] -> std::remove_cvref_t<decltype(Get<I>(storage))> {
return {utempl::Wrapper<Get<I>(names)>{}, *this};
};
}(utempl::Wrapper<Is>{})...};
}(std::index_sequence_for<Ts...>())
} {};
taskProcessor{utempl::Wrapper<utempl::ConstexprString{kBasicTaskProcessorName}>{}, *this}
,storage{} {
};
inline constexpr auto Run() {
this->taskProcessor.Run();
[&]<auto... Is>(std::index_sequence<Is...>) {
([](auto& component){
if constexpr(requires{component.Run();}) {
component.Run();
};
}(Get<Is>(this->storage)), ...);
}(std::index_sequence_for<Ts...>());
impl::InitComponents<DependencyGraph>(*this);
};
template <utempl::ConstexprString name>
inline constexpr auto FindComponent() -> auto& {
constexpr auto I = utempl::Find(names, name);
return Get<I>(this->storage);
return *Get<I>(this->storage);
};
template <>
inline constexpr auto FindComponent<kBasicTaskProcessorName>() -> auto& {
@ -103,7 +171,7 @@ struct ServiceContext {
template <typename T>
inline constexpr auto FindComponent() -> T& {
constexpr auto I = utempl::Find<std::remove_cvref_t<T>>(utempl::kTypeList<Ts...>);
return Get<I>(this->storage);
return *Get<I>(this->storage);
};
};
@ -170,8 +238,8 @@ struct DependencyInfoInjector {
private:
template <utempl::ConstexprString name, utempl::ConstexprString... names, typename... TTs, Options... Options>
static consteval auto FindComponentTypeImpl(ComponentConfig<names, TTs, Options>...) {
if constexpr(name == kBasicTaskProcessorName) {
return [] -> engine::basic::TaskProcessor<config.template Get<"threads">()> {}();
if constexpr(static_cast<std::string_view>(name) == kBasicTaskProcessorName) {
return [] -> engine::basic::TaskProcessor<config.template Get<"threads">() - 1> {}();
} else {
constexpr auto I = utempl::Find(utempl::Tuple{std::string_view{names}...}, std::string_view{name});
return [] -> decltype(utempl::Get<I>(utempl::TypeList<TTs...>{})) {}();
@ -232,7 +300,7 @@ public:
};
} else {
constexpr auto name = Magic(loopholes::Getter<DependencyInfoKey<Current, I>{}>{});
if constexpr(name == kBasicTaskProcessorName) {
if constexpr(static_cast<std::string_view>(name) == kBasicTaskProcessorName) {
return utempl::Tuple{names...};
} else {
return utempl::Tuple{names..., name};
@ -366,7 +434,7 @@ struct ServiceContextBuilder {
template <utempl::ConstexprString name>
static consteval auto FindComponent() {
if constexpr(name == kBasicTaskProcessorName) {
return [] -> engine::basic::TaskProcessor<config.template Get<"threads">()> {}();
return [] -> engine::basic::TaskProcessor<config.template Get<"threads">() - 1> {}();
} else {
return []<typename... TTs, utempl::ConstexprString... names, Options... Options>
(const ServiceContextBuilder<config, ComponentConfig<names, TTs, Options>...>&)
@ -392,6 +460,15 @@ struct ServiceContextBuilder {
}(ComponentConfigs{}...);
};
static consteval auto GetIndexDependencyGraph() {
return []<utempl::ConstexprString... Names, utempl::Tuple... Dependencies>
(DependencyGraph<DependencyGraphElement<Names, Dependencies>...>){
return utempl::Tuple{Unpack(Dependencies, [](auto... dependencies) -> std::array<std::size_t, sizeof...(dependencies)> {
return {Find(utempl::Tuple{Names...}, dependencies)...};
})...};
}(GetDependencyGraph());
};
static consteval auto Sort() {
constexpr auto sorted = TopologicalSort(GetDependencyGraph());
return [&]<std::size_t... Is>(std::index_sequence<Is...>){
@ -403,16 +480,14 @@ struct ServiceContextBuilder {
static constexpr auto GetServiceContext() {
return []<utempl::ConstexprString... names, typename... TTs, Options... Options>
(utempl::TypeList<ComponentConfig<names, TTs, Options>...>) {
return ServiceContext<config, utempl::Tuple{names...}, utempl::Tuple{Options...}, TTs...>{};
return ServiceContext<config, GetIndexDependencyGraph(), utempl::Tuple{names...}, utempl::Tuple{Options...}, TTs...>{};
}(utempl::TypeList<ComponentConfigs...>{});
};
static constexpr auto Run() -> void {
static auto context = GetServiceContext();
context.Run();
for(;;) {
std::this_thread::sleep_for(std::chrono::minutes(1));
};
context.template FindComponent<kBasicTaskProcessorName>().ioContext.run();
};
};
} // namespace cserver