asio协程的思维转变


#1

ASIO协程的思维转变


avbot 是一个纯粹的单线程程序。在设计 avbot 最初的时候,我就给自己下了一个明确的目标:必须单线程。

但是,它的逻辑可不简单。它需要处理 XMPP 协议,处理 IRC 协议,处理 WebQQ 协议,处理pop3协议,处理 SMTP 协议。 所有的处理都必须异步。绝对不能因为 IO 阻塞。

可能很多人会不以为然,这有什么, select() 一下就好了。但是你知道这意味着多少代码么?

你需要编写大量的函数,大量的回调函数。别说复杂的 WebQQ 协议了,就是一个简单的 IRC , 你知道要多少代码完成异步处理么?

而且除了 WebQQ 以外(WebQQ的代理支持也在添加计划中), XMPP IRC POP3 SMTP 都是支持代理的。你知道要多少代码完成代理么?

想必很多吧?

所幸的是,avbot 没有使用C开发,也没有选择 ACE MFC 这样的不合格的类库开发。而是选择了 Boost。尤其是 ASIO, ASIO让异步过程大大简化。

而让异步过程更加简化的,就是 ASIO 作者发明的 stackless coroutine!

avbot 发布了许久了, 最近突然有个用户跑来说,希望能增加个调用 “外部脚本” 的功能,方便扩展。

我一向对设计一个 plugin 机制极力的避免,不喜欢动态载入的模块扩展程序本身的功能。何况 avbot 是 c++开发的,调用脚本并不是容易的事情。(好吧,真实的原因是我被 mingw (VC 不支持 utf8源码,我已经抛弃了) 折腾怕了,不想再搞个 python 。windows实在是恐怖的平台,写点程序麻烦的要死,编译麻烦的要死。可是 avbot 又必须跨平台,结果是我一天写好的东西要在 windows (虚拟机) 里折腾好几天,累死人 )

于是我决定提供一个 JSON 接口,内置一个简单的 HTTP Server, 用脚本(python应该 HTTP JSON 模块有的是,对吧)连接到 avbot ,然后 avbot 将发生的每条消息以 json 的形式返回给 外部脚本。

另外,默认使用 HTTP 的connection: keep-alive 模式,所以保持一个长连接即可。

那么,avbot 需要支持不确定数目的消息接收方了。

对于链接到 avbot 的客户端而言, avbot 并不保留之前的所有消息,而是从连接上的那一刻开始,后续的消息才能通知到。 一个很明显的思路就是,将链接上的客户端做成一个链表/列队, avbot 收到消息后,遍历这个列队执行消息发送。

这个思路很简单,可是如果要求 : 必须单线程异步呢?

avbot 是一个纯粹的单线程程序,绝对不允许多线程化。所有的逻辑必须使用异步处理。

那么,这个问题就复杂化了, “avbot 收到消息后,遍历这个列队执行消息发送” 这个做法,不可避免的带来了阻塞。好吧,异步遍历吧。

要是异步遍历还没遍历完,又来一个消息呢? 考虑这个问题,你会发疯的。因为异步,太多的细节需要考虑了。真的。

好吧,又有个好主意了,为每个客户端建立一个列队,每次遍历就是把要发送的消息挂入列队即可。这样也不需要异步遍历了,同步就可以。解决了异步遍历的时候又来一个消息导致的痛苦的调度。

然后细分,考虑每个客户端,就是等待 “发送列队” 不为空!等等,一直这么等待也不行,如果客户断开了链接呢? 所以要 “同时等待发送列队不为空&&客户正常在线,并且已经发送了 HTTP 请求头部”

好绕口,不过也只能如此了。

avbot 因为默认使用了 keep-alive , 所以发送是一个死循环,知道客户端主动断开链接或者网络发生错误。如果 客户端死了,那么,发送列队兴许会出现 爆队 的情况。所以要限制发送列队的大小。不是满了就不发送,而是满了后就把早的消息踢掉,也就是让 客户端发生“暂时性卡死”后,还能继续处理最后的几条信息。

诶,复杂的逻辑终于理清了,代码呢?!

啊累?

靠,这么复杂的 逻辑,得写一长段代码,调试几百年了吧?

错,我只花了几个小时,不到 100 行的代码就轻松实现了全部要求。

!!! WHAT !!!

这种功能不可能不用个千把行代码的吧?!

