summaryrefslogtreecommitdiffabout
authorSergey Poznyakoff <gray@gnu.org.ua>2009-04-28 08:08:13 (GMT)
committer Sergey Poznyakoff <gray@gnu.org.ua>2009-04-28 08:08:13 (GMT)
commit29e4b3da0990aca9db232dc3ce0a57722ccf1cac (patch) (side-by-side diff)
tree54dce1039191f354ab364021aeb7028e80578246
parent625a9abc1f7810ed5b4babb6b112add9a1413cf7 (diff)
downloadtagr-29e4b3da0990aca9db232dc3ce0a57722ccf1cac.tar.gz
tagr-29e4b3da0990aca9db232dc3ce0a57722ccf1cac.tar.bz2
Move network code into a separate file. Introduce a proof-of-concept stream server.
* src/server.c: New file * src/Makefile.am (tagr_SOURCES): Add server.c * TODO: Update. * etc/upgrade.awk: Update. * gnulib.modules: Add strtoumax. * src/main.c (listen_sockaddr, recv_buffer): Remove. (decode_buffer): Remove (see server.c) (get_port): Remove. (tagr_idle): New function. (main): Remove select loop. Use server functions instead. * src/readconfig.c: New configuration file statement "server". * src/tagr.h (listen_sockaddr, rateup) (port, progname): Remove. (tagr_server_type): New enum. (register_server, open_servers, close_servers) (server_loop, tagr_idle): New protos.
Diffstat (more/less context) (ignore whitespace changes)
-rw-r--r--TODO10
-rw-r--r--etc/upgrade.awk6
-rw-r--r--gnulib.modules1
-rw-r--r--src/Makefile.am1
-rw-r--r--src/main.c202
-rw-r--r--src/readconfig.c93
-rw-r--r--src/server.c475
-rw-r--r--src/tagr.h23
8 files changed, 632 insertions, 179 deletions
diff --git a/TODO b/TODO
index 0871b7e..9a8e27f 100644
--- a/TODO
+++ b/TODO
@@ -1,6 +1,14 @@
-Tagr TODO list. 2009-04-27
+Tagr TODO list. 2009-04-28
Copyright (C) 2009 Sergey Poznyakoff
+* Stream-based protocol
+
+S: +OK tagr ready
+C: SAMPLE id 1240903636 12345 3456
+S: +OK accepted
+C: QUIT
+S: +OK bye
+
* Zero-unknown option and interrupted data feeds
Decide what to do if a data feed gets interrupted.
diff --git a/etc/upgrade.awk b/etc/upgrade.awk
index 2278758..e3bfe52 100644
--- a/etc/upgrade.awk
+++ b/etc/upgrade.awk
@@ -20,7 +20,11 @@ $1 == "user" || $1 == "template" || $1 == "basedir" {
next
}
$1 == "port" {
- print "listen 127.0.0.1:" $2 ";"
+ print "server default {
+ print " type udp;"
+ print " listen 127.0.0.1:" $2 ";"
+ print "}"
+ print ""
next
}
$1 == "router" {
diff --git a/gnulib.modules b/gnulib.modules
index 7eecc00..d4887b3 100644
--- a/gnulib.modules
+++ b/gnulib.modules
@@ -11,6 +11,7 @@ obstack
strtod
strtol
strtoul
+strtoumax
sysexits
realloc
version-etc
diff --git a/src/Makefile.am b/src/Makefile.am
index f9dcc5f..0db1eb6 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -28,6 +28,7 @@ tagr_SOURCES=\
readconfig.c\
report.c\
report.h\
+ server.c\
stat.c\
tagr.h
diff --git a/src/main.c b/src/main.c
index 51c3b1b..181e9d2 100644
--- a/src/main.c
+++ b/src/main.c
@@ -26,11 +26,7 @@
#include <sys/types.h>
#include <sys/time.h>
#include <sys/wait.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <sys/un.h>
#include <argp.h>
-#include <netdb.h>
#include <signal.h>
#include <errno.h>
#include <string.h>
@@ -50,7 +46,6 @@ static RETSIGTYPE sig_fatal (int);
static RETSIGTYPE sig_hup (int);
static RETSIGTYPE sig_child (int);
-struct grecs_sockaddr listen_sockaddr;
int preprocess_only = 0;
int log_to_stderr = -1;
int test_template_option;
@@ -58,8 +53,6 @@ int lint_option;
char *pidfile = TAGR_PIDFILE;
unsigned update_interval = 5*60;
-char i_recv_buffer[reply_size (MAXADDR)];
-char *recv_buffer = i_recv_buffer;
char *configfile = TAGR_CONFIGFILE;
char *html_template = TAGR_TEMPLATE;
char *user;
@@ -451,67 +444,6 @@ change_user ()
void
-decode_buffer ()
-{
- int i;
- Stat_reply *reply;
- Stat *sp;
- int child = 0;
-
- reply = (Stat_reply *) recv_buffer;
- reply->n_addr = ntohl (reply->n_addr);
-
- if (reply->n_addr > MAXADDR)
- {
- logmsg (L_NOTICE, _("got invalid packet: n_addr = %d"),
- reply->n_addr);
- return;
- }
- reply->timestamp = ntohl (reply->timestamp);
- if (verbose_level)
- {
- char tbuf[sizeof("2009-04-01 00:00:00")];
- strftime (tbuf, sizeof tbuf, "%Y-%m-%d %H:%M:%S",
- gmtime (&reply->timestamp));
- logmsg (L_INFO, _("Received packet: %d %lu - %s"), reply->n_addr,
- (unsigned long) reply->timestamp, tbuf);
- }
-
- sp = reply->stat;
-
- if (!single_process_option)
- {
- pid_t pid = fork ();
- if (pid > 0)
- return;
- else if (pid < 0)
- logmsg (L_ERR, _("cannot fork: %s"), strerror (errno));
- else
- {
- signal (SIGHUP, SIG_IGN);
- signal (SIGCHLD, SIG_IGN);
- child = 1;
- }
- }
-
- if (open_db (TAGR_DB_WR) == 0)
- {
- for (i = 0; i < reply->n_addr; i++, sp++)
- {
- sp->in = ntohl (sp->in);
- sp->out = ntohl (sp->out);
- verbose (1, _("Monitor %s: %lu %lu"), sp->name, sp->in, sp->out);
- report (sp, reply->timestamp);
- }
- close_db ();
- }
-
- if (child)
- exit (0);
-}
-
-
-void
read_input (const char *name)
{
FILE *fp;
@@ -566,27 +498,6 @@ read_input (const char *name)
verbose (2, _("Finished reading `%s'"), name);
}
-int
-get_port (char *str)
-{
- int pn;
-
- if (isdigit (str[0]))
- pn = htons (atoi (str));
- else
- {
- struct servent *s = getservbyname (str, "udp");
- if (s)
- pn = s->s_port;
- else
- {
- logmsg (L_ERR, _("no such service: %s"), str);
- return 0;
- }
- }
- return pn;
-}
-
enum command
{
command_none,
@@ -725,13 +636,36 @@ tagr_version (FILE *stream, struct argp_state *state)
}
+static char **save_argv;
+
+int
+tagr_idle ()
+{
+ switch (command)
+ {
+ case command_none:
+ break;
+
+ case command_update:
+ rebuild (0);
+ command = command_none;
+ alarm (update_interval);
+ break;
+
+ case command_reconfig:
+ close_servers ();
+ tagr_restart (save_argv);
+ }
+ return 0;
+}
+
int
main (int argc, char **argv)
{
int index, rc;
fd_set read_fds;
- char **save_argv = argv;
-
+
+ save_argv = argv;
argp_program_bug_address = "<" PACKAGE_BUGREPORT ">";
argp_program_version_hook = tagr_version;
@@ -816,47 +750,6 @@ main (int argc, char **argv)
if (rebuild_option || import_option || list_option || read_option)
exit (0);
- if (listen_sockaddr.sa == NULL)
- {
- logmsg (L_CRIT, _("listener address is not configured"));
- exit (EX_CONFIG);
- }
-
- sockfd = socket (listen_sockaddr.sa->sa_family,
- SOCK_DGRAM, 0);
- if (sockfd < 0)
- die (EX_OSERR, "socket: %s", strerror (errno));
-
- if ((rc = fcntl (sockfd, F_GETFD, 0)) == -1
- || fcntl (sockfd, F_SETFD, rc | FD_CLOEXEC) == -1)
- logmsg (L_ERR, _("cannot set close-on-exec: %s"),
- strerror (errno));
-
- switch (listen_sockaddr.sa->sa_family)
- {
- case PF_INET:
- {
- int yes = 1;
- if (setsockopt (sockfd, SOL_SOCKET, SO_REUSEADDR,
- (void *) &yes, sizeof(yes)) == -1)
- logmsg (L_ERR, _("setting reuseaddr failed: %s"),
- strerror (errno));
- }
- break;
-
- case PF_UNIX:
- {
- struct sockaddr_un *s_un =
- (struct sockaddr_un *) listen_sockaddr.sa;
- if (unlink (s_un->sun_path))
- logmsg (L_ERR, _("cannot remove socket %s: %s"),
- s_un->sun_path, strerror (errno));
- }
- }
-
- if (bind (sockfd, listen_sockaddr.sa, listen_sockaddr.len) < 0)
- die (EX_OSERR, "bind: %s", strerror (errno));
-
if (save_argv[0][0] != '/')
{
logmsg (L_ERR,
@@ -895,46 +788,13 @@ main (int argc, char **argv)
logmsg (L_INFO, _("%s started"), program_version);
+ open_servers ();
+
command = command_none;
alarm (update_interval);
- for (;;)
- {
- switch (command)
- {
- case command_none:
- break;
-
- case command_update:
- rebuild (0);
- command = command_none;
- alarm (update_interval);
- break;
-
- case command_reconfig:
- tagr_restart (save_argv);
- }
-
- FD_ZERO (&read_fds);
- FD_SET (sockfd, &read_fds);
-
- rc = select (sockfd + 1, &read_fds, NULL, NULL, NULL);
- if (rc < 0)
- {
- if (errno != EINTR)
- logmsg (L_ERR, "select: %s", strerror (errno));
- continue;
- }
- if (rc == 1)
- {
- struct sockaddr saremote;
- int salen = sizeof saremote;
- rc = recvfrom (sockfd, (char *) recv_buffer,
- sizeof (i_recv_buffer), 0, &saremote, &salen);
- if (rc > 0)
- decode_buffer ();
- else if (rc < 0 && errno != EINTR)
- logmsg (L_ERR, "recvfrom: %s", strerror (errno));
- }
- }
+ server_loop ();
+
+ close_servers ();
+ exit (0);
}
diff --git a/src/readconfig.c b/src/readconfig.c
index e4e4e83..9eff4c7 100644
--- a/src/readconfig.c
+++ b/src/readconfig.c
@@ -340,7 +340,93 @@ cb_facility (enum grecs_callback_command cmd,
grecs_error (locus, 0, _("unknown syslog facility"));
return 0;
}
+
+
+struct cfg_server
+{
+ const char *id;
+ enum tagr_server_type type;
+ struct grecs_sockaddr addr;
+};
+
+static int
+cb_server_type (enum grecs_callback_command cmd,
+ grecs_locus_t *locus,
+ void *varptr,
+ grecs_value_t *value,
+ void *cb_data)
+{
+ enum tagr_server_type *typeptr = varptr;
+ if (cmd != grecs_callback_set_value)
+ {
+ grecs_error (locus, 0, _("unexpected block statement"));
+ return 1;
+ }
+ if (value->type != GCONF_TYPE_STRING)
+ {
+ grecs_error (locus, 0, _("expected scalar value but found list"));
+ return 1;
+ }
+ if (strcmp (value->v.string, "udp") == 0)
+ *typeptr = tagr_udp_server;
+ else if (strcmp (value->v.string, "tcp") == 0)
+ *typeptr = tagr_tcp_server;
+ else
+ grecs_error (locus, 0, _("unknown server type"));
+ return 0;
+}
+
+static struct grecs_keyword server_kw[] = {
+ { "type", N_("{udp|tcp}"), N_("Server type"),
+ grecs_type_string, NULL, offsetof (struct cfg_server, type),
+ cb_server_type },
+ { "listen", N_("socket"), N_("Listen on this address"),
+ grecs_type_sockaddr, NULL, offsetof (struct cfg_server, addr), },
+ { NULL }
+};
+
+static int
+cb_server (enum grecs_callback_command cmd,
+ grecs_locus_t *locus,
+ void *varptr,
+ grecs_value_t *value,
+ void *cb_data)
+{
+ int rc;
+ struct cfg_server *cfg;
+ void **pdata = cb_data;
+ switch (cmd)
+ {
+ case grecs_callback_section_begin:
+ if (!value || value->type != GCONF_TYPE_STRING)
+ {
+ grecs_error (locus, 0, _("tag must be a string"));
+ return 0;
+ }
+ cfg = xzalloc (sizeof (*cfg));
+ cfg->id = value->v.string;
+ *pdata = cfg;
+ break;
+
+ case grecs_callback_section_end:
+ cfg = *pdata;
+ if (cfg->addr.sa == NULL)
+ grecs_error (locus, 0, _("listener address is not configured"));
+ else
+ register_server (cfg->id, cfg->type, cfg->addr);
+ free (cfg);
+ *pdata = NULL;
+ break;
+
+ case grecs_callback_set_value:
+ grecs_error (locus, 0, _("invalid use of block statement"));
+ }
+ return 0;
+}
+
+
+
static struct grecs_keyword log_kw[] = {
{ "tag", N_("arg"), N_("Tag syslog diagnostics with this tag."),
grecs_type_string, &log_tag, 0 },
@@ -366,8 +452,11 @@ static struct grecs_keyword tagr_kw[] = {
grecs_type_string, &html_template },
{ "pidfile", NULL, N_("Set pid file name"),
grecs_type_string, &pidfile },
- { "listen", N_("socket"), N_("Listen on this address"),
- grecs_type_sockaddr, &listen_sockaddr, },
+
+ { "server", N_("id: string"), N_("Configure server"),
+ grecs_type_section, NULL, 0,
+ cb_server, NULL, server_kw },
+
{ "update-interval", NULL, N_("Set graph update interval"),
grecs_type_uint, &update_interval },
diff --git a/src/server.c b/src/server.c
new file mode 100644
index 0000000..82aae0e
--- a/dev/null
+++ b/src/server.c
@@ -0,0 +1,475 @@
+/* This file is part of tagr.
+ Copyright (C) 2000, 2005, 2006, 2009 Max Bouglacoff, Sergey Poznyakoff
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>. */
+
+#ifdef HAVE_CONFIG_H
+# include <config.h>
+#endif
+
+#include <unistd.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <ctype.h>
+#include <sys/types.h>
+#include <sys/time.h>
+#include <sys/wait.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <sys/un.h>
+#include <netdb.h>
+#include <signal.h>
+#include <errno.h>
+#include <string.h>
+#include <syslog.h>
+#include <tagr.h>
+#include <report.h>
+#include <inttypes.h>
+#include <wordsplit.h>
+
+struct tagr_server
+{
+ struct tagr_server *next;
+ char *id;
+ enum tagr_server_type type;
+ int fd;
+ struct grecs_sockaddr addr;
+};
+
+struct tagr_server *server_head, *server_tail;
+
+struct server_class
+{
+ int (*srv_open) (struct tagr_server *);
+ void (*srv_run) (struct tagr_server *);
+ void (*srv_close) (struct tagr_server *);
+};
+
+static void
+common_close (struct tagr_server *srv)
+{
+ close (srv->fd);
+}
+
+static int
+setup_socket (struct tagr_server *srv)
+{
+ int rc;
+
+ if ((rc = fcntl (srv->fd, F_GETFD, 0)) == -1
+ || fcntl (srv->fd, F_SETFD, rc | FD_CLOEXEC) == -1)
+ logmsg (L_ERR, _("cannot set close-on-exec: %s"),
+ strerror (errno));
+
+ switch (srv->addr.sa->sa_family)
+ {
+ case PF_INET:
+ {
+ int yes = 1;
+ if (setsockopt (srv->fd, SOL_SOCKET, SO_REUSEADDR,
+ (void *) &yes, sizeof(yes)) == -1)
+ logmsg (L_ERR, _("setting reuseaddr failed: %s"),
+ strerror (errno));
+ }
+ break;
+
+ case PF_UNIX:
+ {
+ struct sockaddr_un *s_un =
+ (struct sockaddr_un *) srv->addr.sa;
+ if (unlink (s_un->sun_path))
+ logmsg (L_ERR, _("cannot remove socket %s: %s"),
+ s_un->sun_path, strerror (errno));
+ }
+ }
+
+ if (bind (srv->fd, srv->addr.sa, srv->addr.len) < 0)
+ {
+ logmsg (L_ERR, "bind: %s", strerror (errno));
+ close (srv->fd);
+ srv->fd = -1;
+ return 1;
+ }
+ return 0;
+}
+
+static int udp_open (struct tagr_server *);
+static void udp_run (struct tagr_server *);
+static int tcp_open (struct tagr_server *);
+static void tcp_run (struct tagr_server *);
+
+struct server_class server_tab[] = {
+ { udp_open, udp_run, common_close },
+ { tcp_open, tcp_run, common_close }
+};
+
+
+void
+register_server (const char *id,
+ enum tagr_server_type type, struct grecs_sockaddr addr)
+{
+ struct tagr_server *srv = xmalloc (sizeof (*srv));
+ srv->id = xstrdup (id);
+ srv->type = type;
+ srv->fd = -1;
+ srv->addr = addr;
+
+ srv->next = NULL;
+ if (server_tail)
+ server_tail->next =srv;
+ else
+ server_head = srv;
+ server_tail = srv;
+}
+
+void
+open_servers ()
+{
+ struct tagr_server *prev = NULL;
+ struct tagr_server *srv = server_head;
+
+ while (srv)
+ {
+ struct tagr_server *next = srv->next;
+ if (server_tab[srv->type].srv_open (srv))
+ {
+ free (srv->id);
+ free (srv->addr.sa);
+ free (srv);
+ if (prev)
+ prev->next = next;
+ else
+ server_head = next;
+ }
+ else
+ prev = srv;
+ srv = next;
+ }
+
+ if (!server_head)
+ logmsg (L_CRIT, _("no servers configured"));
+
+ server_tail = prev;
+}
+
+void
+close_servers ()
+{
+ struct tagr_server *srv = server_head;
+
+ while (srv)
+ {
+ struct tagr_server *next = srv->next;
+ server_tab[srv->type].srv_close (srv);
+ free (srv->id);
+ free (srv->addr.sa);
+ free (srv);
+ srv = next;
+ }
+ server_head = server_tail = NULL;
+}
+
+void
+subprocess (void (*fun) (void *data), void *data, int fd)
+{
+ int child = 0;
+ if (!single_process_option)
+ {
+ pid_t pid = fork ();
+ if (pid > 0)
+ return;
+ else if (pid < 0)
+ logmsg (L_ERR, _("cannot fork: %s"), strerror (errno));
+ else
+ {
+ signal (SIGHUP, SIG_IGN);
+ signal (SIGCHLD, SIG_IGN);
+ child = 1;
+ if (fd != -1)
+ close (fd);
+ }
+ }
+ fun (data);
+ if (child)
+ exit (0);
+}
+
+
+void
+decode_buffer (void *recv_buffer)
+{
+ int i;
+ Stat_reply *reply;
+ Stat *sp;
+ int child = 0;
+
+ reply = (Stat_reply *) recv_buffer;
+ reply->n_addr = ntohl (reply->n_addr);
+
+ if (reply->n_addr > MAXADDR)
+ {
+ logmsg (L_NOTICE, _("got invalid packet: n_addr = %d"),
+ reply->n_addr);
+ return;
+ }
+ reply->timestamp = ntohl (reply->timestamp);
+ if (verbose_level)
+ {
+ char tbuf[sizeof("2009-04-01 00:00:00")];
+ strftime (tbuf, sizeof tbuf, "%Y-%m-%d %H:%M:%S",
+ gmtime (&reply->timestamp));
+ logmsg (L_INFO, _("Received packet: %d %lu - %s"), reply->n_addr,
+ (unsigned long) reply->timestamp, tbuf);
+ }
+
+ sp = reply->stat;
+
+ if (open_db (TAGR_DB_WR) == 0)
+ {
+ for (i = 0; i < reply->n_addr; i++, sp++)
+ {
+ sp->in = ntohl (sp->in);
+ sp->out = ntohl (sp->out);
+ verbose (1, _("Monitor %s: %lu %lu"), sp->name, sp->in, sp->out);
+ report (sp, reply->timestamp);
+ }
+ close_db ();
+ }
+}
+
+static int
+udp_open (struct tagr_server *srv)
+{
+ srv->fd = socket (srv->addr.sa->sa_family, SOCK_DGRAM, 0);
+
+ if (srv->fd < 0)
+ {
+ logmsg (L_ERR, "socket: %s", strerror (errno));
+ return 1;
+ }
+
+ return setup_socket (srv);
+}
+
+static void
+udp_run (struct tagr_server *srv)
+{
+ char recv_buffer[reply_size (MAXADDR)];
+ struct sockaddr saremote;
+ int salen = sizeof saremote;
+ int rc = recvfrom (srv->fd, recv_buffer,
+ sizeof (recv_buffer), 0, &saremote, &salen);
+ if (rc > 0)
+ subprocess (decode_buffer, recv_buffer, -1);
+ else if (rc < 0 && errno != EINTR)
+ logmsg (L_ERR, "recvfrom: %s", strerror (errno));
+}
+
+
+
+static int
+tcp_open (struct tagr_server *srv)
+{
+ srv->fd = socket (srv->addr.sa->sa_family, SOCK_STREAM, 0);
+
+ if (srv->fd < 0)
+ {
+ logmsg (L_ERR, "socket: %s", strerror (errno));
+ return 1;
+ }
+
+ if (setup_socket (srv))
+ return 1;
+
+ if (listen (srv->fd, 8))
+ {
+ logmsg (L_ERR, "listen: %s", strerror (errno));
+ close (srv->fd);
+ return 1;
+ }
+ return 0;
+}
+
+static void
+trim_crlf (char *buf)
+{
+ size_t len = strlen (buf);
+ if (len > 0 && buf[len-1] == '\n')
+ {
+ buf[--len] = 0;
+ if (len > 0 && buf[len-1] == '\r')
+ buf[--len] = 0;
+ }
+}
+
+static int
+convert_ts (const char *str, time_t *pt)
+{
+ uintmax_t val;
+ char *p;
+
+ val = strtoumax (str, &p, 10);
+ if (*p)
+ return 1;
+ *pt = val;
+ if (*pt != val)
+ return 1;
+ return 0;
+}
+
+static int
+convert_ulong (const char *str, unsigned long *pt)
+{
+ uintmax_t val;
+ char *p;
+
+ val = strtoumax (str, &p, 10);
+ if (*p)
+ return 1;
+ *pt = val;
+ if (*pt != val)
+ return 1;
+ return 0;
+}
+
+static void
+tcp_server_loop (void *data)
+{
+ int fd = *(int*)data;
+ FILE *in = fdopen (fd, "r");
+ FILE *out = fdopen (fd, "w");
+ size_t bufsize = 0;
+ char *buf = NULL;
+ int quit = 0;
+
+ setvbuf (in, NULL, _IOLBF, 0);
+ setvbuf (out, NULL, _IOLBF, 0);
+
+ fprintf (out, "+OK tagr experimental stream service\r\n");
+ while (!quit && getline (&buf, &bufsize, in) > 0)
+ {
+ struct wordsplit ws;
+
+ trim_crlf (buf);
+ if (wordsplit (buf, &ws, WRDSF_DEFFLAGS))
+ {
+ logmsg (L_ERR, "failed to parse input line");
+ fprintf (out, "-ERR parsing failed\r\n");
+ continue;
+ }
+
+ if (ws.ws_wordc == 0)
+ fprintf (out, "-ERR invalid input\r\n");
+ else if (strcasecmp (ws.ws_wordv[0], "SAMPLE") == 0
+ && ws.ws_wordc == 5)
+ {
+ Stat st;
+ time_t t;
+
+ if (strlen (ws.ws_wordv[1]) > MAX_NAME_LENGTH)
+ fprintf (out, "-ERR id too long\r\n");
+ else
+ {
+ strcpy (st.name, ws.ws_wordv[1]);
+ if (convert_ts (ws.ws_wordv[2], &t)
+ || convert_ulong (ws.ws_wordv[3], &st.in)
+ || convert_ulong (ws.ws_wordv[4], &st.out))
+ fprintf (out, "-ERR invalid input\r\n");
+ else
+ {
+ if (open_db (TAGR_DB_WR))
+ fprintf (out, "-ERR database not available\r\n");
+ else
+ {
+ report (&st, t);
+ close_db ();
+ fprintf (out, "+OK thank you\r\n");
+ }
+ }
+ }
+ }
+ else if (strcasecmp (ws.ws_wordv[0], "QUIT") == 0)
+ {
+ quit = 1;
+ fprintf (out, "+OK bye\r\n");
+ }
+ else
+ fprintf (out, "-ERR invalid input\r\n");
+ wordsplit_free (&ws);
+ }
+
+ free (buf);
+ fclose (in);
+ fclose (out);
+ close (fd);
+}
+
+static void
+tcp_run (struct tagr_server *srv)
+{
+ union
+ {
+ struct sockaddr sa;
+ struct sockaddr_in s_in;
+ struct sockaddr_un s_un;
+ } addr;
+ socklen_t len = sizeof (addr);
+
+ int fd = accept (srv->fd, &addr.sa, &len);
+
+ if (fd == -1)
+ {
+ logmsg (L_ERR, "accept: %s", strerror (errno));
+ return;
+ }
+
+ subprocess (tcp_server_loop, &fd, srv->fd);
+ close (fd);
+}
+
+void
+server_loop ()
+{
+ fd_set read_fds;
+ int max_fd = 0;
+ struct tagr_server *srv;
+
+ FD_ZERO (&read_fds);
+ for (srv = server_head; srv; srv = srv->next)
+ {
+ FD_SET (srv->fd, &read_fds);
+ if (srv->fd > max_fd)
+ max_fd = srv->fd;
+ }
+
+ while (tagr_idle () == 0)
+ {
+ fd_set rfds = read_fds;
+ int rc = select (max_fd + 1, &rfds, NULL, NULL, NULL);
+ if (rc < 0)
+ {
+ if (errno != EINTR)
+ logmsg (L_ERR, "select: %s", strerror (errno));
+ continue;
+ }
+
+ for (srv = server_head; srv; srv = srv->next)
+ if (FD_ISSET (srv->fd, &rfds))
+ server_tab[srv->type].srv_run (srv);
+ }
+}
+
+
+
diff --git a/src/tagr.h b/src/tagr.h
index a66837e..1d8aac1 100644
--- a/src/tagr.h
+++ b/src/tagr.h
@@ -43,18 +43,15 @@ struct monitor
int swap;
};
-extern struct grecs_sockaddr listen_sockaddr;
extern int preprocess_only;
extern int log_to_stderr;
extern char *pidfile;
extern unsigned update_interval;
+extern int single_process_option;
-extern char rateup[];
extern char *user;
extern char *basedir;
extern char *configfile;
-extern int port;
-extern char *progname;
extern int foreground;
extern char *html_template;
extern char *html_input_file;
@@ -276,4 +273,22 @@ int draw_graph (FILE *fp,
int growright,
struct grid_class *xgrid, struct grid_class *ygrid);
+
+/* server.c */
+enum tagr_server_type
+ {
+ tagr_udp_server,
+ tagr_tcp_server
+ };
+
+void register_server (const char *id,
+ enum tagr_server_type type,
+ struct grecs_sockaddr addr);
+void open_servers (void);
+void close_servers (void);
+void server_loop (void);
+
+int tagr_idle (void);
+
+

Return to:

Send suggestions and report system problems to the System administrator.