From da4f9073102fdbddc78acca0329de6740eb32064 Mon Sep 17 00:00:00 2001 From: Sergey Poznyakoff Date: Sun, 13 Jun 2021 23:19:06 +0300 Subject: Multithread implementation --- mockmta.c | 365 ++++++++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 308 insertions(+), 57 deletions(-) diff --git a/mockmta.c b/mockmta.c index 8c276d6..3a68053 100644 --- a/mockmta.c +++ b/mockmta.c @@ -3,7 +3,7 @@ mockmta - mock MTA server for use in test suites SYNOPSIS - mockmta [-d] [-c CERT] [-f CA] [-k KEY] [-p PORT] [-t SEC] MAILBOX + mockmta [-d] [-c CERT] [-a CA] [-k KEY] [-p PORT] [-t SEC] MAILBOX DESCRIPTION Starts a mock MTA, which behaves almost identically to the real one, @@ -25,26 +25,25 @@ } When run as a daemon, mockmta starts listening on localhost port - PORT (default 25). Incoming SMTP sessions are processed sequentially. - Listening for incoming requests is blocked while an SMTP session is - active. + PORT (default 25). To support TLS, the program must be compiled with the GnuTLS library. To do so, make sure the library and its headers are properly installed and: - cc -omockmta -DWITH_TLS mockmta.c -lgnutls + cc -omockmta -opthread -DWITH_TLS mockmta.c -lgnutls To enable the STARTTLS ESMTP command, supply the names of the certificate (-c CERT) and certificate key (-k KEY) files. OPTIONS + -a CA Name of certificate authority file. -c CERT Name of the certificate file. - -d Daemon mode - -f CA Name of certificate authority file. + -d Daemon mode. + -f Remain in foreground (implies -d). -k KEY Name of the certificate key file. -p PORT Listen on this port. - -t SEC Reserved for future use. + -t SEC Set SMTP timeout. EXIT CODES 0 Success. @@ -89,13 +88,31 @@ #include #include #include +#include +#include +#include char *progname; char *mailbox_name; int daemon_opt; -int idle_timeout; +int smtp_timeout = 5*60; int port = 25; + int msgid = 1; +pthread_mutex_t msgid_mutex = PTHREAD_MUTEX_INITIALIZER; + +#ifndef _PATH_DEVNULL +# define _PATH_DEVNULL "/dev/null" +#endif + +enum + { + SMTP_IN, + SMTP_OUT + }; + +void smtp_timer_enqueue (pthread_t tid, int state); +void smtp_timer_dequeue (pthread_t tid); enum { @@ -173,14 +190,6 @@ nomemory (void) exit (EX_FAILURE); } -static volatile int io_timed_out; - -static void -sigalrm (int sig) -{ - io_timed_out = 1; -} - struct iodrv { int (*drv_read) (void *, char *, size_t, size_t *); @@ -374,8 +383,7 @@ iobase_readln (struct iobase *bp, char *buf, size_t size) int cr_seen = 0; size--; - if (idle_timeout) - alarm (idle_timeout); + smtp_timer_enqueue (pthread_self (), SMTP_IN); while (len < size) { int c = iobase_data_getc (bp); @@ -383,7 +391,7 @@ iobase_readln (struct iobase *bp, char *buf, size_t size) { if (bp->iob_eof) break; - if (iobase_fill (bp) || io_timed_out) + if (iobase_fill (bp)) break; continue; } @@ -407,13 +415,7 @@ iobase_readln (struct iobase *bp, char *buf, size_t size) if (c == '\n') break; } - if (idle_timeout) - alarm (0); - if (io_timed_out) - { - bp->iob_errno = ETIMEDOUT; - return -1; - } + smtp_timer_dequeue (pthread_self ()); if (len == 0 && bp->iob_errno) return -1; buf[len] = 0; @@ -450,7 +452,8 @@ static ssize_t iobase_writeln (struct iobase *bp, char *buf, size_t size) { size_t len = 0; - + + smtp_timer_enqueue (pthread_self (), SMTP_OUT); while (size) { char *p = memchr (buf, '\n', size); @@ -464,6 +467,7 @@ iobase_writeln (struct iobase *bp, char *buf, size_t size) len += rc; size -= rc; } + smtp_timer_dequeue (pthread_self ()); if (len == 0 && bp->iob_errno) return -1; return len; @@ -556,6 +560,10 @@ set_tls_opt (int c) { switch (c) { + case 'a': + tls_cafile = optarg; + break; + case 'c': tls_cert = optarg; break; @@ -564,10 +572,6 @@ set_tls_opt (int c) tls_key = optarg; break; - case 'f': - tls_cafile = optarg; - break; - default: return 1; } @@ -859,6 +863,7 @@ struct smtp { enum smtp_state state; struct iobase *iob; + unsigned sid; char buf[IOBUFSIZE]; char *arg; int capa_mask; @@ -898,7 +903,7 @@ smtp_io_mlsend (struct iobase *iob, int code, char const **av) char buf[IOBUFSIZE]; size_t n; int i; - + snprintf (buf, sizeof buf, "%3d", code); for (i = 0; av[i]; i++) { @@ -1328,8 +1333,7 @@ smtp_data (struct smtp *smtp) terror ("can't unlock %s: %s", mailbox_name, strerror (errno)); fclose (fp); - smtp_io_send (smtp->iob, 250, "%04d Message accepted for delivery", msgid); - msgid++; + smtp_io_send (smtp->iob, 250, "%x Message accepted for delivery", smtp->sid); return res; } @@ -1403,6 +1407,10 @@ do_smtp (int ifd, int ofd) smtp.data_buf = NULL; smtp.data_len = 0; smtp.data_size = 0; + + pthread_mutex_lock (&msgid_mutex); + smtp.sid = msgid++; + pthread_mutex_unlock (&msgid_mutex); smtp_io_send (smtp.iob, 220, "Ready"); while (smtp.state != STATE_QUIT) @@ -1452,6 +1460,151 @@ do_smtp (int ifd, int ofd) smtp_end (&smtp); } +struct smtp_timer +{ + pthread_t tid; + int state; + struct timespec wakeup_time; + struct smtp_timer *prev, *next; +}; + +struct smtp_timer *smtp_timer_head, *smtp_timer_tail; +static pthread_mutex_t smtp_timer_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t smtp_timer_cond = PTHREAD_COND_INITIALIZER; + +static inline int +timespec_cmp (struct timespec const *a, struct timespec const *b) +{ + if (a->tv_sec < b->tv_sec) + return -1; + if (a->tv_sec > b->tv_sec) + return 1; + if (a->tv_nsec < b->tv_nsec) + return -1; + if (a->tv_nsec > b->tv_nsec) + return 1; + return 0; +} + +void +smtp_timer_enqueue (pthread_t tid, int state) +{ + struct smtp_timer *timer, *p; + + timer = malloc (sizeof (timer[0])); + if (!timer) + nomemory (); + timer->tid = tid; + timer->state = state; + clock_gettime (CLOCK_REALTIME, &timer->wakeup_time); + timer->wakeup_time.tv_sec += smtp_timeout; + + pthread_mutex_lock (&smtp_timer_mutex); + for (p = smtp_timer_head; p; p = p->next) + { + if (timespec_cmp (&timer->wakeup_time, &p->wakeup_time) < 0) + break; + } + if (p) + { + timer->prev = p->prev; + if (p == smtp_timer_head) + smtp_timer_head = timer; + else + p->prev->next = timer; + + p->prev = timer; + timer->next = p; + } + else + { + timer->next = NULL; + timer->prev = smtp_timer_tail; + if (smtp_timer_tail) + smtp_timer_tail->next = timer; + else + smtp_timer_head = timer; + smtp_timer_tail = timer; + } + + if (timer == smtp_timer_head) + pthread_cond_broadcast (&smtp_timer_cond); + + pthread_mutex_unlock (&smtp_timer_mutex); +} + +void +smtp_timer_unlink (struct smtp_timer *timer) +{ + if (timer->prev) + timer->prev->next = timer->next; + else + smtp_timer_head = timer->next; + + if (timer->next) + timer->next->prev = timer->prev; + else + smtp_timer_tail = timer->prev; +} + +void +smtp_timer_dequeue (pthread_t tid) +{ + struct smtp_timer *p; + pthread_mutex_lock (&smtp_timer_mutex); + for (p = smtp_timer_head; p; p = p->next) + { + if (p->tid == tid) + { + smtp_timer_unlink (p); + free (p); + break; + } + } + pthread_mutex_unlock (&smtp_timer_mutex); +} + +void * +thr_watcher (void *ptr) +{ + pthread_mutex_lock (&smtp_timer_mutex); + while (1) + { + struct smtp_timer *timer = smtp_timer_head; + + if (!timer) + { + pthread_cond_wait (&smtp_timer_cond, &smtp_timer_mutex); + continue; + } + + switch (pthread_cond_timedwait (&smtp_timer_cond, + &smtp_timer_mutex, + &timer->wakeup_time)) + { + case 0: + /* Condition signalled: timer list has been updated. Restart. */ + continue; + + case ETIMEDOUT: + /* Wakeup time is reached */ + break; + + default: + /* Should not happen */ + terror ("unexpected error from pthread_cond_timedwait: %m"); + exit (EX_FAILURE); + } + + /* Thread I/O timed out. Terminate the thread. */ + pthread_cancel (timer->tid); + smtp_timer_unlink (timer); + free (timer); + } + pthread_mutex_unlock (&smtp_timer_mutex); + return NULL; +} + static int mta_open (int port) { @@ -1489,36 +1642,73 @@ mta_open (int port) return fd; } -void -mta_run (int fd) +static void +smtp_cleanup (void *ptr) { + int fd = *(int*) ptr; + close (fd); +} + +void * +thr_smtp (void *ptr) +{ + int fd = *(int*) ptr; + pthread_cleanup_push (smtp_cleanup, ptr); + do_smtp (fd, fd); + pthread_cleanup_pop (1); + return NULL; +} + +void * +thr_mta_listener (void *ptr) +{ + int fd = *(int*) ptr; + pthread_attr_t attr; + + pthread_attr_init (&attr); + pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); + while (1) { int sfd; struct sockaddr_in remote_addr; socklen_t len = sizeof (remote_addr); - pid_t pid; + pthread_t tid; if ((sfd = accept (fd, (struct sockaddr *) &remote_addr, &len)) < 0) { terror ("accept: %m"); exit (EX_FAILURE); } - - do_smtp (sfd, sfd); - close (sfd); + + pthread_create (&tid, &attr, thr_smtp, &sfd); } + return NULL; } +static int fatal_signals[] = { + SIGHUP, + SIGINT, + SIGQUIT, + SIGTERM, + 0 +}; + +static void +signull(int sig) +{ +} + int main (int argc, char **argv) { int c; int fd; + int foreground = 0; progname = argv[0]; - while ((c = getopt (argc, argv, "dc:f:k:p:t:")) != EOF) + while ((c = getopt (argc, argv, "a:dc:fk:p:t:")) != EOF) { switch (c) { @@ -1526,11 +1716,22 @@ main (int argc, char **argv) daemon_opt = 1; break; + case 'f': + daemon_opt = 1; + foreground = 1; + break; + case 'p': port = atoi (optarg); break; case 't': + smtp_timeout = atoi (optarg); + if (smtp_timeout <= 0) + { + terror ("invalid timeout value"); + exit (EX_USAGE); + } break; default: @@ -1557,27 +1758,77 @@ main (int argc, char **argv) if (daemon_opt) { - pid_t pid; - + struct sigaction act; + sigset_t sigs; + int i; + pthread_t tid; + fd = mta_open (port); - pid = fork (); - if (pid == -1) + if (!foreground) { - terror ("fork: %m"); - exit (EX_FAILURE); - } - if (pid) - { - return 0; + switch (fork()) + { + case -1: + terror ("daemon: %m"); + exit (EX_FAILURE); + + case 0: + break; + + default: + _exit (0); + } + + if (setsid() == -1) + { + terror ("setsid: %m"); + exit (EX_FAILURE); + } + + chdir("/"); + + close(0); + close(1); + close(2); + open(_PATH_DEVNULL, O_RDONLY); + open(_PATH_DEVNULL, O_WRONLY); + dup(1); + + /* Set up logging */ + openlog (progname, LOG_PID, LOG_MAIL); + terror = terror_syslog; } - else + + /* Set up signal handling */ + sigemptyset (&sigs); + + act.sa_flags = 0; + sigemptyset (&act.sa_mask); + act.sa_handler = signull; + + for (i = 0; fatal_signals[i]; i++) { - /* child */ - openlog (progname, LOG_PID, LOG_MAIL); - terror = terror_syslog; - mta_run (fd); + sigaddset (&sigs, fatal_signals[i]); + sigaction (fatal_signals[i], &act, NULL); } + sigaddset (&sigs, SIGPIPE); + sigaddset (&sigs, SIGALRM); + sigaddset (&sigs, SIGCHLD); + pthread_sigmask (SIG_BLOCK, &sigs, NULL); + + pthread_create (&tid, NULL, thr_mta_listener, &fd); + pthread_create (&tid, NULL, thr_watcher, NULL); + + /* Unblock only the fatal signals */ + sigemptyset (&sigs); + for (i = 0; fatal_signals[i]; i++) + sigaddset (&sigs, fatal_signals[i]); + + pthread_sigmask (SIG_UNBLOCK, &sigs, NULL); + + /* Wait for signal to arrive */ + sigwait (&sigs, &i); } else do_smtp (0, 1); -- cgit v1.2.1