C++ asio学习三
C++ asio学习三
用智能指针延长session的生命周期
问题:
客户端断开后:会触发服务器对应session的写或读事件,由于是异步编程,需要在回调中对读写事件进行处理。
客户端断开, 则应该析构掉该session。但是此时该session在asio底层回调队列中可能还有很多读写函数对象在排队等着执行 。 如果在某个读写回调对象把这个session析构掉了,那之后执行的读写回调函数可能会再次析构这个session。
所以我们需要保证, 在该session对应asio底层回调队列中,还存在将要执行的读写回调函数时,该session不被析构。通过智能指针来实现伪闭包,延长session的生命周期。
智能指针传给函数对象,函数对象不释放,智能指针也就不会被释放掉。
把智能指针传递给session用的回调函数,函数内部再使用智能指针,这个时候智能指针就不被释放。
假如包含智能指针的函数没有调用怎么办?用lambda表达式和bind强制将智能指针中的shared_ptr加1。
构造一个伪闭包:
- 利用智能指针被复制或使用引用计数加一的原理保证内存不被回收
bind操作可以将值绑定在一个函数对象上生成新的函数对象
,如果将智能指针作为参数绑定给函数对象,那么智能指针就以值的方式被新函数对象使用,那么智能指针的生命周期将和新生成的函数对象一致
,从而达到延长生命的效果。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40// 包含智能指针的Server类。
class CServer
{
public:
// 构造函数
CServer(boost::asio::io_context& io_context, short port);
void ClearSession(std::string uuid);
private:
void HandleAccept(std::shared_ptr<CSession>, const boost::system::error_code& error);
void StartAccept();
boost::asio::io_context& _io_context;// 上下文
short _port;// 端口
tcp::acceptor _acceptor;
// 通过智能指针方式管理Session类,将acceptor接收的连接保存在Session类型的智能指针里。
// 在Server类中添加成员变量,该变量为一个map类型,key为Session的uid,value为该Session的智能指针。
std::map<std::string, std::shared_ptr<CSession>> _sessions;
// 通过Server中的_sessions这个map管理链接,可以增加Session智能指针的引用计数,只有当Session从这个map中移除后,Session才会被释放。
};
class CSession :public std::enable_shared_from_this<CSession> {
public:
// 上下文初始化CSession,socket绑定上下文
CSession(boost::asio::io_context& io_context, CServer* server);
tcp::socket& GetSocket() { return _socket; }
std::string& GetUuid() { return _uuid; }
void Start();
void Send(char* msg, int max_length);
private:
enum { MAX_LENGTH = 1024 };
void HandleRead(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> _self_shared);
void HandleWrite(const boost::system::error_code& error, std::shared_ptr<CSession> _self_shared);
tcp::socket _socket;
std::string _uuid;
char _data[MAX_LENGTH];
CServer* _server;
std::queue<std::shared_ptr<MsgNode> > _send_que;
std::mutex _send_lock;
};
1 |
|
处理粘包问题
粘包问题:当客户端发送多个数据包给服务器时,服务器底层的tcp接收缓冲区收到的数据为粘连在一起的,是服务器的问题,不是客户端的问题。
客户端发送: hello world! hello world!
服务器接收:hello world! hello world!
客户端给服务器发送了两个hello world! 服务其TCP缓冲区接收了两次,但是第一次接收的数据粘包了。
粘包原因:TCP发送数据的时候,数据逻辑性出了问题。
- TCP底层通信是面向字节流的,TCP只保证发送数据的准确性和顺序性,字节流以字节为单位。
- 客户端每次发送N个字节给服务端,N取决于当前客户端的发送缓冲区是否有数据。比如发送缓冲区总大小为10字节,当前有5字节未发送完,那么此时只有5个字节的空闲时间。
- 此时调用接口发送hello world!, 就只能发送hello给服务器,那么服务器这次接收到的数据很可能就是连着其他数据的hello,下次才能收到world!。
还有其他产生粘包问题的原因:
- 客户端的发送频率远高于服务器的接收频率,服务器接收不过来,就会导致数据在服务器的tcp接收缓冲区滞留形成粘连。
- tcp底层的安全和效率机制不允许字节数特别少的小包发送频率过高,tcp会在底层累计数据长度到一定大小才一起发送。
处理粘包的方法
:主要采用应用层定义收发包格式的方式,这个过程俗称切包处理。用消息id+消息长度+消息内容的tlv协议去切割数据包。
在代码中对粘包进行处理:
- 定义新的数据结构体,数据包含两部分:消息长度+消息内容,用额外的2字节去存储当前消息的长度。
- 接收消息数据的CSession类也需要更新。
- 数据初始化的时候,就要初始化头部信息。
完善加上粘包处理后的逻辑:
- 头部未解析:
- 收到的数据不是满足头部的大小:未处理的数据加上头部当前缓存的数据,如果小于2字节,就说明头部数据没有接收完。
- 收到的数据比头部多:头部的信息已经接收完,取出头部信息。定义数据节点,取出数据信息。
- 若数据节点的长度< 头部信息长度:数据还没收完。将数据放到接收节点中,更新信息。
- 若数据节点的长度大于等于头部信息长度:取出首包全部数据,头部节点清楚一下,轮询切包。
- 头部已解析:已经处理完头部,消息体没有接收完。
- 消息体还没有接收全:当前数据拷贝到消息节点里,继续监听对方发送。
- 消息体长度够了,拷贝信息到消息节点,更新变量,把剩下的数据轮询切包。
1 |
|
- 对于客户端:发送和接收数据的时候,也要先发送两个字节的数据长度,再发送数据消息的结构。
1 |
|
asio简易处理粘包问题的方法
- 上面是通过
async_read_some函数监听读事件,绑定回调函数HandleRead。
- async_read_some 这个函数的特点是只要对端发数据,服务器接收到数据,即使没有收全对端发送的数据也会触发HandleRead函数。
- 所以HandleRead函数的处理方式很复杂
简单的方式可以用async_read函数,读取固定字节数。只有完全读完才会触发回调函数
1 |
|
- 这样可以直接在HandleReadHead函数内处理头部信息:
- 解析头部,获取字符串长度,如果长度大于最大长度,数据非法
- 字符串长度正常的话,调用HandleReadMsg函数,解析消息体。
- 打印消息体,继续接收头部消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21void CSession::HandleReadMsg(const boost::system::error_code& error, size_t bytes_transferred,
std::shared_ptr<CSession> shared_self) {
if (!error) {
PrintRecvData(_data, bytes_transferred);
std::chrono::milliseconds dura(2000);
std::this_thread::sleep_for(dura);
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
cout << "receive data is " << _recv_msg_node->_data << endl;
Send(_recv_msg_node->_data, _recv_msg_node->_total_len);
//再次接收头部数据
_recv_head_node->Clear();
boost::asio::async_read(_socket, boost::asio::buffer(_recv_head_node->_data, HEAD_LENGTH),
std::bind(&CSession::HandleReadHead, this, std::placeholders::_1, std::placeholders::_2,
SharedSelf()));
}
else {
cout << "handle read msg failed, error is " << error.what() << endl;
Close();
_server->ClearSession(_uuid);
}
}
C++ asio学习三
https://cauccliu.github.io/2024/03/26/C++ asio学习三/