当asio遇上resumable function


#1

基于resumable function的封装. 让task同时实现了await-able, 和 then-able. 简化概念为单独一个task, 方便的协程回调转换. 能够很方便的进行库封装以及复杂异步业务逻辑处理与封装.

如上代码clone出来,外加更新git的submodule, 是可以执行的, 前提是得有vs2015的环境

本文新增一种基于awaitable_tasks结合asio实现封装异步过程的方式. 以 https://github.com/chriskohlhoff/asio/tree/master/asio/src/examples/cpp03/http/client 的sync_client和async_client为例. 改造为协程版本.

由于asio在架构上增加了对协程的适配, 可以非常方便的将awaitable_tasks和asio组合使用. 适配层: https://github.com/wangjieest/awaitable_tasks/blob/master/asio_test/asio_use_task.hpp

同步方式调用

asio::io_service io_service; sync_request(io_service, server, path); 直接调用即可.内部阻塞.

协程模式的驱动和异步方式完全一致.

asio::io_service io_service; auto t = make_request_task(io_service, server, path); io_service.run(); //io_service 驱动. 异步执行,但代码逻辑完全和同步模式一致.

// 异常版本的协程方式,代码同步,逻辑异步,需要io_service来驱动异步事件. // asio::io_service io_service; // auto t = make_request_task(io_service, server, path); // io_service.run(); //io_service 驱动.

awaitable_tasks::task<asio::error_code> make_request_task(asio::io_service& io_service,
                                            const std::string& server,
                                            const std::string& path) {
    asio::error_code err;
    try {
        // Start an asynchronous resolve to translate the server and service names
        // into a list of endpoints.
        asio::streambuf request_;
        std::ostream request_stream(&request_);
        request_stream << "GET " << path << " HTTP/1.0\r\n";
        request_stream << "Host: " << server << "\r\n";
        request_stream << "Accept: */*\r\n";
        request_stream << "Connection: close\r\n\r\n";

        tcp::resolver::query query(server, "http");

        tcp::socket socket_(io_service);
        tcp::resolver resolver_(io_service);

        auto resolver_ret = co_await resolver_.async_resolve(query, asio::use_task);

        // Attempt a connection to each endpoint in the list until we
        // successfully establish a connection.
        co_await asio::async_connect(socket_, resolver_ret, asio::use_task);

        co_await asio::async_write(socket_, request_, asio::use_task);

        asio::streambuf response_;
        co_await asio::async_read_until(socket_, response_, "\r\n", asio::use_task);

        // Check that response is OK.
        std::istream response_stream(&response_);
        std::string http_version;
        response_stream >> http_version;
        unsigned int status_code;
        response_stream >> status_code;
        std::string status_message;
        std::getline(response_stream, status_message);
        if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
            std::cout << "Invalid response\n";
            return asio::error_code();
        }
        if (status_code != 200) {
            std::cout << "Response returned with status code ";
            std::cout << status_code << "\n";
            return asio::error_code();
        }

        // Read the response headers, which are terminated by a blank line.
        co_await asio::async_read_until(socket_, response_, "\r\n\r\n", asio::use_task);

        // Process the response headers.
        std::istream response_stream2(&response_);
        std::string header;
        while (std::getline(response_stream2, header) && header != "\r")
            std::cout << header << "\n";
        std::cout << "\n";

        // Write whatever content we already have to output.
        if (response_.size() > 0)
            std::cout << &response_;

        // Continue reading remaining data until EOF.
        // Read until EOF, writing data to output as we go.
        try {
            for (;;) {
                auto count = co_await asio::async_read(socket_,
                                                response_,
                                                asio::transfer_at_least(1),
                                                asio::use_task);
                if (count)
                    std::cout << &response_;
            }
        } catch (std::system_error& e) {
            if (e.code() == asio::error::eof) {
                return asio::error_code();
            }
            throw asio::system_error(e.code());
        }
    } catch (std::exception& e) {
        std::cout << "Exception: " << e.what() << "\n";
    }

    return asio::error_code();
}

// 同步调用方式,无需io_service驱动.阻塞调用 // asio::io_service io_service; // sync_request(io_service, server, path);

int sync_request(asio::io_service& io_service, const std::string& server, const std::string& path) {
    try {
        // Get a list of endpoints corresponding to the server name.
        tcp::resolver resolver(io_service);
        tcp::resolver::query query(server, "http");
        auto endpoints = resolver.resolve(query);

        // Try each endpoint until we successfully establish a connection.
        tcp::socket socket(io_service);
        asio::connect(socket, endpoints);

        // Form the request. We specify the "Connection: close" header so that the
        // server will close the socket after transmitting the response. This will
        // allow us to treat all data up until the EOF as the content.
        asio::streambuf request;
        std::ostream request_stream(&request);
        request_stream << "GET " << path << " HTTP/1.0\r\n";
        request_stream << "Host: " << server << "\r\n";
        request_stream << "Accept: */*\r\n";
        request_stream << "Connection: close\r\n\r\n";

        // Send the request.
        asio::write(socket, request);

        // Read the response status line. The response streambuf will automatically
        // grow to accommodate the entire line. The growth may be limited by passing
        // a maximum size to the streambuf constructor.
        asio::streambuf response;
        asio::read_until(socket, response, "\r\n");

        // Check that response is OK.
        std::istream response_stream(&response);
        std::string http_version;
        response_stream >> http_version;
        unsigned int status_code;
        response_stream >> status_code;
        std::string status_message;
        std::getline(response_stream, status_message);
        if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
            std::cout << "Invalid response\n";
            return 1;
        }
        if (status_code != 200) {
            std::cout << "Response returned with status code " << status_code << "\n";
            return 1;
        }

        // Read the response headers, which are terminated by a blank line.
        asio::read_until(socket, response, "\r\n\r\n");

        // Process the response headers.
        std::string header;
        while (std::getline(response_stream, header) && header != "\r")
            std::cout << header << "\n";
        std::cout << "\n";

        // Write whatever content we already have to output.
        if (response.size() > 0)
            std::cout << &response;

        // Read until EOF, writing data to output as we go.
        asio::error_code error;
        while (asio::read(socket, response, asio::transfer_at_least(1), error))
            std::cout << &response;
        if (error != asio::error::eof)
            throw asio::system_error(error);
    } catch (std::exception& e) {
        std::cout << "Exception: " << e.what() << "\n";
    }

    return 0;
}

#2

使用的是标准的 C++ 的东西实现的吗? 依赖非标准的 vs的 await 的话就毫无价值了.


#3

http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2017/n4649.pdf


#4

感觉不是很喜欢这个