From f3e8ff5d73d19ca43a4d2605f7d01d02147d0676 Mon Sep 17 00:00:00 2001 From: sha512sum Date: Thu, 14 Mar 2024 18:47:37 +0000 Subject: [PATCH] Add Streaming API --- include/cserver/clients/http/component.hpp | 102 ++++++++++++------ include/cserver/clients/http/request.hpp | 12 ++- include/cserver/clients/http/response.hpp | 36 +++++++ include/cserver/engine/use_streaming.hpp | 9 ++ .../server/handlers/http_handler_base.hpp | 18 +++- include/cserver/server/http/http_stream.hpp | 51 +++++++++ include/cserver/server/server/server.hpp | 27 +++-- 7 files changed, 208 insertions(+), 47 deletions(-) create mode 100644 include/cserver/clients/http/response.hpp create mode 100644 include/cserver/engine/use_streaming.hpp create mode 100644 include/cserver/server/http/http_stream.hpp diff --git a/include/cserver/clients/http/component.hpp b/include/cserver/clients/http/component.hpp index 79432dc..10d4b6e 100644 --- a/include/cserver/clients/http/component.hpp +++ b/include/cserver/clients/http/component.hpp @@ -1,9 +1,10 @@ #pragma once #include -#include +#include #include #include #include +#include #include #include #include @@ -13,7 +14,7 @@ namespace cserver::server::http { inline constexpr auto ParseHttpHeader(std::string header) -> std::pair { size_t pos = header.find(":"); std::string key = header.substr(0, pos); - std::string value = header.substr(pos + 2); + std::string value = header.substr(pos + 2, header.size() - 1); return std::make_pair(key, value); }; @@ -41,24 +42,24 @@ struct HttpClient { return {}; }); }; - template - inline auto PerformRequest(T&& request) -> cserver::Task { - boost::asio::ssl::stream socket(this->taskProcessor.ioContext, this->ctx); - boost::asio::ip::tcp::resolver::iterator endpoint = - co_await this->resolver.async_resolve({request.request.url.host(), request.request.url.has_port() ? request.request.url.port() : request.request.url.scheme()}, boost::asio::use_awaitable); - auto [ec, _] = co_await boost::asio::async_connect(socket.lowest_layer(), endpoint, boost::asio::as_tuple(boost::asio::use_awaitable)); - if(ec) { - throw BoostErrorWrapper{ec}; - }; - co_await socket.async_handshake(boost::asio::ssl::stream_base::client, boost::asio::redirect_error(boost::asio::use_awaitable, ec)); - if(ec) { - throw BoostErrorWrapper{ec}; - }; - std::string req(request.request.ToString()); - co_await boost::asio::async_write(socket, boost::asio::buffer(req.data(), req.size()), boost::asio::redirect_error(boost::asio::use_awaitable, ec)); - if(ec) { - throw BoostErrorWrapper{ec}; + +private: + template + static consteval auto GetPerformReturnType(Flags...) { + constexpr auto kUseStreaming = utempl::Find(utempl::kTypeList) != sizeof...(Flags); + if constexpr(kUseStreaming) { + return [] -> clients::http::Response, Socket> { + std::unreachable(); + }(); + } else { + return [] -> server::http::HTTPResponse { + std::unreachable(); + }(); }; + }; + template + inline auto ReadHeaders(Socket&& socket, auto&&...) const -> cserver::Task { + boost::system::error_code ec; std::string serverResponse; co_await boost::asio::async_read_until(socket, boost::asio::dynamic_buffer(serverResponse), "\r\n\r\n", boost::asio::redirect_error(boost::asio::use_awaitable, ec)); if(ec) { @@ -73,20 +74,24 @@ struct HttpClient { while(std::getline(responseStream, header) && header.find(":") != std::string::npos) { response.headers.insert(server::http::ParseHttpHeader(std::move(header))); }; - if(response.statusCode != 200) { - co_return response; - }; - if(response.headers.contains("Content-Length")) { - auto size = std::stoi(response.headers["Content-Length"]); - response.body.reserve(size); - co_await boost::asio::async_read(socket, boost::asio::dynamic_buffer(response.body), boost::asio::transfer_at_least(size), boost::asio::redirect_error(boost::asio::use_awaitable, ec)); - if(ec) { - throw BoostErrorWrapper{ec}; - }; - co_return response; + co_return response; + }; + template + inline auto ReadBody(Socket&& socket, std::size_t length, auto&&...) const -> cserver::Task { + std::string response; + response.reserve(length); + boost::system::error_code ec; + co_await boost::asio::async_read(socket, boost::asio::dynamic_buffer(response), boost::asio::transfer_at_least(length), boost::asio::redirect_error(boost::asio::use_awaitable, ec)); + if(ec) { + throw BoostErrorWrapper{ec}; }; + co_return response; + }; + template + inline auto ReadBody(Socket&& socket, auto&&...) const -> cserver::Task { + std::string response; for(;;) { - auto [ec, n] = co_await boost::asio::async_read(socket, boost::asio::dynamic_buffer(response.body), boost::asio::transfer_at_least(1), boost::asio::as_tuple(boost::asio::use_awaitable)); + auto [ec, n] = co_await boost::asio::async_read(socket, boost::asio::dynamic_buffer(response), boost::asio::transfer_at_least(1), boost::asio::as_tuple(boost::asio::use_awaitable)); if(ec && ec == boost::asio::error::eof) { break; }; @@ -96,6 +101,41 @@ struct HttpClient { }; co_return response; }; +public: + template + inline auto PerformRequest(T&& request, Flags... flags) -> cserver::TaskGetPerformReturnType>(flags...))> { + constexpr bool kUseStreaming = utempl::Find(utempl::kTypeList) != sizeof...(Flags); + boost::asio::ssl::stream socket(this->taskProcessor.ioContext, this->ctx); + boost::asio::ip::tcp::resolver::iterator endpoint = + co_await this->resolver.async_resolve({request.request.url.host(), request.request.url.has_port() ? request.request.url.port() : request.request.url.scheme()}, boost::asio::use_awaitable); + auto [ec, _] = co_await boost::asio::async_connect(socket.lowest_layer(), endpoint, boost::asio::as_tuple(boost::asio::use_awaitable)); + if(ec) { + throw BoostErrorWrapper{ec}; + }; + co_await socket.async_handshake(boost::asio::ssl::stream_base::client, boost::asio::redirect_error(boost::asio::use_awaitable, ec)); + if(ec) { + throw BoostErrorWrapper{ec}; + }; + std::string req(request.request.ToString()); + co_await boost::asio::async_write(socket, boost::asio::buffer(req.data(), req.size()), boost::asio::transfer_all(), boost::asio::redirect_error(boost::asio::use_awaitable, ec)); + if(ec) { + throw BoostErrorWrapper{ec}; + }; + auto response = co_await this->ReadHeaders(socket, flags...); + if constexpr(kUseStreaming) { + co_return clients::http::Response, decltype(socket)>{*this, std::move(socket), std::move(response)}; + } else { + if(response.statusCode != 200) { + co_return response; + }; + if(response.headers.contains("Content-Length")) { + auto size = std::stoi(response.headers["Content-Length"]); + response.body = co_await ReadBody(std::move(socket), size, flags...); + }; + response.body = co_await ReadBody(std::move(socket), flags...); + co_return response; + }; + }; }; } // namespace cserver::clients::http diff --git a/include/cserver/clients/http/request.hpp b/include/cserver/clients/http/request.hpp index 68cdb8b..485dc53 100644 --- a/include/cserver/clients/http/request.hpp +++ b/include/cserver/clients/http/request.hpp @@ -81,11 +81,15 @@ struct Request { inline constexpr auto ToString() const -> std::string { return this->request.ToString(); }; - inline auto Perform() && -> cserver::Task { - co_return co_await client.PerformRequest(std::move(*this).AddHeaderIfNotExists("Transfer-Encoding", "Content-Length", std::to_string(this->request.body.size()))); + template + inline auto Perform(Flags&&... flags) && + -> decltype(this->client.PerformRequest(std::move(*this).AddHeaderIfNotExists("Transfer-Encoding", "Content-Length", std::to_string(this->request.body.size())), std::forward(flags)...)) { + co_return co_await this->client.PerformRequest(std::move(*this).AddHeaderIfNotExists("Transfer-Encoding", "Content-Length", std::to_string(this->request.body.size())), std::forward(flags)...); }; - inline auto Perform() & -> cserver::Task { - co_return co_await client.PerformRequest(this->AddHeaderIfNotExists("Transfer-Encoding", "Content-Length", std::to_string(this->request.body.size()))); + template + inline auto Perform(Flags&&... flags) & + -> decltype(this->client.PerformRequest(this->AddHeaderIfNotExists("Transfer-Encoding", "Content-Length", std::to_string(this->request.body.size())), std::forward(flags)...)) { + co_return co_await this->client.PerformRequest(this->AddHeaderIfNotExists("Transfer-Encoding", "Content-Length", std::to_string(this->request.body.size())), std::forward(flags)...); }; }; diff --git a/include/cserver/clients/http/response.hpp b/include/cserver/clients/http/response.hpp new file mode 100644 index 0000000..da9f324 --- /dev/null +++ b/include/cserver/clients/http/response.hpp @@ -0,0 +1,36 @@ +#pragma once +#include +#include +#include +#include + +namespace cserver::clients::http { + +template +class Response : public server::http::HTTPResponse { + HttpClient& client; + Socket socket; + +public: + inline auto ReadChunk() -> cserver::Task { + this->body.resize(4479); + auto [ec, n] = co_await this->socket.async_read_some(boost::asio::buffer(this->body.data(), 4479), boost::asio::as_tuple(boost::asio::use_awaitable)); + if(ec == boost::asio::error::eof || ec == boost::asio::error::operation_aborted || ec == boost::asio::ssl::error::stream_truncated) { + co_return false; + }; + if(ec) { + throw BoostErrorWrapper{ec}; + }; + + this->body.resize(n); + co_return true; + }; + inline constexpr Response(Response&&) = default; + inline constexpr Response(const Response&) = default; + inline constexpr Response(HttpClient& client, Socket socket, server::http::HTTPResponse response) : + client(client), + socket(std::move(socket)), + HTTPResponse(std::move(response)) {}; +}; + +} // namespace cserver::clients::http diff --git a/include/cserver/engine/use_streaming.hpp b/include/cserver/engine/use_streaming.hpp new file mode 100644 index 0000000..f881dd5 --- /dev/null +++ b/include/cserver/engine/use_streaming.hpp @@ -0,0 +1,9 @@ +#pragma once + +namespace cserver { + +struct UseStreaming {}; + +inline constexpr auto kUseStreaming = UseStreaming{}; + +} // namespace cserver diff --git a/include/cserver/server/handlers/http_handler_base.hpp b/include/cserver/server/handlers/http_handler_base.hpp index 2e88002..14fdf7f 100644 --- a/include/cserver/server/handlers/http_handler_base.hpp +++ b/include/cserver/server/handlers/http_handler_base.hpp @@ -2,6 +2,7 @@ #include #include #include +#include #include namespace cserver::server::handlers { @@ -15,18 +16,29 @@ struct HTTPHandlerBase { }); }; inline auto HandleRequest(cserver::server::http::HTTPRequest&& request, - cserver::server::http::HTTPResponse& response) -> cserver::Task { + cserver::server::http::HTTPResponse& response) -> Task requires requires(T t) {t.HandleRequestThrow(std::move(request), response);} { try { co_return co_await static_cast(*this).HandleRequestThrow(std::move(request), response); } catch(const std::exception& err) { - fmt::println("Error in {}: {}", __PRETTY_FUNCTION__, err.what()); + fmt::println("Error in handler with default name {}: {}", T::kName, err.what()); } catch(...) { - fmt::println("Error in {}: Unknown Error", __PRETTY_FUNCTION__); + fmt::println("Error in handler with default name {}: Unknown Error", T::kName); }; response.statusCode = 500; response.statusMessage = "Internal Server Error"; co_return "Internal Server Error"; }; + inline auto HandleRequestStream(cserver::server::http::HTTPRequest&& request, + cserver::server::http::HTTPStream& stream) -> Task requires requires(T t) {t.HandleRequestStreamThrow(std::move(request), stream);} { + try { + co_await static_cast(*this).HandleRequestStreamThrow(std::move(request), stream); + } catch(const std::exception& err) { + fmt::println("Error in handler with default name {}: {}", T::kName, err.what()); + } catch(...) { + fmt::println("Error in handler with default name {}: Unknown Error", T::kName); + }; + co_await stream.Close(); + }; inline constexpr HTTPHandlerBase(auto, auto&) {}; }; diff --git a/include/cserver/server/http/http_stream.hpp b/include/cserver/server/http/http_stream.hpp new file mode 100644 index 0000000..8bec9ca --- /dev/null +++ b/include/cserver/server/http/http_stream.hpp @@ -0,0 +1,51 @@ +#pragma once +#include +#include +#include +#include + +namespace cserver::server::http { + +struct HTTPStream { + boost::asio::ip::tcp::socket socket; + std::stringstream stream = {}; + inline auto SetMethod(std::string method) -> Task { + method += " "; + auto [ec, n] = co_await boost::asio::async_write(this->socket, boost::asio::buffer(method.data(), method.size()), boost::asio::transfer_all(), boost::asio::as_tuple(boost::asio::use_awaitable)); + if(ec) { + throw BoostErrorWrapper{ec}; + }; + }; + inline auto SetStatus(std::string status) -> Task { + status = fmt::format("HTTP/1.1 {}\r\n", std::move(status)); + auto [ec, n] = co_await boost::asio::async_write(this->socket, boost::asio::buffer(status.data(), status.size()), boost::asio::transfer_all(), boost::asio::as_tuple(boost::asio::use_awaitable)); + if(ec) { + throw BoostErrorWrapper{ec}; + }; + }; + + inline auto SetHeader(std::string first, std::string second) -> Task { + this->stream << fmt::format("{}: {}\r\n", std::move(first), std::move(second)); + co_return; + }; + inline auto SetEndOfHeaders() -> Task { + this->stream << "\r\n"; + auto str = this->stream.str(); + auto [ec, n] = co_await boost::asio::async_write(this->socket, boost::asio::buffer(str.data(), str.size()), boost::asio::transfer_all(), boost::asio::as_tuple(boost::asio::use_awaitable)); + if(ec) { + throw BoostErrorWrapper{ec}; + }; + }; + inline auto PushBodyChunk(std::string_view chunk) -> Task { + auto [ec, n] = co_await boost::asio::async_write(this->socket, boost::asio::buffer(chunk.data(), chunk.size()), boost::asio::transfer_all(), boost::asio::as_tuple(boost::asio::use_awaitable)); + if(ec) { + throw BoostErrorWrapper{ec}; + }; + }; + inline auto Close() -> Task { + this->socket.close(); + co_return; + }; +}; + +} // namespace cserver::server::http diff --git a/include/cserver/server/server/server.hpp b/include/cserver/server/server/server.hpp index 3a4bb6a..13cfe25 100644 --- a/include/cserver/server/server/server.hpp +++ b/include/cserver/server/server/server.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include @@ -51,6 +52,22 @@ struct Server { }, port(T::kConfig.template Get().template Get<"port">()) { }; + template + auto ProcessHandler(Socket&& socket, http::HTTPRequest request) -> Task { + if constexpr(requires(http::HTTPStream& stream){Get(this->handlers).HandleRequestStream(std::move(request), stream);}) { + http::HTTPStream stream{std::move(socket)}; + co_await Get(this->handlers).HandleRequestStream(std::move(request), stream); + co_return; + } else { + http::HTTPResponse response{}; + response.body = co_await Get(this->handlers).HandleRequest(std::move(request), response); + response.headers["Content-Length"] = std::to_string(response.body.size()); + response.headers["Server"] = "cserver/1"; + auto data = response.ToString(); + co_await boost::asio::async_write(socket, boost::asio::buffer(data.data(), data.size()), boost::asio::use_awaitable); + socket.close(); + }; + }; auto Reader(boost::asio::ip::tcp::socket socket) -> Task { std::string buffer; buffer.reserve(socket.available()); @@ -60,15 +77,7 @@ struct Server { co_await [&](std::index_sequence) -> cserver::Task { (co_await [&](utempl::Wrapper) -> cserver::Task { if(request.url.path().substr(0, Get(kPaths).size()) == Get(kPaths)) { - flag = true; - http::HTTPResponse response{}; - response.body = co_await Get(this->handlers).HandleRequest(std::move(request), response); - - response.headers["Content-Length"] = std::to_string(response.body.size()); - response.headers["Server"] = "cserver/1"; - auto data = response.ToString(); - co_await boost::asio::async_write(socket, boost::asio::buffer(data.data(), data.size()), boost::asio::use_awaitable); - socket.close(); + co_await this->ProcessHandler(std::move(socket), std::move(request)); }; }(utempl::Wrapper{}), ...); }(std::index_sequence_for());