From 29e4b3da0990aca9db232dc3ce0a57722ccf1cac Mon Sep 17 00:00:00 2001 From: Sergey Poznyakoff Date: Tue, 28 Apr 2009 11:08:13 +0300 Subject: Move network code into a separate file. Introduce a proof-of-concept stream server. * src/server.c: New file * src/Makefile.am (tagr_SOURCES): Add server.c * TODO: Update. * etc/upgrade.awk: Update. * gnulib.modules: Add strtoumax. * src/main.c (listen_sockaddr, recv_buffer): Remove. (decode_buffer): Remove (see server.c) (get_port): Remove. (tagr_idle): New function. (main): Remove select loop. Use server functions instead. * src/readconfig.c: New configuration file statement "server". * src/tagr.h (listen_sockaddr, rateup) (port, progname): Remove. (tagr_server_type): New enum. (register_server, open_servers, close_servers) (server_loop, tagr_idle): New protos. --- TODO | 10 +- etc/upgrade.awk | 6 +- gnulib.modules | 1 + src/Makefile.am | 1 + src/main.c | 202 ++++------------------- src/readconfig.c | 93 ++++++++++- src/server.c | 475 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/tagr.h | 23 ++- 8 files changed, 632 insertions(+), 179 deletions(-) create mode 100644 src/server.c diff --git a/TODO b/TODO index 0871b7e..9a8e27f 100644 --- a/TODO +++ b/TODO @@ -1,6 +1,14 @@ -Tagr TODO list. 2009-04-27 +Tagr TODO list. 2009-04-28 Copyright (C) 2009 Sergey Poznyakoff +* Stream-based protocol + +S: +OK tagr ready +C: SAMPLE id 1240903636 12345 3456 +S: +OK accepted +C: QUIT +S: +OK bye + * Zero-unknown option and interrupted data feeds Decide what to do if a data feed gets interrupted. diff --git a/etc/upgrade.awk b/etc/upgrade.awk index 2278758..e3bfe52 100644 --- a/etc/upgrade.awk +++ b/etc/upgrade.awk @@ -20,7 +20,11 @@ $1 == "user" || $1 == "template" || $1 == "basedir" { next } $1 == "port" { - print "listen 127.0.0.1:" $2 ";" + print "server default { + print " type udp;" + print " listen 127.0.0.1:" $2 ";" + print "}" + print "" next } $1 == "router" { diff --git a/gnulib.modules b/gnulib.modules index 7eecc00..d4887b3 100644 --- a/gnulib.modules +++ b/gnulib.modules @@ -11,6 +11,7 @@ obstack strtod strtol strtoul +strtoumax sysexits realloc version-etc diff --git a/src/Makefile.am b/src/Makefile.am index f9dcc5f..0db1eb6 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -28,6 +28,7 @@ tagr_SOURCES=\ readconfig.c\ report.c\ report.h\ + server.c\ stat.c\ tagr.h diff --git a/src/main.c b/src/main.c index 51c3b1b..181e9d2 100644 --- a/src/main.c +++ b/src/main.c @@ -26,11 +26,7 @@ #include #include #include -#include -#include -#include #include -#include #include #include #include @@ -50,7 +46,6 @@ static RETSIGTYPE sig_fatal (int); static RETSIGTYPE sig_hup (int); static RETSIGTYPE sig_child (int); -struct grecs_sockaddr listen_sockaddr; int preprocess_only = 0; int log_to_stderr = -1; int test_template_option; @@ -58,8 +53,6 @@ int lint_option; char *pidfile = TAGR_PIDFILE; unsigned update_interval = 5*60; -char i_recv_buffer[reply_size (MAXADDR)]; -char *recv_buffer = i_recv_buffer; char *configfile = TAGR_CONFIGFILE; char *html_template = TAGR_TEMPLATE; char *user; @@ -450,67 +443,6 @@ change_user () } -void -decode_buffer () -{ - int i; - Stat_reply *reply; - Stat *sp; - int child = 0; - - reply = (Stat_reply *) recv_buffer; - reply->n_addr = ntohl (reply->n_addr); - - if (reply->n_addr > MAXADDR) - { - logmsg (L_NOTICE, _("got invalid packet: n_addr = %d"), - reply->n_addr); - return; - } - reply->timestamp = ntohl (reply->timestamp); - if (verbose_level) - { - char tbuf[sizeof("2009-04-01 00:00:00")]; - strftime (tbuf, sizeof tbuf, "%Y-%m-%d %H:%M:%S", - gmtime (&reply->timestamp)); - logmsg (L_INFO, _("Received packet: %d %lu - %s"), reply->n_addr, - (unsigned long) reply->timestamp, tbuf); - } - - sp = reply->stat; - - if (!single_process_option) - { - pid_t pid = fork (); - if (pid > 0) - return; - else if (pid < 0) - logmsg (L_ERR, _("cannot fork: %s"), strerror (errno)); - else - { - signal (SIGHUP, SIG_IGN); - signal (SIGCHLD, SIG_IGN); - child = 1; - } - } - - if (open_db (TAGR_DB_WR) == 0) - { - for (i = 0; i < reply->n_addr; i++, sp++) - { - sp->in = ntohl (sp->in); - sp->out = ntohl (sp->out); - verbose (1, _("Monitor %s: %lu %lu"), sp->name, sp->in, sp->out); - report (sp, reply->timestamp); - } - close_db (); - } - - if (child) - exit (0); -} - - void read_input (const char *name) { @@ -566,27 +498,6 @@ read_input (const char *name) verbose (2, _("Finished reading `%s'"), name); } -int -get_port (char *str) -{ - int pn; - - if (isdigit (str[0])) - pn = htons (atoi (str)); - else - { - struct servent *s = getservbyname (str, "udp"); - if (s) - pn = s->s_port; - else - { - logmsg (L_ERR, _("no such service: %s"), str); - return 0; - } - } - return pn; -} - enum command { command_none, @@ -725,13 +636,36 @@ tagr_version (FILE *stream, struct argp_state *state) } +static char **save_argv; + +int +tagr_idle () +{ + switch (command) + { + case command_none: + break; + + case command_update: + rebuild (0); + command = command_none; + alarm (update_interval); + break; + + case command_reconfig: + close_servers (); + tagr_restart (save_argv); + } + return 0; +} + int main (int argc, char **argv) { int index, rc; fd_set read_fds; - char **save_argv = argv; - + + save_argv = argv; argp_program_bug_address = "<" PACKAGE_BUGREPORT ">"; argp_program_version_hook = tagr_version; @@ -816,47 +750,6 @@ main (int argc, char **argv) if (rebuild_option || import_option || list_option || read_option) exit (0); - if (listen_sockaddr.sa == NULL) - { - logmsg (L_CRIT, _("listener address is not configured")); - exit (EX_CONFIG); - } - - sockfd = socket (listen_sockaddr.sa->sa_family, - SOCK_DGRAM, 0); - if (sockfd < 0) - die (EX_OSERR, "socket: %s", strerror (errno)); - - if ((rc = fcntl (sockfd, F_GETFD, 0)) == -1 - || fcntl (sockfd, F_SETFD, rc | FD_CLOEXEC) == -1) - logmsg (L_ERR, _("cannot set close-on-exec: %s"), - strerror (errno)); - - switch (listen_sockaddr.sa->sa_family) - { - case PF_INET: - { - int yes = 1; - if (setsockopt (sockfd, SOL_SOCKET, SO_REUSEADDR, - (void *) &yes, sizeof(yes)) == -1) - logmsg (L_ERR, _("setting reuseaddr failed: %s"), - strerror (errno)); - } - break; - - case PF_UNIX: - { - struct sockaddr_un *s_un = - (struct sockaddr_un *) listen_sockaddr.sa; - if (unlink (s_un->sun_path)) - logmsg (L_ERR, _("cannot remove socket %s: %s"), - s_un->sun_path, strerror (errno)); - } - } - - if (bind (sockfd, listen_sockaddr.sa, listen_sockaddr.len) < 0) - die (EX_OSERR, "bind: %s", strerror (errno)); - if (save_argv[0][0] != '/') { logmsg (L_ERR, @@ -895,46 +788,13 @@ main (int argc, char **argv) logmsg (L_INFO, _("%s started"), program_version); + open_servers (); + command = command_none; alarm (update_interval); - for (;;) - { - switch (command) - { - case command_none: - break; - - case command_update: - rebuild (0); - command = command_none; - alarm (update_interval); - break; - - case command_reconfig: - tagr_restart (save_argv); - } - - FD_ZERO (&read_fds); - FD_SET (sockfd, &read_fds); - - rc = select (sockfd + 1, &read_fds, NULL, NULL, NULL); - if (rc < 0) - { - if (errno != EINTR) - logmsg (L_ERR, "select: %s", strerror (errno)); - continue; - } - if (rc == 1) - { - struct sockaddr saremote; - int salen = sizeof saremote; - rc = recvfrom (sockfd, (char *) recv_buffer, - sizeof (i_recv_buffer), 0, &saremote, &salen); - if (rc > 0) - decode_buffer (); - else if (rc < 0 && errno != EINTR) - logmsg (L_ERR, "recvfrom: %s", strerror (errno)); - } - } + server_loop (); + + close_servers (); + exit (0); } diff --git a/src/readconfig.c b/src/readconfig.c index e4e4e83..9eff4c7 100644 --- a/src/readconfig.c +++ b/src/readconfig.c @@ -340,7 +340,93 @@ cb_facility (enum grecs_callback_command cmd, grecs_error (locus, 0, _("unknown syslog facility")); return 0; } + + +struct cfg_server +{ + const char *id; + enum tagr_server_type type; + struct grecs_sockaddr addr; +}; + +static int +cb_server_type (enum grecs_callback_command cmd, + grecs_locus_t *locus, + void *varptr, + grecs_value_t *value, + void *cb_data) +{ + enum tagr_server_type *typeptr = varptr; + if (cmd != grecs_callback_set_value) + { + grecs_error (locus, 0, _("unexpected block statement")); + return 1; + } + if (value->type != GCONF_TYPE_STRING) + { + grecs_error (locus, 0, _("expected scalar value but found list")); + return 1; + } + if (strcmp (value->v.string, "udp") == 0) + *typeptr = tagr_udp_server; + else if (strcmp (value->v.string, "tcp") == 0) + *typeptr = tagr_tcp_server; + else + grecs_error (locus, 0, _("unknown server type")); + return 0; +} + +static struct grecs_keyword server_kw[] = { + { "type", N_("{udp|tcp}"), N_("Server type"), + grecs_type_string, NULL, offsetof (struct cfg_server, type), + cb_server_type }, + { "listen", N_("socket"), N_("Listen on this address"), + grecs_type_sockaddr, NULL, offsetof (struct cfg_server, addr), }, + { NULL } +}; + +static int +cb_server (enum grecs_callback_command cmd, + grecs_locus_t *locus, + void *varptr, + grecs_value_t *value, + void *cb_data) +{ + int rc; + struct cfg_server *cfg; + void **pdata = cb_data; + switch (cmd) + { + case grecs_callback_section_begin: + if (!value || value->type != GCONF_TYPE_STRING) + { + grecs_error (locus, 0, _("tag must be a string")); + return 0; + } + cfg = xzalloc (sizeof (*cfg)); + cfg->id = value->v.string; + *pdata = cfg; + break; + + case grecs_callback_section_end: + cfg = *pdata; + if (cfg->addr.sa == NULL) + grecs_error (locus, 0, _("listener address is not configured")); + else + register_server (cfg->id, cfg->type, cfg->addr); + free (cfg); + *pdata = NULL; + break; + + case grecs_callback_set_value: + grecs_error (locus, 0, _("invalid use of block statement")); + } + return 0; +} + + + static struct grecs_keyword log_kw[] = { { "tag", N_("arg"), N_("Tag syslog diagnostics with this tag."), grecs_type_string, &log_tag, 0 }, @@ -366,8 +452,11 @@ static struct grecs_keyword tagr_kw[] = { grecs_type_string, &html_template }, { "pidfile", NULL, N_("Set pid file name"), grecs_type_string, &pidfile }, - { "listen", N_("socket"), N_("Listen on this address"), - grecs_type_sockaddr, &listen_sockaddr, }, + + { "server", N_("id: string"), N_("Configure server"), + grecs_type_section, NULL, 0, + cb_server, NULL, server_kw }, + { "update-interval", NULL, N_("Set graph update interval"), grecs_type_uint, &update_interval }, diff --git a/src/server.c b/src/server.c new file mode 100644 index 0000000..82aae0e --- /dev/null +++ b/src/server.c @@ -0,0 +1,475 @@ +/* This file is part of tagr. + Copyright (C) 2000, 2005, 2006, 2009 Max Bouglacoff, Sergey Poznyakoff + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . */ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +struct tagr_server +{ + struct tagr_server *next; + char *id; + enum tagr_server_type type; + int fd; + struct grecs_sockaddr addr; +}; + +struct tagr_server *server_head, *server_tail; + +struct server_class +{ + int (*srv_open) (struct tagr_server *); + void (*srv_run) (struct tagr_server *); + void (*srv_close) (struct tagr_server *); +}; + +static void +common_close (struct tagr_server *srv) +{ + close (srv->fd); +} + +static int +setup_socket (struct tagr_server *srv) +{ + int rc; + + if ((rc = fcntl (srv->fd, F_GETFD, 0)) == -1 + || fcntl (srv->fd, F_SETFD, rc | FD_CLOEXEC) == -1) + logmsg (L_ERR, _("cannot set close-on-exec: %s"), + strerror (errno)); + + switch (srv->addr.sa->sa_family) + { + case PF_INET: + { + int yes = 1; + if (setsockopt (srv->fd, SOL_SOCKET, SO_REUSEADDR, + (void *) &yes, sizeof(yes)) == -1) + logmsg (L_ERR, _("setting reuseaddr failed: %s"), + strerror (errno)); + } + break; + + case PF_UNIX: + { + struct sockaddr_un *s_un = + (struct sockaddr_un *) srv->addr.sa; + if (unlink (s_un->sun_path)) + logmsg (L_ERR, _("cannot remove socket %s: %s"), + s_un->sun_path, strerror (errno)); + } + } + + if (bind (srv->fd, srv->addr.sa, srv->addr.len) < 0) + { + logmsg (L_ERR, "bind: %s", strerror (errno)); + close (srv->fd); + srv->fd = -1; + return 1; + } + return 0; +} + +static int udp_open (struct tagr_server *); +static void udp_run (struct tagr_server *); +static int tcp_open (struct tagr_server *); +static void tcp_run (struct tagr_server *); + +struct server_class server_tab[] = { + { udp_open, udp_run, common_close }, + { tcp_open, tcp_run, common_close } +}; + + +void +register_server (const char *id, + enum tagr_server_type type, struct grecs_sockaddr addr) +{ + struct tagr_server *srv = xmalloc (sizeof (*srv)); + srv->id = xstrdup (id); + srv->type = type; + srv->fd = -1; + srv->addr = addr; + + srv->next = NULL; + if (server_tail) + server_tail->next =srv; + else + server_head = srv; + server_tail = srv; +} + +void +open_servers () +{ + struct tagr_server *prev = NULL; + struct tagr_server *srv = server_head; + + while (srv) + { + struct tagr_server *next = srv->next; + if (server_tab[srv->type].srv_open (srv)) + { + free (srv->id); + free (srv->addr.sa); + free (srv); + if (prev) + prev->next = next; + else + server_head = next; + } + else + prev = srv; + srv = next; + } + + if (!server_head) + logmsg (L_CRIT, _("no servers configured")); + + server_tail = prev; +} + +void +close_servers () +{ + struct tagr_server *srv = server_head; + + while (srv) + { + struct tagr_server *next = srv->next; + server_tab[srv->type].srv_close (srv); + free (srv->id); + free (srv->addr.sa); + free (srv); + srv = next; + } + server_head = server_tail = NULL; +} + +void +subprocess (void (*fun) (void *data), void *data, int fd) +{ + int child = 0; + if (!single_process_option) + { + pid_t pid = fork (); + if (pid > 0) + return; + else if (pid < 0) + logmsg (L_ERR, _("cannot fork: %s"), strerror (errno)); + else + { + signal (SIGHUP, SIG_IGN); + signal (SIGCHLD, SIG_IGN); + child = 1; + if (fd != -1) + close (fd); + } + } + fun (data); + if (child) + exit (0); +} + + +void +decode_buffer (void *recv_buffer) +{ + int i; + Stat_reply *reply; + Stat *sp; + int child = 0; + + reply = (Stat_reply *) recv_buffer; + reply->n_addr = ntohl (reply->n_addr); + + if (reply->n_addr > MAXADDR) + { + logmsg (L_NOTICE, _("got invalid packet: n_addr = %d"), + reply->n_addr); + return; + } + reply->timestamp = ntohl (reply->timestamp); + if (verbose_level) + { + char tbuf[sizeof("2009-04-01 00:00:00")]; + strftime (tbuf, sizeof tbuf, "%Y-%m-%d %H:%M:%S", + gmtime (&reply->timestamp)); + logmsg (L_INFO, _("Received packet: %d %lu - %s"), reply->n_addr, + (unsigned long) reply->timestamp, tbuf); + } + + sp = reply->stat; + + if (open_db (TAGR_DB_WR) == 0) + { + for (i = 0; i < reply->n_addr; i++, sp++) + { + sp->in = ntohl (sp->in); + sp->out = ntohl (sp->out); + verbose (1, _("Monitor %s: %lu %lu"), sp->name, sp->in, sp->out); + report (sp, reply->timestamp); + } + close_db (); + } +} + +static int +udp_open (struct tagr_server *srv) +{ + srv->fd = socket (srv->addr.sa->sa_family, SOCK_DGRAM, 0); + + if (srv->fd < 0) + { + logmsg (L_ERR, "socket: %s", strerror (errno)); + return 1; + } + + return setup_socket (srv); +} + +static void +udp_run (struct tagr_server *srv) +{ + char recv_buffer[reply_size (MAXADDR)]; + struct sockaddr saremote; + int salen = sizeof saremote; + int rc = recvfrom (srv->fd, recv_buffer, + sizeof (recv_buffer), 0, &saremote, &salen); + if (rc > 0) + subprocess (decode_buffer, recv_buffer, -1); + else if (rc < 0 && errno != EINTR) + logmsg (L_ERR, "recvfrom: %s", strerror (errno)); +} + + + +static int +tcp_open (struct tagr_server *srv) +{ + srv->fd = socket (srv->addr.sa->sa_family, SOCK_STREAM, 0); + + if (srv->fd < 0) + { + logmsg (L_ERR, "socket: %s", strerror (errno)); + return 1; + } + + if (setup_socket (srv)) + return 1; + + if (listen (srv->fd, 8)) + { + logmsg (L_ERR, "listen: %s", strerror (errno)); + close (srv->fd); + return 1; + } + return 0; +} + +static void +trim_crlf (char *buf) +{ + size_t len = strlen (buf); + if (len > 0 && buf[len-1] == '\n') + { + buf[--len] = 0; + if (len > 0 && buf[len-1] == '\r') + buf[--len] = 0; + } +} + +static int +convert_ts (const char *str, time_t *pt) +{ + uintmax_t val; + char *p; + + val = strtoumax (str, &p, 10); + if (*p) + return 1; + *pt = val; + if (*pt != val) + return 1; + return 0; +} + +static int +convert_ulong (const char *str, unsigned long *pt) +{ + uintmax_t val; + char *p; + + val = strtoumax (str, &p, 10); + if (*p) + return 1; + *pt = val; + if (*pt != val) + return 1; + return 0; +} + +static void +tcp_server_loop (void *data) +{ + int fd = *(int*)data; + FILE *in = fdopen (fd, "r"); + FILE *out = fdopen (fd, "w"); + size_t bufsize = 0; + char *buf = NULL; + int quit = 0; + + setvbuf (in, NULL, _IOLBF, 0); + setvbuf (out, NULL, _IOLBF, 0); + + fprintf (out, "+OK tagr experimental stream service\r\n"); + while (!quit && getline (&buf, &bufsize, in) > 0) + { + struct wordsplit ws; + + trim_crlf (buf); + if (wordsplit (buf, &ws, WRDSF_DEFFLAGS)) + { + logmsg (L_ERR, "failed to parse input line"); + fprintf (out, "-ERR parsing failed\r\n"); + continue; + } + + if (ws.ws_wordc == 0) + fprintf (out, "-ERR invalid input\r\n"); + else if (strcasecmp (ws.ws_wordv[0], "SAMPLE") == 0 + && ws.ws_wordc == 5) + { + Stat st; + time_t t; + + if (strlen (ws.ws_wordv[1]) > MAX_NAME_LENGTH) + fprintf (out, "-ERR id too long\r\n"); + else + { + strcpy (st.name, ws.ws_wordv[1]); + if (convert_ts (ws.ws_wordv[2], &t) + || convert_ulong (ws.ws_wordv[3], &st.in) + || convert_ulong (ws.ws_wordv[4], &st.out)) + fprintf (out, "-ERR invalid input\r\n"); + else + { + if (open_db (TAGR_DB_WR)) + fprintf (out, "-ERR database not available\r\n"); + else + { + report (&st, t); + close_db (); + fprintf (out, "+OK thank you\r\n"); + } + } + } + } + else if (strcasecmp (ws.ws_wordv[0], "QUIT") == 0) + { + quit = 1; + fprintf (out, "+OK bye\r\n"); + } + else + fprintf (out, "-ERR invalid input\r\n"); + wordsplit_free (&ws); + } + + free (buf); + fclose (in); + fclose (out); + close (fd); +} + +static void +tcp_run (struct tagr_server *srv) +{ + union + { + struct sockaddr sa; + struct sockaddr_in s_in; + struct sockaddr_un s_un; + } addr; + socklen_t len = sizeof (addr); + + int fd = accept (srv->fd, &addr.sa, &len); + + if (fd == -1) + { + logmsg (L_ERR, "accept: %s", strerror (errno)); + return; + } + + subprocess (tcp_server_loop, &fd, srv->fd); + close (fd); +} + +void +server_loop () +{ + fd_set read_fds; + int max_fd = 0; + struct tagr_server *srv; + + FD_ZERO (&read_fds); + for (srv = server_head; srv; srv = srv->next) + { + FD_SET (srv->fd, &read_fds); + if (srv->fd > max_fd) + max_fd = srv->fd; + } + + while (tagr_idle () == 0) + { + fd_set rfds = read_fds; + int rc = select (max_fd + 1, &rfds, NULL, NULL, NULL); + if (rc < 0) + { + if (errno != EINTR) + logmsg (L_ERR, "select: %s", strerror (errno)); + continue; + } + + for (srv = server_head; srv; srv = srv->next) + if (FD_ISSET (srv->fd, &rfds)) + server_tab[srv->type].srv_run (srv); + } +} + + + diff --git a/src/tagr.h b/src/tagr.h index a66837e..1d8aac1 100644 --- a/src/tagr.h +++ b/src/tagr.h @@ -43,18 +43,15 @@ struct monitor int swap; }; -extern struct grecs_sockaddr listen_sockaddr; extern int preprocess_only; extern int log_to_stderr; extern char *pidfile; extern unsigned update_interval; +extern int single_process_option; -extern char rateup[]; extern char *user; extern char *basedir; extern char *configfile; -extern int port; -extern char *progname; extern int foreground; extern char *html_template; extern char *html_input_file; @@ -276,4 +273,22 @@ int draw_graph (FILE *fp, int growright, struct grid_class *xgrid, struct grid_class *ygrid); + +/* server.c */ +enum tagr_server_type + { + tagr_udp_server, + tagr_tcp_server + }; + +void register_server (const char *id, + enum tagr_server_type type, + struct grecs_sockaddr addr); +void open_servers (void); +void close_servers (void); +void server_loop (void); + +int tagr_idle (void); + + -- cgit v1.2.1