#include "connection.h" #include #include #include #include #include #include #include #include #include #include "event_loop.h" #include "ring_buffer.h" #include "utils.h" /* * Macros */ #define SEND_BUF_SIZE (1024 * 200) #define RECV_BUF_SIZE (1024 * 200) #define MAX_CONNECTIONS (100) /* * Type definitions */ typedef struct { uint32_t id; uint32_t ev_mask; int fd; rb_handle_t send_buf; bool hangup; } conn_t; /* * Data */ static uint32_t _free_id = 1; static conn_t *_conns[MAX_CONNECTIONS]; static uint32_t _slots_available = MAX_CONNECTIONS; static uint32_t _recv_buf[RECV_BUF_SIZE]; /* * Private API */ // Get index of the connection with fd. // Parameters: // - fd - file descriptor // Returns: // - FOUND: index of the connection in _conns // - NOT FOUND: -1 static int _get_conn_index_by_fd(int fd) { for (int i = 0; i < MAX_CONNECTIONS; ++i) { if (_conns[i] && _conns[i]->fd == fd) { return i; } } return -1; } // Get index of the connection by ID. // Parameters: // - fd - file descriptor // Returns: // - FOUND: index of the connection in _conns // - NOT FOUND: -1 static int _get_conn_index_by_id(uint32_t id) { for (int i = 0; i < MAX_CONNECTIONS; ++i) { if (_conns[i] && _conns[i]->id == id) { return i; } } return -1; } // Free connection resources. // Parameters: // - index - index of the connection // Remarks: // - calls close() // - frees memory // - does NOT call shutdown() // - does NOT check if index is in bounds // - does NOT check if memory is valid static void _free_conn(int index) { conn_t *c = _conns[index]; if (flag_verbose) { fprintf( stdout, "[I] CONN: #%"PRIu32" Local delete\n", c->id ); } close(c->fd); rb_free(c->send_buf); free(c); _conns[index] = NULL; _slots_available++; } /* * Event handlers. * These functions return false if they have deleted the connection. */ static bool _event_epollin(int index) { conn_t *c = _conns[index]; int res = 0; do { res = recv( c->fd, _recv_buf, RECV_BUF_SIZE, 0 ); if (res == -1) { fprintf(stderr, "[!] recv failed: %d\n", errno); _free_conn(index); return false; } if (res == 0) { _free_conn(index); return false; } if (flag_verbose) { fprintf( stdout, "[I] CONN: #%"PRIu32" Local -> Remote (%"PRIu32" B)\n", c->id, res ); } // TODO: handle data in _recv_buf } while (c->hangup); return true; } static bool _event_epollout(int index) { conn_t *c = _conns[index]; int res; uint32_t to_send = rb_raw_read_size(c->send_buf); res = send( c->fd, rb_raw_read_ptr(c->send_buf), to_send, MSG_NOSIGNAL ); if (res == -1) { fprintf(stderr, "[!] send failed: %d\n", errno); _free_conn(index); return false; } if (flag_verbose) { fprintf( stdout, "[I] CONN: #%"PRIu32" Remote -> Local (%"PRIu32" B)\n", c->id, res ); } if (!rb_raw_read_advance(c->send_buf, res)) { // no more data can be read from the ring buffer c->ev_mask &= ~(uint32_t)EPOLLOUT; struct epoll_event ev; ev.data.fd = c->fd; ev.events = c->ev_mask; loop_ctl( EPOLL_CTL_MOD, c->fd, &ev ); } return true; } static bool _event_epollhup(int index) { if (flag_verbose) { fprintf( stdout, "[I] CONN: #%"PRIu32" Local hangup\n", _conns[index]->id ); } _conns[index]->hangup = true; return true; } static bool _event_epollerr(int index) { shutdown(_conns[index]->fd, SHUT_RDWR); _free_conn(index); return false; } /* * Public API */ uint32_t connection_add(int fd) { if (_slots_available == 0) { return 0; } conn_t *conn = malloc(sizeof(conn_t)); if (!conn) { return 0; } conn->id = _free_id++; conn->fd = fd; conn->send_buf = rb_allocate(SEND_BUF_SIZE); conn->hangup = false; if (!conn->send_buf) { free(conn); } for (int i = 0; i < MAX_CONNECTIONS; ++i) { if (!_conns[i]) { _conns[i] = conn; break; } } _slots_available--; conn->ev_mask = EPOLLIN | EPOLLRDHUP; struct epoll_event ev; ev.data.fd = fd; ev.events = conn->ev_mask; loop_ctl( EPOLL_CTL_ADD, fd, &ev ); return conn->id; } void connection_cleanup() { for (int i = 0; i < MAX_CONNECTIONS; ++i) { if (_conns[i]) { shutdown(_conns[i]->fd, SHUT_RDWR); _free_conn(i); } } } bool connection_is_socket_managed(int fd) { for (int i = 0; i < MAX_CONNECTIONS; ++i) { if (_conns[i] && _conns[i]->fd == fd) { return true; } } return false; } void connection_event(int fd, uint32_t events) { int index = _get_conn_index_by_fd(fd); if ((events & EPOLLHUP) && !_event_epollhup(index)) { return; } if ((events & EPOLLRDHUP) && !_event_epollhup(index)) { return; } if ((events & EPOLLERR) && !_event_epollerr(index)) { return; } if ((events & EPOLLIN) && !_event_epollin(index)) { return; } if ((events & EPOLLOUT) && !_event_epollout(index)) { return; } } uint32_t connection_send(uint32_t id, const void *data_void, uint32_t size) { const char *data = data_void; int index = _get_conn_index_by_id(id); if (index == -1) return 0; conn_t *c = _conns[index]; uint32_t wrote = 0; uint32_t can_write, will_write; // write to ring buffer can_write = rb_raw_write_size(c->send_buf); while (can_write && size) { will_write = size < can_write ? size : can_write; memcpy( rb_raw_write_ptr(c->send_buf), data, will_write ); can_write = rb_raw_write_advance( c->send_buf, will_write ); size -= will_write; data += will_write; write += will_write; } // add to epoll if not added if (!(c->ev_mask & EPOLLOUT)) { c->ev_mask |= EPOLLOUT; struct epoll_event ev; ev.data.fd = c->fd; ev.events = c->ev_mask; loop_ctl( EPOLL_CTL_MOD, c->fd, &ev ); } return wrote; }