Mercurial > libjeffpc
view socksvc.c @ 810:cb643e3ad672
sexpr: use size_t in sexpr lexer input function
Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author | Josef 'Jeff' Sipek <jeffpc@josefsipek.net> |
---|---|
date | Sat, 18 Jul 2020 10:07:34 -0400 |
parents | 09d5cc5c7f94 |
children | a3965185623f |
line wrap: on
line source
/* * Copyright (c) 2015-2017 Josef 'Jeff' Sipek <jeffpc@josefsipek.net> * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ #include <sys/socket.h> #include <netdb.h> #include <netinet/in.h> #include <arpa/inet.h> #include <signal.h> #include <stdio.h> #include <unistd.h> #include <sys/select.h> #include <jeffpc/taskq.h> #include <jeffpc/atomic.h> #include <jeffpc/error.h> #include <jeffpc/types.h> #include <jeffpc/time.h> #include <jeffpc/io.h> #include <jeffpc/mem.h> #include <jeffpc/socksvc.h> /* * This part of the communication library is a bit different. It is meant * to abstract away the setup and teardown of a listening server. * Regardless of what the server is listening for, the same exact steps are * needed to start listening on a socket and accept connections. The * difference from the rest of the common library is the fact that this code * is not shy about printing errors. */ #define MAX_SOCK_FDS 8 #define CONN_BACKLOG 64 union sockaddr_union { struct sockaddr_in inet; struct sockaddr_in6 inet6; }; struct state { void *private; struct taskq *taskq; void (*func)(int, struct socksvc_stats *, void *); int fds[MAX_SOCK_FDS]; int nfds; }; struct socksvc { struct socksvc_stats stats; struct state *state; int fd; }; static struct mem_cache *socksvc_cache; static atomic_t server_shutdown; static void __attribute__((constructor)) init_socksvc_subsys(void) { socksvc_cache = mem_cache_create("socksvc-cache", sizeof(struct socksvc), 0); ASSERT(!IS_ERR(socksvc_cache)); } static void sigterm_handler(int signum, siginfo_t *info, void *unused) { cmn_err(CE_INFO, "SIGTERM received"); atomic_set(&server_shutdown, 1); } static void handle_signals(void) { struct sigaction action; int ret; sigemptyset(&action.sa_mask); action.sa_handler = SIG_IGN; action.sa_flags = 0; ret = sigaction(SIGPIPE, &action, NULL); if (ret) cmn_err(CE_INFO, "Failed to ignore SIGPIPE: %s", strerror(errno)); action.sa_sigaction = sigterm_handler; action.sa_flags = SA_SIGINFO; ret = sigaction(SIGTERM, &action, NULL); if (ret) cmn_err(CE_INFO, "Failed to set SIGTERM handler: %s", strerror(errno)); } static int bind_sock(struct state *state, int family, struct sockaddr *addr, int addrlen) { const int on = 1; int ret; int fd; if (state->nfds >= MAX_SOCK_FDS) return -EMFILE; fd = socket(family, SOCK_STREAM, 0); if (fd == -1) return -errno; setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on)); if (bind(fd, addr, addrlen)) goto err; if (listen(fd, CONN_BACKLOG)) goto err; state->fds[state->nfds++] = fd; return 0; err: ret = -errno; close(fd); return ret; } static int start_listening(struct state *state, const char *host, uint16_t port) { struct addrinfo hints, *res, *p; char strport[10]; memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE; snprintf(strport, sizeof(strport), "%d", port); if (getaddrinfo(host, strport, &hints, &res)) return -EINVAL; /* XXX: this should likely be something else */ for (p = res; p; p = p->ai_next) { struct sockaddr_in *ipv4; struct sockaddr_in6 *ipv6; char str[64]; void *addr; int ret; switch (p->ai_family) { case AF_INET: ipv4 = (struct sockaddr_in *) p->ai_addr; addr = &ipv4->sin_addr; break; case AF_INET6: ipv6 = (struct sockaddr_in6 *) p->ai_addr; addr = &ipv6->sin6_addr; break; default: cmn_err(CE_INFO, "Unsupparted address family: %d", p->ai_family); addr = NULL; break; } if (!addr) continue; inet_ntop(p->ai_family, addr, str, sizeof(str)); ret = bind_sock(state, p->ai_family, p->ai_addr, p->ai_addrlen); if (ret && ret != -EAFNOSUPPORT) return ret; else if (!ret) cmn_err(CE_INFO, "Bound to: %s %s", str, strport); } freeaddrinfo(res); return 0; } static void stop_listening(struct state *state) { int i; for (i = 0; i < state->nfds; i++) close(state->fds[i]); } static void wrap_taskq_callback(void *arg) { struct socksvc *cb = arg; cb->stats.dequeued_time = gettime(); cb->state->func(cb->fd, &cb->stats, cb->state->private); close(cb->fd); mem_cache_free(socksvc_cache, cb); } static void accept_conns(struct state *state) { fd_set set; int maxfd; int ret; int i; for (;;) { union sockaddr_union addr; unsigned len; uint64_t select_time; uint64_t accept_time; int fd; FD_ZERO(&set); maxfd = 0; for (i = 0; i < state->nfds; i++) { FD_SET(state->fds[i], &set); maxfd = MAX(maxfd, state->fds[i]); } ret = select(maxfd + 1, &set, NULL, NULL, NULL); select_time = gettime(); if (atomic_read(&server_shutdown)) break; if ((ret < 0) && (errno != EINTR)) cmn_err(CE_ERROR, "Error on select: %s", strerror(errno)); for (i = 0; (i < state->nfds) && (ret > 0); i++) { struct socksvc *cb; if (!FD_ISSET(state->fds[i], &set)) continue; len = sizeof(addr); fd = accept(state->fds[i], (struct sockaddr *) &addr, &len); if (fd == -1) { cmn_err(CE_INFO, "Failed to accept from fd %d: %s", state->fds[i], strerror(errno)); continue; } accept_time = gettime(); cb = mem_cache_alloc(socksvc_cache); if (!cb) { cmn_err(CE_INFO, "Failed to allocate cb data"); xclose(fd); continue; } cb->stats.selected_time = select_time; cb->stats.accepted_time = accept_time; cb->state = state; cb->fd = fd; ret = taskq_dispatch(state->taskq, wrap_taskq_callback, cb); if (ret) { cmn_err(CE_ERROR, "Failed to dispatch conn: %s", xstrerror(ret)); free(cb); close(fd); } /* XXX: should this be executed every time we loop? */ ret--; } } } int socksvc(const char *host, uint16_t port, int nthreads, void (*func)(int fd, struct socksvc_stats *, void *), void *private) { char name[128]; struct state state; int ret; memset(&state, 0, sizeof(state)); state.func = func; state.private = private; snprintf(name, sizeof(name), "socksvc: %s:%d", host ? host : "<any>", port); state.taskq = taskq_create_fixed(name, nthreads); if (IS_ERR(state.taskq)) { ret = PTR_ERR(state.taskq); goto err; } handle_signals(); ret = start_listening(&state, host, port); if (ret) goto err_taskq; accept_conns(&state); stop_listening(&state); taskq_wait(state.taskq); ret = 0; err_taskq: taskq_destroy(state.taskq); err: return ret; }