Sponge
CS144's user-space TCP library
tcp_sponge_socket.cc
Go to the documentation of this file.
1 #include "tcp_sponge_socket.hh"
2 
3 #include "network_interface.hh"
4 #include "parser.hh"
5 #include "tun.hh"
6 #include "util.hh"
7 
8 #include <cstddef>
9 #include <exception>
10 #include <iostream>
11 #include <stdexcept>
12 #include <string>
13 #include <sys/socket.h>
14 #include <sys/syscall.h>
15 #include <sys/types.h>
16 #include <unistd.h>
17 #include <utility>
18 
19 using namespace std;
20 
21 static constexpr size_t TCP_TICK_MS = 10;
22 
24 template <typename AdaptT>
25 void TCPSpongeSocket<AdaptT>::_tcp_loop(const function<bool()> &condition) {
26  auto base_time = timestamp_ms();
27  while (condition()) {
28  auto ret = _eventloop.wait_next_event(TCP_TICK_MS);
29  if (ret == EventLoop::Result::Exit or _abort) {
30  break;
31  }
32 
33  if (_tcp.value().active()) {
34  const auto next_time = timestamp_ms();
35  _tcp.value().tick(next_time - base_time);
36  _datagram_adapter.tick(next_time - base_time);
37  base_time = next_time;
38  }
39  }
40 }
41 
44 template <typename AdaptT>
46  AdaptT &&datagram_interface)
47  : LocalStreamSocket(move(data_socket_pair.first))
48  , _thread_data(move(data_socket_pair.second))
49  , _datagram_adapter(move(datagram_interface)) {
51 }
52 
53 template <typename AdaptT>
55  _tcp.emplace(config);
56 
57  // Set up the event loop
58 
59  // There are four possible events to handle:
60  //
61  // 1) Incoming datagram received (needs to be given to
62  // TCPConnection::segment_received method)
63  //
64  // 2) Outbound bytes received from local application via a write()
65  // call (needs to be read from the local stream socket and
66  // given to TCPConnection::data_written method)
67  //
68  // 3) Incoming bytes reassembled by the TCPConnection
69  // (needs to be read from the inbound_stream and written
70  // to the local stream socket back to the application)
71  //
72  // 4) Outbound segment generated by TCP (needs to be
73  // given to underlying datagram socket)
74 
75  // rule 1: read from filtered packet stream and dump into TCPConnection
76  _eventloop.add_rule(_datagram_adapter,
78  [&] {
79  auto seg = _datagram_adapter.read();
80  if (seg) {
81  _tcp->segment_received(move(seg.value()));
82  }
83 
84  // debugging output:
85  if (_thread_data.eof() and _tcp.value().bytes_in_flight() == 0 and not _fully_acked) {
86  cerr << "DEBUG: Outbound stream to "
87  << _datagram_adapter.config().destination.to_string()
88  << " has been fully acknowledged.\n";
89  _fully_acked = true;
90  }
91  },
92  [&] { return _tcp->active(); });
93 
94  // rule 2: read from pipe into outbound buffer
95  _eventloop.add_rule(
96  _thread_data,
98  [&] {
99  const auto data = _thread_data.read(_tcp->remaining_outbound_capacity());
100  const auto len = data.size();
101  const auto amount_written = _tcp->write(move(data));
102  if (amount_written != len) {
103  throw runtime_error("TCPConnection::write() accepted less than advertised length");
104  }
105 
106  if (_thread_data.eof()) {
107  _tcp->end_input_stream();
108  _outbound_shutdown = true;
109 
110  // debugging output:
111  cerr << "DEBUG: Outbound stream to " << _datagram_adapter.config().destination.to_string()
112  << " finished (" << _tcp.value().bytes_in_flight() << " byte"
113  << (_tcp.value().bytes_in_flight() == 1 ? "" : "s") << " still in flight).\n";
114  }
115  },
116  [&] { return (_tcp->active()) and (not _outbound_shutdown) and (_tcp->remaining_outbound_capacity() > 0); },
117  [&] {
118  _tcp->end_input_stream();
119  _outbound_shutdown = true;
120  });
121 
122  // rule 3: read from inbound buffer into pipe
123  _eventloop.add_rule(
124  _thread_data,
126  [&] {
127  ByteStream &inbound = _tcp->inbound_stream();
128  // Write from the inbound_stream into
129  // the pipe, handling the possibility of a partial
130  // write (i.e., only pop what was actually written).
131  const size_t amount_to_write = min(size_t(65536), inbound.buffer_size());
132  const std::string buffer = inbound.peek_output(amount_to_write);
133  const auto bytes_written = _thread_data.write(move(buffer), false);
134  inbound.pop_output(bytes_written);
135 
136  if (inbound.eof() or inbound.error()) {
137  _thread_data.shutdown(SHUT_WR);
138  _inbound_shutdown = true;
139 
140  // debugging output:
141  cerr << "DEBUG: Inbound stream from " << _datagram_adapter.config().destination.to_string()
142  << " finished " << (inbound.error() ? "with an error/reset.\n" : "cleanly.\n");
143  if (_tcp.value().state() == TCPState::State::TIME_WAIT) {
144  cerr << "DEBUG: Waiting for lingering segments (e.g. retransmissions of FIN) from peer...\n";
145  }
146  }
147  },
148  [&] {
149  return (not _tcp->inbound_stream().buffer_empty()) or
150  ((_tcp->inbound_stream().eof() or _tcp->inbound_stream().error()) and not _inbound_shutdown);
151  });
152 
153  // rule 4: read outbound segments from TCPConnection and send as datagrams
154  _eventloop.add_rule(_datagram_adapter,
156  [&] {
157  while (not _tcp->segments_out().empty()) {
158  _datagram_adapter.write(_tcp->segments_out().front());
159  _tcp->segments_out().pop();
160  }
161  },
162  [&] { return not _tcp->segments_out().empty(); });
163 }
164 
169  int fds[2];
170  SystemCall("socketpair", ::socketpair(AF_UNIX, type, 0, static_cast<int *>(fds)));
171  return {FileDescriptor(fds[0]), FileDescriptor(fds[1])};
172 }
173 
175 template <typename AdaptT>
176 TCPSpongeSocket<AdaptT>::TCPSpongeSocket(AdaptT &&datagram_interface)
177  : TCPSpongeSocket(socket_pair_helper(SOCK_STREAM), move(datagram_interface)) {}
178 
179 template <typename AdaptT>
181  try {
182  if (_tcp_thread.joinable()) {
183  cerr << "Warning: unclean shutdown of TCPSpongeSocket\n";
184  // force the other side to exit
185  _abort.store(true);
186  _tcp_thread.join();
187  }
188  } catch (const exception &e) {
189  cerr << "Exception destructing TCPSpongeSocket: " << e.what() << endl;
190  }
191 }
192 
193 template <typename AdaptT>
195  shutdown(SHUT_RDWR);
196  if (_tcp_thread.joinable()) {
197  cerr << "DEBUG: Waiting for clean shutdown... ";
198  _tcp_thread.join();
199  cerr << "done.\n";
200  }
201 }
202 
205 template <typename AdaptT>
207  if (_tcp) {
208  throw runtime_error("connect() with TCPConnection already initialized");
209  }
210 
211  _initialize_TCP(c_tcp);
212 
213  _datagram_adapter.config_mut() = c_ad;
214 
215  cerr << "DEBUG: Connecting to " << c_ad.destination.to_string() << "... ";
216  _tcp->connect();
217 
218  const TCPState expected_state = TCPState::State::SYN_SENT;
219 
220  if (_tcp->state() != expected_state) {
221  throw runtime_error("After TCPConnection::connect(), state was " + _tcp->state().name() + " but expected " +
222  expected_state.name());
223  }
224 
225  _tcp_loop([&] { return _tcp->state() == TCPState::State::SYN_SENT; });
226  cerr << "done.\n";
227 
228  _tcp_thread = thread(&TCPSpongeSocket::_tcp_main, this);
229 }
230 
233 template <typename AdaptT>
235  if (_tcp) {
236  throw runtime_error("listen_and_accept() with TCPConnection already initialized");
237  }
238 
239  _initialize_TCP(c_tcp);
240 
241  _datagram_adapter.config_mut() = c_ad;
242  _datagram_adapter.set_listening(true);
243 
244  cerr << "DEBUG: Listening for incoming connection... ";
245  _tcp_loop([&] {
246  const auto s = _tcp->state();
248  });
249  cerr << "new connection from " << _datagram_adapter.config().destination.to_string() << ".\n";
250 
251  _tcp_thread = thread(&TCPSpongeSocket::_tcp_main, this);
252 }
253 
254 template <typename AdaptT>
256  try {
257  if (not _tcp.has_value()) {
258  throw runtime_error("no TCP");
259  }
260  _tcp_loop([] { return true; });
261  shutdown(SHUT_RDWR);
262  if (not _tcp.value().active()) {
263  cerr << "DEBUG: TCP connection finished "
264  << (_tcp.value().state() == TCPState::State::RESET ? "uncleanly" : "cleanly.\n");
265  }
266  _tcp.reset();
267  } catch (const exception &e) {
268  cerr << "Exception in TCPConnection runner thread: " << e.what() << "\n";
269  throw e;
270  }
271 }
272 
275 
278 
281 
284 
287 
289 
290 void CS144TCPSocket::connect(const Address &address) {
291  TCPConfig tcp_config;
292  tcp_config.rt_timeout = 100;
293 
294  FdAdapterConfig multiplexer_config;
295  multiplexer_config.source = {"169.254.144.9", to_string(uint16_t(random_device()()))};
296  multiplexer_config.destination = address;
297 
298  TCPOverIPv4SpongeSocket::connect(tcp_config, multiplexer_config);
299 }
300 
301 static const string LOCAL_TAP_IP_ADDRESS = "169.254.10.9";
302 static const string LOCAL_TAP_NEXT_HOP_ADDRESS = "169.254.10.1";
303 
305  EthernetAddress addr;
306  for (auto &byte : addr) {
307  byte = random_device()(); // use a random local Ethernet address
308  }
309  addr.at(0) |= 0x02; // "10" in last two binary digits marks a private Ethernet address
310  addr.at(0) &= 0xfe;
311 
312  return addr;
313 }
314 
320 
321 void FullStackSocket::connect(const Address &address) {
322  TCPConfig tcp_config;
323  tcp_config.rt_timeout = 100;
324 
325  FdAdapterConfig multiplexer_config;
326  multiplexer_config.source = {LOCAL_TAP_IP_ADDRESS, to_string(uint16_t(random_device()()))};
327  multiplexer_config.destination = address;
328 
329  TCPOverIPv4OverEthernetSpongeSocket::connect(tcp_config, multiplexer_config);
330 }
TCPSpongeSocket::_thread_data
LocalStreamSocket _thread_data
Stream socket for reads and writes between owner and TCP thread.
Definition: tcp_sponge_socket.hh:24
util.hh
TCPState::State::SYN_RCVD
@ SYN_RCVD
Got the peer's SYN.
TCPSpongeSocket::wait_until_closed
void wait_until_closed()
Definition: tcp_sponge_socket.cc:194
std::string
std::exception
ByteStream::eof
bool eof() const
Definition: byte_stream.cc:47
len
constexpr size_t len
Definition: tcp_benchmark.cc:12
std::move
T move(T... args)
ByteStream::pop_output
void pop_output(const size_t len)
Remove bytes from the buffer.
Definition: byte_stream.cc:29
std::pair
EventLoop::Direction::Out
@ Out
Callback will be triggered when Rule::fd is writable.
SystemCall
SystemCall("socketpair", ::socketpair(AF_UNIX, SOCK_STREAM, 0, fds.data()))
FullStackSocket::connect
void connect(const Address &address)
Definition: tcp_sponge_socket.cc:321
shutdown
sock3 shutdown(SHUT_RDWR)
CS144TCPSocket::CS144TCPSocket
CS144TCPSocket()
Definition: tcp_sponge_socket.cc:288
ByteStream::buffer_size
size_t buffer_size() const
Definition: byte_stream.cc:43
FileDescriptor
A reference-counted handle to a file descriptor.
Definition: file_descriptor.hh:12
LOCAL_TAP_NEXT_HOP_ADDRESS
static const string LOCAL_TAP_NEXT_HOP_ADDRESS
Definition: tcp_sponge_socket.cc:302
socket_pair_helper
static pair< FileDescriptor, FileDescriptor > socket_pair_helper(const int type)
Call socketpair and return connected Unix-domain sockets of specified type.
Definition: tcp_sponge_socket.cc:168
CS144TCPSocket::connect
void connect(const Address &address)
Definition: tcp_sponge_socket.cc:290
timestamp_ms
uint64_t timestamp_ms()
Get the time in milliseconds since the program began.
Definition: util.cc:14
TCPState::State::RESET
@ RESET
A connection that terminated abnormally.
Address
Wrapper around IPv4 addresses and DNS operations.
Definition: address.hh:13
fds
std::array< int, 2 > fds
Definition: socket_example_3.cc:2
TCPState::State::TIME_WAIT
@ TIME_WAIT
Both sides have sent FIN and ACK'd, waiting for 2 MSL.
buffer
std::string buffer
Definition: parser_example.cc:7
TCP_TICK_MS
static constexpr size_t TCP_TICK_MS
Definition: tcp_sponge_socket.cc:21
FdAdapterConfig
Config for classes derived from FdAdapter.
Definition: tcp_config.hh:26
ByteStream
An in-order byte stream.
Definition: byte_stream.hh:11
TCPState::State::LISTEN
@ LISTEN
Listening for a peer to connect.
std::thread
ByteStream::peek_output
std::string peek_output(const size_t len) const
Definition: byte_stream.cc:23
std::random_device
std::cerr
tun.hh
LocalStreamSocket
A wrapper around Unix-domain stream sockets.
Definition: socket.hh:113
std::array
FileDescriptor::set_blocking
void set_blocking(const bool blocking_state)
Set blocking(true) or non-blocking(false)
Definition: file_descriptor.cc:101
TCPSpongeSocket::connect
void connect(const TCPConfig &c_tcp, const FdAdapterConfig &c_ad)
Connect using the specified configurations; blocks until connect succeeds or fails.
Definition: tcp_sponge_socket.cc:206
std::runtime_error
TCPOverIPv4OverEthernetAdapter
A FD adapter for IPv4 datagrams read from and written to a TAP device.
Definition: tuntap_adapter.hh:44
FdAdapterConfig::source
Address source
Source address and port.
Definition: tcp_config.hh:28
std::uint16_t
TCPState
Summary of a TCPConnection's internal state.
Definition: tcp_state.hh:23
TCPConfig::rt_timeout
uint16_t rt_timeout
Initial value of the retransmission timeout, in milliseconds.
Definition: tcp_config.hh:19
TCPSpongeSocket::TCPSpongeSocket
TCPSpongeSocket(std::pair< FileDescriptor, FileDescriptor > data_socket_pair, AdaptT &&datagram_interface)
Construct LocalStreamSocket fds from socket pair, initialize eventloop.
Definition: tcp_sponge_socket.cc:45
TCPConfig
Config for TCP sender and receiver.
Definition: tcp_config.hh:12
TCPSpongeSocket::_tcp_loop
void _tcp_loop(const std::function< bool()> &condition)
Process events while specified condition is true.
Definition: tcp_sponge_socket.cc:25
TCPSpongeSocket::listen_and_accept
void listen_and_accept(const TCPConfig &c_tcp, const FdAdapterConfig &c_ad)
Listen and accept using the specified configurations; blocks until accept succeeds or fails.
Definition: tcp_sponge_socket.cc:234
LOCAL_TAP_IP_ADDRESS
static const string LOCAL_TAP_IP_ADDRESS
Definition: tcp_sponge_socket.cc:301
TCPOverIPv4OverTunFdAdapter
A FD adapter for IPv4 datagrams read from and written to a TUN device.
Definition: tuntap_adapter.hh:13
std::min
T min(T... args)
TCPSpongeSocket::~TCPSpongeSocket
~TCPSpongeSocket()
When a connected socket is destructed, it will send a RST.
Definition: tcp_sponge_socket.cc:180
std::endl
T endl(T... args)
ByteStream::error
bool error() const
Definition: byte_stream.hh:62
std
random_private_ethernet_address
EthernetAddress random_private_ethernet_address()
Definition: tcp_sponge_socket.cc:304
FdAdapterConfig::destination
Address destination
Destination address and port.
Definition: tcp_config.hh:29
EventLoop::Result::Exit
@ Exit
All rules have been canceled or were uninterested; make no further calls to EventLoop::wait_next_even...
Address::to_string
std::string to_string() const
Human-readable string, e.g., "8.8.8.8:53".
Definition: address.cc:103
TapFD
A FileDescriptor to a Linux TAP device.
Definition: tun.hh:23
TCPState::name
std::string name() const
Summarize the TCPState in a string.
Definition: tcp_state.cc:12
to_string
string to_string(const EthernetAddress address)
Printable representation of an EthernetAddress.
Definition: ethernet_header.cc:52
TunFD
A FileDescriptor to a Linux TUN device.
Definition: tun.hh:16
EventLoop::Direction::In
@ In
Callback will be triggered when Rule::fd is readable.
FullStackSocket::FullStackSocket
FullStackSocket()
Definition: tcp_sponge_socket.cc:315
TCPState::State::SYN_SENT
@ SYN_SENT
Sent a SYN to initiate a connection.
tcp_sponge_socket.hh
std::exception::what
T what(T... args)
network_interface.hh
parser.hh
TCPSpongeSocket::_initialize_TCP
void _initialize_TCP(const TCPConfig &config)
Set up the TCPConnection and the event loop.
Definition: tcp_sponge_socket.cc:54
TCPSpongeSocket
Multithreaded wrapper around TCPConnection that approximates the Unix sockets API.
Definition: tcp_sponge_socket.hh:21
TCPSpongeSocket::_tcp_main
void _tcp_main()
Main loop of TCPConnection thread.
Definition: tcp_sponge_socket.cc:255