C++ asio学习一

C++ asio学习一

TCP通信基本流程

在这里插入图片描述
      单线程流程中,服务器创建用于监听的套接字,绑定本地的ip和端口,listen函数去监听绑定的端口。
      如果有客户端进行连接,服务器端就可以和发起连接的客户端建立连接,连接建立成功会生成一个用于通信的套接字。用于监听的套接字和用于通信的套接字是不一样的。监听的套接字用于建立连接,通信的套接字用于数据交互。用于数据交互的read和write都是阻塞函数,在单线程下面,一个服务器想和多客户端进行通信,肯定是做不到的,因为accept,read,write都是阻塞的。


Boost网络库节点创建

1
2
3
4
5
6
7
8
9
10
#pragma once
extern int client_end_point();// 创建客户端端点
extern int server_end_point();// 创建服务端节点
extern int create_tcp_socket();
extern int create_acceptor_socket(); // 创建服务器接收socket,生成一个acceptor的socket,用来接收新的连接。
extern int bind_acceptor_socket(); // 绑定socket到端口,对于acceptor类型的socket,服务器要将其绑定到指定的端点,所有连接这个端点的连接都可以被接收到。
extern int connnect_to_end();//客户端连接 作为客户端可以连接服务器指定的端点进行连接
extern int dns_connect_to_end();//通过dns域名解析的方式,去连接服务器
extern int accept_new_connection(); // 服务器接收连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#include"endpoint.h"
#include <boost/asio.hpp>
#include <iostream>
using namespace boost;

int client_end_point() {
std::string raw_ip_address = "127.4.8.1";
unsigned short port_num = 3333;

boost::system::error_code ec; //错误码,用来判断错误类型
asio::ip::address ip_address = asio::ip::address::from_string(raw_ip_address, ec);
if (ec.value() != 0) {
// ip访问失败
std::cout << "Failed to parse the IP address. Error code = "<< ec.value() << ". Message: " << ec.message();
return ec.value();
}

// 转换成需要通信的地址,绑定ip和端口
asio::ip::tcp::endpoint ep(ip_address, port_num);
return 0;
}

int server_end_point() {
unsigned short port_num = 3333;
asio::ip::address ip_address = asio::ip::address_v6::any();//任何地址都可以与我通信
asio::ip::tcp::endpoint ep(ip_address, port_num);
return 0;
}

int create_tcp_socket() {
//创建socket分为4步::
// 1. 创建上下文iocontext
// 2. 选择协议,
// 3. 生成socket
// 4. 打开socket。

// 创建上下文
asio::io_context ioc;
// 创建协议
asio::ip::tcp protocol = asio::ip::tcp::v4();
// 生成socket
asio::ip::tcp::socket sock(ioc);
// 打开socket
boost::system::error_code ec; //错误码,用来判断错误类型
sock.open(protocol, ec);
if (ec.value() != 0) {
// socket打开失败
std::cout << "Failed to parse the socket. Error code = " << ec.value() << ". Message: " << ec.message();
return ec.value();
}
return 0;
}

int create_acceptor_socket() {
asio::io_context ioc;
// 旧版写法
//asio::ip::tcp::acceptor acceptor(ioc);
//asio::ip::tcp protocol = asio::ip::tcp::v4();
//boost::system::error_code ec; //错误码,用来判断错误类型
//acceptor.open(protocol, ec);
//if (ec.value() != 0) {
// // acceptor打开失败
// std::cout << "Failed to parse the acceptor. Error code = " << ec.value() << ". Message: " << ec.message();
// return ec.value();
//}

// 新版写法
asio::ip::tcp::acceptor acceptor(ioc, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), 3333)); //比较简单,直接创建acceptor,默认实现绑定
return 0;
}

int bind_acceptor_socket() {
unsigned short port_num = 3333;
asio::ip::tcp::endpoint ep(asio::ip::address_v4::any(),port_num);
//创建服务,绑定服务
asio::io_context ios;
asio::ip::tcp::acceptor acceptor(ios, ep.protocol());//需要手动绑定
boost::system::error_code ec;
acceptor.bind(ep, ec);
if (ec.value() != 0) {
// socket打开失败
std::cout << "Failed to bind the socket. Error code = " << ec.value() << ". Message: " << ec.message();
return ec.value();
}
return 0;
}

