/* This file is part of tagr. Copyright (C) 2000, 2005, 2006, 2009 Max Bouglacoff, Sergey Poznyakoff This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . */ #ifdef HAVE_CONFIG_H # include #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include unsigned stream_idle_timeout = 10; struct tagr_server { struct tagr_server *next; char *id; enum tagr_server_type type; int fd; struct grecs_sockaddr addr; }; static struct tagr_server *server_head, *server_tail; struct server_class { int (*srv_open) (struct tagr_server *); void (*srv_run) (struct tagr_server *); void (*srv_close) (struct tagr_server *); }; static void common_close (struct tagr_server *srv) { close (srv->fd); } static int setup_socket (struct tagr_server *srv) { int rc; if ((rc = fcntl (srv->fd, F_GETFD, 0)) == -1 || fcntl (srv->fd, F_SETFD, rc | FD_CLOEXEC) == -1) logmsg (L_ERR, _("cannot set close-on-exec: %s"), strerror (errno)); switch (srv->addr.sa->sa_family) { case PF_INET: { int yes = 1; if (setsockopt (srv->fd, SOL_SOCKET, SO_REUSEADDR, (void *) &yes, sizeof(yes)) == -1) logmsg (L_ERR, _("setting reuseaddr failed: %s"), strerror (errno)); } break; case PF_UNIX: { struct sockaddr_un *s_un = (struct sockaddr_un *) srv->addr.sa; if (unlink (s_un->sun_path)) logmsg (L_ERR, _("cannot remove socket %s: %s"), s_un->sun_path, strerror (errno)); } } if (bind (srv->fd, srv->addr.sa, srv->addr.len) < 0) { logmsg (L_ERR, "bind: %s", strerror (errno)); close (srv->fd); srv->fd = -1; return 1; } return 0; } static int udp_open (struct tagr_server *); static void udp_run (struct tagr_server *); static int tcp_open (struct tagr_server *); static void tcp_run (struct tagr_server *); struct server_class server_tab[] = { { udp_open, udp_run, common_close }, { tcp_open, tcp_run, common_close } }; void register_server (const char *id, enum tagr_server_type type, struct grecs_sockaddr addr) { struct tagr_server *srv = xmalloc (sizeof (*srv)); srv->id = xstrdup (id); srv->type = type; srv->fd = -1; srv->addr = addr; srv->next = NULL; if (server_tail) server_tail->next =srv; else server_head = srv; server_tail = srv; } void open_servers () { struct tagr_server *prev = NULL; struct tagr_server *srv = server_head; while (srv) { struct tagr_server *next = srv->next; if (server_tab[srv->type].srv_open (srv)) { free (srv->id); free (srv->addr.sa); free (srv); if (prev) prev->next = next; else server_head = next; } else prev = srv; srv = next; } if (!server_head) logmsg (L_CRIT, _("no servers configured")); server_tail = prev; } void close_servers () { struct tagr_server *srv = server_head; while (srv) { struct tagr_server *next = srv->next; server_tab[srv->type].srv_close (srv); free (srv->id); free (srv->addr.sa); free (srv); srv = next; } server_head = server_tail = NULL; } void subprocess (void (*fun) (void *data), void *data, int fd) { int child = 0; if (!single_process_option) { pid_t pid = fork (); if (pid > 0) return; else if (pid < 0) logmsg (L_ERR, _("cannot fork: %s"), strerror (errno)); else { signal (SIGHUP, SIG_DFL); signal (SIGCHLD, SIG_DFL); signal (SIGALRM, SIG_DFL); child = 1; if (fd != -1) close (fd); } } fun (data); if (child) exit (0); } void decode_buffer (void *recv_buffer) { int i; Stat_reply *reply; Stat *sp; int child = 0; reply = (Stat_reply *) recv_buffer; reply->n_addr = ntohl (reply->n_addr); if (reply->n_addr > MAXADDR) { logmsg (L_NOTICE, _("got invalid packet: n_addr = %d"), reply->n_addr); return; } reply->timestamp = ntohl (reply->timestamp); if (verbose_level) { char tbuf[sizeof("2009-04-01 00:00:00")]; strftime (tbuf, sizeof tbuf, "%Y-%m-%d %H:%M:%S", gmtime (&reply->timestamp)); logmsg (L_INFO, _("Received packet: %d %lu - %s"), reply->n_addr, (unsigned long) reply->timestamp, tbuf); } sp = reply->stat; if (open_db (TAGR_DB_WR) == 0) { for (i = 0; i < reply->n_addr; i++, sp++) { sp->in = ntohl (sp->in); sp->out = ntohl (sp->out); verbose (1, _("Monitor %s: %lu %lu"), sp->name, sp->in, sp->out); report (sp, reply->timestamp); } close_db (); } } static int udp_open (struct tagr_server *srv) { srv->fd = socket (srv->addr.sa->sa_family, SOCK_DGRAM, 0); if (srv->fd < 0) { logmsg (L_ERR, "socket: %s", strerror (errno)); return 1; } return setup_socket (srv); } static void udp_run (struct tagr_server *srv) { char recv_buffer[reply_size (MAXADDR)]; struct sockaddr saremote; int salen = sizeof saremote; int rc = recvfrom (srv->fd, recv_buffer, sizeof (recv_buffer), 0, &saremote, &salen); if (rc > 0) subprocess (decode_buffer, recv_buffer, -1); else if (rc < 0 && errno != EINTR) logmsg (L_ERR, "recvfrom: %s", strerror (errno)); } static int tcp_open (struct tagr_server *srv) { srv->fd = socket (srv->addr.sa->sa_family, SOCK_STREAM, 0); if (srv->fd < 0) { logmsg (L_ERR, "socket: %s", strerror (errno)); return 1; } if (setup_socket (srv)) return 1; if (listen (srv->fd, 8)) { logmsg (L_ERR, "listen: %s", strerror (errno)); close (srv->fd); return 1; } return 0; } void trim_crlf (char *buf) { size_t len = strlen (buf); if (len > 0 && buf[len-1] == '\n') { buf[--len] = 0; if (len > 0 && buf[len-1] == '\r') buf[--len] = 0; } } static int convert_ts (const char *str, time_t *pt) { uintmax_t val; char *p; val = strtoumax (str, &p, 10); if (*p) return 1; *pt = val; if (*pt != val) return 1; return 0; } static int convert_ulong (const char *str, unsigned long *pt) { uintmax_t val; char *p; val = strtoumax (str, &p, 10); if (*p) return 1; *pt = val; if (*pt != val) return 1; return 0; } static RETSIGTYPE sig_child_alrm (int sig) { logmsg (L_NOTICE, _("child timed out")); exit (EX_TEMPFAIL); } enum tagr_dialog_state { initial_dialog_state, auth_dialog_state, quit_dialog_state }; struct tagr_dialog { enum tagr_dialog_state state; FILE *in; FILE *out; }; #define STATEMASK(n) (1u<<(n)) #define ANYSTATE ((unsigned)~0) struct command { const char *name; unsigned statemask; int minargs; int maxargs; void (*handler) (struct tagr_dialog *, struct command *, struct wordsplit *); }; static void cmd_auth (struct tagr_dialog *dlg, struct command *cmd, struct wordsplit *ws) { if (tagr_auth (ws->ws_wordv[1], ws->ws_wordv[2]) == 0) { dlg->state = auth_dialog_state; fprintf (dlg->out, "+OK welcome, %s\r\n", ws->ws_wordv[1]); } else fprintf (dlg->out, "-ERR authorization failed\r\n"); } static void cmd_sample (struct tagr_dialog *dlg, struct command *cmd, struct wordsplit *ws) { Stat st; time_t t; if (strlen (ws->ws_wordv[1]) > MAX_NAME_LENGTH) { fprintf (dlg->out, "-ERR id too long\r\n"); return; } strcpy (st.name, ws->ws_wordv[1]); if (convert_ts (ws->ws_wordv[2], &t) || convert_ulong (ws->ws_wordv[3], &st.in) || convert_ulong (ws->ws_wordv[4], &st.out)) { fprintf (dlg->out, "-ERR invalid input\r\n"); return; } if (open_db (TAGR_DB_WR)) fprintf (dlg->out, "-TEMP database not available\r\n"); else { report (&st, t); close_db (); fprintf (dlg->out, "+OK thank you\r\n"); } } static void cmd_quit (struct tagr_dialog *dlg, struct command *cmd, struct wordsplit *ws) { dlg->state = quit_dialog_state; fprintf (dlg->out, "+OK bye\r\n"); } static struct command command_tab[] = { { "AUTH", STATEMASK (initial_dialog_state), 3, 3, cmd_auth }, { "SAMPLE", STATEMASK (auth_dialog_state), 5, 5, cmd_sample }, { "QUIT", ANYSTATE, 1, 1, cmd_quit }, { NULL } }; static struct command * find_command (const char *name) { struct command *cmd; for (cmd = command_tab; cmd->name; cmd++) if (c_strcasecmp (cmd->name, name) == 0) return cmd; return NULL; } static void tcp_server_loop (void *data) { int fd = *(int*)data; struct tagr_dialog dlg; size_t bufsize = 0; char *buf = NULL; const char *authid; signal (SIGALRM, sig_child_alrm); dlg.state = initial_dialog_state; dlg.in = fdopen (fd, "r"); dlg.out = fdopen (fd, "w"); setvbuf (dlg.in, NULL, _IOLBF, 0); setvbuf (dlg.out, NULL, _IOLBF, 0); authid = tagr_auth_init (); fprintf (dlg.out, "+OK tagr experimental stream service %s\r\n", authid); while (dlg.state != quit_dialog_state && getline (&buf, &bufsize, dlg.in) > 0) { struct wordsplit ws; struct command *cmd; alarm (stream_idle_timeout); trim_crlf (buf); if (wordsplit (buf, &ws, WRDSF_DEFFLAGS)) { logmsg (L_ERR, _("failed to parse input line")); fprintf (dlg.out, "-ERR parsing failed\r\n"); continue; } if (ws.ws_wordc == 0) fprintf (dlg.out, "-ERR invalid input\r\n"); else if (cmd = find_command (ws.ws_wordv[0])) { if (!(STATEMASK (dlg.state) & cmd->statemask)) fprintf (dlg.out, "-ERR wrong state\r\n"); else if (ws.ws_wordc < cmd->minargs || (cmd->maxargs > 0 && ws.ws_wordc > cmd->maxargs)) fprintf (dlg.out, "-ERR invalid number of arguments\r\n"); else cmd->handler (&dlg, cmd, &ws); } else fprintf (dlg.out, "-ERR invalid input\r\n"); wordsplit_free (&ws); } free (buf); fclose (dlg.in); fclose (dlg.out); close (fd); } static void tcp_run (struct tagr_server *srv) { union { struct sockaddr sa; struct sockaddr_in s_in; struct sockaddr_un s_un; } addr; socklen_t len = sizeof (addr); int fd = accept (srv->fd, &addr.sa, &len); if (fd == -1) { logmsg (L_ERR, "accept: %s", strerror (errno)); return; } subprocess (tcp_server_loop, &fd, srv->fd); close (fd); } void server_loop () { fd_set read_fds; int max_fd = 0; struct tagr_server *srv; FD_ZERO (&read_fds); for (srv = server_head; srv; srv = srv->next) { FD_SET (srv->fd, &read_fds); if (srv->fd > max_fd) max_fd = srv->fd; } while (tagr_idle () == 0) { fd_set rfds = read_fds; int rc = select (max_fd + 1, &rfds, NULL, NULL, NULL); if (rc < 0) { if (errno != EINTR) logmsg (L_ERR, "select: %s", strerror (errno)); continue; } for (srv = server_head; srv; srv = srv->next) if (FD_ISSET (srv->fd, &rfds)) server_tab[srv->type].srv_run (srv); } }