c++ - 在 boost::asio 中使用具有可调整大小的 streambuf 的相同 istream

标签 c++ boost boost-asio istream

我正在使用 boost asio async_read() 从套接字读取数据。我有以下读者类成员,它们在程序的整个生命周期中都存在:

boost::asio::streambuf recv_data;
std::istream stream_is(&recv_data);

async_read 调用如下所示:

boost::asio::async_read(ep_ptr->get_sock(), recv_data, boost::asio::transfer_exactly(n),
                              boost::bind(&IProtocol::handle_data_recv, &protocol,
                                          boost::asio::placeholders::error));

我的问题是,如果我从套接字读取“n”个字节并且 streambuf 的大小小于“n”,那么它会自行调整大小,会发生什么情况。我是否需要重新创建 std::istream,因为 std::istream 持有的内部 streambuf 缓冲区现在可能已被释放/解除分配?

最佳答案

不,谢天谢地,绑定(bind)不关心 recv_data 的内部可能会被重新分配,而是绑定(bind)到 recv_data 对象本身。这是我编写的下载程序的工作示例,您可以看到我没有在读取之间重新分配缓冲区。

以同样的方式,你可以安全地共享一个 vector 的引用,而不用关心 vector 的内部是否被重新分配(除非你开始直接指向 vector 元素的内存地址,或者在它们失效后使用迭代器. vector 的句柄仍然有效,并且以同样的方式,streambuf 的句柄对 istream 仍然有效并且它工作得很好。

download.h

#ifndef _MV_DOWNLOAD_H_
#define _MV_DOWNLOAD_H_

#include <string>
#include <iostream>
#include <istream>
#include <ostream>
#include <fstream>
#include <algorithm>
#include "Network/url.h"
#include "Utility/generalUtility.h"
#include <boost/asio.hpp>
#include <boost/bind.hpp>

namespace MV {
    struct HttpHeader {
        std::string version;
        int status = 0;
        std::string message;
        std::map<std::string, std::string> values;

        std::vector<std::string> bounces;

        bool success = false;
        std::string errorMessage;

        size_t contentLength;

        HttpHeader() {
        }

        HttpHeader(std::istream& response_stream) {
            read(response_stream);
        }

        void read(std::istream& response_stream);
    };


    inline std::ostream& operator<<(std::ostream& os, const HttpHeader& obj) {
        os << "\\/______HTTP_HEADER______\\/\nVersion [" << obj.version << "] Status [" << obj.status << "] Message [" << obj.message << "]\n";
        os << "||-----------------------||\n";
        for (auto&& kvp : obj.values) {
            os << "[" << kvp.first << "]: " << kvp.second << "\n";
        }
        os << "\n||--------Bounces--------||\n";
        for (size_t i = 0; i < obj.bounces.size(); ++i) {
            os << i << ": " << obj.bounces[i] << "\n";
        }
        os << "/\\_______________________/\\" << std::endl;
        return os;
    }
    inline std::istream& operator>>(std::istream& a_is, HttpHeader& a_obj) {
        a_obj.read(a_is);
        return a_is;
    }

    class DownloadRequest : public std::enable_shared_from_this<DownloadRequest> {
    public:
        static std::shared_ptr<DownloadRequest> make(const MV::Url& a_url, const std::shared_ptr<std::ostream> &a_streamOutput) {
            auto result = std::shared_ptr<DownloadRequest>(new DownloadRequest(a_streamOutput));
            result->perform(a_url);
            return result;
        }

        //onComplete is called on success or error at the end of the download.
        static std::shared_ptr<DownloadRequest> make(const std::shared_ptr<boost::asio::io_service> &a_ioService, const MV::Url& a_url, const std::shared_ptr<std::ostream> &a_streamOutput, std::function<void (std::shared_ptr<DownloadRequest>)> a_onComplete) {
            auto result = std::shared_ptr<DownloadRequest>(new DownloadRequest(a_streamOutput));
            result->onComplete = a_onComplete;
            result->ioService = a_ioService;
            result->perform(a_url);
            return result;
        }

        HttpHeader& header() {
            return headerData;
        }

        MV::Url finalUrl() {
            return currentUrl;
        }

        MV::Url inputUrl() {
            return originalUrl;
        }

    private:
        DownloadRequest(const std::shared_ptr<std::ostream> &a_streamOutput) :
            streamOutput(a_streamOutput) {
        }

        void perform(const MV::Url& a_url);

        bool initializeSocket();

        void initiateRequest(const MV::Url& a_url);

        void handleResolve(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator);
        void handleConnect(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator);
        void handleWriteRequest(const boost::system::error_code& err);
        void handleReadHeaders(const boost::system::error_code& err);
        void handleReadContent(const boost::system::error_code& err);

        void readResponseToStream() {
            (*streamOutput) << &(*response);
        }

        std::shared_ptr<boost::asio::io_service> ioService;
        std::unique_ptr<boost::asio::ip::tcp::resolver> resolver;
        std::unique_ptr<boost::asio::ip::tcp::socket> socket;

        std::unique_ptr<std::istream> responseStream;

        std::unique_ptr<boost::asio::streambuf> request;
        std::unique_ptr<boost::asio::streambuf> response;

        std::shared_ptr<std::ostream> streamOutput;

        HttpHeader headerData;

        MV::Url currentUrl;
        MV::Url originalUrl;

        std::function<void(std::shared_ptr<DownloadRequest>)> onComplete;
    };

    std::string DownloadString(const MV::Url& a_url);

    HttpHeader DownloadFile(const MV::Url& a_url, const std::string &a_path);
    void DownloadFile(const std::shared_ptr<boost::asio::io_service> &a_ioService, const MV::Url& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete = std::function<void(std::shared_ptr<DownloadRequest>)>());

    void DownloadFiles(const std::vector<MV::Url>& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete = std::function<void(std::shared_ptr<DownloadRequest>)>());
    void DownloadFiles(const std::shared_ptr<boost::asio::io_service> &a_ioService, const std::vector<MV::Url>& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete = std::function<void(std::shared_ptr<DownloadRequest>)>(), std::function<void()> a_onAllComplete = std::function<void()>());
}
#endif

download.cpp

#include "download.h"
#include <boost/filesystem.hpp>
#include <atomic>

namespace MV{
    void HttpHeader::read(std::istream& response_stream) {
        values.clear();

        response_stream >> version;
        std::string status_code;
        response_stream >> status_code;
        try {
            status = std::stoi(status_code);
        } catch (...) {
            status = 0;
        }

        getline_platform_agnostic(response_stream, message);

        if (!message.empty() && message[0] == ' ') { message = message.substr(1); }

        std::string header;
        while (getline_platform_agnostic(response_stream, header) && !header.empty()) {
            auto index = header.find_first_of(':');
            if (index != std::string::npos && index > 0) {
                auto key = header.substr(0, index);
                auto value = (index + 2 >= header.size()) ? "" : header.substr(index + 2);
                std::transform(key.begin(), key.end(), key.begin(), [](char c) {return std::tolower(c); });
                values[key] = value;
                if (toLower(key) == "content-length") {
                    try {
                        contentLength = static_cast<size_t>(stol(value));
                    } catch (std::exception &e) {
                        std::cerr << e.what() << std::endl;
                        contentLength = 0;
                    }
                }
            }
        }
    }

    std::string DownloadString(const Url& a_url) {
        auto result = std::make_shared<std::stringstream>();
        if (DownloadRequest::make(a_url, result)->header().success) {
            return result->str();
        } else {
            return "";
        }
    }

    MV::HttpHeader DownloadFile(const Url& a_url, const std::string &a_path) {
        HttpHeader header;
        {
            boost::filesystem::create_directories(boost::filesystem::path(a_path).parent_path());
            auto outFile = std::make_shared<std::ofstream>(a_path, std::ofstream::out | std::ofstream::binary);
            auto request = DownloadRequest::make(a_url, outFile);
            header = request->header();
        }
        if (!header.success) {
            std::remove(a_path.c_str());
        }
        return header;
    }

    void DownloadFile(const std::shared_ptr<boost::asio::io_service> &a_ioService, const MV::Url& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete) {
        boost::filesystem::create_directories(boost::filesystem::path(a_path).parent_path());
        auto outFile = std::make_shared<std::ofstream>(a_path, std::ofstream::out | std::ofstream::binary);
        auto request = DownloadRequest::make(a_ioService, a_url, outFile, [a_path, a_onComplete](std::shared_ptr<DownloadRequest> a_result) {
            if (!a_result->header().success) {
                std::remove(a_path.c_str());
            }
            if (a_onComplete) { a_onComplete(a_result); }
        });
    }

    void DownloadFiles(const std::vector<MV::Url>& a_urls, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete) {
        auto service = std::make_shared<boost::asio::io_service>();
        for (auto&& url : a_urls) {
            DownloadFile(service, url, a_path + boost::filesystem::path(url.path()).filename().string(), a_onComplete);
        }
        service->run();
    }
    void DownloadFiles(const std::shared_ptr<boost::asio::io_service> &a_ioService, const std::vector<MV::Url>& a_urls, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete, std::function<void()> a_onAllComplete) {
        size_t totalFiles = a_urls.size();
        for (auto&& url : a_urls) {
            auto counter = std::make_shared<std::atomic<size_t>>(0);
            DownloadFile(a_ioService, url, a_path + boost::filesystem::path(url.path()).filename().string(), [=](std::shared_ptr<DownloadRequest> a_request) {
                a_onComplete(a_request);
                if (++(*counter) == totalFiles) {
                    a_onAllComplete();
                }
            });
        }
    }

    void DownloadRequest::handleReadContent(const boost::system::error_code& err) {
        if (!err) {
            readResponseToStream();
            if (onComplete) { onComplete(shared_from_this()); }
        } else if (err != boost::asio::error::eof) {
            headerData.success = false;
            headerData.errorMessage = "Download Read Content Failure: " + err.message();
            std::cerr << headerData.errorMessage << std::endl;
            if (onComplete) { onComplete(shared_from_this()); }
        }
    }

    void DownloadRequest::handleReadHeaders(const boost::system::error_code& err) {
        if (!err) {
            responseStream = std::make_unique<std::istream>(&(*response));

            headerData.read(*responseStream);
            headerData.success = true;
            headerData.errorMessage = "";
            if (headerData.status >= 300 && headerData.status < 400 && headerData.bounces.size() < 32 && headerData.values.find("location") != headerData.values.end()) {
                headerData.bounces.push_back(currentUrl.toString());
                initiateRequest(headerData.values["location"]);
            } else {
                auto amountLeftToRead = headerData.contentLength - response->size();
                if (response->size() > 0) {
                    readResponseToStream();
                }
                if (amountLeftToRead > 0) {
                    boost::asio::async_read(*socket, *response, boost::asio::transfer_at_least(amountLeftToRead), boost::bind(&DownloadRequest::handleReadContent, shared_from_this(), boost::asio::placeholders::error));
                } else {
                    if (onComplete) { onComplete(shared_from_this()); }
                }
            }
        } else {
            headerData.success = false;
            headerData.errorMessage = "Download Read Header Failure: " + err.message();
            std::cerr << headerData.errorMessage << std::endl;
            if (onComplete) { onComplete(shared_from_this()); }
        }
    }

    void DownloadRequest::handleWriteRequest(const boost::system::error_code& err) {
        if (!err) {
            boost::asio::async_read_until(*socket, *response, "\r\n\r\n", boost::bind(&DownloadRequest::handleReadHeaders, shared_from_this(), boost::asio::placeholders::error));
        } else {
            headerData.success = false;
            headerData.errorMessage = "Download Write Failure: " + err.message();
            std::cerr << headerData.errorMessage << std::endl;
            if (onComplete) { onComplete(shared_from_this()); }
        }
    }

    void DownloadRequest::handleConnect(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator) {
        if (!err) {
            // The connection was successful. Send the request.
            boost::asio::async_write(*socket, *request, boost::bind(&DownloadRequest::handleWriteRequest, shared_from_this(), boost::asio::placeholders::error));
        } else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator()) {
            // The connection failed. Try the next endpoint in the list.
            socket->close();
            boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
            socket->async_connect(endpoint, boost::bind(&DownloadRequest::handleConnect, shared_from_this(), boost::asio::placeholders::error, ++endpoint_iterator));
        } else {
            headerData.success = false;
            headerData.errorMessage = "Download Connection Failure: " + err.message();
            std::cerr << headerData.errorMessage << std::endl;
            if (onComplete) { onComplete(shared_from_this()); }
        }
    }

    void DownloadRequest::handleResolve(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator) {
        if (!err) {
            // Attempt a connection to the first endpoint in the list. Each endpoint
            // will be tried until we successfully establish a connection.
            boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
            socket->async_connect(endpoint, boost::bind(&DownloadRequest::handleConnect, shared_from_this(), boost::asio::placeholders::error, ++endpoint_iterator));
        } else {
            headerData.success = false;
            headerData.errorMessage = "Download Resolve Failure: " + err.message();
            std::cerr << headerData.errorMessage << std::endl;
            if (onComplete) { onComplete(shared_from_this()); }
        }
    }

    void DownloadRequest::initiateRequest(const MV::Url& a_url) {
        socket->close();
        currentUrl = a_url;
        request = std::make_unique<boost::asio::streambuf>();
        response = std::make_unique<boost::asio::streambuf>();
        using boost::asio::ip::tcp;

        std::ostream requestStream(&(*request));
        requestStream << "GET " << a_url.pathAndQuery() << " HTTP/1.1\r\n";
        requestStream << "Host: " << a_url.host() << "\r\n";
        requestStream << "Accept: */*\r\n";
        requestStream << "Connection: close\r\n\r\n";

        tcp::resolver::query query(a_url.host(), "http");
        resolver->async_resolve(query, boost::bind(&DownloadRequest::handleResolve, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::iterator));
    }

    bool DownloadRequest::initializeSocket() {
        bool created = false;
        if (!ioService) {
            ioService = std::make_shared<boost::asio::io_service>();
            created = true;
        }

        resolver = std::make_unique<boost::asio::ip::tcp::resolver>(*ioService);
        socket = std::make_unique<boost::asio::ip::tcp::socket>(*ioService);

        return created;
    }

    void DownloadRequest::perform(const MV::Url& a_url) {
        originalUrl = a_url;
        try {
            bool needToCallRun = initializeSocket();
            initiateRequest(a_url);
            if (needToCallRun) {
                ioService->run();
            }
        } catch (...) {
            headerData.success = false;
            headerData.errorMessage = "Exception thrown to top level.";
            std::cerr << headerData.errorMessage << std::endl;
            onComplete(shared_from_this());
        }
    }

}

