larra/library/include/larra/xml_stream.hpp

320 lines
11 KiB
C++
Raw Normal View History

2024-09-28 19:15:31 +00:00
#pragma once
#include <libxml++/libxml++.h>
#include <spdlog/spdlog.h>
#include <boost/asio/as_tuple.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/write.hpp>
#include <boost/system/result.hpp>
2024-10-06 18:56:10 +00:00
#include <larra/serialization.hpp>
2024-10-08 08:36:08 +00:00
#include <larra/stream_error.hpp>
2024-09-28 19:15:31 +00:00
#include <larra/utils.hpp>
#include <stack>
#include <utempl/utils.hpp>
struct _xmlError;
namespace larra::xmpp {
struct XmlGroup : xmlpp::Element {
using Element::Element;
};
struct XmlPath : public xmlpp::Element {
public:
using Element::Element;
[[nodiscard]] auto GetData() const -> std::string {
return get_attribute("d")->get_value();
}
};
namespace impl {
2024-10-06 12:25:42 +00:00
constexpr std::size_t kXmlStreamReadChunkSize = 4096;
2024-09-28 19:15:31 +00:00
constexpr auto BufferToStringView(const boost::asio::const_buffer& buffer, size_t size) -> std::string_view {
assert(size <= buffer.size());
return {boost::asio::buffer_cast<const char*>(buffer), size};
};
constexpr auto BufferToStringView(const boost::asio::const_buffer& buffer) -> std::string_view {
return BufferToStringView(buffer, buffer.size());
};
class Parser : private xmlpp::SaxParser {
public:
inline explicit Parser(xmlpp::Document& document) : doc(document) {};
~Parser() override = default;
auto ParseChunk(std::string_view str) -> const _xmlError*;
2024-10-21 11:21:26 +00:00
std::stack<xmlpp::Element*> context{};
2024-09-28 19:15:31 +00:00
xmlpp::Document& doc;
private:
inline auto on_start_document() -> void override {
}
inline auto on_end_document() -> void override {
}
auto on_start_element(const std::string& name, const AttributeList& properties) -> void override;
auto on_end_element(const std::string& name) -> void override;
auto on_characters(const std::string& characters) -> void override;
auto on_cdata_block(const std::string& characters) -> void override;
};
constexpr auto GetCharsRangeFromBuf(auto&& buf) {
return buf.data() //
| std::views::transform([](const auto& buf) -> std::string_view { //
return ::larra::xmpp::impl::BufferToStringView(buf); //
}) //
| std::views::join;
};
constexpr auto SplitStreamBuf(auto&& buf, char delim) {
return GetCharsRangeFromBuf(buf) //
| std::views::lazy_split(delim); //
};
auto GetIndex(const boost::asio::streambuf&, const _xmlError* error, std::size_t alreadyCountedLines = 1) -> std::size_t;
auto CountLines(const boost::asio::streambuf&) -> std::size_t;
auto CountLines(std::string_view) -> std::size_t;
auto IsExtraContentAtTheDocument(const _xmlError* error) -> bool;
} // namespace impl
template <typename Stream, typename BufferType = boost::asio::streambuf>
2024-10-08 08:36:08 +00:00
struct XmlStream : Stream {
constexpr XmlStream(Stream stream, std::unique_ptr<BufferType> buff = std::make_unique<BufferType>()) :
2024-09-28 19:15:31 +00:00
Stream(std::forward<Stream>(stream)), streambuf(std::move(buff)) {};
using Stream::Stream;
auto next_layer() -> Stream& {
return *this;
}
[[nodiscard]] auto next_layer() const -> const Stream& {
2024-09-28 19:15:31 +00:00
return *this;
}
2024-10-08 08:36:08 +00:00
auto ReadOneRaw(auto& socket) -> boost::asio::awaitable<std::unique_ptr<xmlpp::Document>> {
2024-10-06 12:25:42 +00:00
auto doc = std::make_unique<xmlpp::Document>();
impl::Parser parser(*doc);
for(;;) {
auto enumerated = std::views::zip(std::views::iota(std::size_t{}, this->streambuf->size()),
::larra::xmpp::impl::GetCharsRangeFromBuf(*this->streambuf));
auto it = std::ranges::find(enumerated, '>', [](auto v) {
auto [_, c] = v;
return c;
});
if(it == std::ranges::end(enumerated)) {
for(const auto& buf : this->streambuf->data()) {
auto error = parser.ParseChunk(impl::BufferToStringView(buf));
if(error) {
throw std::runtime_error(std::format("Bad xml object: {}", xmlpp::format_xml_error(error)));
}
}
this->streambuf->consume(this->streambuf->size());
auto buff = this->streambuf->prepare(impl::kXmlStreamReadChunkSize);
auto n = co_await socket.async_read_some(buff, boost::asio::use_awaitable);
this->streambuf->commit(n);
continue;
}
auto [i, _] = *it;
auto toRead = i + 1;
for(const auto& buf : this->streambuf->data()) {
if(toRead == 0) {
break;
}
auto toReadCurrent = std::min(buf.size(), toRead);
auto error = parser.ParseChunk(impl::BufferToStringView(buf, toReadCurrent));
if(error) {
throw std::runtime_error(std::format("Bad xml object: {}", xmlpp::format_xml_error(error)));
}
toRead -= toReadCurrent;
}
this->streambuf->consume(i + 1);
co_return doc;
}
}
2024-10-08 08:36:08 +00:00
template <typename T>
auto ReadOneRaw(auto& stream) -> boost::asio::awaitable<T> {
auto doc = co_await this->ReadOneRaw(stream);
co_return Serialization<T>::Parse(doc->get_root_node());
}
template <typename T>
auto ReadOneRaw(auto& stream) -> boost::asio::awaitable<T>
requires requires(std::unique_ptr<xmlpp::Document> ptr) {
{ Serialization<T>::Parse(std::move(ptr)) } -> std::same_as<T>;
}
{
co_return Serialization<T>::Parse(co_await this->ReadOneRaw(stream));
2024-10-06 12:25:42 +00:00
}
2024-10-08 08:36:08 +00:00
inline auto ReadRaw(auto& socket) -> boost::asio::awaitable<std::unique_ptr<xmlpp::Document>> {
2024-09-28 19:15:31 +00:00
auto doc = std::make_unique<xmlpp::Document>(); // Not movable :(
impl::Parser parser(*doc);
std::size_t lines = 1;
std::size_t size{};
for(auto elem : this->streambuf->data()) {
auto error = parser.ParseChunk(impl::BufferToStringView(elem));
if(!error) {
auto linesAdd = impl::CountLines(impl::BufferToStringView(elem));
lines += linesAdd;
if(linesAdd == 0) {
size += elem.size();
}
if(parser.context.empty() && parser.doc.get_root_node() != nullptr) {
SPDLOG_DEBUG("Object already transferred");
co_return doc;
}
continue;
}
if(!impl::IsExtraContentAtTheDocument(error)) {
throw std::runtime_error(std::format("Bad xml object: {}", xmlpp::format_xml_error(error)));
}
std::size_t size = impl::GetIndex(*this->streambuf, error, lines) - size;
this->streambuf->consume(size);
SPDLOG_DEBUG("Object already transferred");
co_return doc;
}
this->streambuf->consume(this->streambuf->size());
for(;;) {
2024-10-06 12:25:42 +00:00
auto buff = this->streambuf->prepare(impl::kXmlStreamReadChunkSize);
2024-09-28 19:15:31 +00:00
auto [e, n] = co_await socket.async_read_some(buff, boost::asio::as_tuple(boost::asio::use_awaitable));
if(e) {
boost::system::throw_exception_from_error(e, boost::source_location());
}
this->streambuf->commit(n);
auto error = parser.ParseChunk(impl::BufferToStringView(buff, n));
if(!error) {
auto linesAdd = impl::CountLines(impl::BufferToStringView(buff, n));
2024-10-08 08:36:08 +00:00
SPDLOG_DEBUG("Readed {} bytes for XmlStream with {} lines", n, linesAdd);
2024-09-28 19:15:31 +00:00
lines += linesAdd;
if(linesAdd == 0) {
size += n;
}
this->streambuf->consume(this->streambuf->size());
if(parser.context.empty() && parser.doc.get_root_node() != nullptr) {
co_return doc;
}
SPDLOG_DEBUG(
"Object not transferred. context size: {}, isValidRootNode: {}", parser.context.size(), parser.doc.get_root_node() != nullptr);
continue;
}
if(!impl::IsExtraContentAtTheDocument(error)) {
throw std::runtime_error(std::format("Bad xml object: {}", xmlpp::format_xml_error(error)));
}
auto toConsume = impl::GetIndex(*this->streambuf, error, lines) - size;
this->streambuf->consume(toConsume);
co_return doc;
}
}
template <typename T>
2024-10-08 08:36:08 +00:00
auto ReadRaw(auto& stream) -> boost::asio::awaitable<T> {
auto doc = co_await this->ReadRaw(stream);
2024-10-06 18:56:10 +00:00
co_return Serialization<T>::Parse(doc->get_root_node());
2024-09-28 19:15:31 +00:00
}
template <typename T>
2024-10-08 08:36:08 +00:00
auto ReadRaw(auto& stream) -> boost::asio::awaitable<T>
2024-09-28 19:15:31 +00:00
requires requires(std::unique_ptr<xmlpp::Document> ptr) {
2024-10-06 18:56:10 +00:00
{ Serialization<T>::Parse(std::move(ptr)) } -> std::same_as<T>;
2024-09-28 19:15:31 +00:00
}
{
2024-10-08 08:36:08 +00:00
co_return Serialization<T>::Parse(co_await this->ReadRaw(stream));
2024-09-28 19:15:31 +00:00
}
2024-10-08 08:36:08 +00:00
private:
2024-09-28 19:15:31 +00:00
template <typename T>
2024-10-08 08:36:08 +00:00
auto ReadImpl(boost::asio::awaitable<std::variant<StreamError, T>> awaitable) -> boost::asio::awaitable<T> {
co_return std::visit(utempl::Overloaded(
[](T value) -> T {
return std::move(value);
},
[](StreamError error) -> T {
std::visit(
[](auto error) {
throw error;
},
error);
std::unreachable();
}),
co_await std::move(awaitable));
}
public:
template <typename T = std::monostate>
auto Read(auto& stream) {
return this->ReadImpl(this->ReadRaw<std::variant<StreamError, T>>(stream));
}
template <typename T = std::monostate>
auto Read() {
return this->Read<T>(this->next_layer());
}
template <typename T = std::monostate>
auto ReadOne(auto& stream) {
return this->ReadImpl(this->ReadOneRaw<std::variant<StreamError, T>>(stream));
}
template <typename T = std::monostate>
auto ReadOne() {
return this->ReadOne<T>(this->next_layer());
2024-09-28 19:15:31 +00:00
}
auto Send(xmlpp::Document& doc, auto& stream, bool bAddXmlDecl, bool removeEnd) const -> boost::asio::awaitable<void> {
constexpr auto beginSize = sizeof("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n") - 1;
auto str = doc.write_to_string();
auto view = std::string_view{str}.substr(beginSize, str.size() - beginSize - 1);
if(bAddXmlDecl) {
if(removeEnd) {
std::string data = "<?xml version=\"1.0\"?>" + static_cast<std::string>(view.substr(0, view.size() - 2)) + ">";
co_await boost::asio::async_write(stream, boost::asio::buffer(data), boost::asio::use_awaitable);
co_return;
}
std::string data = "<?xml version=\"1.0\"?>" + static_cast<std::string>(view);
co_await boost::asio::async_write(stream, boost::asio::buffer(data), boost::asio::use_awaitable);
co_return;
}
if(removeEnd) {
std::string data = static_cast<std::string>(view.substr(0, view.size() - 2)) + ">";
co_await boost::asio::async_write(stream, boost::asio::buffer(data), boost::asio::use_awaitable);
} else {
co_await boost::asio::async_write(stream, boost::asio::buffer(view), boost::asio::use_awaitable);
}
}
auto Send(xmlpp::Document& doc, bool bAddXmlDecl = false) -> boost::asio::awaitable<void> {
co_await this->Send(doc, this->next_layer(), bAddXmlDecl);
}
template <AsXml T>
auto Send(const T& xso, auto& stream) const -> boost::asio::awaitable<void> {
xmlpp::Document doc;
2024-10-06 18:56:10 +00:00
using S = Serialization<std::decay_t<T>>;
S::Serialize(doc.create_root_node(S::kDefaultName, S::kDefaultNamespace, S::kPrefix), xso);
co_await this->Send(doc, stream, S::kAddXmlDecl, S::kRemoveEnd);
2024-09-28 19:15:31 +00:00
}
auto Send(const AsXml auto& xso) -> boost::asio::awaitable<void> {
co_await this->Send(xso, this->next_layer());
}
2024-10-08 08:36:08 +00:00
XmlStream(XmlStream&& other) = default;
2024-09-28 19:15:31 +00:00
2024-10-21 11:21:26 +00:00
std::unique_ptr<BufferType> streambuf{}; // Not movable :(
2024-09-28 19:15:31 +00:00
};
} // namespace larra::xmpp