如果使用以前的老办法,确实如此。

可是,自从发现了 ASIO 后,我被 ASIO 爸爸发明的协程深深的震惊了!

利用 ASIO 爸爸提出的协程思想,我只用了不到 100行代码就全部完成了以上复杂的逻辑,而且,全部都是异步的哦~ 。


我为什么喜欢用协程
#2

#3

貌似没贴代码,也没讲解代码, 恩,有需要的么?

代码在 github , 如果看不懂的,可以 ping 我一下,我找个时间讲解下 里面那个协程


#4

来,讲解代码。 来看 rpc/server.cpp 200 行这里的代码,为方便起见,我摘录出来。

// 数据操作跑这里,嘻嘻.
void avbot_rpc_server::client_loop(boost::system::error_code ec, std::size_t bytestransfered)
{
	std::string uri;

	boost::smatch what;
	//for (;;)
	BOOST_ASIO_CORO_REENTER(this)
	{for (;;){

		m_request.clear();
		m_streambuf = boost::make_shared<boost::asio::streambuf>();

		// 读取用户请求.
		BOOST_ASIO_CORO_YIELD avhttpd::async_read_request(
				*m_socket, *m_streambuf, m_request,
				boost::bind(&avbot_rpc_server::client_loop, shared_from_this(), _1, 0)
		);

		if(ec)
		{
			if (ec == avhttpd::errc::post_without_content)
			{
				BOOST_ASIO_CORO_YIELD avhttpd::async_write_response(*m_socket, avhttpd::errc::no_content,
					boost::bind(&avbot_rpc_server::client_loop, shared_from_this(), _1, 0)
				);
				return;
			}
			else if (ec == avhttpd::errc::header_missing_host)
			{
				BOOST_ASIO_CORO_YIELD avhttpd::async_write_response(*m_socket, avhttpd::errc::bad_request,
					boost::bind(&avbot_rpc_server::client_loop, shared_from_this(), _1, 0)
				);
				return;
			}
			return;
		}

		uri = m_request.find(avhttpd::http_options::request_uri);

		// 解析 HTTP
		if(m_request.find(avhttpd::http_options::request_method) == "GET" )
		{
			if(uri=="/message")
			{
				// 等待消息, 并发送.
				BOOST_ASIO_CORO_YIELD m_responses.async_pop(
					boost::bind(&avbot_rpc_server::on_pop, shared_from_this(), _2)
				);
			}
			else if(
				boost::regex_match(uri, what,
					boost::regex("/search\\?channel=([^&]*)&q=([^&]*)&date=([^&]*).*")
				)
			)
			{
				// 取出这几个参数, 到数据库里查找, 返回结果吧.
				BOOST_ASIO_CORO_YIELD do_search(what[1],what[2],what[3],
					boost::bind(&avbot_rpc_server::done_search, shared_from_this(), _1, _2)
				);
				return;
			}
			else if(boost::regex_match(uri, what,boost::regex("/search(\\?)?")))
			{
				// missing parameter
				BOOST_ASIO_CORO_YIELD avhttpd::async_write_response(
					*m_socket,
					avhttpd::errc::internal_server_error,
					boost::bind(
						&avbot_rpc_server::client_loop,
						shared_from_this(),
						_1, 0
					)
				);
				return;
			}
			else if (boost::regex_match(uri, what,boost::regex("/status(\\?)?")))
			{
				// 获取 avbot 的状态.
				//boost::regex_match();
			}
			else
			{
				BOOST_ASIO_CORO_YIELD avhttpd::async_write_response(
					*m_socket,
					avhttpd::errc::not_found,
					boost::bind(
						&avbot_rpc_server::client_loop,
						shared_from_this(),
						_1, 0
					)
				);
				return;
			}
		}
		else if( m_request.find(avhttpd::http_options::request_method) == "POST")
		{
			// 这里进入 POST 处理.
			// 读取 body
			BOOST_ASIO_CORO_YIELD boost::asio::async_read(
				*m_socket,
				*m_streambuf,
				boost::asio::transfer_exactly(
					boost::lexical_cast<std::size_t>(
						m_request.find(avhttpd::http_options::content_length)
					) - m_streambuf->size()
				),
				boost::bind(&avbot_rpc_server::client_loop, shared_from_this(), _1, _2 )
			);
			// body 必须是合法有效的 JSON 格式
			BOOST_ASIO_CORO_YIELD avhttpd::async_write_response(
					*m_socket,
					process_post(m_streambuf->size()),
					avhttpd::response_opts()
						(avhttpd::http_options::content_length, "4")
						(avhttpd::http_options::content_type, "text/plain")
						("Cache-Control", "no-cache")
						(avhttpd::http_options::http_version,
							m_request.find(avhttpd::http_options::http_version)),
					boost::asio::buffer("done"),
					boost::bind(&avbot_rpc_server::client_loop, shared_from_this(), _1, 0)
			);
			if ( m_request.find(avhttpd::http_options::connection) != "keep-alive" )
				return;
		}

		// 继续
		BOOST_ASIO_CORO_YIELD avloop_idle_post(m_socket->get_io_service(),
			boost::bind(&avbot_rpc_server::client_loop, shared_from_this(), ec, 0)
		);
	}}
}

