asio与序列化


#1

文章本来是发在自己的blog上的,不过因为去的人太少,可能会有很多bug就一直这么留着,而且microcai那会说我私藏不发论坛上,所以复制一份发到论坛上,有空把asio的东西都搬过来。另外论坛的标题怎么还有限制,竟然说我title无效…

asio与序列化——被忽略的神器asio::streambuf

如果不是为了寻找一种跟简便的序列化的方法,我肯定会忽略掉streambuf,因为他在asio的example和介绍中都是那么的平淡无奇,以至于让我以为他只能跟async_read_until一起用才有意义,让人完全没有想用的的欲望…后来问了microcai和jackarain才知道有transfer_exactly这种东西,我才意识到了这货是把boost::serialization和asio联系到一起的神器。 最开始用boost的serialization,总觉得不顺手,因为标准库中并没有提供一种streambuf可以直接用来存放二进制数据,所以我只能用text archive来序列化和反序列化结构体。本来打算自己继承std::streambuf写一个,忽然想起来asio里面有个streambuf,不知道合不合适,拿过来一看,诶嘿,完全可用,那就不自己造轮子了。 选择了asio::streambuf之后,就遇到了新问题,直接这么写是不行的:

boost::asio::write(sock,streambuf);
boost::asio::read(sock,streambuf);

这样会一直阻塞在函数里面不会返回,除非发生错误或者连接中断。异步的版本也不行,一直不会调用回调函数。这就郁闷了啊,到底该怎样写入确定字节数的数据,写完就返回呢?这时候就轮到asio::transfer_exactly上场了,给read和write加个参数,指定要写入的字节数就可以了,于是这样boost::serialization就和asio完美结合起来了!!示例如下:

boost::asio::write(sock,streambuf,boost::asio::transfer_exactly(streambuf.size()));
boost::asio::read(sock,streambuf,boost::asio::transfer_exactly(streambuf.size()));

异步版本也一样,我就不写了,直接上一个示例,功能很简单,一个客户端一个服务器,客户端连上服务器之后要求输入一个数字和一个字符串,组合成一个结构体并序列化,然后发送到服务器,服务器返回这个结构体,不过并不是只返回一次,而是随机返回1到10次,客户端接收后显示出来。

protlcal.h:

#pragma once          #include <boost/serialization/access.hpp>     #include <boost/archive/binary_oarchive.hpp>     #include <boost/archive/binary_iarchive.hpp>          struct message_t     {         int num;         std::string str;              friend class boost::serialization::access;         template         void serialize(Archive &ar, const unsigned int /version/)         {             ar & num;             ar & str;         }     };

client:

#include <cstdlib>
#include <cstring>
#include <iostream>
#include <boost/asio.hpp>
#include "protocal.h"

void read_buf(boost::asio::ip::tcp::socket& s,
	boost::asio::streambuf& buf)
{
	int size = 0;
	boost::asio::read(s, boost::asio::buffer(&size, sizeof(int)));
	if (size == 0)
	{
		std::cout << "No data...." << std::endl;
		return;
	}
	boost::asio::read(s, buf, boost::asio::transfer_exactly(size));
}

void write_buf(boost::asio::ip::tcp::socket& s, 
	boost::asio::streambuf& buf)
{
	int size = buf.size();
	boost::asio::write(s, boost::asio::buffer(&size, sizeof(int)));
	boost::asio::write(s, buf, boost::asio::transfer_exactly(size));
}

int main(int argc, char* argv[])
{
  try
  {
    if (argc != 3)
    {
      std::cerr << "Usage: blocking_tcp_echo_client <host> <port>\n";
      return 1;
    }

    boost::asio::io_service io_service;

	boost::asio::ip::tcp::resolver resolver(io_service);
	boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), argv[1], argv[2]);
	boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve(query);

	boost::asio::ip::tcp::socket s(io_service);
    boost::asio::connect(s, iterator);

	message_t message;
	std::cout << "Enter a number:";
	std::cin >> message.num;
	std::cout << "Enter a message:";
	std::cin >> message.str;

	boost::asio::streambuf buf1;
	boost::archive::binary_oarchive oa(buf1);
	oa << message;
	write_buf(s, buf1);
	
	boost::asio::streambuf buf2;
	read_buf(s, buf2);

	boost::archive::binary_iarchive ia(buf2);
	int rep_num = 0;
	ia >> rep_num;
	for (int i = 0; i < rep_num; ++i)
	{
		message_t recv_msg;
		ia >> recv_msg;
		std::cout << "Number:" << recv_msg.num;
		std::cout << "Message:" << recv_msg.str << std::endl;
	}

  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << "\n";
  }

  return 0;
}

server:

#include <cstdlib>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <functional>
#include "../client/protocal.h"
#include <ctime>

// using boost::asio::ip::tcp;

