Add Streaming API

This commit is contained in:
sha512sum 2024-03-14 18:47:37 +00:00
parent 57903bd14e
commit f3e8ff5d73
7 changed files with 208 additions and 47 deletions

View file

@ -1,9 +1,10 @@
#pragma once #pragma once
#include <cserver/clients/http/request.hpp> #include <cserver/clients/http/request.hpp>
#include <cserver/server/http/http_response.hpp> #include <cserver/clients/http/response.hpp>
#include <cserver/engine/components.hpp> #include <cserver/engine/components.hpp>
#include <cserver/engine/coroutine.hpp> #include <cserver/engine/coroutine.hpp>
#include <cserver/utils/boost_error_wrapper.hpp> #include <cserver/utils/boost_error_wrapper.hpp>
#include <cserver/engine/use_streaming.hpp>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <boost/asio/ssl.hpp> #include <boost/asio/ssl.hpp>
#include <utempl/constexpr_string.hpp> #include <utempl/constexpr_string.hpp>
@ -13,7 +14,7 @@ namespace cserver::server::http {
inline constexpr auto ParseHttpHeader(std::string header) -> std::pair<std::string, std::string> { inline constexpr auto ParseHttpHeader(std::string header) -> std::pair<std::string, std::string> {
size_t pos = header.find(":"); size_t pos = header.find(":");
std::string key = header.substr(0, pos); std::string key = header.substr(0, pos);
std::string value = header.substr(pos + 2); std::string value = header.substr(pos + 2, header.size() - 1);
return std::make_pair(key, value); return std::make_pair(key, value);
}; };
@ -41,24 +42,24 @@ struct HttpClient {
return {}; return {};
}); });
}; };
template <typename T>
inline auto PerformRequest(T&& request) -> cserver::Task<server::http::HTTPResponse> { private:
boost::asio::ssl::stream<boost::asio::ip::tcp::socket> socket(this->taskProcessor.ioContext, this->ctx); template <typename Socket, typename... Flags>
boost::asio::ip::tcp::resolver::iterator endpoint = static consteval auto GetPerformReturnType(Flags...) {
co_await this->resolver.async_resolve({request.request.url.host(), request.request.url.has_port() ? request.request.url.port() : request.request.url.scheme()}, boost::asio::use_awaitable); constexpr auto kUseStreaming = utempl::Find<UseStreaming>(utempl::kTypeList<Flags...>) != sizeof...(Flags);
auto [ec, _] = co_await boost::asio::async_connect(socket.lowest_layer(), endpoint, boost::asio::as_tuple(boost::asio::use_awaitable)); if constexpr(kUseStreaming) {
if(ec) { return [] -> clients::http::Response<HttpClient<TaskProcessor>, Socket> {
throw BoostErrorWrapper{ec}; std::unreachable();
}();
} else {
return [] -> server::http::HTTPResponse {
std::unreachable();
}();
}; };
co_await socket.async_handshake(boost::asio::ssl::stream_base::client, boost::asio::redirect_error(boost::asio::use_awaitable, ec));
if(ec) {
throw BoostErrorWrapper{ec};
};
std::string req(request.request.ToString());
co_await boost::asio::async_write(socket, boost::asio::buffer(req.data(), req.size()), boost::asio::redirect_error(boost::asio::use_awaitable, ec));
if(ec) {
throw BoostErrorWrapper{ec};
}; };
template <typename Socket>
inline auto ReadHeaders(Socket&& socket, auto&&...) const -> cserver::Task<server::http::HTTPResponse> {
boost::system::error_code ec;
std::string serverResponse; std::string serverResponse;
co_await boost::asio::async_read_until(socket, boost::asio::dynamic_buffer(serverResponse), "\r\n\r\n", boost::asio::redirect_error(boost::asio::use_awaitable, ec)); co_await boost::asio::async_read_until(socket, boost::asio::dynamic_buffer(serverResponse), "\r\n\r\n", boost::asio::redirect_error(boost::asio::use_awaitable, ec));
if(ec) { if(ec) {
@ -73,20 +74,24 @@ struct HttpClient {
while(std::getline(responseStream, header) && header.find(":") != std::string::npos) { while(std::getline(responseStream, header) && header.find(":") != std::string::npos) {
response.headers.insert(server::http::ParseHttpHeader(std::move(header))); response.headers.insert(server::http::ParseHttpHeader(std::move(header)));
}; };
if(response.statusCode != 200) {
co_return response; co_return response;
}; };
if(response.headers.contains("Content-Length")) { template <typename Socket>
auto size = std::stoi(response.headers["Content-Length"]); inline auto ReadBody(Socket&& socket, std::size_t length, auto&&...) const -> cserver::Task<std::string> {
response.body.reserve(size); std::string response;
co_await boost::asio::async_read(socket, boost::asio::dynamic_buffer(response.body), boost::asio::transfer_at_least(size), boost::asio::redirect_error(boost::asio::use_awaitable, ec)); response.reserve(length);
boost::system::error_code ec;
co_await boost::asio::async_read(socket, boost::asio::dynamic_buffer(response), boost::asio::transfer_at_least(length), boost::asio::redirect_error(boost::asio::use_awaitable, ec));
if(ec) { if(ec) {
throw BoostErrorWrapper{ec}; throw BoostErrorWrapper{ec};
}; };
co_return response; co_return response;
}; };
template <typename Socket>
inline auto ReadBody(Socket&& socket, auto&&...) const -> cserver::Task<std::string> {
std::string response;
for(;;) { for(;;) {
auto [ec, n] = co_await boost::asio::async_read(socket, boost::asio::dynamic_buffer(response.body), boost::asio::transfer_at_least(1), boost::asio::as_tuple(boost::asio::use_awaitable)); auto [ec, n] = co_await boost::asio::async_read(socket, boost::asio::dynamic_buffer(response), boost::asio::transfer_at_least(1), boost::asio::as_tuple(boost::asio::use_awaitable));
if(ec && ec == boost::asio::error::eof) { if(ec && ec == boost::asio::error::eof) {
break; break;
}; };
@ -96,6 +101,41 @@ struct HttpClient {
}; };
co_return response; co_return response;
}; };
public:
template <typename... Flags, typename T>
inline auto PerformRequest(T&& request, Flags... flags) -> cserver::Task<decltype(this->GetPerformReturnType<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>(flags...))> {
constexpr bool kUseStreaming = utempl::Find<UseStreaming>(utempl::kTypeList<Flags...>) != sizeof...(Flags);
boost::asio::ssl::stream<boost::asio::ip::tcp::socket> socket(this->taskProcessor.ioContext, this->ctx);
boost::asio::ip::tcp::resolver::iterator endpoint =
co_await this->resolver.async_resolve({request.request.url.host(), request.request.url.has_port() ? request.request.url.port() : request.request.url.scheme()}, boost::asio::use_awaitable);
auto [ec, _] = co_await boost::asio::async_connect(socket.lowest_layer(), endpoint, boost::asio::as_tuple(boost::asio::use_awaitable));
if(ec) {
throw BoostErrorWrapper{ec};
};
co_await socket.async_handshake(boost::asio::ssl::stream_base::client, boost::asio::redirect_error(boost::asio::use_awaitable, ec));
if(ec) {
throw BoostErrorWrapper{ec};
};
std::string req(request.request.ToString());
co_await boost::asio::async_write(socket, boost::asio::buffer(req.data(), req.size()), boost::asio::transfer_all(), boost::asio::redirect_error(boost::asio::use_awaitable, ec));
if(ec) {
throw BoostErrorWrapper{ec};
};
auto response = co_await this->ReadHeaders(socket, flags...);
if constexpr(kUseStreaming) {
co_return clients::http::Response<HttpClient<TaskProcessor>, decltype(socket)>{*this, std::move(socket), std::move(response)};
} else {
if(response.statusCode != 200) {
co_return response;
};
if(response.headers.contains("Content-Length")) {
auto size = std::stoi(response.headers["Content-Length"]);
response.body = co_await ReadBody(std::move(socket), size, flags...);
};
response.body = co_await ReadBody(std::move(socket), flags...);
co_return response;
};
};
}; };
} // namespace cserver::clients::http } // namespace cserver::clients::http

View file

@ -81,11 +81,15 @@ struct Request {
inline constexpr auto ToString() const -> std::string { inline constexpr auto ToString() const -> std::string {
return this->request.ToString(); return this->request.ToString();
}; };
inline auto Perform() && -> cserver::Task<server::http::HTTPResponse> { template <typename... Flags>
co_return co_await client.PerformRequest(std::move(*this).AddHeaderIfNotExists("Transfer-Encoding", "Content-Length", std::to_string(this->request.body.size()))); inline auto Perform(Flags&&... flags) &&
-> decltype(this->client.PerformRequest(std::move(*this).AddHeaderIfNotExists("Transfer-Encoding", "Content-Length", std::to_string(this->request.body.size())), std::forward<Flags>(flags)...)) {
co_return co_await this->client.PerformRequest(std::move(*this).AddHeaderIfNotExists("Transfer-Encoding", "Content-Length", std::to_string(this->request.body.size())), std::forward<Flags>(flags)...);
}; };
inline auto Perform() & -> cserver::Task<server::http::HTTPResponse> { template <typename... Flags>
co_return co_await client.PerformRequest(this->AddHeaderIfNotExists("Transfer-Encoding", "Content-Length", std::to_string(this->request.body.size()))); inline auto Perform(Flags&&... flags) &
-> decltype(this->client.PerformRequest(this->AddHeaderIfNotExists("Transfer-Encoding", "Content-Length", std::to_string(this->request.body.size())), std::forward<Flags>(flags)...)) {
co_return co_await this->client.PerformRequest(this->AddHeaderIfNotExists("Transfer-Encoding", "Content-Length", std::to_string(this->request.body.size())), std::forward<Flags>(flags)...);
}; };
}; };