generalUtility.h(其中的一部分,仅供引用)

inline std::istream& getline_platform_agnostic(std::istream& is, std::string& t) {
    t.clear();

    // The characters in the stream are read one-by-one using a std::streambuf.
    // That is faster than reading them one-by-one using the std::istream.
    // Code that uses streambuf this way must be guarded by a sentry object.
    // The sentry object performs various tasks,
    // such as thread synchronization and updating the stream state.

    std::istream::sentry se(is, true);
    std::streambuf* sb = is.rdbuf();

    for (;;) {
        int c = sb->sbumpc();
        switch (c) {
        case '\n':
            return is;
        case '\r':
            if (sb->sgetc() == '\n')
                sb->sbumpc();
            return is;
        case EOF:
            // Also handle the case when the last line has no line ending
            if (t.empty())
                is.setstate(std::ios::eofbit);
            return is;
        default:
            t += (char)c;
        }
    }
}

inline std::string toLower(std::string s) {
    std::transform(s.begin(), s.end(), s.begin(), [](char c) { return std::tolower(c); });
    return s;
}

url.h

修改(稍微改变了一些命名方案的东西)

https://github.com/keyz182/Poco-1.4.3/blob/master/Foundation/include/Poco/URI.h

https://github.com/keyz182/Poco-1.4.3/blob/master/Foundation/src/URI.cpp

关于c++ - 在 boost::asio 中使用具有可调整大小的 streambuf 的相同 istream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38229739/

相关文章:

c++ - cmake 命令找不到 boost 库

c++ - 使用 boost::asio ssl 服务器减少每次连接的内存使用

c++ - 取一个数组引用 `T(&)[n]` 到一个 `std::array<T, n>` 的内容

c++ - 如何在 win32 C++ 应用程序中为具有 HANDLE 作为成员的类编写复制构造函数?

c++ - D3D11 : variable number of lights in HLSL

c++ - 如何对二进制输出使用boost序列化?

c++ - 堆栈分配数组的安全分配

c++ - 将 Boost 适配器与 C++11 lambda 配合使用

c++ - Boost-定期任务计划程序

c++ - Boost::ASIO:如何从 io_service 获取返回值?