Move ReadStartStream to RawXmlStream
This commit is contained in:
parent
2e92c59f50
commit
145bf7bd80
3 changed files with 87 additions and 38 deletions
|
@ -313,40 +313,9 @@ struct ClientCreateVisitor {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
auto GetStartStreamIndex(auto& socket, boost::asio::streambuf& streambuf) -> boost::asio::awaitable<std::size_t> {
|
template <typename Socket>
|
||||||
auto splited = Splitter{GetEnumerated(streambuf)};
|
auto ReadStartStream(RawXmlStream<Socket>& stream) -> boost::asio::awaitable<ServerToUserStream> {
|
||||||
using It = decltype(splited.begin());
|
auto doc = (co_await stream.ReadOne(), co_await stream.ReadOne());
|
||||||
// clang-format off
|
|
||||||
co_return co_await
|
|
||||||
[&](this auto&& self, std::size_t n, It it) -> std::optional<It> {
|
|
||||||
return n == 0 ? std::move(it) : it == splited.end() ? std::nullopt : self(n - 1, (++it, std::move(it)));
|
|
||||||
}(2, splited.begin())
|
|
||||||
.transform([](auto value) -> boost::asio::awaitable<std::size_t> {
|
|
||||||
auto [n, _] = *value;
|
|
||||||
co_return n;
|
|
||||||
}) // NOLINTNEXTLINE
|
|
||||||
.or_else([&] { // NOLINTNEXTLINE
|
|
||||||
return std::optional{[](auto self, auto& socket, auto& streambuf) -> boost::asio::awaitable<std::size_t> {
|
|
||||||
auto buf = streambuf.prepare(4096); // NOLINT
|
|
||||||
std::size_t n = co_await socket.async_read_some(buf, boost::asio::use_awaitable);
|
|
||||||
streambuf.commit(n);
|
|
||||||
co_return co_await self->GetStartStreamIndex(socket, streambuf);
|
|
||||||
}(this, socket, streambuf)};
|
|
||||||
})
|
|
||||||
.value();
|
|
||||||
// clang-format on
|
|
||||||
}
|
|
||||||
|
|
||||||
auto ReadStartStream(auto& socket, boost::asio::streambuf& streambuf) -> boost::asio::awaitable<ServerToUserStream> {
|
|
||||||
auto n = co_await this->GetStartStreamIndex(socket, streambuf);
|
|
||||||
xmlpp::DomParser parser;
|
|
||||||
std::string dataToReed =
|
|
||||||
(::larra::xmpp::impl::GetCharsRangeFromBuf(streambuf) | std::views::take(n - 1) | std::ranges::to<std::string>()) + "/>";
|
|
||||||
|
|
||||||
parser.parse_memory(dataToReed);
|
|
||||||
auto doc = parser.get_document();
|
|
||||||
SPDLOG_DEBUG("Stream readed. Consuming {} bytes with stream data {}. Total buffer size: {}", n, dataToReed, streambuf.size());
|
|
||||||
streambuf.consume(n);
|
|
||||||
co_return ServerToUserStream::Parse(doc->get_root_node());
|
co_return ServerToUserStream::Parse(doc->get_root_node());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -357,7 +326,7 @@ struct ClientCreateVisitor {
|
||||||
|
|
||||||
co_await stream.Send(UserStream{.from = account.Jid(), .to = account.Jid().server, .version = "1.0", .xmlLang = "en"});
|
co_await stream.Send(UserStream{.from = account.Jid(), .to = account.Jid().server, .version = "1.0", .xmlLang = "en"});
|
||||||
SPDLOG_DEBUG("UserStream sended");
|
SPDLOG_DEBUG("UserStream sended");
|
||||||
ServerToUserStream sToUStream = co_await ReadStartStream(stream.next_layer(), *stream.streambuf);
|
ServerToUserStream sToUStream = co_await ReadStartStream(stream);
|
||||||
StreamFeatures features = co_await stream.template Read<StreamFeatures>();
|
StreamFeatures features = co_await stream.template Read<StreamFeatures>();
|
||||||
SPDLOG_DEBUG("features parsed");
|
SPDLOG_DEBUG("features parsed");
|
||||||
|
|
||||||
|
@ -376,7 +345,7 @@ struct ClientCreateVisitor {
|
||||||
co_await this->Connect(socket.next_layer(), co_await this->Resolve());
|
co_await this->Connect(socket.next_layer(), co_await this->Resolve());
|
||||||
co_await stream.Send(UserStream{.from = account.Jid().Username("anonymous"), .to = account.Jid().server}, socket.next_layer());
|
co_await stream.Send(UserStream{.from = account.Jid().Username("anonymous"), .to = account.Jid().server}, socket.next_layer());
|
||||||
SPDLOG_DEBUG("UserStream sended");
|
SPDLOG_DEBUG("UserStream sended");
|
||||||
auto streamHeader = co_await this->ReadStartStream(socket, *stream.streambuf);
|
auto streamHeader = co_await this->ReadStartStream(stream);
|
||||||
StreamFeatures features = co_await stream.template Read<StreamFeatures>();
|
StreamFeatures features = co_await stream.template Read<StreamFeatures>();
|
||||||
SPDLOG_DEBUG("features parsed(SSL)");
|
SPDLOG_DEBUG("features parsed(SSL)");
|
||||||
if(!features.startTls) {
|
if(!features.startTls) {
|
||||||
|
@ -388,7 +357,7 @@ struct ClientCreateVisitor {
|
||||||
}
|
}
|
||||||
co_await this->ProcessTls(stream);
|
co_await this->ProcessTls(stream);
|
||||||
co_await stream.Send(UserStream{.from = account.Jid(), .to = account.Jid().server}, socket.next_layer());
|
co_await stream.Send(UserStream{.from = account.Jid(), .to = account.Jid().server}, socket.next_layer());
|
||||||
auto newStreamHeader = co_await this->ReadStartStream(socket, *stream.streambuf);
|
auto newStreamHeader = co_await this->ReadStartStream(stream);
|
||||||
auto newFeatures = co_await stream.template Read<StreamFeatures>();
|
auto newFeatures = co_await stream.template Read<StreamFeatures>();
|
||||||
co_await this->Auth(stream, std::move(newStreamHeader), std::move(newFeatures));
|
co_await this->Auth(stream, std::move(newStreamHeader), std::move(newFeatures));
|
||||||
co_return Client{std::move(this->account).Jid(), RawXmlStream{std::move(socket)}};
|
co_return Client{std::move(this->account).Jid(), RawXmlStream{std::move(socket)}};
|
||||||
|
|
|
@ -58,6 +58,8 @@ struct XmlPath : public xmlpp::Element {
|
||||||
|
|
||||||
namespace impl {
|
namespace impl {
|
||||||
|
|
||||||
|
constexpr std::size_t kXmlStreamReadChunkSize = 4096;
|
||||||
|
|
||||||
constexpr auto BufferToStringView(const boost::asio::const_buffer& buffer, size_t size) -> std::string_view {
|
constexpr auto BufferToStringView(const boost::asio::const_buffer& buffer, size_t size) -> std::string_view {
|
||||||
assert(size <= buffer.size());
|
assert(size <= buffer.size());
|
||||||
return {boost::asio::buffer_cast<const char*>(buffer), size};
|
return {boost::asio::buffer_cast<const char*>(buffer), size};
|
||||||
|
@ -124,6 +126,52 @@ struct RawXmlStream : Stream {
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto ReadOne(auto& socket) -> boost::asio::awaitable<std::unique_ptr<xmlpp::Document>> {
|
||||||
|
auto doc = std::make_unique<xmlpp::Document>();
|
||||||
|
impl::Parser parser(*doc);
|
||||||
|
for(;;) {
|
||||||
|
auto enumerated = std::views::zip(std::views::iota(std::size_t{}, this->streambuf->size()),
|
||||||
|
::larra::xmpp::impl::GetCharsRangeFromBuf(*this->streambuf));
|
||||||
|
|
||||||
|
auto it = std::ranges::find(enumerated, '>', [](auto v) {
|
||||||
|
auto [_, c] = v;
|
||||||
|
return c;
|
||||||
|
});
|
||||||
|
if(it == std::ranges::end(enumerated)) {
|
||||||
|
for(const auto& buf : this->streambuf->data()) {
|
||||||
|
auto error = parser.ParseChunk(impl::BufferToStringView(buf));
|
||||||
|
if(error) {
|
||||||
|
throw std::runtime_error(std::format("Bad xml object: {}", xmlpp::format_xml_error(error)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this->streambuf->consume(this->streambuf->size());
|
||||||
|
auto buff = this->streambuf->prepare(impl::kXmlStreamReadChunkSize);
|
||||||
|
auto n = co_await socket.async_read_some(buff, boost::asio::use_awaitable);
|
||||||
|
this->streambuf->commit(n);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
auto [i, _] = *it;
|
||||||
|
auto toRead = i + 1;
|
||||||
|
for(const auto& buf : this->streambuf->data()) {
|
||||||
|
if(toRead == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
auto toReadCurrent = std::min(buf.size(), toRead);
|
||||||
|
|
||||||
|
auto error = parser.ParseChunk(impl::BufferToStringView(buf, toReadCurrent));
|
||||||
|
if(error) {
|
||||||
|
throw std::runtime_error(std::format("Bad xml object: {}", xmlpp::format_xml_error(error)));
|
||||||
|
}
|
||||||
|
toRead -= toReadCurrent;
|
||||||
|
}
|
||||||
|
|
||||||
|
this->streambuf->consume(i + 1);
|
||||||
|
co_return doc;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
auto ReadOne() -> boost::asio::awaitable<std::unique_ptr<xmlpp::Document>> {
|
||||||
|
co_return co_await this->ReadOne(this->next_layer());
|
||||||
|
}
|
||||||
inline auto Read(auto& socket) -> boost::asio::awaitable<std::unique_ptr<xmlpp::Document>> {
|
inline auto Read(auto& socket) -> boost::asio::awaitable<std::unique_ptr<xmlpp::Document>> {
|
||||||
auto doc = std::make_unique<xmlpp::Document>(); // Not movable :(
|
auto doc = std::make_unique<xmlpp::Document>(); // Not movable :(
|
||||||
impl::Parser parser(*doc);
|
impl::Parser parser(*doc);
|
||||||
|
@ -153,7 +201,7 @@ struct RawXmlStream : Stream {
|
||||||
}
|
}
|
||||||
this->streambuf->consume(this->streambuf->size());
|
this->streambuf->consume(this->streambuf->size());
|
||||||
for(;;) {
|
for(;;) {
|
||||||
auto buff = this->streambuf->prepare(4096); // NOLINT
|
auto buff = this->streambuf->prepare(impl::kXmlStreamReadChunkSize);
|
||||||
auto [e, n] = co_await socket.async_read_some(buff, boost::asio::as_tuple(boost::asio::use_awaitable));
|
auto [e, n] = co_await socket.async_read_some(buff, boost::asio::as_tuple(boost::asio::use_awaitable));
|
||||||
if(e) {
|
if(e) {
|
||||||
boost::system::throw_exception_from_error(e, boost::source_location());
|
boost::system::throw_exception_from_error(e, boost::source_location());
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
#include <larra/features.hpp>
|
#include <larra/features.hpp>
|
||||||
#include <larra/impl/mock_socket.hpp>
|
#include <larra/impl/mock_socket.hpp>
|
||||||
#include <larra/raw_xml_stream.hpp>
|
#include <larra/raw_xml_stream.hpp>
|
||||||
|
#include <larra/stream.hpp>
|
||||||
#include <utempl/utils.hpp>
|
#include <utempl/utils.hpp>
|
||||||
|
|
||||||
namespace larra::xmpp {
|
namespace larra::xmpp {
|
||||||
|
@ -19,6 +20,9 @@ constexpr std::string_view kDoc3 =
|
||||||
"xmlns='urn:ietf:params:xml:ns:xmpp-sasl'><mechanism>PLAIN</mechanism><mechanism>SCRAM-SHA-256</mechanism><mechanism>X-OAUTH2</"
|
"xmlns='urn:ietf:params:xml:ns:xmpp-sasl'><mechanism>PLAIN</mechanism><mechanism>SCRAM-SHA-256</mechanism><mechanism>X-OAUTH2</"
|
||||||
"mechanism></mechanisms><register xmlns='http://jabber.org/features/iq-register'/></stream:features>";
|
"mechanism></mechanisms><register xmlns='http://jabber.org/features/iq-register'/></stream:features>";
|
||||||
|
|
||||||
|
constexpr std::string_view kDoc4 =
|
||||||
|
R"(<?xml version='1.0'?><stream:stream id='68321991947053239' version='1.0' xml:lang='en' xmlns:stream='http://etherx.jabber.org/streams' to='test1@localhost' from='localhost' xmlns='jabber:client'>)";
|
||||||
|
|
||||||
TEST(RawXmlStream, ReadByOne) {
|
TEST(RawXmlStream, ReadByOne) {
|
||||||
boost::asio::io_context context;
|
boost::asio::io_context context;
|
||||||
bool error{};
|
bool error{};
|
||||||
|
@ -156,4 +160,32 @@ TEST(RawXmlStream, Write) {
|
||||||
EXPECT_FALSE(error);
|
EXPECT_FALSE(error);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(RawXmlStream, ReadOneByOne) {
|
||||||
|
boost::asio::io_context context;
|
||||||
|
bool error{};
|
||||||
|
boost::asio::co_spawn(
|
||||||
|
context,
|
||||||
|
// NOLINTNEXTLINE: Safe
|
||||||
|
[&] -> boost::asio::awaitable<void> {
|
||||||
|
RawXmlStream stream{impl::MockSocket{context.get_executor(), 1}};
|
||||||
|
stream.AddReceivedData(kDoc4);
|
||||||
|
try {
|
||||||
|
auto doc = (co_await stream.ReadOne(), co_await stream.ReadOne());
|
||||||
|
auto node = doc->get_root_node();
|
||||||
|
EXPECT_TRUE(node);
|
||||||
|
if(!node) {
|
||||||
|
co_return;
|
||||||
|
}
|
||||||
|
auto stream = ServerToUserStream::Parse(node);
|
||||||
|
} catch(const std::exception& err) {
|
||||||
|
SPDLOG_ERROR("{}", err.what());
|
||||||
|
error = true;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
boost::asio::detached);
|
||||||
|
|
||||||
|
context.run();
|
||||||
|
EXPECT_FALSE(error);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace larra::xmpp
|
} // namespace larra::xmpp
|
||||||
|
|
Loading…
Reference in a new issue