View file

@ -0,0 +1,36 @@
#pragma once
#include <cserver/server/http/http_response.hpp>
#include <cserver/engine/coroutine.hpp>
#include <cserver/utils/boost_error_wrapper.hpp>
#include <boost/asio/ssl.hpp>
namespace cserver::clients::http {
template <typename HttpClient, typename Socket>
class Response : public server::http::HTTPResponse {
HttpClient& client;
Socket socket;
public:
inline auto ReadChunk() -> cserver::Task<bool> {
this->body.resize(4479);
auto [ec, n] = co_await this->socket.async_read_some(boost::asio::buffer(this->body.data(), 4479), boost::asio::as_tuple(boost::asio::use_awaitable));
if(ec == boost::asio::error::eof || ec == boost::asio::error::operation_aborted || ec == boost::asio::ssl::error::stream_truncated) {
co_return false;
};
if(ec) {
throw BoostErrorWrapper{ec};
};
this->body.resize(n);
co_return true;
};
inline constexpr Response(Response&&) = default;
inline constexpr Response(const Response&) = default;
inline constexpr Response(HttpClient& client, Socket socket, server::http::HTTPResponse response) :
client(client),
socket(std::move(socket)),
HTTPResponse(std::move(response)) {};
};
} // namespace cserver::clients::http

