/* This file is part of tagr.
Copyright (C) 2000, 2005, 2006, 2009 Max Bouglacoff, Sergey Poznyakoff
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3, or (at your option)
any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see . */
#ifdef HAVE_CONFIG_H
# include
#endif
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
unsigned stream_idle_timeout = 10;
struct tagr_server
{
struct tagr_server *next;
char *id;
enum tagr_server_type type;
int fd;
struct grecs_sockaddr addr;
};
static struct tagr_server *server_head, *server_tail;
struct server_class
{
int (*srv_open) (struct tagr_server *);
void (*srv_run) (struct tagr_server *);
void (*srv_close) (struct tagr_server *);
};
static void
common_close (struct tagr_server *srv)
{
close (srv->fd);
}
static int
setup_socket (struct tagr_server *srv)
{
int rc;
if ((rc = fcntl (srv->fd, F_GETFD, 0)) == -1
|| fcntl (srv->fd, F_SETFD, rc | FD_CLOEXEC) == -1)
logmsg (L_ERR, _("cannot set close-on-exec: %s"),
strerror (errno));
switch (srv->addr.sa->sa_family)
{
case PF_INET:
{
int yes = 1;
if (setsockopt (srv->fd, SOL_SOCKET, SO_REUSEADDR,
(void *) &yes, sizeof(yes)) == -1)
logmsg (L_ERR, _("setting reuseaddr failed: %s"),
strerror (errno));
}
break;
case PF_UNIX:
{
struct sockaddr_un *s_un =
(struct sockaddr_un *) srv->addr.sa;
if (unlink (s_un->sun_path))
logmsg (L_ERR, _("cannot remove socket %s: %s"),
s_un->sun_path, strerror (errno));
}
}
if (bind (srv->fd, srv->addr.sa, srv->addr.len) < 0)
{
logmsg (L_ERR, "bind: %s", strerror (errno));
close (srv->fd);
srv->fd = -1;
return 1;
}
return 0;
}
static int udp_open (struct tagr_server *);
static void udp_run (struct tagr_server *);
static int tcp_open (struct tagr_server *);
static void tcp_run (struct tagr_server *);
struct server_class server_tab[] = {
{ udp_open, udp_run, common_close },
{ tcp_open, tcp_run, common_close }
};
void
register_server (const char *id,
enum tagr_server_type type, struct grecs_sockaddr addr)
{
struct tagr_server *srv = xmalloc (sizeof (*srv));
srv->id = xstrdup (id);
srv->type = type;
srv->fd = -1;
srv->addr = addr;
srv->next = NULL;
if (server_tail)
server_tail->next =srv;
else
server_head = srv;
server_tail = srv;
}
void
open_servers ()
{
struct tagr_server *prev = NULL;
struct tagr_server *srv = server_head;
while (srv)
{
struct tagr_server *next = srv->next;
if (server_tab[srv->type].srv_open (srv))
{
free (srv->id);
free (srv->addr.sa);
free (srv);
if (prev)
prev->next = next;
else
server_head = next;
}
else
prev = srv;
srv = next;
}
if (!server_head)
logmsg (L_CRIT, _("no servers configured"));
server_tail = prev;
}
void
close_servers ()
{
struct tagr_server *srv = server_head;
while (srv)
{
struct tagr_server *next = srv->next;
server_tab[srv->type].srv_close (srv);
free (srv->id);
free (srv->addr.sa);
free (srv);
srv = next;
}
server_head = server_tail = NULL;
}
void
subprocess (void (*fun) (void *data), void *data, int fd)
{
int child = 0;
if (!single_process_option)
{
pid_t pid = fork ();
if (pid > 0)
return;
else if (pid < 0)
logmsg (L_ERR, _("cannot fork: %s"), strerror (errno));
else
{
signal (SIGHUP, SIG_DFL);
signal (SIGCHLD, SIG_DFL);
signal (SIGALRM, SIG_DFL);
child = 1;
if (fd != -1)
close (fd);
}
}
fun (data);
if (child)
exit (0);
}
void
decode_buffer (void *recv_buffer)
{
int i;
Stat_reply *reply;
Stat *sp;
int child = 0;
reply = (Stat_reply *) recv_buffer;
reply->n_addr = ntohl (reply->n_addr);
if (reply->n_addr > MAXADDR)
{
logmsg (L_NOTICE, _("got invalid packet: n_addr = %d"),
reply->n_addr);
return;
}
reply->timestamp = ntohl (reply->timestamp);
if (verbose_level)
{
char tbuf[sizeof("2009-04-01 00:00:00")];
strftime (tbuf, sizeof tbuf, "%Y-%m-%d %H:%M:%S",
gmtime (&reply->timestamp));
logmsg (L_INFO, _("Received packet: %d %lu - %s"), reply->n_addr,
(unsigned long) reply->timestamp, tbuf);
}
sp = reply->stat;
if (open_db (TAGR_DB_WR) == 0)
{
for (i = 0; i < reply->n_addr; i++, sp++)
{
sp->in = ntohl (sp->in);
sp->out = ntohl (sp->out);
verbose (1, _("Monitor %s: %lu %lu"), sp->name, sp->in, sp->out);
report (sp, reply->timestamp);
}
close_db ();
}
}
static int
udp_open (struct tagr_server *srv)
{
srv->fd = socket (srv->addr.sa->sa_family, SOCK_DGRAM, 0);
if (srv->fd < 0)
{
logmsg (L_ERR, "socket: %s", strerror (errno));
return 1;
}
return setup_socket (srv);
}
static void
udp_run (struct tagr_server *srv)
{
char recv_buffer[reply_size (MAXADDR)];
struct sockaddr saremote;
int salen = sizeof saremote;
int rc = recvfrom (srv->fd, recv_buffer,
sizeof (recv_buffer), 0, &saremote, &salen);
if (rc > 0)
subprocess (decode_buffer, recv_buffer, -1);
else if (rc < 0 && errno != EINTR)
logmsg (L_ERR, "recvfrom: %s", strerror (errno));
}
static int
tcp_open (struct tagr_server *srv)
{
srv->fd = socket (srv->addr.sa->sa_family, SOCK_STREAM, 0);
if (srv->fd < 0)
{
logmsg (L_ERR, "socket: %s", strerror (errno));
return 1;
}
if (setup_socket (srv))
return 1;
if (listen (srv->fd, 8))
{
logmsg (L_ERR, "listen: %s", strerror (errno));
close (srv->fd);
return 1;
}
return 0;
}
void
trim_crlf (char *buf)
{
size_t len = strlen (buf);
if (len > 0 && buf[len-1] == '\n')
{
buf[--len] = 0;
if (len > 0 && buf[len-1] == '\r')
buf[--len] = 0;
}
}
static int
convert_ts (const char *str, time_t *pt)
{
uintmax_t val;
char *p;
val = strtoumax (str, &p, 10);
if (*p)
return 1;
*pt = val;
if (*pt != val)
return 1;
return 0;
}
static int
convert_ulong (const char *str, unsigned long *pt)
{
uintmax_t val;
char *p;
val = strtoumax (str, &p, 10);
if (*p)
return 1;
*pt = val;
if (*pt != val)
return 1;
return 0;
}
static RETSIGTYPE
sig_child_alrm (int sig)
{
logmsg (L_NOTICE, _("child timed out"));
exit (EX_TEMPFAIL);
}
enum tagr_dialog_state
{
initial_dialog_state,
auth_dialog_state,
quit_dialog_state
};
struct tagr_dialog
{
enum tagr_dialog_state state;
FILE *in;
FILE *out;
};
#define STATEMASK(n) (1u<<(n))
#define ANYSTATE ((unsigned)~0)
struct command
{
const char *name;
unsigned statemask;
int minargs;
int maxargs;
void (*handler) (struct tagr_dialog *, struct command *,
struct wordsplit *);
};
static void
cmd_auth (struct tagr_dialog *dlg, struct command *cmd,
struct wordsplit *ws)
{
if (tagr_auth (ws->ws_wordv[1], ws->ws_wordv[2]) == 0)
{
dlg->state = auth_dialog_state;
fprintf (dlg->out, "+OK welcome, %s\r\n", ws->ws_wordv[1]);
}
else
fprintf (dlg->out, "-ERR authorization failed\r\n");
}
static void
cmd_sample (struct tagr_dialog *dlg, struct command *cmd,
struct wordsplit *ws)
{
Stat st;
time_t t;
if (strlen (ws->ws_wordv[1]) > MAX_NAME_LENGTH)
{
fprintf (dlg->out, "-ERR id too long\r\n");
return;
}
strcpy (st.name, ws->ws_wordv[1]);
if (convert_ts (ws->ws_wordv[2], &t)
|| convert_ulong (ws->ws_wordv[3], &st.in)
|| convert_ulong (ws->ws_wordv[4], &st.out))
{
fprintf (dlg->out, "-ERR invalid input\r\n");
return;
}
if (open_db (TAGR_DB_WR))
fprintf (dlg->out, "-TEMP database not available\r\n");
else
{
report (&st, t);
close_db ();
fprintf (dlg->out, "+OK thank you\r\n");
}
}
static void
cmd_quit (struct tagr_dialog *dlg, struct command *cmd,
struct wordsplit *ws)
{
dlg->state = quit_dialog_state;
fprintf (dlg->out, "+OK bye\r\n");
}
static struct command command_tab[] = {
{ "AUTH", STATEMASK (initial_dialog_state), 3, 3, cmd_auth },
{ "SAMPLE", STATEMASK (auth_dialog_state), 5, 5, cmd_sample },
{ "QUIT", ANYSTATE, 1, 1, cmd_quit },
{ NULL }
};
static struct command *
find_command (const char *name)
{
struct command *cmd;
for (cmd = command_tab; cmd->name; cmd++)
if (c_strcasecmp (cmd->name, name) == 0)
return cmd;
return NULL;
}
static void
tcp_server_loop (void *data)
{
int fd = *(int*)data;
struct tagr_dialog dlg;
size_t bufsize = 0;
char *buf = NULL;
const char *authid;
signal (SIGALRM, sig_child_alrm);
dlg.state = initial_dialog_state;
dlg.in = fdopen (fd, "r");
dlg.out = fdopen (fd, "w");
setvbuf (dlg.in, NULL, _IOLBF, 0);
setvbuf (dlg.out, NULL, _IOLBF, 0);
authid = tagr_auth_init ();
fprintf (dlg.out, "+OK tagr experimental stream service %s\r\n", authid);
while (dlg.state != quit_dialog_state
&& getline (&buf, &bufsize, dlg.in) > 0)
{
struct wordsplit ws;
struct command *cmd;
alarm (stream_idle_timeout);
trim_crlf (buf);
if (wordsplit (buf, &ws, WRDSF_DEFFLAGS))
{
logmsg (L_ERR, _("failed to parse input line"));
fprintf (dlg.out, "-ERR parsing failed\r\n");
continue;
}
if (ws.ws_wordc == 0)
fprintf (dlg.out, "-ERR invalid input\r\n");
else if (cmd = find_command (ws.ws_wordv[0]))
{
if (!(STATEMASK (dlg.state) & cmd->statemask))
fprintf (dlg.out, "-ERR wrong state\r\n");
else if (ws.ws_wordc < cmd->minargs
|| (cmd->maxargs > 0 && ws.ws_wordc > cmd->maxargs))
fprintf (dlg.out, "-ERR invalid number of arguments\r\n");
else
cmd->handler (&dlg, cmd, &ws);
}
else
fprintf (dlg.out, "-ERR invalid input\r\n");
wordsplit_free (&ws);
}
free (buf);
fclose (dlg.in);
fclose (dlg.out);
close (fd);
}
static void
tcp_run (struct tagr_server *srv)
{
union
{
struct sockaddr sa;
struct sockaddr_in s_in;
struct sockaddr_un s_un;
} addr;
socklen_t len = sizeof (addr);
int fd = accept (srv->fd, &addr.sa, &len);
if (fd == -1)
{
logmsg (L_ERR, "accept: %s", strerror (errno));
return;
}
subprocess (tcp_server_loop, &fd, srv->fd);
close (fd);
}
void
server_loop ()
{
fd_set read_fds;
int max_fd = 0;
struct tagr_server *srv;
FD_ZERO (&read_fds);
for (srv = server_head; srv; srv = srv->next)
{
FD_SET (srv->fd, &read_fds);
if (srv->fd > max_fd)
max_fd = srv->fd;
}
while (tagr_idle () == 0)
{
fd_set rfds = read_fds;
int rc = select (max_fd + 1, &rfds, NULL, NULL, NULL);
if (rc < 0)
{
if (errno != EINTR)
logmsg (L_ERR, "select: %s", strerror (errno));
continue;
}
for (srv = server_head; srv; srv = srv->next)
if (FD_ISSET (srv->fd, &rfds))
server_tab[srv->type].srv_run (srv);
}
}