From 7a1985a0f05564fd7c381e828ff134836f3e3253 Mon Sep 17 00:00:00 2001 From: Sergey Poznyakoff Date: Fri, 17 Apr 2020 13:05:42 +0300 Subject: Limit number of simultaneous tcpmux connections. Impose idle connection timeouts. * src/config.c: New statements: max-connections and idle-timeout * src/net.c (max_connections,idle_timeout): New globals. (handle_connection): Take single FILE* as argument. (connection_start,connection_stop): New functions. (wy_thr_tcpmux,wy_thr_connection_watcher): New functions. (wy_thr_listen): Start wy_thr_connection_watcher thread if tcpmux listener is enabled. Start a separate thread to process each incoming connection. * src/wydawca.h (max_connections,idle_timeout): New externs. (wy_thr_tcpmux,wy_thr_connection_watcher): New protos. --- src/config.c | 8 ++- src/net.c | 195 +++++++++++++++++++++++++++++++++++++++++++++++++++------- src/triplet.c | 1 - src/wydawca.c | 1 - src/wydawca.h | 4 ++ 5 files changed, 185 insertions(+), 24 deletions(-) diff --git a/src/config.c b/src/config.c index 7d0e760..a7d4312 100644 --- a/src/config.c +++ b/src/config.c @@ -1489,7 +1489,13 @@ static struct grecs_keyword wydawca_kw[] = { { "listen", N_("socket"), N_("Listen on this address"), grecs_type_sockaddr, GRECS_DFLT, &listen_sockaddr, }, - + { "max-connections", NULL, + N_("maximum allowed number of simultaneous connections"), + grecs_type_size, GRECS_DFLT, &max_connections }, + { "idle-timeout", N_("interval"), + N_("Idle timeout for a TXPMUX connection"), + grecs_type_string, GRECS_CONST, &idle_timeout, 0, cb_interval }, + #ifdef WITH_LIBWRAP { "tcp-wrapper", NULL, N_("Configure TCP wrappers"), diff --git a/src/net.c b/src/net.c index 3f8d291..f0869dc 100644 --- a/src/net.c +++ b/src/net.c @@ -16,6 +16,10 @@ #include "wydawca.h" +struct grecs_sockaddr listen_sockaddr; +size_t max_connections = 16; +time_t idle_timeout = 5; + static int open_listener() { @@ -80,39 +84,156 @@ trim_crlf(char *s) s[--len] = 0; } } + +struct wydawca_connection { + struct timespec ts; + pthread_t tid; + FILE *fp; + struct wydawca_connection *next, *prev; +}; -void -handle_connection(FILE * in, FILE * out) +struct wydawca_connection_queue { + struct wydawca_connection *head, *tail; +}; + +static inline void +wydawca_connection_enqueue(struct wydawca_connection_queue *q, + struct wydawca_connection *conn) +{ + conn->next = NULL; + conn->prev = q->tail; + if (q->tail) + q->tail->next = conn; + else + q->head = conn; + q->tail = conn; +} + +static inline void +wydawca_connection_dequeue(struct wydawca_connection_queue *q, + struct wydawca_connection *conn) +{ + if (conn->prev) + conn->prev->next = conn->next; + else + q->head = conn->next; + if (conn->next) + conn->next->prev = conn->prev; + else + q->tail = conn->prev; + conn->prev = conn->next = NULL; +} + +static struct wydawca_connection *conn_table; +static struct wydawca_connection_queue conn_avail, conn_idle; +static pthread_mutex_t conn_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t conn_cond = PTHREAD_COND_INITIALIZER; + +static void +conn_table_init(void) +{ + size_t i; + + conn_table = grecs_calloc(max_connections, sizeof(conn_table[0])); + for (i = 1; i < max_connections; i++) { + wydawca_connection_enqueue(&conn_avail, &conn_table[i]); + } +} + +static void +connection_start(int fd) +{ + struct wydawca_connection *conn; + + pthread_mutex_lock(&conn_mutex); + if (conn_avail.head) { + conn = conn_avail.head; + wydawca_connection_dequeue(&conn_avail, conn); + clock_gettime(CLOCK_REALTIME, &conn->ts); + conn->ts.tv_sec += idle_timeout; + conn->fp = fdopen(fd, "w+"); + pthread_create(&conn->tid, NULL, wy_thr_tcpmux, conn); + wydawca_connection_enqueue(&conn_idle, conn); + pthread_cond_broadcast(&conn_cond); + } else { + wy_log(LOG_ERR, "connection table is full"); + close(fd); + } + pthread_mutex_unlock(&conn_mutex); +} + +static void +connection_unidle(struct wydawca_connection *conn) +{ + pthread_mutex_lock(&conn_mutex); + fclose(conn->fp); + conn->fp = NULL; + wydawca_connection_dequeue(&conn_idle, conn); + pthread_cond_broadcast(&conn_cond); + pthread_mutex_unlock(&conn_mutex); +} + +static void +connection_stop(struct wydawca_connection *conn) +{ + pthread_mutex_lock(&conn_mutex); + if (conn->fp) { + fclose(conn->fp); + conn->fp = NULL; + wydawca_connection_dequeue(&conn_idle, conn); + pthread_cond_broadcast(&conn_cond); + } + wydawca_connection_enqueue(&conn_avail, conn); + pthread_mutex_unlock(&conn_mutex); +} + +static inline int +timespec_cmp(struct timespec const *a, struct timespec const *b) +{ + if (a->tv_sec > b->tv_sec) + return 1; + if (a->tv_sec < b->tv_sec) + return -1; + if (a->tv_nsec > b->tv_nsec) + return 1; + if (a->tv_nsec < b->tv_nsec) + return -1; + return 0; +} + +static void +handle_connection(struct wydawca_connection *conn) { char *buf = NULL; size_t buflen = 0; struct spool *spool; char *p; - if (grecs_getline(&buf, &buflen, in) <= 0) + if (grecs_getline(&buf, &buflen, conn->fp) <= 0) return; trim_crlf(buf); wy_debug(1, ("recv: %s", buf)); spool = wydawca_find_spool(buf); if (!spool) { if (all_spool_aliases && grecs_list_locate(all_spool_aliases, buf)) - fprintf(out, "+ OK, all spools\r\n"); + fprintf(conn->fp, "+ OK, all spools\r\n"); else { - fprintf(out, "- Unknown service name\r\n"); + fprintf(conn->fp, "- Unknown service name\r\n"); free(buf); return; } } else if (spool->url) - fprintf(out, "+ OK, URL %s\r\n", spool->url); + fprintf(conn->fp, "+ OK, URL %s\r\n", spool->url); else - fprintf(out, "+ OK, spool %s\r\n", spool->tag); + fprintf(conn->fp, "+ OK, spool %s\r\n", spool->tag); - if (grecs_getline(&buf, &buflen, in) < 0) { + if (grecs_getline(&buf, &buflen, conn->fp) < 0) { wy_log(LOG_ERR, "protocol error"); free(buf); return; } - + connection_unidle(conn); + trim_crlf(buf); wy_debug(1, ("recv: %s", buf)); @@ -135,6 +256,42 @@ handle_connection(FILE * in, FILE * out) free(buf); } +void * +wy_thr_tcpmux(void *ptr) +{ + struct wydawca_connection *conn = ptr; + pthread_cleanup_push((void (*)(void*))connection_stop, conn); + setlinebuf(conn->fp); + handle_connection(conn); + pthread_cleanup_pop(1); + return NULL; +} + +void * +wy_thr_connection_watcher(void *ptr) +{ + pthread_mutex_lock(&conn_mutex); + while (1) { + if (conn_idle.head) { + struct timespec ts; + + pthread_cond_timedwait(&conn_cond, &conn_mutex, + &conn_idle.head->ts); + if (conn_idle.head) { + clock_gettime(CLOCK_REALTIME, &ts); + if (timespec_cmp(&ts, &conn_idle.head->ts) >= 0) { + void *ret; + pthread_cancel(conn_idle.head->tid); + pthread_mutex_unlock(&conn_mutex); + pthread_join(conn_idle.head->tid, &ret); + pthread_mutex_lock(&conn_mutex); + } + } + } else + pthread_cond_wait(&conn_cond, &conn_mutex); + } +} + static inline int notify_parent(void) { @@ -149,9 +306,12 @@ wy_thr_listen(void *ptr) int wfd = watcher_init(); int maxfd = 0; - if (ctlfd != -1) + if (ctlfd != -1) { + pthread_t tid; + pthread_create(&tid, NULL, wy_thr_connection_watcher, NULL); maxfd = ctlfd; - + } + if (wfd != -1 && wfd > maxfd) maxfd = wfd; @@ -162,6 +322,7 @@ wy_thr_listen(void *ptr) exit(EX_CONFIG); } + conn_table_init(); if (notify_parent()) kill(getppid(), SIGUSR1); @@ -191,14 +352,13 @@ wy_thr_listen(void *ptr) if (ctlfd != -1 && FD_ISSET(ctlfd, &rset)) { int fd; - FILE *in, *out; union { struct sockaddr sa; struct sockaddr_in s_in; struct sockaddr_un s_un; } addr; socklen_t len; - + len = sizeof(addr); fd = accept(ctlfd, (struct sockaddr *) &addr, &len); if (fd == -1) @@ -210,14 +370,7 @@ wy_thr_listen(void *ptr) continue; } #endif - - in = fdopen(fd, "r"); - setlinebuf(in); - out = fdopen(fd, "w"); - setlinebuf(out); - handle_connection(in, out); - fclose(in); - fclose(out); + connection_start(fd); } } return NULL; diff --git a/src/triplet.c b/src/triplet.c index 797a09e..a82ac88 100644 --- a/src/triplet.c +++ b/src/triplet.c @@ -556,7 +556,6 @@ void * wy_thr_triplet(void *ptr) { struct wy_triplet *trp = ptr; - timer_start("wydawca"); spool_create_timers(); triplet_commit(trp); diff --git a/src/wydawca.c b/src/wydawca.c index 240eca1..5c7d5d7 100644 --- a/src/wydawca.c +++ b/src/wydawca.c @@ -52,7 +52,6 @@ time_t wakeup_interval; struct grecs_list *all_spool_aliases; char *wy_gpg_homedir; char *default_check_script; -struct grecs_sockaddr listen_sockaddr; unsigned min_directive_version = MIN_DIRECTIVE_VERSION; unsigned max_directive_version = MAX_DIRECTIVE_VERSION; diff --git a/src/wydawca.h b/src/wydawca.h index 476f77c..c671429 100644 --- a/src/wydawca.h +++ b/src/wydawca.h @@ -327,6 +327,8 @@ extern int wy_mode; extern int foreground; extern int single_process; extern struct grecs_sockaddr listen_sockaddr; +extern size_t max_connections; +extern time_t idle_timeout; extern struct grecs_list *all_spool_aliases; @@ -566,3 +568,5 @@ FILE *fopenat_ro(int dirfd, char const *name); void *wy_thr_triplet(void *ptr); void *wy_thr_cleaner(void *ptr); void wy_triplet_wait(void); +void *wy_thr_tcpmux(void *ptr); +void *wy_thr_connection_watcher(void *ptr); -- cgit v1.2.1