View file

@ -0,0 +1,9 @@
#pragma once
namespace cserver {
struct UseStreaming {};
inline constexpr auto kUseStreaming = UseStreaming{};
} // namespace cserver

View file

@ -2,6 +2,7 @@
#include <cserver/engine/components.hpp> #include <cserver/engine/components.hpp>
#include <cserver/server/http/http_request.hpp> #include <cserver/server/http/http_request.hpp>
#include <cserver/server/http/http_response.hpp> #include <cserver/server/http/http_response.hpp>
#include <cserver/server/http/http_stream.hpp>
#include <cserver/engine/coroutine.hpp> #include <cserver/engine/coroutine.hpp>
namespace cserver::server::handlers { namespace cserver::server::handlers {
@ -15,18 +16,29 @@ struct HTTPHandlerBase {
}); });
}; };
inline auto HandleRequest(cserver::server::http::HTTPRequest&& request, inline auto HandleRequest(cserver::server::http::HTTPRequest&& request,
cserver::server::http::HTTPResponse& response) -> cserver::Task<std::string> { cserver::server::http::HTTPResponse& response) -> Task<std::string> requires requires(T t) {t.HandleRequestThrow(std::move(request), response);} {
try { try {
co_return co_await static_cast<T&>(*this).HandleRequestThrow(std::move(request), response); co_return co_await static_cast<T&>(*this).HandleRequestThrow(std::move(request), response);
} catch(const std::exception& err) { } catch(const std::exception& err) {
fmt::println("Error in {}: {}", __PRETTY_FUNCTION__, err.what()); fmt::println("Error in handler with default name {}: {}", T::kName, err.what());
} catch(...) { } catch(...) {
fmt::println("Error in {}: Unknown Error", __PRETTY_FUNCTION__); fmt::println("Error in handler with default name {}: Unknown Error", T::kName);
}; };
response.statusCode = 500; response.statusCode = 500;
response.statusMessage = "Internal Server Error"; response.statusMessage = "Internal Server Error";
co_return "Internal Server Error"; co_return "Internal Server Error";
}; };
inline auto HandleRequestStream(cserver::server::http::HTTPRequest&& request,
cserver::server::http::HTTPStream& stream) -> Task<void> requires requires(T t) {t.HandleRequestStreamThrow(std::move(request), stream);} {
try {
co_await static_cast<T&>(*this).HandleRequestStreamThrow(std::move(request), stream);
} catch(const std::exception& err) {
fmt::println("Error in handler with default name {}: {}", T::kName, err.what());
} catch(...) {
fmt::println("Error in handler with default name {}: Unknown Error", T::kName);
};
co_await stream.Close();
};
inline constexpr HTTPHandlerBase(auto, auto&) {}; inline constexpr HTTPHandlerBase(auto, auto&) {};
}; };

