介绍
IO操作是任何分布式应用的网络基础设施的关键操作。它们直接参与数据交换的过程。输入操作用来接收数据,输出操作用来发送数据。
IO缓冲区
网络编程都是关于通过计算机网络进行进程间通信。像其他类型的IO操作一样,网络IO操作涉及使用内存缓冲区。
同步和异步IO操作
Boost.Asio支持两种类型的IO操作:同步和异步。同步操作阻塞当前线程的执行直到操作完成。异步操作在初始化时关联一个回调函数,当操作完成时,Boost.Asio会调用回调函数。
在一个或多个异步操作初始化后,程序使用一个线程来运行Boost.Asio库。Boost.Asio库使用这个线程运行事件循环并调用回调函数通知异步操作完成,异步操作的结果作为参数传递给回调函数。
附加操作
-
取消异步操作。
取消之前初始化的异步操作的能力很重要。它允许程序声明之前初始化的异步操作不再有效。
-
shutdown套接字。
shutdown套接字很有用,如果需要通知另一个程序整个报文已发送。
-
close套接字。
使用固定长度IO缓冲区
固定长度的IO缓冲区通常用在报文长度已知的情况下。在Boost.Asio里面,固定长度IO缓冲区由asio::mutable_buffer或asio::const_buffer表示。asio::mutable_buffer表示可写缓冲区,asio::const_buffer表示只读缓冲区。
但是asio::mutable_buffer和asio::const_buffer并不能直接在Boost.Asio的IO函数中直接使用。相反MutableBufferSequence和ConstBufferSequence概念被引入。MutableBufferSequence指定一个对象表示asio::mutable_buffer对象的集合。相应地,ConstBufferSequence指定一个对象表示asio::const_buffer对象的集合。
asio::buffer()自由函数拥有28个重载形式,接收多种缓冲区表示形式并返回一个asio::mutable_buffers_1或asio::const_buffers_1对象。如果传给asio::buffer()函数的参数是只读类型,则返回asio::const_buffers_1对象,反之返回asio::mutable_buffers_1对象。asio::mutable_buffers_1和asio::const_buffers_1是asio::mutable_buffer和asio::const_buffer相应的适配。
为输出操作准备缓冲区
- 分配一个缓冲区。
- 将输出数据填入缓冲区。
- 将缓冲区表示为满足ConstBufferSequence需求的对象。
- 缓冲区已经可以用在Boost.Asio的输出函数或方法。
1
2
3
|
std::string buf = "Hello";
asio::const_buffer_1 output_buf = asio::buffer(buf);
// output_buf can be used in Boost.Asio output operations
|
为了更好的理解为什么需要将缓冲区表示成满足ConstBufferSequence需求,可以看一下send()的声明:
1
2
|
template <typename ConstBufferSequence>
std::size_t send(const ConstBufferSequence& buffers);
|
为了使用send()发送string对象,我们可以这样做:
1
2
3
|
asio::const_buffer asio_buf(buf.c_str(), buf.size());
std::vector<asio::const_buffer> buffers_sequence;
buffers_sequence.push_back(asio_buf);
|
但是这样没有使用asio::buffer()方便。
为输入操作准备缓冲区
- 分配一个缓冲区。缓冲区的大小必须足以保存接收到的数据。
- 将缓冲区表示为满足MutableBufferSequence需求的对象。
- 缓冲区已经可以用在Boost.Asio的输入函数或方法。
1
2
3
4
5
|
const size_t BUF_SIZE_BYTES = 20;
std::unique_ptr<char[]> buf(new char[BUF_SIZE_BYTES]);
asio::mutable_buffers_1 input_buf =
asio::buffer(static_cast<void*>(buf.get()), BUF_SIZE_BYTES);
// input_buf can be used in Boost.Asio input operations
|
缓冲区所有权
asio::mutable_buffer,asio::const_buffer,asio::mutable_buffers_1,asio::const_buffers_1等并不拥有原始缓冲区的所有权,它们只提供访问缓冲区的接口,不控制其生命周期。
使用可扩展面向流的IO缓冲区
可扩展缓冲区是当新数据写入时可以动态增长的缓冲区。它们经常用于从套接字读取未知大小的报文。
可扩展面向流的缓冲区在Boost.Asio中由asio::streambuf类表示:
1
|
typedef basic_streambuf<> streambuf;
|
asio::basic_streambuf<>类继承自std::streambuf。
1
2
3
4
5
6
7
8
9
10
11
|
asio::streambuf buf;
std::ostream output(&buf);
// Writing the message to the stream-based buffer.
output << "Message";
// Instantiate an input stream which uses our stream buffer.
std::istream input(&buf);
std::string message;
std::getline(input, message);
|
同步写TCP套接字
写套接字最基本的方法是使用Boost.Asio中的asio::ip::tcp::socket类的write_some()方法:
1
2
|
template<typename ConstBufferSequence>
std::size_t write_some(const ConstBufferSequence& buffers);
|
如果方法成功执行,返回写入的字节数。需要强调的是这个方法可能不会发送参数指定的所有数据,它只保证如果错误不发生至少发送一个字节。这意味着为了发送缓冲区中所有数据需要调用write_some()多次。
- 在客户端程序中,分配,打开并连接一个主动TCP套接字。在服务端程序中,通过接收器套接字接收一个主动TCP套接字的连接请求。
- 分配缓冲区并将要写到套接字的数据填充进缓冲区。
- 在一个循环中调用套接字的write_some()方法多次直到缓冲区中的数据都发送完。
1
2
3
4
5
6
7
8
9
|
void writeToSocket(asio::ip::tcp::socket& sock) {
std::string buf = "Hello";
std::size_t total_bytes_written = 0;
while (total_bytes_written != buf.length()) {
total_bytes_written += sock.write_some(
asio::buffer(buf.c_str() + total_bytes_written,
buf.length() - total_bytes_written));
}
}
|
替代品——send()方法
asio::ip::tcp::socket类包含另外一个同步写数据的方法send()。它有3种重载形式。其中一个和write_some()方法一样,有一样的签名并提供同样的功能。
第二个重载函数接收一个额外的参数:
1
2
3
|
template<typename ConstBufferSequence>
std::size_t send(const ConstBufferSequence& buffers,
socket_base::message_flags flags);
|
第三个重载函数和第二个一样,但是它不抛出异常,而是接收一个boost::system::error_code的参数。
使用write_some()方法向套接字写数据看起来非常复杂,因为必须使用一个循环,并且每一次迭代都需要正确构造缓冲区并跟踪已写的数据。幸运的是,Boost.Asio提供一个自由函数asio::write()简化向套接字写数据。asio::write()有8种重载形式。
1
2
3
4
5
6
7
|
template<typename SyncWriteStream, typename ConstBufferSequence>
std::size_t write(SyncWriteStream& s, const ConstBufferSequence& buffers);
void WriteToSocketEnhanced(asio::ip::tcp::socket& sock) {
std::string buf = "hello";
asio::write(sock, asio::buffer(buf));
}
|
同步读TCP套接字
由Boost.Asio提供的从套接字读取数据最基本的方法是asio::ip::tcp::socket类的read_some()方法。
1
2
|
template<typename MutableBufferSequence>
std::size_t read_some(const MutableBufferSequence& buffers);
|
需要注意的是没有办法控制read_some()将会读取多少数据。它只保证如果错误不发生至少读取一个字节。通常情况下,为了从套接字读取某一数量的数据需要调用read_some()多次。
- 在客户端程序中,分配,打开并连接一个主动TCP套接字。在服务端程序中,通过接收器套接字接收一个主动TCP套接字的连接请求。
- 分配足够大小的缓冲区使得能够装下预期的报文。
- 在一个循环中调用套接字的read_some()方法多次直到读完数据。
1
2
3
4
5
6
7
8
9
10
11
|
std::string readFromSocket(asio::ip::tcp::socket& sock) {
const unsigned char MESSAGE_SIZE = 7;
char buf[MESSAGE_SIZE];
std::size_t total_bytes_read = 0;
while (total_bytes_read != MESSAGE_SIZE) {
total_bytes_read += sock.read_some(
asio::buffer(buf + total_bytes_read,
MESSAGE_SIZE - total_bytes_read));
}
return std::string(buf, total_bytes_read);
}
|
替代品——receive()方法
asio::ip::tcp::socket类包含另外一个同步读数据的方法receive()。它有3种重载形式。其中一个和read_some()方法一样,有一样的签名并提供同样的功能。
第二个重载函数接收一个额外的参数:
1
2
3
|
template<typename MutableBufferSequence>
std::size_t receive(const MutableBufferSequence& buffers,
socket_base::message_flags flags);
|
第三个重载函数和第二个一样,但是它不抛出异常,而是接收一个boost::system::error_code的参数。
使用read_some()方法向套接字写数据看起来非常复杂,因为必须使用一个循环,并且每一次迭代都需要正确构造缓冲区并跟踪已写的数据。幸运的是,Boost.Asio提供一组自由函数简化向套接字写数据。有3种读函数,每个有7种重载形式。
asio::read()函数
1
2
3
4
5
6
7
8
9
|
template<typename SyncReadStream, typename MutableBufferSequence>
std::size_t read(SyncReadStream& s, const MutableBufferSequence& buffers)
std::string readFromSocketEnhanced(asio::ip::tcp::socket& sock) {
const unsigned char MESSAGE_SIZE = 7;
char buf[MESSAGE_SIZE];
asio::read(sock, asio::buffer(buf, MESSAGE_SIZE));
return std::string(buf, MESSAGE_SIZE);
}
|
read_until()函数
asio::read_until()函数从套接字读取数据直到遇到指定的模式,它有8种重载形式。
1
2
3
4
5
|
template<typename SyncReadStream, typename Allocator>
std::size_t read_until(
SyncReadStream& s,
boost::asio::basic_streambuf<Allocator>& b,
char delim);
|
asio::read_until从套接字s读取数据到缓冲区b直到遇到指定的字符delim。需要注意的是asio::read_until内部使用read_some()方法读取数据的。当函数返回时,缓冲区b中可能包含在delim后的数据。也就是说程序员需要处理这种情况。
1
2
3
4
5
6
7
8
|
std::string readFromSocketDelim(asio::ip::tcp::socket& sock) {
asio::streambuf buf;
asio::read_until(sock, buf, '\n');
std::string message;
std::istream input_stream(&buf);
std::getline(input_stream, message);
return message;
}
|
read_at()函数
asio::read_at()函数从特定位置开始读数据,参考Boost文档。
异步写TCP套接字
Boost.Asio提供的异步写数据最基本的工具是asio::ip::tcp::socket类的async_write_some()方法。
1
2
3
|
template<typename ConstBufferSequence, typename WriteHandler>
void async_write_some(const ConstBufferSequence& buffers,
WriteHandler handler);
|
async_write_some()方法初始化一个写操作并立即返回。第一个参数包含要写到套接字的数据,第二个参数是个回调函数,当初始化操作完成后由Boost.Asio调用。
回调应该具有下述的签名:
1
2
|
void write_handler(const boost::system::error_code& ec,
std::size_t bytes_transferred);
|
async_write_some()方法保证如果不发生错误,至少写一个字节数据。这意味着通常情况下,为了将所有数据写到套接字,需要调用async_write_some()多次。
- 定义一个数据结构,包含指向套接字的指针,一个缓冲区和一个用来统计已写数据的变量
- 定义一个回调函数
- 在客户端程序中,分配,打开并连接一个主动TCP套接字。在服务端程序中,通过接收器套接字接收一个主动TCP套接字的连接请求。
- 分配一个缓冲区,并填充要写到套接字的数据
- 调用async_write_some()方法初始化一个写操作,指定步骤2的函数为回调函数
- 调用asio::io_service类的run()方法
- 在回调中,增加已写的数据。如果已写数据小于需要写的总数据,再初始化一个异步写操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
struct Session {
std::shared_ptr<asio::ip::tcp::socket> sock;
std::string buf;
std::size_t total_bytes_written;
};
void callback(const boost::system::error_code& ec,
std::size_t bytes_transferred, std::shared_ptr<Session> s) {
if (ec != 0) {
std::cout << "Error code = " << ec.value()
<< ". Message: " << ec.message();
return;
}
s->total_bytes_written += bytes_transferred;
if (s->total_bytes_written == s->buf.length()) {
return;
}
s->sock->async_write_some(
asio::buffer(s->buf.c_str() + s->total_bytes_written,
s->buf.length() - s->total_bytes_written),
std::bind(callback, std::placeholders::_1,
std::placeholders::_2, s));
}
void writeToSocket(std::shared_ptr<asio::ip::tcp::socket> sock) {
std::shared_ptr<Session> s(new Session);
s->buf = std::string("Hello");
s->total_bytes_written = 0;
s->sock = sock;
s->sock->async_write_some(asio::buffer(s->buf),
std::bind(callback, std::placeholders::_1,
std::placeholders::_2, s));
}
int main() {
std::string raw_ip_address = "127.0.0.1";
unsigned short port_num = 3333;
try {
asio::ip::tcp::endpoint ep(
asio::ip::address::from_string(raw_ip_address), port_num);
asio::io_service ios;
std::shared_ptr<asio::ip::tcp::socket> sock(
new asio::ip::tcp::socket(ios, ep.protocol()));
sock->connect(ep);
writeToSocket(sock);
ios.run();
} catch (system::system_error &e) {
return e.code().value();
}
return 0;
}
|
Boost.Asio提供一个自由函数asio::async_write()方法可以更方便地异步写到套接字:
1
2
3
4
|
template<typename AsyncWriteStream, typename ConstBufferSequence,
typename WriteHandler>
void async_write(AsyncWriteStream& s,
const ConstBufferSequence& buffers, WriteHandler handler);
|
asio::async_write()函数是通过多次调用async_write_some()方法实现的。
异步读TCP套接字