void avbot_rpc_server::callback_message(const boost::property_tree::ptree& jsonmessage)
{
	boost::shared_ptr<boost::asio::streambuf> buf(new boost::asio::streambuf);
	std::ostream stream(buf.get());
	std::stringstream teststream;

	js::write_json(stream, jsonmessage);

	m_responses.push(buf);
}



static void accepte_handler(
	boost::shared_ptr<boost::asio::ip::tcp::socket> m_socket,
	avbot & mybot,
	soci::session & db)
{
	void avlog_do_search(boost::asio::io_service & io_service,
		std::string c, std::string q, std::string date,
		boost::function<void (boost::system::error_code, pt::ptree)> handler,
		soci::session & db);

	boost::make_shared<avbot_rpc_server>(
		m_socket,
		boost::ref(mybot.on_message),
		boost::bind(
			avlog_do_search,
			boost::ref(m_socket->get_io_service()),
			_1,_2,_3,_4,
			boost::ref(db)
		)
	)->start();
}

bool avbot_start_rpc(boost::asio::io_service & io_service, int port, avbot & mybot, soci::session & avlogdb)
{
	try
	{
		// 调用 acceptor_server 跑 avbot_rpc_server 。 在端口 6176 上跑哦!
		boost::acceptor_server(
			io_service,
			boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v6(), port),
			boost::bind(accepte_handler, _1, boost::ref(mybot), boost::ref(avlogdb))
		);
	}
	catch (...)
	{
		try
		{
			boost::acceptor_server(
				io_service,
				boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port),
				boost::bind(accepte_handler, _1, boost::ref(mybot), boost::ref(avlogdb))
			);
		}
		catch (...)
		{
			return false;
		}
	}
	return true;
}

#5

这就是协程的起始了。函数被执行到在这里,就开始了跳转,第一次执行的时候,不会跳转。

会继续执行接下来的语句。 但是,第二次回调执行到这里的时候,会立即跳转到上一次退出的地方的下一条语句。


#6

这里,执行力一次 async_read 过程。在这里,前面加了条 BOOST_ASIO_CORO_YIELD , 然后回调函数设定为自己。

执行这条语句后,整个过程就会退出,而不会继续执行下一条命令了。

接着,等异步过程完毕,再次回调的时候,执行到 BOOST_ASIO_CORO_REENTER(this) 就会立即调整到下面

		if(ec)
		{

这里了。


#7

#8

为什么必须单线程?性能会更好?


#9

为啥要多线程呢? 多线程是为了利用多核。如果不是为了利用多核,多线程的意义就是让不会编程的人写程序而已。


#10

现在的PC基本上都是多核,为什么不利用多核呢?


#11

不要这么私自, 系统里并不只有你一个软件。


#12

我不觉得这是自私,相反,我觉得这种架构可以给用户提供充分利用机器性能的可能。做得厚道一些,可以用单线程或者多线程可设置的,让用户选择


#13

如果用多线程设计, 用户一开始就没有选择权了。相反,一开始就使用异步设计, 后来转换为多线程是轻轻松松的事情。只要多开几个线程跑 io_service.run() 就可以了。


#14

协程不适合用强类型语言编写。。


#15

可是我记得 asio 底层是有使用后台线程来实现一些功能的吧?网络接口我还不清楚,但是要做其他类型扩展的时候就要用后台线程来等待事件发生,然后通过 io_service::post 来投递到主事件循环中的吧。