View file

@ -0,0 +1,51 @@
#pragma once
#include <cserver/engine/coroutine.hpp>
#include <cserver/utils/boost_error_wrapper.hpp>
#include <fmt/format.h>
#include <boost/asio.hpp>
namespace cserver::server::http {
struct HTTPStream {
boost::asio::ip::tcp::socket socket;
std::stringstream stream = {};
inline auto SetMethod(std::string method) -> Task<void> {
method += " ";
auto [ec, n] = co_await boost::asio::async_write(this->socket, boost::asio::buffer(method.data(), method.size()), boost::asio::transfer_all(), boost::asio::as_tuple(boost::asio::use_awaitable));
if(ec) {
throw BoostErrorWrapper{ec};
};
};
inline auto SetStatus(std::string status) -> Task<void> {
status = fmt::format("HTTP/1.1 {}\r\n", std::move(status));
auto [ec, n] = co_await boost::asio::async_write(this->socket, boost::asio::buffer(status.data(), status.size()), boost::asio::transfer_all(), boost::asio::as_tuple(boost::asio::use_awaitable));
if(ec) {
throw BoostErrorWrapper{ec};
};
};
inline auto SetHeader(std::string first, std::string second) -> Task<void> {
this->stream << fmt::format("{}: {}\r\n", std::move(first), std::move(second));
co_return;
};
inline auto SetEndOfHeaders() -> Task<void> {
this->stream << "\r\n";
auto str = this->stream.str();
auto [ec, n] = co_await boost::asio::async_write(this->socket, boost::asio::buffer(str.data(), str.size()), boost::asio::transfer_all(), boost::asio::as_tuple(boost::asio::use_awaitable));
if(ec) {
throw BoostErrorWrapper{ec};
};
};
inline auto PushBodyChunk(std::string_view chunk) -> Task<void> {
auto [ec, n] = co_await boost::asio::async_write(this->socket, boost::asio::buffer(chunk.data(), chunk.size()), boost::asio::transfer_all(), boost::asio::as_tuple(boost::asio::use_awaitable));
if(ec) {
throw BoostErrorWrapper{ec};
};
};
inline auto Close() -> Task<void> {
this->socket.close();
co_return;
};
};
} // namespace cserver::server::http