int connnect_to_end() {
std::string raw_ip_address = "127.0.0.1";
unsigned short port_num = 3333;

try
{
asio::ip::tcp::endpoint ep(asio::ip::address::from_string(raw_ip_address), port_num);
asio::io_context ios;

asio::ip::tcp::socket sock(ios, ep.protocol()); //创建socket

sock.connect(ep); // 连接
}
catch (system::system_error& e) {
std::cout << "Error occured! Error code = " << e.code()
<< ". Message: " << e.what();
return e.code().value();
}
return 0;
}

int dns_connect_to_end() {
std::string host = "samplehost";
std::string port_num = "3333";
asio::io_context ios;

// resolver_query:提供的查询服务
asio::ip::tcp::resolver::query resolver_query(host, port_num, asio::ip::tcp::resolver::query::numeric_service);//客户端连接的时候做域名解析
asio::ip::tcp::resolver resolver(ios);//域名解析器
try
{
asio::ip::tcp::resolver::iterator it = resolver.resolve(resolver_query); //返回解析的迭代器
asio::ip::tcp::socket sock(ios); //创建socket
asio::connect(sock,it);//一个全局的连接器
}
catch (system::system_error& e)
{
std::cout << "Error occured! Error code = " << e.code()
<< ". Message: " << e.what();
return e.code().value();
}
return 0;
}


int accept_new_connection() {
const int BACKLOG_SIZE = 30;//缓冲区大小

unsigned short port_num = 3333;
asio::ip::tcp::endpoint ep(asio::ip::address_v4::any(),port_num);//生成端点

asio::io_context ios;
try
{
// 生成服务接收器
asio::ip::tcp::acceptor acceptor(ios, ep.protocol());//需要手动绑定
acceptor.bind(ep);
acceptor.listen(BACKLOG_SIZE);
asio::ip::tcp::socket sock(ios);
acceptor.accept(sock);// 新的连接通信由sock处理
}
catch (system::system_error& e)
{
std::cout << "Error occured! Error code = " << e.code()
<< ". Message: " << e.what();
return e.code().value();
}
return 0;
}

buffer

  1. 服务器与客户端建立连接之后,需要进行数据传输,需要用到buffer。
  2. boost::asio提供了asio::mutable_bufferasio::const_buffer这两个结构,他们是一段连续的空间,首字节存储了后续数据的长度。
  3. asio::mutable_buffer用于写服务,asio::const_buffer用于读服务。但是这两个结构都没有被asio的api直接使用。对于api的buffer参数,asio提出了MutableBufferSequence和ConstBufferSequence概念,他们是由多个asio::mutable_buffer和asio::const_buffer组成的。也就是说boost::asio为了节省空间,将一部分连续的空间组合起来,作为参数交给api使用。
  4. 越过复杂的组合概念,asio提出了buffer()函数,该函数接收多种形式的字节流,该函数返回asio::mutable_buffers_1 o或者asio::const_buffers_1结构的对象。
  5. 最终,可以用buffer()函数生成我们要用的缓存存储数据。
  • boost的发送接口send要求的参数为ConstBufferSequence类型
1
2
asio::const_buffers_1 output_buf = asio::buffer("hello world");
asio.send(output_buf);
  • output_buf可以直接传递给该send接口。也可以将数组转化为send接受的类型
1
2
3
4
5
void use_buffer_array(){
const size_t BUF_SIZE_BYTES = 20;
std::unique_ptr<char[] > buf(new char[BUF_SIZE_BYTES]);
auto input_buf = asio::buffer(static_cast<void*>(buf.get()), BUF_SIZE_BYTES);
}

