C++ asio学习五
网络结构的更新
- asio网络层,会使用io_context进行数据封装,底层的话,
在linux中就是epoll模型,在windows就是iocp模型。
- 当服务器的接受数据较多时,又要处理接收到的信息的逻辑处理,逻辑处理一般会放到一个
逻辑处理队列中进行处理。
因为有时候逻辑比较复杂。
- 通过一个队列,单独线程从队列中取出逻辑函数。从而实现网络线程和逻辑线程分开,由一个队列进行连接。极大地提升网络线程的收发能力,并且可以用多线程的方式管理网络层。
- asio的多线程模式:
- 启动n个线程,每个线程都有一个iocontext,每个线程负责一部分的socket。
- 一个ioconext由多个线程共享。也可以一定程度上减轻readhandler的负担。
- 逻辑处理一般都是单线程的,因为大量的用户同时处理一个逻辑过程的时候,频繁地加锁取消锁,还不如就单线程的来做。
完善消息结构:
消息 = 消息id + 消息长度 + 消息内容。 前两部分统一封装到消息头里,tlv格式。消息id占2个字节,消息长度占2个字节,消息头共4个字节。
更新消息节点:将收取消息的类和发送消息的类,继承自消息基类。
1 2 3 4 5 6 7 8 9 10 11 12 13
| SendNode::SendNode(const char* msg, short max_len, short msg_id):MsgNode(max_len + HEAD_TOTAL_LEN) , _msg_id(msg_id){ short msg_id_host = boost::asio::detail::socket_ops::host_to_network_short(msg_id); memcpy(_data, &msg_id_host, HEAD_ID_LEN); short max_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len); memcpy(_data + HEAD_ID_LEN, &max_len_host, HEAD_DATA_LEN); memcpy(_data + HEAD_ID_LEN + HEAD_DATA_LEN, msg, max_len); }
|
单例模式
懒汉单例模式
- 通过静态成员变量实现单例。
存在隐患,对于多线程方式生成的实例可能是多个。
1 2 3 4 5 6 7 8 9 10 11 12
| class Single2 { private: Single2() {} Single2(const Single2&) = delete; Single2& operator=(const Single2&) = delete; public: static Single2& GetInst() { static Single2 single; return single; } };
|
饿汉单例模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| class Single2Hungry { private: Single2Hungry() { } Single2Hungry(const Single2Hungry &) = delete; Single2Hungry &operator=(const Single2Hungry &) = delete; public: static Single2Hungry *GetInst() { if (single == nullptr) { single = new Single2Hungry(); } return single; } private: static Single2Hungry *single; };
Single2Hungry *Single2Hungry::single = Single2Hungry::GetInst();
|
饿汉式是在程序启动时就进行单例的初始化,这种方式也可以通过懒汉式调用,无论饿汉式还是懒汉式都存在一个问题,就是什么时候释放内存?多线程情况下,释放内存就很难了,还有二次释放内存的风险。
懒汉式指针
单例模式的单例由指针存在,创建单例的时候,用加锁的方式进行判断。防止在加锁的过程中,出现单例类被创建的情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| class SinglePointer { private: SinglePointer() { } SinglePointer(const SinglePointer &) = delete; SinglePointer &operator=(const SinglePointer &) = delete; public: static SinglePointer *GetInst() { if (single != nullptr) { return single; } s_mutex.lock(); if (single != nullptr) { s_mutex.unlock(); return single; } single = new SinglePointer(); s_mutex.unlock(); return single; } private: static SinglePointer *single; static mutex s_mutex; };
|
智能指针设计单例类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| class SingleAuto { private: SingleAuto() { } SingleAuto(const SingleAuto&) = delete; SingleAuto& operator=(const SingleAuto&) = delete; public: ~SingleAuto() { cout << "single auto delete success " << endl; } static std::shared_ptr<SingleAuto> GetInst() { if (single != nullptr) { return single; } s_mutex.lock(); if (single != nullptr) { s_mutex.unlock(); return single; } single = std::shared_ptr<SingleAuto>(new SingleAuto); s_mutex.unlock(); return single; } private: static std::shared_ptr<SingleAuto> single; static mutex s_mutex; };
|
服务器优雅退出
- 服务器退出之前,要把服务器逻辑队列中的服务执行完成。
- asio提供的信号建立的方式,
利用signal_set 定义了一系列信号合集,并且绑定了一个匿名函数,匿名函数捕获了io_context的引用
,并且函数中设置了停止操作,也就是说当捕获到SIGINT,SIGTERM等信号时,会调用io_context.stop
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| int main() { try { boost::asio::io_context io_context; boost::asio::signal_set signals(io_context, SIGINT, SIGTERM); signals.async_wait([&io_context](auto, auto) { io_context.stop(); }); CServer s(io_context, 10086); io_context.run(); } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << endl; } }
|
asio的多线程模型IOService
第一个是启动多个线程,每个线程管理一个iocontext。第二种是只启动一个iocontext,被多个线程共享。
启动线程的个数,不要超过核数。
每个线程独立调用io_context,一个socket会被注册在同一个io_context里
,它的回调函数也会被单独的一个线程回调:
- 那么对于同一个socket,他的回调函数每次触发都是在同一个线程里,
就不会有线程安全问题,网络io层面上的并发是线程安全的。
- 如果两个socket对应的上层逻辑处理,如果有交互或者访问共享区,会存在线程安全问题。可以通过
加锁
或者逻辑队列
的方式解决安全问题。
多线程的优势:提升了并发能力, 单线程仅有一个io_context服务用来监听读写事件,就绪后回调函数在一个线程里串行调用。如果一个回调函数的调用时间较长肯定会影响后续的函数调用。
通过逻辑队列的方式将网络线程和逻辑线程解耦合,不会出现前一个调用时间影响下一个回调触发的问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| class AsioIOServicePool :public Singleton<AsioIOServicePool> { friend Singleton<AsioIOServicePool>; public: using IOService = boost::asio::io_context; using Work = boost::asio::io_context::work; using WorkPtr = std::unique_ptr<Work>;
~AsioIOServicePool(); AsioIOServicePool(const AsioIOServicePool&) = delete; AsioIOServicePool& operator=(const AsioIOServicePool&) = delete; boost::asio::io_context& GetIOService(); void Stop(); private: AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency()); std::vector<IOService> _ioServices; std::vector<WorkPtr> _works; std::vector<std::thread> _threads; std::size_t _nextIOService; };
AsioIOServicePool::AsioIOServicePool(std::size_t size) :_ioServices(size), _works(size), _nextIOService(0) { for (std::size_t i = 0; i < size; ++i) { _works[i] = std::unique_ptr<Work>(new Work(_ioServices[i])); } for (std::size_t i = 0; i < _ioServices.size(); ++i) { _threads.emplace_back([this, i]() { _ioServices[i].run(); }); } }
AsioIOServicePool::~AsioIOServicePool() { std::cout << "destruct" << std::endl; }
boost::asio::io_context& AsioIOServicePool::GetIOService() { auto& service = _ioServices[_nextIOService++]; if (_nextIOService == _ioServices.size()) { _nextIOService = 0; } return service; }
void AsioIOServicePool::Stop() { for (auto& work : _works) { work.reset(); } for (auto& t : _threads) { t.join(); } }
|
asio多线程IOThreadPool
一个IOServicePool开启n个线程和n个iocontext,每个线程内独立运行iocontext, 各个iocontext监听各自绑定的socket是否就绪,如果就绪就在各自线程里触发回调函数。
- IOThreadPool:
初始化一个iocontext用来监听服务器的读写事件,包括新连接到来的监听也用这个iocontext。只是我们让iocontext.run在多个线程中调用
,这样回调函数就会被不同的线程触发,从这个角度看回调函数被并发调用了。
- 线程池统一管理一个io_context,每个线程调用一个io_context,会话session都注册到一个,哪个线程调用了io_context.run,哪个线程去就绪队列取出回调函数。
- 回调函数对同一个session来说就是不安全的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| class AsioThreadPool :public Singleton<AsioThreadPool> { public: friend class Singleton<AsioThreadPool>; ~AsioThreadPool() {} AsioThreadPool& operator=(const AsioThreadPool&) = delete; AsioThreadPool(const AsioThreadPool&) = delete; boost::asio::io_context& GetIOService(); void Stop(); private: AsioThreadPool(int threadNum = std::thread::hardware_concurrency()); boost::asio::io_context _service; std::unique_ptr<boost::asio::io_context::work> _work; std::vector<std::thread> _threads; };
AsioThreadPool::AsioThreadPool(int threadNum) :_work(new boost::asio::io_context::work(_service)) { for (int i = 0; i < threadNum; ++i) { _threads.emplace_back([this]() { _service.run(); }); } }
|
线程池里每个线程都会运行_service.run函数
,这就是多线程调用一个io_context的逻辑。
- 因为回调函数是在不同的线程里调用的,所以会存在不同的线程调用同一个socket的回调函数的情况。
_service.run 内部在Linux环境下调用的是epoll_wait返回所有就绪的描述符列表
,在windows上会循环调用GetQueuedCompletionStatus函数返回就绪的描述符
,二者原理类似,进而通过描述符找到对应的注册的回调函数,然后调用回调函数。
epoll 和 iocp的一些知识点
1 2 3 4 5 6 7 8 9 10 11 12
| IOCP的使用主要分为以下几步: 1 创建完成端口(iocp)对象。 2 创建一个或多个工作线程,在完成端口上执行并处理投递到完成端口上的I/O请求。 3 Socket关联iocp对象,在Socket上投递网络事件。 4 工作线程调用GetQueuedCompletionStatus函数获取完成通知封包,取得事件信息并进行处理。
epoll_wait的工作方式: 1 调用epoll_creat在内核中创建一张epoll表。 2 开辟一片包含n个epoll_event大小的连续空间。 3 将要监听的socket注册到epoll表里。 4 调用epoll_wait,传入之前我们开辟的连续空间,epoll_wait返回就绪的epoll_event列表, epoll会将就绪的socket信息写入我们之前开辟的连续空间。
|
- 使用这种方式,有可能会存在隐患,不同的线程有可能处理同一块Read回调处理函数,存在网络上的并行。
改进方法:再添加一个strand管理的队列,asio的strand是一个安全队列,里面进行独立的单线程访问。
- 回调处理放在_strand中进行执行。
1 2 3 4 5 6
| void CSession::Start(){ ::memset(_data, 0, MAX_LENGTH); _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), boost::asio::bind_executor(_strand, std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, SharedSelf()))); }
|
- IOThreadPool相比于IOServicePool,速度慢一些。
参考列表
https://www.bilibili.com/video/BV1FV4y1U7oo/
https://llfc.club/category?catid=225RaiVNI8pFDD5L4m807g7ZwmF#!aid/2Qld2hoFIu8ycYBJXQdxwyWEBfy