|
Sponge
CS144's user-space TCP library
|
Go to the documentation of this file.
13 #include <sys/socket.h>
14 #include <sys/syscall.h>
15 #include <sys/types.h>
24 template <
typename AdaptT>
33 if (_tcp.value().active()) {
35 _tcp.value().tick(next_time - base_time);
36 _datagram_adapter.tick(next_time - base_time);
37 base_time = next_time;
44 template <
typename AdaptT>
46 AdaptT &&datagram_interface)
48 , _thread_data(
move(data_socket_pair.second))
49 , _datagram_adapter(
move(datagram_interface)) {
53 template <
typename AdaptT>
76 _eventloop.add_rule(_datagram_adapter,
79 auto seg = _datagram_adapter.read();
81 _tcp->segment_received(
move(seg.value()));
85 if (_thread_data.eof() and _tcp.value().bytes_in_flight() == 0 and not _fully_acked) {
86 cerr <<
"DEBUG: Outbound stream to "
87 << _datagram_adapter.config().destination.to_string()
88 <<
" has been fully acknowledged.\n";
92 [&] {
return _tcp->active(); });
99 const auto data = _thread_data.read(_tcp->remaining_outbound_capacity());
100 const auto len = data.size();
101 const auto amount_written = _tcp->write(
move(data));
102 if (amount_written !=
len) {
103 throw runtime_error(
"TCPConnection::write() accepted less than advertised length");
106 if (_thread_data.eof()) {
107 _tcp->end_input_stream();
108 _outbound_shutdown =
true;
111 cerr <<
"DEBUG: Outbound stream to " << _datagram_adapter.config().destination.to_string()
112 <<
" finished (" << _tcp.value().bytes_in_flight() <<
" byte"
113 << (_tcp.value().bytes_in_flight() == 1 ?
"" :
"s") <<
" still in flight).\n";
116 [&] {
return (_tcp->active()) and (not _outbound_shutdown) and (_tcp->remaining_outbound_capacity() > 0); },
118 _tcp->end_input_stream();
119 _outbound_shutdown =
true;
131 const size_t amount_to_write =
min(
size_t(65536), inbound.
buffer_size());
133 const auto bytes_written = _thread_data.write(
move(
buffer),
false);
136 if (inbound.
eof() or inbound.
error()) {
137 _thread_data.shutdown(SHUT_WR);
138 _inbound_shutdown =
true;
141 cerr <<
"DEBUG: Inbound stream from " << _datagram_adapter.config().destination.to_string()
142 <<
" finished " << (inbound.
error() ?
"with an error/reset.\n" :
"cleanly.\n");
144 cerr <<
"DEBUG: Waiting for lingering segments (e.g. retransmissions of FIN) from peer...\n";
149 return (not _tcp->inbound_stream().buffer_empty()) or
150 ((_tcp->inbound_stream().eof() or _tcp->inbound_stream().error()) and not _inbound_shutdown);
154 _eventloop.add_rule(_datagram_adapter,
157 while (not _tcp->segments_out().empty()) {
158 _datagram_adapter.write(_tcp->segments_out().front());
159 _tcp->segments_out().pop();
162 [&] {
return not _tcp->segments_out().empty(); });
170 SystemCall(
"socketpair", ::socketpair(AF_UNIX, type, 0,
static_cast<int *
>(
fds)));
175 template <
typename AdaptT>
179 template <
typename AdaptT>
182 if (_tcp_thread.joinable()) {
183 cerr <<
"Warning: unclean shutdown of TCPSpongeSocket\n";
189 cerr <<
"Exception destructing TCPSpongeSocket: " << e.
what() <<
endl;
193 template <
typename AdaptT>
196 if (_tcp_thread.joinable()) {
197 cerr <<
"DEBUG: Waiting for clean shutdown... ";
205 template <
typename AdaptT>
208 throw runtime_error(
"connect() with TCPConnection already initialized");
211 _initialize_TCP(c_tcp);
213 _datagram_adapter.config_mut() = c_ad;
220 if (_tcp->state() != expected_state) {
221 throw runtime_error(
"After TCPConnection::connect(), state was " + _tcp->state().name() +
" but expected " +
222 expected_state.
name());
233 template <
typename AdaptT>
236 throw runtime_error(
"listen_and_accept() with TCPConnection already initialized");
239 _initialize_TCP(c_tcp);
241 _datagram_adapter.config_mut() = c_ad;
242 _datagram_adapter.set_listening(
true);
244 cerr <<
"DEBUG: Listening for incoming connection... ";
246 const auto s = _tcp->state();
249 cerr <<
"new connection from " << _datagram_adapter.config().destination.to_string() <<
".\n";
254 template <
typename AdaptT>
257 if (not _tcp.has_value()) {
260 _tcp_loop([] {
return true; });
262 if (not _tcp.value().active()) {
263 cerr <<
"DEBUG: TCP connection finished "
268 cerr <<
"Exception in TCPConnection runner thread: " << e.
what() <<
"\n";
306 for (
auto &
byte : addr) {
LocalStreamSocket _thread_data
Stream socket for reads and writes between owner and TCP thread.
@ SYN_RCVD
Got the peer's SYN.
void pop_output(const size_t len)
Remove bytes from the buffer.
@ Out
Callback will be triggered when Rule::fd is writable.
SystemCall("socketpair", ::socketpair(AF_UNIX, SOCK_STREAM, 0, fds.data()))
void connect(const Address &address)
sock3 shutdown(SHUT_RDWR)
size_t buffer_size() const
A reference-counted handle to a file descriptor.
static const string LOCAL_TAP_NEXT_HOP_ADDRESS
static pair< FileDescriptor, FileDescriptor > socket_pair_helper(const int type)
Call socketpair and return connected Unix-domain sockets of specified type.
void connect(const Address &address)
uint64_t timestamp_ms()
Get the time in milliseconds since the program began.
@ RESET
A connection that terminated abnormally.
Wrapper around IPv4 addresses and DNS operations.
@ TIME_WAIT
Both sides have sent FIN and ACK'd, waiting for 2 MSL.
static constexpr size_t TCP_TICK_MS
Config for classes derived from FdAdapter.
@ LISTEN
Listening for a peer to connect.
std::string peek_output(const size_t len) const
A wrapper around Unix-domain stream sockets.
void set_blocking(const bool blocking_state)
Set blocking(true) or non-blocking(false)
void connect(const TCPConfig &c_tcp, const FdAdapterConfig &c_ad)
Connect using the specified configurations; blocks until connect succeeds or fails.
A FD adapter for IPv4 datagrams read from and written to a TAP device.
Address source
Source address and port.
Summary of a TCPConnection's internal state.
uint16_t rt_timeout
Initial value of the retransmission timeout, in milliseconds.
TCPSpongeSocket(std::pair< FileDescriptor, FileDescriptor > data_socket_pair, AdaptT &&datagram_interface)
Construct LocalStreamSocket fds from socket pair, initialize eventloop.
Config for TCP sender and receiver.
void _tcp_loop(const std::function< bool()> &condition)
Process events while specified condition is true.
void listen_and_accept(const TCPConfig &c_tcp, const FdAdapterConfig &c_ad)
Listen and accept using the specified configurations; blocks until accept succeeds or fails.
static const string LOCAL_TAP_IP_ADDRESS
A FD adapter for IPv4 datagrams read from and written to a TUN device.
~TCPSpongeSocket()
When a connected socket is destructed, it will send a RST.
EthernetAddress random_private_ethernet_address()
Address destination
Destination address and port.
@ Exit
All rules have been canceled or were uninterested; make no further calls to EventLoop::wait_next_even...
std::string to_string() const
Human-readable string, e.g., "8.8.8.8:53".
A FileDescriptor to a Linux TAP device.
std::string name() const
Summarize the TCPState in a string.
A FileDescriptor to a Linux TUN device.
@ In
Callback will be triggered when Rule::fd is readable.
@ SYN_SENT
Sent a SYN to initiate a connection.
void _initialize_TCP(const TCPConfig &config)
Set up the TCPConnection and the event loop.
Multithreaded wrapper around TCPConnection that approximates the Unix sockets API.
void _tcp_main()
Main loop of TCPConnection thread.