C++ asio学习二

C++ asio学习二

asio异步写操作

  • async_write_some是异步写的函数:传入buffer和回调函数以及参数以后,发送后会调用回调函数。
1
2
3
4
5
6
7
8
9
10
11
12
void Session::WriteToSocketErr(const std::string& buf) {
// make_shared 延长_send_node 的生命周期。
_send_node = make_shared<MsgNode>(buf.c_str(), buf.length());
//异步发送数据,因为异步所以不会一下发送完
//async_write_some的回调函数要求是两个参数的:发送多少,以及可能返回的错误码,都作为参数传递给回调函数。
/*但是自己定义的函数参数为3个,通过bind将三个参数转换为两个参数的普通函数。*/

this->_socket->async_write_some(asio::buffer(_send_node->_msg,
_send_node->_total_len),
std::bind(&Session::WriteCallBackErr,
this, std::placeholders::_1, std::placeholders::_2, _send_node));
}
  • 回调函数需要判断是否所有数据都发送完成了,如果没有,继续调用回调函数进行发送。
1
2
3
4
5
6
7
8
9
10
11
12
void Session::WriteCallBackErr(const boost::system::error_code& ec, 
std::size_t bytes_transferred,
std::shared_ptr<MsgNode> msg_node)
{
if (bytes_transferred + msg_node->_cur_len < msg_node->_total_len) {
_send_node->_cur_len += bytes_transferred;
this->_socket->async_write_some(asio::buffer(_send_node->_msg+_send_node->_cur_len,
_send_node->_total_len-_send_node->_cur_len),
std::bind(&Session::WriteCallBackErr,
this, std::placeholders::_1, std::placeholders::_2, _send_node));
}
}
  • 上面的代码中存在的问题:异步发送的过程是无序的,用户想发送数据的时候就调用WriteToSocketErr,或者循环调用WriteToSocketErr,很可能在一次没发送完数据还未调用回调函数时再次调用WriteToSocketErr。那么有可能出现发送数据的顺序和想要的顺序不同。
  • 为了确保发送数据的顺序正确性,在应用层使用一个队列,保证发送顺序。并用一个标志位用来判断当前是否还有数据正在发送过程中。
1
2
3
4
5
6
7
8
9
10
class Session{
public:
// 函数参数和上面的参数并无差别
void WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);
void WriteToSocket(const std::string &buf);
private:
std::queue<std::shared_ptr<MsgNode>> _send_queue; // 发送数据顺序队列
std::shared_ptr<asio::ip::tcp::socket> _socket;
bool _send_pending;// 判断当前是否有数据正在发送,为true表示一个节点还未发送完。
};
  • 发送函数:发送的时候先把数据插到队列中,回调后,将正在发送标志位置为true。
1
2
3
4
5
6
7
8
9
10
11
12
13
void Session::WriteToSocket(const std::string& buf) {
// 将要发送的数据插入队列
_send_queue.emplace(new MsgNode(buf.c_str(), buf.length()));
if (_send_pending) { //还有数据在发送的话,就先return
return;
}

//异步发送数据,因为异步所以不会一下发送完
this->_socket->async_write_some(
asio::buffer(buf),
std::bind(&Session::WriteCallBack,this, std::placeholders::_1, std::placeholders::_2));
_send_pending = true; // 调用一次async_write_some,肯定会触发WriteCallBack,这个时候将标志位置为true
}
  • 回调函数:回调函数在执行时,会首先判断队首元素是否全部发送出去了,如果没有就继续发送队首元素。队首元素发送完毕后,继续取出队列中的元素。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void Session::WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred, std::shared_ptr<MsgNode>) {
if (ec.value() != 0) {//出错了
std::cout << "Error , code is " << ec.value() << " . Message is " << ec.message();
return;
}
// 取出队首元素
auto& send_data = _send_queue.front();
send_data->_cur_len += bytes_transferred;
//数据未发送完, 则继续发送
if (send_data->_cur_len < send_data->_total_len) {
this->_socket->async_write_some(
asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len - send_data->_cur_len),
std::bind(&Session::WriteCallBack,this, std::placeholders::_1, std::placeholders::_2)
);
return;
}