asio tcp同步写

  • write,向socket中写入数据:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    void write_to_socket(asio::ip::tcp::socket& sock) {
    std::string buf = "Hello world!";
    std::size_t total_bytes_written = 0;
    // 循环发送
    // write_some返回每次写入的字节数
    // 粘包的时候,或者缓冲区不够,这种时候,具体读了多少,都要通过返回值判断
    // 发送的时候要用buffer构造
    while (total_bytes_written != buf.length()) {
    total_bytes_written += sock.write_some(asio::buffer(buf.c_str() + total_bytes_written, buf.length() - total_bytes_written));
    }
    }
  • 同步send:保证send的数据,一次性发送完成,就是想要的数据长度和大小。调用send()函数即可。send会一直等待,如果tcp没有发完,就会一直阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int send_data_by_write_some() {
std::string raw_ip_address = "127.0.0.1";
unsigned short port_num = 3333;
try
{
asio::ip::tcp::endpoint ep(asio::ip::address::from_string(raw_ip_address), port_num);
asio::io_context ioc;
asio::ip::tcp::socket sock(ioc, ep.protocol());
sock.connect(ep);

std::string buf = "Hello World!";
// send会一直等待,如果tcp没有发完,就会一直阻塞。直到数据全部发送完毕
int send_length = sock.send(asio::buffer(buf.c_str(), buf.length()));
if (send_length <= 0) {
std::cout << "send failed" << std::endl;
return 0;
}
}
catch (system::system_error& e)
{
std::cout << "Error occured! Error code = " << e.code()
<< ". Message: " << e.what();
return e.code().value();
}
return 0;
}
  • 同步write:类似send方法,asio还提供了一个write函数,可以一次性将所有数据发送给对端,如果发送缓冲区满了则阻塞,直到发送缓冲区可用,将数据发送完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int send_data_by_wirte() {
std::string raw_ip_address = "127.0.0.1";
unsigned short port_num = 3333;
try {
asio::ip::tcp::endpoint
ep(asio::ip::address::from_string(raw_ip_address),
port_num);
asio::io_service ios;
// Step 1. Allocating and opening the socket.
asio::ip::tcp::socket sock(ios, ep.protocol());
sock.connect(ep);
std::string buf = "Hello World!";
int send_length = asio::write(sock, asio::buffer(buf.c_str(), buf.length()));
if (send_length <= 0) {
std::cout << "send failed" << std::endl;
return 0;
}
}
catch (system::system_error& e) {
std::cout << "Error occured! Error code = " << e.code()
<< ". Message: " << e.what();
return e.code().value();
}
return 0;
}

asio tcp同步读

  • 同步读和同步写类似,提供了读取指定字节数的接口read_some。不断地轮询去读
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
std::string read_from_socket(asio::ip::tcp::socket& sock) {
// 存消息的数组结构
const unsigned char MESSAGE_SIZE = 7;
char buf[MESSAGE_SIZE];
std::size_t total_bytes_read = 0;// 表示已经读了多少字节

//按照偏移量进行读,不断轮询,不够的话就继续读
while (total_bytes_read != MESSAGE_SIZE) {
total_bytes_read += sock.read_some(
asio::buffer(buf + total_bytes_read,
MESSAGE_SIZE - total_bytes_read));
}

return std::string(buf, total_bytes_read); //返回给调用者
}
  • 一次读完,用receive
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int read_data_by_receive() {
std::string raw_ip_address = "127.0.0.1";
unsigned short port_num = 3333;
try {
asio::ip::tcp::endpoint ep(asio::ip::address::from_string(raw_ip_address),port_num);
asio::io_service ios;
asio::ip::tcp::socket sock(ios, ep.protocol());
sock.connect(ep);

const unsigned char BUFF_SIZE = 7;
char buffer_receive[BUFF_SIZE];
// receive函数一次性只返回定义的size大小的数据,没有读满,函数不会返回。
// receive_length<0:系统级的错误
// receive_length=0:对端关闭了。
// receive_length>0:肯定等于7,等于BUFF_SIZE大小,读不完,函数不会返回,函数会阻塞。
int receive_length = sock.receive(asio::buffer(buffer_receive,BUFF_SIZE));
if (receive_length <= 0) {
std::cout << "receive failed" << std::endl;
}
}
catch (system::system_error& e) {
std::cout << "Error occured! Error code = " << e.code()
<< ". Message: " << e.what();
return e.code().value();
}
return 0;
}
  • read:也可以一次性同步读取对方发送的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int read_data_by_read() {
std::string raw_ip_address = "127.0.0.1";
unsigned short port_num = 3333;
try {
asio::ip::tcp::endpoint ep(asio::ip::address::from_string(raw_ip_address), port_num);
asio::io_service ios;
asio::ip::tcp::socket sock(ios, ep.protocol());
sock.connect(ep);

const unsigned char BUFF_SIZE = 7;
char buffer_receive[BUFF_SIZE];
// asio给的接口
int receive_length = asio::read(sock, asio::buffer(buffer_receive, BUFF_SIZE));
if (receive_length <= 0) {
std::cout << "receive failed" << std::endl;
}
}
catch (system::system_error& e) {
std::cout << "Error occured! Error code = " << e.code()
<< ". Message: " << e.what();
return e.code().value();
}
return 0;
}

同步阻塞方式的读写demo

  • 客户端需要根据服务器的ip和port创建一个endpoint,然后连接,创建socket,传输数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <iostream>
#include <boost/asio.hpp>
//#include"endpoint.h"