View file

@ -3,6 +3,7 @@
#include <cserver/server/http/http_request_parser.hpp> #include <cserver/server/http/http_request_parser.hpp>
#include <cserver/server/http/http_response.hpp> #include <cserver/server/http/http_response.hpp>
#include <cserver/engine/coroutine.hpp> #include <cserver/engine/coroutine.hpp>
#include <cserver/server/http/http_stream.hpp>
#include <boost/asio.hpp> #include <boost/asio.hpp>
@ -51,6 +52,22 @@ struct Server {
}, },
port(T::kConfig.template Get<name>().template Get<"port">()) { port(T::kConfig.template Get<name>().template Get<"port">()) {
}; };
template<auto I, typename Socket>
auto ProcessHandler(Socket&& socket, http::HTTPRequest request) -> Task<void> {
if constexpr(requires(http::HTTPStream& stream){Get<I>(this->handlers).HandleRequestStream(std::move(request), stream);}) {
http::HTTPStream stream{std::move(socket)};
co_await Get<I>(this->handlers).HandleRequestStream(std::move(request), stream);
co_return;
} else {
http::HTTPResponse response{};
response.body = co_await Get<I>(this->handlers).HandleRequest(std::move(request), response);
response.headers["Content-Length"] = std::to_string(response.body.size());
response.headers["Server"] = "cserver/1";
auto data = response.ToString();
co_await boost::asio::async_write(socket, boost::asio::buffer(data.data(), data.size()), boost::asio::use_awaitable);
socket.close();
};
};
auto Reader(boost::asio::ip::tcp::socket socket) -> Task<void> { auto Reader(boost::asio::ip::tcp::socket socket) -> Task<void> {
std::string buffer; std::string buffer;
buffer.reserve(socket.available()); buffer.reserve(socket.available());
@ -60,15 +77,7 @@ struct Server {
co_await [&]<auto... Is>(std::index_sequence<Is...>) -> cserver::Task<void> { co_await [&]<auto... Is>(std::index_sequence<Is...>) -> cserver::Task<void> {
(co_await [&]<auto I>(utempl::Wrapper<I>) -> cserver::Task<void> { (co_await [&]<auto I>(utempl::Wrapper<I>) -> cserver::Task<void> {
if(request.url.path().substr(0, Get<I>(kPaths).size()) == Get<I>(kPaths)) { if(request.url.path().substr(0, Get<I>(kPaths).size()) == Get<I>(kPaths)) {
flag = true; co_await this->ProcessHandler<I>(std::move(socket), std::move(request));
http::HTTPResponse response{};
response.body = co_await Get<I>(this->handlers).HandleRequest(std::move(request), response);
response.headers["Content-Length"] = std::to_string(response.body.size());
response.headers["Server"] = "cserver/1";
auto data = response.ToString();
co_await boost::asio::async_write(socket, boost::asio::buffer(data.data(), data.size()), boost::asio::use_awaitable);
socket.close();
}; };
}(utempl::Wrapper<Is>{}), ...); }(utempl::Wrapper<Is>{}), ...);
}(std::index_sequence_for<Ts...>()); }(std::index_sequence_for<Ts...>());