class session
{
public:
	session(boost::asio::io_service& io_service)
		: socket_(io_service)
	{
	}

	boost::asio::ip::tcp::socket& socket()
	{
		return socket_;
	}

	void start()
	{
		async_read_buf(buf_, boost::bind(&session::handle_read_message, this, _1));
	}

private:
	void handle_read_message(boost::system::error_code ec)
	{
		if (ec)
		{
			delete this;
			return;
		}

		boost::archive::binary_iarchive ia(buf_);
		message_t msg;
		ia >> msg;	//得到message结构体

		std::cout << msg.str << std::endl;

		buf_.consume(buf_.size());//清空buf
		boost::archive::binary_oarchive oa(buf_);
		srand(time(NULL));
		int num = rand() % 9 + 1; //生成返回的message结构体的数量
		oa << num;	//加上数量
		for (int i = 0; i < num; ++i)
		{
			oa << msg;
		}
		async_write_buf(buf_, [this](boost::system::error_code){delete this; });
	}

private:
	//回调函数
	typedef std::function<void(boost::system::error_code ec)> callback_t;
	//读取buf
	void async_read_buf(
		boost::asio::streambuf& buf,
		callback_t callback)
	{
		boost::asio::async_read(socket_,
			boost::asio::buffer(&size_, sizeof(int)),
			boost::bind(
			&session::handle_read_size,
			this, boost::ref(buf), callback,
			boost::asio::placeholders::error));
	}
	void handle_read_size(
		boost::asio::streambuf& buf,
		callback_t callback,
		boost::system::error_code ec)
	{
		if (ec)
		{
			delete this;
			return;
		}

		boost::asio::async_read(socket_, buf,
			boost::asio::transfer_exactly(size_),
			boost::bind(callback,
			boost::asio::placeholders::error));
	}

	//写入buf
	void async_write_buf(
		boost::asio::streambuf& buf,
		callback_t callback)
	{
		size_ = buf.size();
		boost::asio::async_write(socket_,
			boost::asio::buffer(&size_, sizeof(int)),
			boost::bind(
			&session::handle_write_size,
			this, boost::ref(buf), callback,
			boost::asio::placeholders::error));
	}

	void handle_write_size(
		boost::asio::streambuf& buf,
		callback_t callback,
		boost::system::error_code ec)
	{
		if (ec)
		{
			delete this;
			return;
		}

		boost::asio::async_write(socket_, buf,
			boost::asio::transfer_exactly(size_),
			boost::bind(callback,
			boost::asio::placeholders::error));
	}

	boost::asio::ip::tcp::socket socket_;

	int size_;

	boost::asio::streambuf buf_;
};

class server
{
public:
	server(boost::asio::io_service& io_service, short port)
		: io_service_(io_service),
		acceptor_(io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port))
	{
		start_accept();
	}

private:
	void start_accept()
	{
		session* new_session = new session(io_service_);
		acceptor_.async_accept(new_session->socket(),
			boost::bind(&server::handle_accept, this, new_session,
			boost::asio::placeholders::error));
	}

	void handle_accept(session* new_session,
		const boost::system::error_code& error)
	{
		if (!error)
		{
			new_session->start();
		}
		else
		{
			delete new_session;
		}

		start_accept();
	}

	boost::asio::io_service& io_service_;
	boost::asio::ip::tcp::acceptor acceptor_;
};

int main(int argc, char* argv[])
{
	try
	{
		if (argc != 2)
		{
			std::cerr << "Usage: async_tcp_echo_server <port>\n";
			return 1;
		}

		boost::asio::io_service io_service;

		server s(io_service, std::atoi(argv[1]));

		io_service.run();
	}
	catch (std::exception& e)
	{
		std::cerr << "Exception: " << e.what() << "\n";
	}

	return 0;
}

有了序列化就是好,是不是有种把异步写成了同步的赶脚?只管把数据往streambuf里面塞,然后一起发出去,收的时候一下全部接受,然后挨个解析,说C++的stream是垃圾的,我咋觉得这么好用呢?另外asio的streambuf完全可以配合std::istream和std::ostream来用,可以直接将int或者std::string直接输入到streambuf而不用boost的序列化库。 不过有个缺点就是如果数据量大肯定不行,内存放不下,但是绝大多数的时候完全够用了。我之前担心过某个session读写的数据稍大一点会导致其他的读写无法进行,后来microcai说没这回事,asio内部会分成小段发送,那唯一要担心的就是太大的数据会撑爆内存了。。


#2

#3

又来黑我?

靠,字数!


#4

一天不黑你就不爽 请给正文内容再输入至少 1 个字符


#5

原来忘了贴protocal.h的内容,好有钱提醒后刚加上,郁闷的是为啥不会变成代码格式…


#7

然后搭配protobuf更好用,iostream绑定streambuf