using namespace std;
using namespace boost::asio::ip;

const int MAX_LENGTH = 1024; //发送和接收数据的最大长度为1024

int main()
{
try
{
// 创建上下文服务
boost::asio::io_context ioc;
// 构建endpoint
tcp::endpoint remote_ep(address::from_string("127.0.0.1"), 10086);
// 创建socket
tcp::socket sock(ioc);
boost::system::error_code error = boost::asio::error::host_not_found;
sock.connect(remote_ep, error);
if (error) {
cout << "connect failed, code is " << error.value() << " error msg is " << error.message();
return 0;
}

std::cout << "Enter message" << std::endl;
char request[MAX_LENGTH];
std::cin.getline(request, MAX_LENGTH);//输入数据到Request里面
size_t request_length = strlen(request);
// 获取输入数据长度后,一次性发送完所有数据
boost::asio::write(sock, boost::asio::buffer(request, request_length));

// 接收数据
char reply[MAX_LENGTH];
size_t reply_length = boost::asio::read(sock,boost::asio::buffer(reply, request_length));
std::cout << "Reply is: ";
std::cout.write(reply, reply_length);
std::cout << "\n";
}
catch (const std::exception& e)
{
std::cerr << "Exception:" << e.what() << endl;
}
return 0;
}
  • 服务端接收客户端的连接,并在连接后开辟新的线程处理客户端的消息,将客户端发送的消息返回回去。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#include <iostream>
#include <boost/asio.hpp>
#include<set>
#include<memory>

using boost::asio::ip::tcp;
const int max_length = 1024;
typedef std::shared_ptr<tcp::socket> socket_ptr;
std::set<std::shared_ptr<std::thread>> thread_set;
using namespace std;


// session:处理某个连接到来以后的收发数据,是跑在线程里的
void session(socket_ptr sock) {
try
{
for (;;) {
char data[max_length];
memset(data, '\0', max_length);
boost::system::error_code error;
// read来读,一次读完,服务器会在这等,直到都到最大长度为止,可能要等很长时间
// size_t length = boost::asio::read(sock,boost::asio::buffer(data, max_length), error);
size_t length = sock->read_some(boost::asio::buffer(data, max_length), error); // 假设不粘包,服务器缓冲区充足
if (error == boost::asio::error::eof) { //对端关闭
std::cout << "connection closed by peer" << endl;
break;
}
else if (error) {// 其他异常抛出
throw boost::system::system_error(error);
}

// 打印对端地址和收到的数据消息
cout << "receive from " << sock->remote_endpoint().address().to_string() << endl;
cout << "receive message is " << data << endl;
//回传给对方
boost::asio::write(*sock, boost::asio::buffer(data, length));
}
}
catch (const std::exception& e)
{
std::cerr << "Exception:" << e.what() << endl;
}
}

// server用来接收客户端的连接
void server(boost::asio::io_context& io_context, unsigned short port) {
tcp::acceptor a(io_context, tcp::endpoint(tcp::v4(), port));// 绑定本地ip和端口
for (;;) {
socket_ptr socket(new tcp::socket(io_context));// 建立socket
a.accept(*socket); //接收连接
auto t = std::make_shared<std::thread>(session, socket); // 执行session中,独立线程。
thread_set.insert(t); //保证线程不被释放,这次for循环结束后,线程有时间执行工作。
// 子线程退出以后,主线程才能退出
}
}


int main()
{
try {
// 创建上下文服务
boost::asio::io_context ioc;
server(ioc, 10086);
// 子线程退出以后,主线程才能退出
for (auto& t : thread_set) {
t->join();
}
}
catch (const std::exception& e) {
std::cerr << "Exception:" << e.what() << endl;
}
return 0;
}

同步读写的缺陷

  • 阻塞式读写,如果客户端不发送read,服务器端是阻塞的。
  • 通过进程开辟的线程为新连接处理读写,但一个进程开辟的线程是有限的,大约2048个,线程也不能开辟太多。
  • 服务器和客户端为应答式,实际场景为全双工通信模式,发送和接收要独立分开。
  • 该服务器和客户端未考虑粘包处理。

参考列表:
https://llfc.club/category?catid=225RaiVNI8pFDD5L4m807g7ZwmF#!aid/2LfzYBkRCfdEDrtE6hWz8VrCLoS


C++ asio学习一
https://cauccliu.github.io/2024/03/26/C++ asio学习一/
Author
Liuchang
Posted on
March 26, 2024
Licensed under