Skip to content

Commit ca875e8

Browse files
committed
net/proxy support close in stream and datagram
1 parent 8ec2174 commit ca875e8

11 files changed

+35
-58
lines changed

net/proxy/datagram.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ class Datagram {
1717
using executor_type = any_io_executor;
1818

1919
virtual ~Datagram() = default;
20-
virtual any_io_executor get_executor() = 0;
2120

2221
virtual void async_receive_from(
2322
absl::Span<mutable_buffer const> buffers,
@@ -29,6 +28,9 @@ class Datagram {
2928
const udp::endpoint &endpoint,
3029
absl::AnyInvocable<void(std::error_code, size_t) &&> callback) = 0;
3130

31+
virtual any_io_executor get_executor() = 0;
32+
virtual void close() = 0;
33+
3234
template <typename BuffersT>
3335
void async_receive_from(
3436
const BuffersT &buffers,

net/proxy/misc/random-handler.cc

+1-7
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class StreamConnection : public boost::intrusive_ref_counter<
2525
private:
2626
void read();
2727
void write();
28-
void close() { stream_.reset(); }
28+
void close() { stream_->close(); }
2929

3030
std::unique_ptr<Stream> stream_;
3131
absl::FixedArray<uint8_t, 0> read_buffer_;
@@ -58,9 +58,6 @@ StreamConnection::StreamConnection(std::unique_ptr<Stream> stream)
5858
}
5959

6060
void StreamConnection::read() {
61-
if (!stream_) {
62-
return;
63-
}
6461
stream_->async_read_some(
6562
buffer(read_buffer_.data(), read_buffer_.size()),
6663
[connection = boost::intrusive_ptr<StreamConnection>(this)](
@@ -74,9 +71,6 @@ void StreamConnection::read() {
7471
}
7572

7673
void StreamConnection::write() {
77-
if (!stream_) {
78-
return;
79-
}
8074
stream_->async_write_some(
8175
const_buffer(write_buffer_.data(), write_buffer_.size()),
8276
[connection = boost::intrusive_ptr<StreamConnection>(this)](

net/proxy/misc/zero-handler.cc

+1-7
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class StreamConnection : public boost::intrusive_ref_counter<
2424
private:
2525
void read();
2626
void write();
27-
void close() { stream_.reset(); }
27+
void close() { stream_->close(); }
2828

2929
std::unique_ptr<Stream> stream_;
3030
absl::FixedArray<uint8_t, 0> read_buffer_;
@@ -55,9 +55,6 @@ StreamConnection::StreamConnection(std::unique_ptr<Stream> stream)
5555
write_buffer_(8192) {}
5656

5757
void StreamConnection::read() {
58-
if (!stream_) {
59-
return;
60-
}
6158
stream_->async_read_some(
6259
buffer(read_buffer_.data(), read_buffer_.size()),
6360
[connection = boost::intrusive_ptr<StreamConnection>(this)](
@@ -71,9 +68,6 @@ void StreamConnection::read() {
7168
}
7269

7370
void StreamConnection::write() {
74-
if (!stream_) {
75-
return;
76-
}
7771
stream_->async_write_some(
7872
const_buffer(write_buffer_.data(), write_buffer_.size()),
7973
[connection = boost::intrusive_ptr<StreamConnection>(this)](

net/proxy/shadowsocks/connector.cc

+2-5
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ class Connector::TcpStream : public proxy::Stream {
4343
absl::Span<const_buffer const> buffers,
4444
absl::AnyInvocable<void(std::error_code, size_t) &&> callback) override;
4545

46-
any_io_executor get_executor() override;
46+
any_io_executor get_executor() override { return connector_.executor_; }
47+
void close() override { base_stream_->close(); }
4748

4849
private:
4950
void connect(absl::AnyInvocable<void(std::error_code) &&> callback);
@@ -455,10 +456,6 @@ void Connector::TcpStream::async_write_some(
455456
});
456457
}
457458

458-
any_io_executor Connector::TcpStream::get_executor() {
459-
return connector_.executor_;
460-
}
461-
462459
} // namespace shadowsocks
463460
} // namespace proxy
464461
} // namespace net

net/proxy/shadowsocks/handler.cc

+4-14
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,6 @@ Handler::TcpConnection::TcpConnection(
8585
write_header_(handler_.pre_shared_key_.method().is_spec_2022()) {}
8686

8787
void Handler::TcpConnection::forward_read() {
88-
if (!stream_) {
89-
return;
90-
}
9188
BufferSpan read_buffer = decryptor_.buffer();
9289
stream_->async_read_some(
9390
buffer(read_buffer.data(), read_buffer.size()),
@@ -342,9 +339,6 @@ void Handler::TcpConnection::forward_parse_host(size_t header_length) {
342339
}
343340

344341
void Handler::TcpConnection::forward_write() {
345-
if (!remote_stream_) {
346-
return;
347-
}
348342
async_write(
349343
*remote_stream_,
350344
buffer(decryptor_.pop_buffer(read_length_), read_length_),
@@ -361,9 +355,6 @@ void Handler::TcpConnection::forward_write() {
361355
}
362356

363357
void Handler::TcpConnection::backward_read() {
364-
if (!remote_stream_) {
365-
return;
366-
}
367358
remote_stream_->async_read_some(
368359
buffer(backward_read_buffer_.data(), backward_read_buffer_.size()),
369360
[connection = boost::intrusive_ptr<TcpConnection>(this)](
@@ -378,9 +369,6 @@ void Handler::TcpConnection::backward_read() {
378369
}
379370

380371
void Handler::TcpConnection::backward_write() {
381-
if (!stream_) {
382-
return;
383-
}
384372
ConstBufferSpan read_buffer(
385373
backward_read_buffer_.data(), backward_read_size_);
386374
do {
@@ -420,8 +408,10 @@ void Handler::TcpConnection::backward_write() {
420408
}
421409

422410
void Handler::TcpConnection::close() {
423-
remote_stream_.reset();
424-
stream_.reset();
411+
if (remote_stream_) {
412+
remote_stream_->close();
413+
}
414+
stream_->close();
425415
}
426416

427417
} // namespace shadowsocks

net/proxy/socks/handler.cc

+4-14
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,6 @@ Handler::TcpConnection::TcpConnection(
7676
backward_buffer_(4096) {}
7777

7878
void Handler::TcpConnection::forward_read() {
79-
if (!stream_) {
80-
return;
81-
}
8279
stream_->async_read_some(
8380
buffer(
8481
&forward_buffer_[forward_size_],
@@ -246,9 +243,6 @@ void Handler::TcpConnection::connect_host(ConstBufferSpan buffer) {
246243
}
247244

248245
void Handler::TcpConnection::forward_write() {
249-
if (!remote_stream_) {
250-
return;
251-
}
252246
async_write(
253247
*remote_stream_,
254248
buffer(forward_buffer_.data(), forward_size_),
@@ -279,9 +273,6 @@ void Handler::TcpConnection::reply() {
279273
}
280274

281275
void Handler::TcpConnection::backward_write() {
282-
if (!stream_) {
283-
return;
284-
}
285276
async_write(
286277
*stream_,
287278
buffer(backward_buffer_.data(), backward_size_),
@@ -309,9 +300,6 @@ void Handler::TcpConnection::backward_dispatch() {
309300
}
310301

311302
void Handler::TcpConnection::backward_read() {
312-
if (!remote_stream_) {
313-
return;
314-
}
315303
remote_stream_->async_read_some(
316304
buffer(
317305
&backward_buffer_[backward_size_],
@@ -328,8 +316,10 @@ void Handler::TcpConnection::backward_read() {
328316
}
329317

330318
void Handler::TcpConnection::close() {
331-
remote_stream_.reset();
332-
stream_.reset();
319+
if (remote_stream_) {
320+
remote_stream_->close();
321+
}
322+
stream_->close();
333323
}
334324

335325
} // namespace socks

net/proxy/stream.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ class Stream {
1717
using executor_type = any_io_executor;
1818

1919
virtual ~Stream() = default;
20-
virtual any_io_executor get_executor() = 0;
2120

2221
virtual void async_read_some(
2322
absl::Span<mutable_buffer const> buffers,
@@ -27,6 +26,9 @@ class Stream {
2726
absl::Span<const_buffer const> buffers,
2827
absl::AnyInvocable<void(std::error_code, size_t) &&> callback) = 0;
2928

29+
virtual any_io_executor get_executor() = 0;
30+
virtual void close() = 0;
31+
3032
template <typename BuffersT>
3133
void async_read_some(
3234
const BuffersT &buffers,

net/proxy/system/tcp-socket-stream.cc

+6-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ namespace system {
88

99
TcpSocketStream::TcpSocketStream(tcp::socket socket, TimerList &timer_list)
1010
: socket_(std::move(socket)),
11-
timer_(timer_list, [this]() { socket_.close(); }) {}
11+
timer_(timer_list, [this]() { close(); }) {}
1212

1313
void TcpSocketStream::async_read_some(
1414
absl::Span<mutable_buffer const> buffers,
@@ -28,6 +28,11 @@ void TcpSocketStream::async_write_some(
2828
timer_.update();
2929
}
3030

31+
void TcpSocketStream::close() {
32+
boost::system::error_code ec;
33+
socket_.close(ec);
34+
}
35+
3136
} // namespace proxy
3237
} // namespace system
3338
} // namespace net

net/proxy/system/tcp-socket-stream.h

+3-4
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,6 @@ class TcpSocketStream : public Stream {
1616
TcpSocketStream(const TcpSocketStream &) = delete;
1717
TcpSocketStream &operator=(const TcpSocketStream &) = delete;
1818

19-
any_io_executor get_executor() override {
20-
return socket_.get_executor();
21-
}
22-
2319
void async_read_some(
2420
absl::Span<mutable_buffer const> buffers,
2521
absl::AnyInvocable<void(std::error_code, size_t) &&> callback) override;
@@ -28,6 +24,9 @@ class TcpSocketStream : public Stream {
2824
absl::Span<const_buffer const> buffers,
2925
absl::AnyInvocable<void(std::error_code, size_t) &&> callback) override;
3026

27+
any_io_executor get_executor() override { return socket_.get_executor(); }
28+
void close() override;
29+
3130
using Stream::async_read_some;
3231
using Stream::async_write_some;
3332

net/proxy/system/udp-socket-datagram.cc

+5
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ void UdpSocketDatagram::async_send_to(
2929
std::move(callback));
3030
}
3131

32+
void UdpSocketDatagram::close() {
33+
boost::system::error_code ec;
34+
socket_.close(ec);
35+
}
36+
3237
} // namespace proxy
3338
} // namespace system
3439
} // namespace net

net/proxy/system/udp-socket-datagram.h

+3-4
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@ class UdpSocketDatagram : public Datagram {
1515
UdpSocketDatagram(const UdpSocketDatagram &) = delete;
1616
UdpSocketDatagram &operator=(const UdpSocketDatagram &) = delete;
1717

18-
any_io_executor get_executor() override {
19-
return socket_.get_executor();
20-
}
21-
2218
void async_receive_from(
2319
absl::Span<mutable_buffer const> buffers,
2420
udp::endpoint &endpoint,
@@ -29,6 +25,9 @@ class UdpSocketDatagram : public Datagram {
2925
const udp::endpoint &endpoint,
3026
absl::AnyInvocable<void(std::error_code, size_t) &&> callback) override;
3127

28+
any_io_executor get_executor() override { return socket_.get_executor(); }
29+
void close() override;
30+
3231
using Datagram::async_receive_from;
3332
using Datagram::async_send_to;
3433

0 commit comments

Comments
 (0)