Sponge
CS144's user-space TCP library
tcp_sponge_socket.cc
Go to the documentation of this file.
1 #include "tcp_sponge_socket.hh"
2 
3 #include "parser.hh"
4 #include "tun.hh"
5 #include "util.hh"
6 
7 #include <cstddef>
8 #include <exception>
9 #include <iostream>
10 #include <stdexcept>
11 #include <string>
12 #include <sys/socket.h>
13 #include <sys/syscall.h>
14 #include <sys/types.h>
15 #include <unistd.h>
16 #include <utility>
17 
18 using namespace std;
19 
20 static constexpr size_t TCP_TICK_MS = 10;
21 
23 template <typename AdaptT>
24 void TCPSpongeSocket<AdaptT>::_tcp_loop(const function<bool()> &condition) {
25  auto base_time = timestamp_ms();
26  while (condition()) {
27  auto ret = _eventloop.wait_next_event(TCP_TICK_MS);
28  if (ret == EventLoop::Result::Exit or _abort) {
29  break;
30  }
31 
32  if (_tcp.value().active()) {
33  const auto next_time = timestamp_ms();
34  _tcp.value().tick(next_time - base_time);
35  _datagram_adapter.tick(next_time - base_time);
36  base_time = next_time;
37  }
38  }
39 }
40 
43 template <typename AdaptT>
45  AdaptT &&datagram_interface)
46  : LocalStreamSocket(move(data_socket_pair.first))
47  , _thread_data(move(data_socket_pair.second))
48  , _datagram_adapter(move(datagram_interface)) {
50 }
51 
52 template <typename AdaptT>
54  _tcp.emplace(config);
55 
56  // Set up the event loop
57 
58  // There are four possible events to handle:
59  //
60  // 1) Incoming datagram received (needs to be given to
61  // TCPConnection::segment_received method)
62  //
63  // 2) Outbound bytes received from local application via a write()
64  // call (needs to be read from the local stream socket and
65  // given to TCPConnection::data_written method)
66  //
67  // 3) Incoming bytes reassembled by the TCPConnection
68  // (needs to be read from the inbound_stream and written
69  // to the local stream socket back to the application)
70  //
71  // 4) Outbound segment generated by TCP (needs to be
72  // given to underlying datagram socket)
73 
74  // rule 1: read from filtered packet stream and dump into TCPConnection
75  _eventloop.add_rule(_datagram_adapter,
77  [&] {
78  auto seg = _datagram_adapter.read();
79  if (seg) {
80  _tcp->segment_received(move(seg.value()));
81  }
82 
83  // debugging output:
84  if (_thread_data.eof() and _tcp.value().bytes_in_flight() == 0 and not _fully_acked) {
85  cerr << "DEBUG: Outbound stream to "
86  << _datagram_adapter.config().destination.to_string()
87  << " has been fully acknowledged.\n";
88  _fully_acked = true;
89  }
90  },
91  [&] { return _tcp->active(); });
92 
93  // rule 2: read from pipe into outbound buffer
94  _eventloop.add_rule(
95  _thread_data,
97  [&] {
98  const auto data = _thread_data.read(_tcp->remaining_outbound_capacity());
99  const auto len = data.size();
100  const auto amount_written = _tcp->write(move(data));
101  if (amount_written != len) {
102  throw runtime_error("TCPConnection::write() accepted less than advertised length");
103  }
104 
105  if (_thread_data.eof()) {
106  _tcp->end_input_stream();
107  _outbound_shutdown = true;
108 
109  // debugging output:
110  cerr << "DEBUG: Outbound stream to " << _datagram_adapter.config().destination.to_string()
111  << " finished (" << _tcp.value().bytes_in_flight() << " byte"
112  << (_tcp.value().bytes_in_flight() == 1 ? "" : "s") << " still in flight).\n";
113  }
114  },
115  [&] { return (_tcp->active()) and (not _outbound_shutdown) and (_tcp->remaining_outbound_capacity() > 0); },
116  [&] {
117  _tcp->end_input_stream();
118  _outbound_shutdown = true;
119  });
120 
121  // rule 3: read from inbound buffer into pipe
122  _eventloop.add_rule(
123  _thread_data,
125  [&] {
126  ByteStream &inbound = _tcp->inbound_stream();
127  // Write from the inbound_stream into
128  // the pipe, handling the possibility of a partial
129  // write (i.e., only pop what was actually written).
130  const size_t amount_to_write = min(size_t(65536), inbound.buffer_size());
131  const std::string buffer = inbound.peek_output(amount_to_write);
132  const auto bytes_written = _thread_data.write(move(buffer), false);
133  inbound.pop_output(bytes_written);
134 
135  if (inbound.eof() or inbound.error()) {
136  _thread_data.shutdown(SHUT_WR);
137  _inbound_shutdown = true;
138 
139  // debugging output:
140  cerr << "DEBUG: Inbound stream from " << _datagram_adapter.config().destination.to_string()
141  << " finished " << (inbound.error() ? "with an error/reset.\n" : "cleanly.\n");
142  if (_tcp.value().state() == TCPState::State::TIME_WAIT) {
143  cerr << "DEBUG: Waiting for lingering segments (e.g. retransmissions of FIN) from peer...\n";
144  }
145  }
146  },
147  [&] {
148  return (not _tcp->inbound_stream().buffer_empty()) or
149  ((_tcp->inbound_stream().eof() or _tcp->inbound_stream().error()) and not _inbound_shutdown);
150  });
151 
152  // rule 4: read outbound segments from TCPConnection and send as datagrams
153  _eventloop.add_rule(_datagram_adapter,
155  [&] {
156  while (not _tcp->segments_out().empty()) {
157  _datagram_adapter.write(_tcp->segments_out().front());
158  _tcp->segments_out().pop();
159  }
160  },
161  [&] { return not _tcp->segments_out().empty(); });
162 }
163 
168  int fds[2];
169  SystemCall("socketpair", ::socketpair(AF_UNIX, type, 0, static_cast<int *>(fds)));
170  return {FileDescriptor(fds[0]), FileDescriptor(fds[1])};
171 }
172 
174 template <typename AdaptT>
175 TCPSpongeSocket<AdaptT>::TCPSpongeSocket(AdaptT &&datagram_interface)
176  : TCPSpongeSocket(socket_pair_helper(SOCK_STREAM), move(datagram_interface)) {}
177 
178 template <typename AdaptT>
180  try {
181  if (_tcp_thread.joinable()) {
182  cerr << "Warning: unclean shutdown of TCPSpongeSocket\n";
183  // force the other side to exit
184  _abort.store(true);
185  _tcp_thread.join();
186  }
187  } catch (const exception &e) {
188  cerr << "Exception destructing TCPSpongeSocket: " << e.what() << endl;
189  }
190 }
191 
192 template <typename AdaptT>
194  shutdown(SHUT_RDWR);
195  if (_tcp_thread.joinable()) {
196  cerr << "DEBUG: Waiting for clean shutdown... ";
197  _tcp_thread.join();
198  cerr << "done.\n";
199  }
200 }
201 
204 template <typename AdaptT>
206  if (_tcp) {
207  throw runtime_error("connect() with TCPConnection already initialized");
208  }
209 
210  _initialize_TCP(c_tcp);
211 
212  _datagram_adapter.config_mut() = c_ad;
213 
214  cerr << "DEBUG: Connecting to " << c_ad.destination.to_string() << "... ";
215  _tcp->connect();
216 
217  const TCPState expected_state = TCPState::State::SYN_SENT;
218 
219  if (_tcp->state() != expected_state) {
220  throw runtime_error("After TCPConnection::connect(), state was " + _tcp->state().name() + " but expected " +
221  expected_state.name());
222  }
223 
224  _tcp_loop([&] { return _tcp->state() == TCPState::State::SYN_SENT; });
225  cerr << "done.\n";
226 
227  _tcp_thread = thread(&TCPSpongeSocket::_tcp_main, this);
228 }
229 
232 template <typename AdaptT>
234  if (_tcp) {
235  throw runtime_error("listen_and_accept() with TCPConnection already initialized");
236  }
237 
238  _initialize_TCP(c_tcp);
239 
240  _datagram_adapter.config_mut() = c_ad;
241  _datagram_adapter.set_listening(true);
242 
243  cerr << "DEBUG: Listening for incoming connection... ";
244  _tcp_loop([&] {
245  const auto s = _tcp->state();
247  });
248  cerr << "new connection from " << _datagram_adapter.config().destination.to_string() << ".\n";
249 
250  _tcp_thread = thread(&TCPSpongeSocket::_tcp_main, this);
251 }
252 
253 template <typename AdaptT>
255  try {
256  if (not _tcp.has_value()) {
257  throw runtime_error("no TCP");
258  }
259  _tcp_loop([] { return true; });
260  shutdown(SHUT_RDWR);
261  if (not _tcp.value().active()) {
262  cerr << "DEBUG: TCP connection finished "
263  << (_tcp.value().state() == TCPState::State::RESET ? "uncleanly" : "cleanly.\n");
264  }
265  _tcp.reset();
266  } catch (const exception &e) {
267  cerr << "Exception in TCPConnection runner thread: " << e.what() << "\n";
268  throw e;
269  }
270 }
271 
274 
277 
280 
283 
285 
286 void CS144TCPSocket::connect(const Address &address) {
287  TCPConfig tcp_config;
288  tcp_config.rt_timeout = 100;
289 
290  FdAdapterConfig multiplexer_config;
291  multiplexer_config.source = {"169.254.144.9", to_string(uint16_t(random_device()()))};
292  multiplexer_config.destination = address;
293 
294  TCPOverIPv4SpongeSocket::connect(tcp_config, multiplexer_config);
295 }
TCPSpongeSocket::_thread_data
LocalStreamSocket _thread_data
Stream socket for reads and writes between owner and TCP thread.
Definition: tcp_sponge_socket.hh:24
util.hh
TCPState::State::SYN_RCVD
@ SYN_RCVD
Got the peer's SYN.
TCPSpongeSocket::wait_until_closed
void wait_until_closed()
Definition: tcp_sponge_socket.cc:193
std::string
std::exception
ByteStream::eof
bool eof() const
Definition: byte_stream.cc:47
len
constexpr size_t len
Definition: tcp_benchmark.cc:12
std::move
T move(T... args)
ByteStream::pop_output
void pop_output(const size_t len)
Remove bytes from the buffer.
Definition: byte_stream.cc:29
std::pair
EventLoop::Direction::Out
@ Out
Callback will be triggered when Rule::fd is writable.
SystemCall
SystemCall("socketpair", ::socketpair(AF_UNIX, SOCK_STREAM, 0, fds.data()))
shutdown
sock3 shutdown(SHUT_RDWR)
CS144TCPSocket::CS144TCPSocket
CS144TCPSocket()
Definition: tcp_sponge_socket.cc:284
ByteStream::buffer_size
size_t buffer_size() const
Definition: byte_stream.cc:43
FileDescriptor
A reference-counted handle to a file descriptor.
Definition: file_descriptor.hh:12
socket_pair_helper
static pair< FileDescriptor, FileDescriptor > socket_pair_helper(const int type)
Call socketpair and return connected Unix-domain sockets of specified type.
Definition: tcp_sponge_socket.cc:167
CS144TCPSocket::connect
void connect(const Address &address)
Definition: tcp_sponge_socket.cc:286
timestamp_ms
uint64_t timestamp_ms()
Get the time in milliseconds since the program began.
Definition: util.cc:14
TCPState::State::RESET
@ RESET
A connection that terminated abnormally.
Address
Wrapper around IPv4 addresses and DNS operations.
Definition: address.hh:13
fds
std::array< int, 2 > fds
Definition: socket_example_3.cc:2
TCPState::State::TIME_WAIT
@ TIME_WAIT
Both sides have sent FIN and ACK'd, waiting for 2 MSL.
buffer
std::string buffer
Definition: parser_example.cc:7
TCP_TICK_MS
static constexpr size_t TCP_TICK_MS
Definition: tcp_sponge_socket.cc:20
FdAdapterConfig
Config for classes derived from FdAdapter.
Definition: tcp_config.hh:26
ByteStream
An in-order byte stream.
Definition: byte_stream.hh:11
TCPState::State::LISTEN
@ LISTEN
Listening for a peer to connect.
std::thread
ByteStream::peek_output
std::string peek_output(const size_t len) const
Definition: byte_stream.cc:23
std::random_device
std::cerr
tun.hh
LocalStreamSocket
A wrapper around Unix-domain stream sockets.
Definition: socket.hh:113
std::to_string
T to_string(T... args)
FileDescriptor::set_blocking
void set_blocking(const bool blocking_state)
Set blocking(true) or non-blocking(false)
Definition: file_descriptor.cc:101
TCPSpongeSocket::connect
void connect(const TCPConfig &c_tcp, const FdAdapterConfig &c_ad)
Connect using the specified configurations; blocks until connect succeeds or fails.
Definition: tcp_sponge_socket.cc:205
std::runtime_error
FdAdapterConfig::source
Address source
Source address and port.
Definition: tcp_config.hh:28
std::uint16_t
TCPState
Summary of a TCPConnection's internal state.
Definition: tcp_state.hh:23
TCPConfig::rt_timeout
uint16_t rt_timeout
Initial value of the retransmission timeout, in milliseconds.
Definition: tcp_config.hh:19
TCPSpongeSocket::TCPSpongeSocket
TCPSpongeSocket(std::pair< FileDescriptor, FileDescriptor > data_socket_pair, AdaptT &&datagram_interface)
Construct LocalStreamSocket fds from socket pair, initialize eventloop.
Definition: tcp_sponge_socket.cc:44
TCPConfig
Config for TCP sender and receiver.
Definition: tcp_config.hh:12
TCPSpongeSocket::_tcp_loop
void _tcp_loop(const std::function< bool()> &condition)
Process events while specified condition is true.
Definition: tcp_sponge_socket.cc:24
TCPSpongeSocket::listen_and_accept
void listen_and_accept(const TCPConfig &c_tcp, const FdAdapterConfig &c_ad)
Listen and accept using the specified configurations; blocks until accept succeeds or fails.
Definition: tcp_sponge_socket.cc:233
TCPOverIPv4OverTunFdAdapter
A FD adapter for IPv4 datagrams read from and written to a TUN device.
Definition: tuntap_adapter.hh:12
std::min
T min(T... args)
TCPSpongeSocket::~TCPSpongeSocket
~TCPSpongeSocket()
When a connected socket is destructed, it will send a RST.
Definition: tcp_sponge_socket.cc:179
std::endl
T endl(T... args)
ByteStream::error
bool error() const
Definition: byte_stream.hh:62
std
FdAdapterConfig::destination
Address destination
Destination address and port.
Definition: tcp_config.hh:29
EventLoop::Result::Exit
@ Exit
All rules have been canceled or were uninterested; make no further calls to EventLoop::wait_next_even...
Address::to_string
std::string to_string() const
Human-readable string, e.g., "8.8.8.8:53".
Definition: address.cc:103
TCPState::name
std::string name() const
Summarize the TCPState in a string.
Definition: tcp_state.cc:12
TunFD
A FileDescriptor to a Linux TUN device.
Definition: tun.hh:16
EventLoop::Direction::In
@ In
Callback will be triggered when Rule::fd is readable.
TCPState::State::SYN_SENT
@ SYN_SENT
Sent a SYN to initiate a connection.
tcp_sponge_socket.hh
std::exception::what
T what(T... args)
parser.hh
TCPSpongeSocket::_initialize_TCP
void _initialize_TCP(const TCPConfig &config)
Set up the TCPConnection and the event loop.
Definition: tcp_sponge_socket.cc:53
TCPSpongeSocket
Multithreaded wrapper around TCPConnection that approximates the Unix sockets API.
Definition: tcp_sponge_socket.hh:21
TCPSpongeSocket::_tcp_main
void _tcp_main()
Main loop of TCPConnection thread.
Definition: tcp_sponge_socket.cc:254