|
Sponge
CS144's user-space TCP library
|
Go to the documentation of this file.
12 #include <sys/socket.h>
13 #include <sys/syscall.h>
14 #include <sys/types.h>
23 template <
typename AdaptT>
32 if (_tcp.value().active()) {
34 _tcp.value().tick(next_time - base_time);
35 _datagram_adapter.tick(next_time - base_time);
36 base_time = next_time;
43 template <
typename AdaptT>
45 AdaptT &&datagram_interface)
47 , _thread_data(
move(data_socket_pair.second))
48 , _datagram_adapter(
move(datagram_interface)) {
52 template <
typename AdaptT>
75 _eventloop.add_rule(_datagram_adapter,
78 auto seg = _datagram_adapter.read();
80 _tcp->segment_received(
move(seg.value()));
84 if (_thread_data.eof() and _tcp.value().bytes_in_flight() == 0 and not _fully_acked) {
85 cerr <<
"DEBUG: Outbound stream to "
86 << _datagram_adapter.config().destination.to_string()
87 <<
" has been fully acknowledged.\n";
91 [&] {
return _tcp->active(); });
98 const auto data = _thread_data.read(_tcp->remaining_outbound_capacity());
99 const auto len = data.size();
100 const auto amount_written = _tcp->write(
move(data));
101 if (amount_written !=
len) {
102 throw runtime_error(
"TCPConnection::write() accepted less than advertised length");
105 if (_thread_data.eof()) {
106 _tcp->end_input_stream();
107 _outbound_shutdown =
true;
110 cerr <<
"DEBUG: Outbound stream to " << _datagram_adapter.config().destination.to_string()
111 <<
" finished (" << _tcp.value().bytes_in_flight() <<
" byte"
112 << (_tcp.value().bytes_in_flight() == 1 ?
"" :
"s") <<
" still in flight).\n";
115 [&] {
return (_tcp->active()) and (not _outbound_shutdown) and (_tcp->remaining_outbound_capacity() > 0); },
117 _tcp->end_input_stream();
118 _outbound_shutdown =
true;
130 const size_t amount_to_write =
min(
size_t(65536), inbound.
buffer_size());
132 const auto bytes_written = _thread_data.write(
move(
buffer),
false);
135 if (inbound.
eof() or inbound.
error()) {
136 _thread_data.shutdown(SHUT_WR);
137 _inbound_shutdown =
true;
140 cerr <<
"DEBUG: Inbound stream from " << _datagram_adapter.config().destination.to_string()
141 <<
" finished " << (inbound.
error() ?
"with an error/reset.\n" :
"cleanly.\n");
143 cerr <<
"DEBUG: Waiting for lingering segments (e.g. retransmissions of FIN) from peer...\n";
148 return (not _tcp->inbound_stream().buffer_empty()) or
149 ((_tcp->inbound_stream().eof() or _tcp->inbound_stream().error()) and not _inbound_shutdown);
153 _eventloop.add_rule(_datagram_adapter,
156 while (not _tcp->segments_out().empty()) {
157 _datagram_adapter.write(_tcp->segments_out().front());
158 _tcp->segments_out().pop();
161 [&] {
return not _tcp->segments_out().empty(); });
169 SystemCall(
"socketpair", ::socketpair(AF_UNIX, type, 0,
static_cast<int *
>(
fds)));
174 template <
typename AdaptT>
178 template <
typename AdaptT>
181 if (_tcp_thread.joinable()) {
182 cerr <<
"Warning: unclean shutdown of TCPSpongeSocket\n";
188 cerr <<
"Exception destructing TCPSpongeSocket: " << e.
what() <<
endl;
192 template <
typename AdaptT>
195 if (_tcp_thread.joinable()) {
196 cerr <<
"DEBUG: Waiting for clean shutdown... ";
204 template <
typename AdaptT>
207 throw runtime_error(
"connect() with TCPConnection already initialized");
210 _initialize_TCP(c_tcp);
212 _datagram_adapter.config_mut() = c_ad;
219 if (_tcp->state() != expected_state) {
220 throw runtime_error(
"After TCPConnection::connect(), state was " + _tcp->state().name() +
" but expected " +
221 expected_state.
name());
232 template <
typename AdaptT>
235 throw runtime_error(
"listen_and_accept() with TCPConnection already initialized");
238 _initialize_TCP(c_tcp);
240 _datagram_adapter.config_mut() = c_ad;
241 _datagram_adapter.set_listening(
true);
243 cerr <<
"DEBUG: Listening for incoming connection... ";
245 const auto s = _tcp->state();
248 cerr <<
"new connection from " << _datagram_adapter.config().destination.to_string() <<
".\n";
253 template <
typename AdaptT>
256 if (not _tcp.has_value()) {
259 _tcp_loop([] {
return true; });
261 if (not _tcp.value().active()) {
262 cerr <<
"DEBUG: TCP connection finished "
267 cerr <<
"Exception in TCPConnection runner thread: " << e.
what() <<
"\n";
LocalStreamSocket _thread_data
Stream socket for reads and writes between owner and TCP thread.
@ SYN_RCVD
Got the peer's SYN.
void pop_output(const size_t len)
Remove bytes from the buffer.
@ Out
Callback will be triggered when Rule::fd is writable.
SystemCall("socketpair", ::socketpair(AF_UNIX, SOCK_STREAM, 0, fds.data()))
sock3 shutdown(SHUT_RDWR)
size_t buffer_size() const
A reference-counted handle to a file descriptor.
static pair< FileDescriptor, FileDescriptor > socket_pair_helper(const int type)
Call socketpair and return connected Unix-domain sockets of specified type.
void connect(const Address &address)
uint64_t timestamp_ms()
Get the time in milliseconds since the program began.
@ RESET
A connection that terminated abnormally.
Wrapper around IPv4 addresses and DNS operations.
@ TIME_WAIT
Both sides have sent FIN and ACK'd, waiting for 2 MSL.
static constexpr size_t TCP_TICK_MS
Config for classes derived from FdAdapter.
@ LISTEN
Listening for a peer to connect.
std::string peek_output(const size_t len) const
A wrapper around Unix-domain stream sockets.
void set_blocking(const bool blocking_state)
Set blocking(true) or non-blocking(false)
void connect(const TCPConfig &c_tcp, const FdAdapterConfig &c_ad)
Connect using the specified configurations; blocks until connect succeeds or fails.
Address source
Source address and port.
Summary of a TCPConnection's internal state.
uint16_t rt_timeout
Initial value of the retransmission timeout, in milliseconds.
TCPSpongeSocket(std::pair< FileDescriptor, FileDescriptor > data_socket_pair, AdaptT &&datagram_interface)
Construct LocalStreamSocket fds from socket pair, initialize eventloop.
Config for TCP sender and receiver.
void _tcp_loop(const std::function< bool()> &condition)
Process events while specified condition is true.
void listen_and_accept(const TCPConfig &c_tcp, const FdAdapterConfig &c_ad)
Listen and accept using the specified configurations; blocks until accept succeeds or fails.
A FD adapter for IPv4 datagrams read from and written to a TUN device.
~TCPSpongeSocket()
When a connected socket is destructed, it will send a RST.
Address destination
Destination address and port.
@ Exit
All rules have been canceled or were uninterested; make no further calls to EventLoop::wait_next_even...
std::string to_string() const
Human-readable string, e.g., "8.8.8.8:53".
std::string name() const
Summarize the TCPState in a string.
A FileDescriptor to a Linux TUN device.
@ In
Callback will be triggered when Rule::fd is readable.
@ SYN_SENT
Sent a SYN to initiate a connection.
void _initialize_TCP(const TCPConfig &config)
Set up the TCPConnection and the event loop.
Multithreaded wrapper around TCPConnection that approximates the Unix sockets API.
void _tcp_main()
Main loop of TCPConnection thread.