_send_queue.pop();
if (_send_queue.empty()) {
_send_pending = false;
}
else {// 队列不为空,继续发送。
auto& send_data = _send_queue.front();
this->_socket->async_write_some(
asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len - send_data->_cur_len),
std::bind(&Session::WriteCallBack,this, std::placeholders::_1, std::placeholders::_2)
);
}

}
  • async_write_some函数不能保证每次回调函数触发时发送的长度为要总长度,每次都要在回调函数判断发送数据是否完成,asio提供了一个更简单的发送函数async_send,这个函数在发送的长度未达到我们要求的长度时就不会触发回调,所以触发回调函数时要么时发送出错了要么是发送完成了,其内部的实现原理就是帮我们不断的调用async_write_some直到完成发送,所以async_send不能和async_write_some混合使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void Session::WriteAllToSocket(const std::string& buf) {
// 将要发送的数据插入队列
_send_queue.emplace(new MsgNode(buf.c_str(), buf.length()));
if (_send_pending) return;

// 异步发送数据,数据不会一次发送完,但是只会触发一次回调
this->_socket->async_send(asio::buffer(buf), std::bind(&Session::WriteAllCallBack, this, std::placeholders::_1, std::placeholders::_2));
_send_pending = true;
}
void Session::WriteAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred) {
if (ec.value() != 0) {
std::cout << "Error occured! Error code = "
<< ec.value()
<< ". Message: " << ec.message();
return;
}

//如果发送完,则pop出队首元素
_send_queue.pop();
//如果队列为空,则说明所有数据都发送完,将pending设置为false
if (_send_queue.empty()) {
_send_pending = false;
}
//如果队列不是空,则继续将队首元素发送
if (!_send_queue.empty()) {
auto& send_data = _send_queue.front();
this->_socket->async_send(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len - send_data->_cur_len),
std::bind(&Session::WriteAllCallBack,
this, std::placeholders::_1, std::placeholders::_2));
}
}

asio异步读操作

  • 异步读操作和异步的写操作类似同样有async_read_someasync_receive函数,前者触发的回调函数获取的读数据的长度可能会小于要求读取的总长度,后者触发的回调函数读取的数据长度等于读取的总长度。
  • 同样的async_read_someasync_receive不能混用,因为async_receive的底层就是循环调用async_read_some。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void Session::ReadFromSocket() {
if (_recv_pending) return;
_recv_node = std::make_shared<MsgNode>(RECVSIZE);
// 一块一块接收
_socket->async_read_some(asio::buffer(_recv_node->_msg, _recv_node->_total_len), std::bind(&Session::ReadCallBack, this,
std::placeholders::_1, std::placeholders::_2));
_recv_pending = true;
}
void Session::ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred) {
_recv_node->_cur_len += bytes_transferred;
// 如果没有接受完,继续接收
if (_recv_node->_cur_len < _recv_node->_total_len) {
_socket->async_read_some(asio::buffer(_recv_node->_msg + _recv_node->_cur_len,
_recv_node->_total_len - _recv_node->_cur_len), std::bind(&Session::ReadCallBack, this,
std::placeholders::_1, std::placeholders::_2));
return;
}

_recv_pending = false;
//指针置空
_recv_node = nullptr;
}
  • async_receive:接收完数据后才会调用一次回调。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void Session::ReadAllFromSocket() {
if (_recv_pending) {
return;
}
_recv_node = std::make_shared<MsgNode>(RECVSIZE);
_socket->async_receive(asio::buffer(_recv_node->_msg, _recv_node->_total_len), std::bind(&Session::ReadAllCallBack, this,
std::placeholders::_1, std::placeholders::_2));
_recv_pending = true;
}
void Session::ReadAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred) {
if (ec.value() != 0) {
std::cout << "Error occured! Error code = "
<< ec.value()
<< ". Message: " << ec.message();
return;
}
_recv_node->_cur_len += bytes_transferred;
//将数据投递到队列里交给逻辑线程处理,此处略去
//如果读完了则将标记置为false
_recv_pending = false;
//指针置空
_recv_node = nullptr;
}

asio 异步echo服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#pragma once
#include<memory>
#include<boost/asio.hpp>
#include<iostream>
#include<queue>
using namespace std;
using namespace boost;
using boost::asio::ip::tcp;

// Session类主要是处理客户端消息收发的会话类,简单起见,不考虑粘包问题,也不考虑支持手动调用发送的接口,只以应答的方式发送和接收固定长度(1024字节长度)的数据。
class Session {
public:
// 上下文初始化Session,socket绑定上下文
Session(boost::asio::io_context& ioc) :_socket(ioc) {
}
tcp::socket& Socket() {
return _socket;
}
void Start();// 在start中监听客户端的读写
private:
// handle_read和handle_write分别为服务器的读回调函数和写回调函数。
// 当服务器读数据时,会调用handle_read,在handle_read过程中,要把数据回传给客户端,要写时,会调用handle_write。
void handle_read(const boost::system::error_code& error, size_t bytes_transfered);
void handle_write(const boost::system::error_code& error);

tcp::socket _socket; //_socket为单独处理客户端读写的socket。
enum { max_length = 1024 };
char _data[max_length]; //_data用来接收客户端传递的数据

};

