diff options
author | Sergey Poznyakoff <gray@gnu.org.ua> | 2009-11-21 23:13:13 +0200 |
---|---|---|
committer | Sergey Poznyakoff <gray@gnu.org.ua> | 2009-11-21 23:13:13 +0200 |
commit | 7f4dfb88f23a14f0b9603e648d4a1a459a6f26a3 (patch) | |
tree | 2beb9dee6a3a93130bd7f783a62afa20ed14e853 | |
parent | d20b648b02aa1f8caf9975a116c54e05d67d1c39 (diff) | |
download | mailfromd-7f4dfb88f23a14f0b9603e648d4a1a459a6f26a3.tar.gz mailfromd-7f4dfb88f23a14f0b9603e648d4a1a459a6f26a3.tar.bz2 |
Use callout resolver to handle timed-out callouts.mtax-cleanup
* mfd/savclt.c: New file.
* mfd/Makefile.am (mailfromd_SOURCES): Add savclt.c
* mfd/callout.c (transcript): Get ID as 1st arg.
Remove static qualifier. All callers updated.
* mfd/engine.c (method_strict)
(method_standard): If callout returned mf_temp_failure,
try to pass the task to the callout server, if one is
defined.
* mfd/mailfromd.h (transcript)
(schedule_callout): New protos.
(callout_server_sa, callout_server_sa_len): New externs.
* mfd/main.c (force_remove): Initialize to 0 (see srvman.c)
(server_config_stmt): New statements single-process and
reuseaddr.
(add_legacy_milter_port, server_section_parser): Pass
flags to mfd_server_new.
(mf_cfg_param): New statement `callout-url'.
* mfd/savsrv.c (MF_SOURCE_NAME): Fixed.
(verify, callout_session_server): Set proctitle.
(callout_session_server): Fix memory leak.
* mfd/srvman.c (struct mfd_server): New member `flags'.
(mfd_server_new): Take flags as 4th argument.
(server_run): Use flags to set single-user and reuseaddr
modes.
* mfd/srvman.h (SRV_SINGLE_PROCESS)
(SRV_KEEP_EXISTING): New defines.
(mfd_server_new): Change signature.
(srvman_url_to_sockaddr): New proto.
-rw-r--r-- | mfd/Makefile.am | 1 | ||||
-rw-r--r-- | mfd/callout.c | 10 | ||||
-rw-r--r-- | mfd/engine.c | 6 | ||||
-rw-r--r-- | mfd/mailfromd.h | 15 | ||||
-rw-r--r-- | mfd/main.c | 55 | ||||
-rw-r--r-- | mfd/savclt.c | 275 | ||||
-rw-r--r-- | mfd/savsrv.c | 14 | ||||
-rw-r--r-- | mfd/srvman.c | 11 | ||||
-rw-r--r-- | mfd/srvman.h | 7 |
9 files changed, 364 insertions, 30 deletions
diff --git a/mfd/Makefile.am b/mfd/Makefile.am index 962c22a0..c68afcff 100644 --- a/mfd/Makefile.am +++ b/mfd/Makefile.am @@ -72,6 +72,7 @@ mailfromd_SOURCES = \ pragma.c\ prog.c\ prog.h\ + savclt.c\ savsrv.c\ srvman.c\ srvman.h\ diff --git a/mfd/callout.c b/mfd/callout.c index c028838e..acecb26f 100644 --- a/mfd/callout.c +++ b/mfd/callout.c @@ -129,8 +129,8 @@ smtp_io_free(struct smtp_io_data *iop) free(iop); } -static void -transcript(struct smtp_io_data *io, char *prefix, const char *msg) +void +transcript(const char *id, char *prefix, const char *msg) { if (do_transcript) { int len = strlen(msg); @@ -141,7 +141,7 @@ transcript(struct smtp_io_data *io, char *prefix, const char *msg) } if (len) logmsg(LOG_INFO, "%s: %s %*.*s", - io->id, + id, prefix, len, len, msg); } } @@ -216,7 +216,7 @@ smtp_send(struct smtp_io_data *iop, const char *command) init_timeout_ctl (&tctl, io_timeout); iop->reply = NULL; /* Clear reply for logging purposes */ - transcript(iop, "SEND:", command); + transcript(iop->id, "SEND:", command); do { size_t nb; int rc; @@ -343,7 +343,7 @@ smtp_recv(struct smtp_io_data *iop, enum smtp_timeout to) int rc = smtp_recvline(iop, to); if (rc) return -1; - transcript(iop, "RECV:", iop->reply); + transcript(iop->id, "RECV:", iop->reply); code = strtoul(iop->reply, &p, 0); if (p - iop->reply != 3 || (*p != '-' && *p != ' ')) { mu_error(_("unexpected reply from server: %s"), diff --git a/mfd/engine.c b/mfd/engine.c index e1f2de06..c9741c1b 100644 --- a/mfd/engine.c +++ b/mfd/engine.c @@ -271,6 +271,7 @@ mfl_smtp_io_callback(void *data, const char *key, const char *value) set_last_poll_recv(env, value); } + /* Method "strict". Verifies whether EMAIL is understood either by host CLIENT_ADDR or one of MX servers of its domain */ mf_status @@ -299,6 +300,8 @@ method_strict(eval_environ_t env, char *email, char *client_addr, if (mf_resolved(rc)) cache_insert2(email, client_addr, rc); + else if (rc == mf_temp_failure) + schedule_callout(email, ehlo, mailfrom, client_addr); } else { set_last_poll_helo(env, ""); set_last_poll_greeting(env, ""); @@ -330,10 +333,13 @@ method_standard(eval_environ_t env, char *email, char *ehlo, char *mailfrom) smtp_timeout_soft, mfl_smtp_io_callback, env); + smtp_io_setup_callout(io, email, ehlo, mailfrom); rc = callout_standard(io); smtp_io_free(io); if (mf_resolved(rc)) cache_insert(email, rc); + else if (rc == mf_temp_failure) + schedule_callout(email, ehlo, mailfrom, NULL); } else { set_last_poll_helo(env, ""); set_last_poll_greeting(env, ""); diff --git a/mfd/mailfromd.h b/mfd/mailfromd.h index 5df54636..74cf1079 100644 --- a/mfd/mailfromd.h +++ b/mfd/mailfromd.h @@ -986,10 +986,6 @@ void milter_settimeout(time_t t); int xeval(eval_environ_t env, enum smtp_state tag); mf_status listens_on (const char *client_addr, int port, time_t timeout); -mf_status check_on_host(eval_environ_t env, char *email, char *client_addr, - char *ehlo, char *mailfrom); -mf_status check_mx_records(eval_environ_t env, char *email, char *client_addr, - char *ehlo, char *mailfrom, int *pcount); mf_status method_strict(eval_environ_t env, char *email, char *client_addr, char *ehlo, char *mailfrom); mf_status method_standard(eval_environ_t env, char *email, char *ehlo, @@ -1147,9 +1143,18 @@ mf_status callout_host(struct smtp_io_data *, const char *); mf_status callout_mx(struct smtp_io_data *, const char *, int *); mf_status callout_strict(struct smtp_io_data *, const char *); mf_status callout_standard(struct smtp_io_data *); - + +void transcript(const char *id, char *prefix, const char *msg); + /* savsrv.c */ int callout_session_server(const char *id, int fd, struct sockaddr const *sa, socklen_t len, void *server_data, void *srvman_data); +/* savclt.c */ +void schedule_callout(const char *email, const char *ehlo, + const char *mailfrom, + const char *client_addr); +extern struct sockaddr *callout_server_sa; +extern socklen_t callout_server_sa_len; + @@ -70,9 +70,9 @@ int do_trace; /* Enable tracing configuration */ int mtasim_option; /* mtasim compatibility mode */ unsigned optimization_level = 1; /* Optimization level */ int log_to_stderr; /* Use stderr for logging */ -int force_remove = 1; /* Remove local communication socket if it already - exists */ int foreground; /* Stay in foreground */ +int force_remove; /* Remove local communication socket if it already + exists */ int single_process_option; /* Run in single process mode. */ unsigned long source_address = INADDR_ANY; /* Source address for TCP connections */ @@ -622,7 +622,8 @@ add_legacy_milter_port(const char *str, mu_debug_t dbg) url = parse_milter_url(dbg, str); if (url) { mfd_server_t srv = mfd_server_new(id, url, - milter_session_server); + milter_session_server, + 0); if (srv) mfd_srvman_attach_server(srv); else @@ -1381,7 +1382,6 @@ parse_opt(int key, char *arg, struct argp_state *state) break; case 'r': - /*FIXME*/ force_remove = 1; break; @@ -2099,6 +2099,8 @@ struct server_config_stmt { mu_acl_t acl; size_t max_children; mfd_server_func_t server; + int single_process; + int reuseaddr; }; static struct server_config_stmt server_config_stmt; @@ -2126,6 +2128,12 @@ struct mu_cfg_param server_section_param[] = { &server_config_stmt.max_children, 0, NULL, N_("Maximum number of instances allowed for this server.") }, + { "single-process", mu_cfg_bool, + &server_config_stmt.single_process, 0, NULL, + N_("Single-process mode.") }, + { "reuseaddr", mu_cfg_bool, + &server_config_stmt.reuseaddr, 0, NULL, + N_("Reuse existing socket (default).") }, { "acl", mu_cfg_section, &server_config_stmt.acl }, { NULL } @@ -2141,6 +2149,7 @@ server_section_parser(enum mu_cfg_section_stage stage, switch (stage) { case mu_cfg_section_start: memset(&server_config_stmt, 0, sizeof(server_config_stmt)); + server_config_stmt.reuseaddr = 1; if (mu_cfg_assert_value_type (node->label, MU_CFG_STRING, tree->debug)) return 1; @@ -2160,10 +2169,17 @@ server_section_parser(enum mu_cfg_section_stage stage, if (!server_config_stmt.id) server_config_stmt.id = next_server_id(); if (server_config_stmt.url && server_config_stmt.server) { - mfd_server_t srv = - mfd_server_new(server_config_stmt.id, - server_config_stmt.url, - server_config_stmt.server); + int flags = 0; + mfd_server_t srv; + + if (server_config_stmt.single_process) + flags |= SRV_SINGLE_PROCESS; + if (!server_config_stmt.reuseaddr) + flags |= SRV_KEEP_EXISTING; + srv = mfd_server_new(server_config_stmt.id, + server_config_stmt.url, + server_config_stmt.server, + flags); if (srv) { mfd_server_set_max_children(srv, server_config_stmt.max_children); @@ -2202,6 +2218,22 @@ cb_milter_listen(mu_debug_t err, void *data, mu_config_value_t *arg) } +static int +cb_callout_url(mu_debug_t err, void *data, mu_config_value_t *arg) +{ + mu_url_t url; + if (mu_cfg_assert_value_type(arg, MU_CFG_STRING, err)) + return 1; + url = parse_milter_url(err, arg->v.string); + if (url) { + callout_server_sa = srvman_url_to_sockaddr(url, + &callout_server_sa_len); + mu_url_destroy(&url); + } + return 0; +} + + struct mu_cfg_param mf_cfg_param[] = { { "debug", mu_cfg_callback, NULL, 0, cb_debug, N_("Set Mailfromd debug verbosity level. Argument is a comma-" @@ -2292,6 +2324,13 @@ struct mu_cfg_param mf_cfg_param[] = { N_("file: string") }, { "database", mu_cfg_section, NULL }, + + { "callout-url", mu_cfg_callback, + NULL, 0, + cb_callout_url, + N_("URL of the callout server"), + N_("url") }, + { NULL } }; diff --git a/mfd/savclt.c b/mfd/savclt.c new file mode 100644 index 00000000..447f6857 --- /dev/null +++ b/mfd/savclt.c @@ -0,0 +1,275 @@ +/* This file is part of Mailfromd. + Copyright (C) 2005, 2006, 2007, 2008, 2009 Sergey Poznyakoff + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. */ + +#define MF_SOURCE_NAME MF_SOURCE_SAVCLT +#ifdef HAVE_CONFIG_H +# include <config.h> +#endif +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> +#include <stdlib.h> +#include <stdio.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <netdb.h> + +#include "mailfromd.h" + +struct sockaddr *callout_server_sa; +socklen_t callout_server_sa_len; +time_t savclt_timeout = 3; + +/* Ideally, I'd want to use smtp_io functions here. But they are + based on Mailutils' TCP streams, which so far do not support + UNIX sockets. So, for the time being, I use the straightforward + I/O approach. */ +int +savclt_getline(int fd, char **pbuf, size_t *pbufsize) +{ + char *buf = *pbuf; + size_t bufsize = *pbufsize; + int off = 0; +#define DELTA 128 + time_t start = time(NULL); + + if (buf == NULL) { + bufsize = DELTA; + buf = xmalloc(bufsize); + } + + do { + struct timeval tv; + int rc, flags; + time_t diff = time(NULL) - start; + + if (diff > savclt_timeout) { + errno = ETIMEDOUT; + return -1; + } + + tv.tv_sec = savclt_timeout - diff; + tv.tv_usec = 0; + flags = MU_STREAM_READY_RD; + rc = mu_fd_wait(fd, &flags, &tv); + if (rc) { + debug1(1, "mu_fd_wait: %s", mu_strerror(rc)); + errno = rc; + return -1; + } + if (!(flags & MU_STREAM_READY_RD)) { + errno = ETIMEDOUT; + return -1; + } + + if (off + 1 == bufsize) { + bufsize += DELTA; + buf = xrealloc(buf, bufsize); + } + + rc = read(fd, buf + off, 1); + if (rc == -1) + return -1; + if (rc == 0) + break; + off++; + } while (buf[off - 1] != '\n'); + + if (off + 1 == bufsize) { + bufsize++; + buf = xrealloc(buf, bufsize); + } + buf[off] = 0; + *pbuf = buf; + *pbufsize = bufsize; + transcript("savclt", "RECV:", buf); + return off; +} + +static int +send_line(int fd, const char *txt, size_t len) +{ + time_t start = time(NULL); + + while (len) { + struct timeval tv; + int rc, flags; + time_t diff = time(NULL) - start; + + if (diff > savclt_timeout) { + errno = ETIMEDOUT; + return -1; + } + + tv.tv_sec = savclt_timeout - diff; + tv.tv_usec = 0; + flags = MU_STREAM_READY_WR; + rc = mu_fd_wait(fd, &flags, &tv); + if (rc) { + debug1(1, "mu_fd_wait: %s", mu_strerror(rc)); + errno = rc; + return -1; + } + if (!(flags & MU_STREAM_READY_WR)) { + errno = ETIMEDOUT; + return -1; + } + + rc = write(fd, txt, len); + if (rc <= 0) { + if (errno == EAGAIN) + continue; + return -1; + } + txt += rc; + len -= rc; + } + return 0; +} + +int +savclt_wrtline(int fd, char **pbuf, size_t *pbufsize, + const char *fmt, ...) +{ + int rc; + va_list ap; + + va_start(ap, fmt); + rc = mu_vasnprintf(pbuf, pbufsize, fmt, ap); + va_end(ap); + if (rc == 0) { + transcript("savclt", "SEND:", *pbuf); + rc = send_line(fd, *pbuf, strlen(*pbuf)); + } + return rc; +} + + +static int +connect_callout_server() +{ + int fd; + int flags; + int rc; + + if (!callout_server_sa) + return -1; + fd = socket(callout_server_sa->sa_family, SOCK_STREAM, 0); + if (fd == -1) { + mu_error("schedule_callout: socket: %s", + mu_strerror(errno)); + return -1; + } + + flags = fcntl(fd, F_GETFL); + flags |= O_NONBLOCK; + fcntl(fd, F_SETFL, flags); + + rc = connect(fd, callout_server_sa, callout_server_sa_len); + if (rc == -1) { + if (errno == EINPROGRESS) { + struct timeval tv; + + tv.tv_sec = savclt_timeout; + tv.tv_usec = 0; + flags = MU_STREAM_READY_WR; + rc = mu_fd_wait(fd, &flags, &tv); + if (rc) { + debug1(1, "mu_fd_wait: %s", mu_strerror(rc)); + close(fd); + return -1; + } + + if (flags & MU_STREAM_READY_WR) { + socklen_t len = sizeof(flags); + rc = getsockopt(fd, SOL_SOCKET, SO_ERROR, + &flags, &len); + if (rc) { + mu_error("getsockopt: %s", + mu_strerror(rc)); + close(fd); + return -1; + } + } else + flags = ETIMEDOUT; + } else + flags = errno; + + if (flags) { + mu_error("connect: %s", mu_strerror(flags)); + close(fd); + return -1; + } + } + + if (rc) { + close(fd); + return -1; + } + + return fd; +} + +enum callout_server_state { + css_init, + css_send, + css_quit, +}; + +void +schedule_callout(const char *email, const char *ehlo, const char *mailfrom, + const char *client_addr) +{ + size_t size = 0; + char *buf = NULL; + int fd = connect_callout_server(); + enum callout_server_state state = css_init; + int rc; + + while (state != css_quit) { + if (savclt_getline(fd, &buf, &size) < 0) + break; + trimcrlf(buf); + if (!(strlen(buf) > 2 && memcmp(buf, "OK", 2) == 0 + && isspace(buf[2]))) + break; + + switch (state) { + case css_init: + if (client_addr) + rc = savclt_wrtline(fd, &buf, &size, + "SVRFY \"%s\" \"%s\" \"%s\" \"%s\"\r\n", + email, ehlo, mailfrom, client_addr); + else + rc = savclt_wrtline(fd, &buf, &size, + "VRFY \"%s\" \"%s\" \"%s\"\r\n", + email, ehlo, mailfrom); + state = (rc < 0) ? css_quit : css_send; + break; + + case css_send: + savclt_wrtline(fd, &buf, &size, "QUIT\r\n"); + break; + + case css_quit: + break; + } + } + close(fd); + free(buf); +} + diff --git a/mfd/savsrv.c b/mfd/savsrv.c index 4e90d364..34f95b43 100644 --- a/mfd/savsrv.c +++ b/mfd/savsrv.c @@ -14,7 +14,7 @@ You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#define MF_SOURCE_NAME MF_SOURCE_CALLOUT +#define MF_SOURCE_NAME MF_SOURCE_SAVSRV #ifdef HAVE_CONFIG_H # include <config.h> #endif @@ -23,11 +23,6 @@ #include <unistd.h> #include <stdlib.h> #include <stdio.h> -#include <stdarg.h> -#include <syslog.h> -#include <signal.h> -#include <pwd.h> -#include <grp.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> @@ -57,6 +52,7 @@ verify(struct vrfy_queue *qp, FILE *fp) { mf_status rc; + mf_proctitle_format("callout server: %s", smtp_io_email(qp->io)); if (qp->hostname) { rc = callout_strict(qp->io, qp->hostname); if (mf_resolved(rc)) @@ -192,6 +188,7 @@ callout_session_server(const char *id, int fd, char *buf = NULL; size_t size = 0; + mf_proctitle_format("callout server"); setvbuf(fp, NULL, _IOLBF, 0); fprintf(fp, "OK mailfromd callout server ready\r\n"); while (getline(&buf, &size, fp) >= 0) { @@ -201,6 +198,8 @@ callout_session_server(const char *id, int fd, int rc = 0; trimcrlf(buf); + debug1(2,"C: %s", buf); + if (mu_argcv_get(buf, "", NULL, &argc, &argv)) { fprintf(fp, "NO cannot parse line\r\n"); continue; @@ -218,7 +217,8 @@ callout_session_server(const char *id, int fd, break; } fclose(fp); - + free(buf); + /* Run queued verifications */ run_queue(NULL); return 0; diff --git a/mfd/srvman.c b/mfd/srvman.c index 47a5b54d..67433858 100644 --- a/mfd/srvman.c +++ b/mfd/srvman.c @@ -45,6 +45,7 @@ struct mfd_server { socklen_t salen; /* Length of the sa */ int backlog; /* Backlog value for listen(2) */ int fd; /* Socket descriptor */ + int flags; /* SRV_* flags */ mfd_server_prefork_hook_t prefork_hook; /* Pre-fork function */ mfd_server_func_t conn; /* Connection handler */ mfd_srvman_hook_t free_hook; @@ -378,7 +379,7 @@ mfd_server_shutdown(struct mfd_server *srv) } struct mfd_server * -mfd_server_new(const char *id, mu_url_t url, mfd_server_func_t conn) +mfd_server_new(const char *id, mu_url_t url, mfd_server_func_t conn, int flags) { struct mfd_server *srv; struct sockaddr *sa; @@ -394,6 +395,7 @@ mfd_server_new(const char *id, mu_url_t url, mfd_server_func_t conn) srv->salen = salen; srv->backlog = 8; srv->conn = conn; + srv->flags = flags; return srv; } @@ -496,7 +498,7 @@ server_run(int connfd, struct mfd_server *srv, if (srv->acl && check_acl(srv->id, srv->acl, sa, salen)) return; - if (single_process_option) { + if (single_process_option || (srv->flags & SRV_SINGLE_PROCESS)) { if ((!srvman_param.prefork_hook || srvman_param.prefork_hook(sa, salen, srvman_param.data) == 0) @@ -707,7 +709,8 @@ server_prep(struct mfd_server *srv, int fd) mu_error(_("%s: file %s is not a socket"), srv->id, s_un->sun_path); return 1; - } else if (force_remove) { + } else if (force_remove + || !(srv->flags & SRV_KEEP_EXISTING)) { if (unlink(s_un->sun_path)) { mu_error(_("%s: cannot unlink file %s: %s"), srv->id, s_un->sun_path, @@ -722,7 +725,7 @@ server_prep(struct mfd_server *srv, int fd) break; case AF_INET: - if (force_remove) { + if (force_remove || !(srv->flags & SRV_KEEP_EXISTING)) { t = 1; setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &t, sizeof(t)); diff --git a/mfd/srvman.h b/mfd/srvman.h index e2e63fa2..dffce48d 100644 --- a/mfd/srvman.h +++ b/mfd/srvman.h @@ -33,6 +33,9 @@ typedef int (*mfd_srvman_prefork_hook_t) (struct sockaddr const *sa, #define DEFAULT_PIDTAB_SIZE 64 #define DEFAULT_SHUTDOWN_TIMEOUT 5 +#define SRV_SINGLE_PROCESS 0x01 +#define SRV_KEEP_EXISTING 0x02 + struct srvman_param { void *data; /* Server manager data */ mu_acl_t acl; /* Global Access Control List */ @@ -48,7 +51,7 @@ extern struct srvman_param srvman_param; void mfd_server_shutdown(mfd_server_t srv); mfd_server_t mfd_server_new(const char *id, mu_url_t url, - mfd_server_func_t conn); + mfd_server_func_t conn, int flags); void mfd_server_free(mfd_server_t srv); void mfd_server_set_prefork_hook(mfd_server_t srv, mfd_server_prefork_hook_t hook); @@ -66,3 +69,5 @@ void mfd_srvman_shutdown(void); void mfd_srvman_free(void); size_t mfd_srvman_count_servers(void); void mfd_srvman_stop(void); + +struct sockaddr *srvman_url_to_sockaddr(mu_url_t url, size_t *psalen); |