//最大报文接收大小
const int RECVSIZE = 1024;


// Server类是为服务器接收连接的管理类。
class Server {
public:
Server(boost::asio::io_context& ioc, short port);
private:
void start_accept();//启动连接描述符,初始化一个acceptor,将要接收连接的acceptor绑定到服务上
// 内部就是将accpeptor对应的socket描述符绑定到epoll或iocp模型上,实现事件驱动。
void handle_accept(Session* new_session, const boost::system::error_code& error); // 有连接过来的时候,触发回调函数,回调session的数据。
boost::asio::io_context& _ioc;// 上下文,不允许被复制被构造。
tcp::acceptor _acceptor;
};

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
void Session::Start() {
memset(_data, 0, max_length);
// async_read_some 在boost asio底层用的是epoll,把_socket的读事件添加到epoll表里。
// 当_socket有读事件就绪的时候,就会触发handle_read,读:对端发送数据,_socket底层的读tcp缓冲区由空变成有数据。
_socket.async_read_some(
boost::asio::buffer(_data, max_length),
std::bind(&Session::handle_read, this, placeholders::_1, placeholders::_2)
);

}

// 读的回调函数:客户端发送数据过来,就会调用这个函数
void Session::handle_read(const boost::system::error_code& error, size_t bytes_transfered) {
if (!error) {
cout << "server receive data is " << _data << endl; // 收到数据
// 将收到的数据发送回客户端,就行了。
boost::asio::async_write(_socket, boost::asio::buffer(_data, bytes_transfered),
std::bind(&Session::handle_write, this, placeholders::_1)); // 当发送完成后触发handle_write回调函数。
}
else {
cout << "read error" << endl;
delete this;
}
}

// 写回调函数,写回调之后,就要开始读了。
// 触发写回调,是因为tcp有空闲空间,能够把用户态的数据拷贝到tcp缓冲区。
void Session::handle_write(const boost::system::error_code& error) {
if (!error) {
memset(_data, 0, max_length);
// 继续读去吧。
_socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&Session::handle_read,
this, placeholders::_1, placeholders::_2));
}
else {
cout << "write error"<<error.value() << endl;
delete this;
}
}

// 构造函数:_ioc是引用变量,要用初始化列表的方式进行赋值,_acceptor专门捕获连接
Server::Server(boost::asio::io_context& ioc, short port):_ioc(ioc),_acceptor(ioc,tcp::endpoint(tcp::v4(),port)) {
cout << "Server start success, on port: " << port << endl;
start_accept();
}

void Server::start_accept() {
Session* new_session = new Session(_ioc); // 定义新的session
// 新的连接到来以后,调用handle_accept
// placeholders::_1 占位符,是asio api要求的错误码。
// asio就会通过placeholders::_1 这个占位符,把错误码发送给handle_accept函数。
_acceptor.async_accept(new_session->Socket(),std::bind(&Server::handle_accept, this, new_session, placeholders::_1));
}

void Server::handle_accept(Session* new_session, const boost::system::error_code& error) {
if (!error) {
new_session->Start();
}
else {
delete new_session;
}
start_accept();//再次调用start_accept(),继续接收,不能接收完新的连接之后,就什么都不干了。
}

客户端不用写成异步的,因为客户端并不是以并发为主。

asio异步服务器中存在的隐患

  • 服务器即将发送数据前(调用async_write前),此刻客户端中断
  • 服务器此时调用async_write会触发发送回调函数,判断ec为非0进而执行delete this逻辑回收session
  • 但要注意的是客户端关闭后,在tcp层面会触发读就绪事件,服务器会触发读事件回调函数。在读事件回调函数中判断错误码ec为非0,进而再次执行delete操作,从而造成二次析构,这是极度危险的。
  • 参考列表
    https://www.bilibili.com/video/BV15P411S7fp/?p=7&spm_id_from=pageDriver

C++ asio学习二
https://cauccliu.github.io/2024/03/26/C++ asio学习二/
Author
Liuchang
Posted on
March 26, 2024
Licensed under