/* This file is part of vmod-binlog
Copyright (C) 2013 Sergey Poznyakoff
Vmod-binlog is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3, or (at your option)
any later version.
Vmod-binlog is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with vmod-binlog. If not, see .
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "vrt.h"
#include "vcc_if.h"
#include "bin/varnishd/cache.h"
#include "vmod-binlog.h"
#include "pack.h"
#ifndef O_SEARCH
# define O_SEARCH 0
#endif
#define BLF_ROUNDTS 0x01
#define BLF_TRUNCATE 0x02
enum binlog_state {
state_init,
state_start,
state_pack
};
struct binlog_config {
size_t size; /* maximum file size */
unsigned interval; /* file rotation interval */
char *pattern; /* file name pattern */
int umask; /* umask for new files and directories */
char *dir; /* root storage directory */
int dd; /* directory descriptor */
char *fname; /* current file name */
int fd; /* current file descriptor */
struct binlog_file_header *base; /* mmap base */
char *recbase; /* record base */
size_t recnum; /* number of records in recbase */
size_t recsize; /* record size */
time_t stoptime; /* when to rotate the current file */
pthread_mutex_t mutex;
int debug; /* debug level */
int flags; /* flags (see BLF_* defines above) */
char *dataspec; /* data format specification */
struct packinst *inst_head; /* compiled dataspec */
struct packinst *inst_cur; /* current instruction */
struct packenv *env; /* pack environment */
enum binlog_state state; /* binlog machine state */
time_t timestamp; /* timestamp for the entry being built */
};
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void
binlog_error(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
vsyslog(LOG_DAEMON|LOG_ERR, fmt, ap);
va_end(ap);
}
void
binlog_debug(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
vsyslog(LOG_DAEMON|LOG_DEBUG, fmt, ap);
va_end(ap);
}
#define debug(c,l,s) do { if ((c)->debug>=(l)) binlog_debug s; } while(0)
void
packerror(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
vsyslog(LOG_DAEMON|LOG_NOTICE, fmt, ap);
va_end(ap);
}
int
module_init(struct vmod_priv *priv, const struct VCL_conf *vclconf)
{
struct binlog_config *conf = calloc(1, sizeof(*conf));
AN(conf);
priv->priv = conf;
return 0;
}
static char *
findparam(const char *param, char *name)
{
const char *p;
char *q;
while (*param) {
for (p = param, q = name; *p && *q && *p == *q; p++, q++);
for (param = p; *param && *param != ';'; param++);
if (*q == 0 && *p == '=') {
size_t len = param - p - 1;
q = malloc(len + 1);
AN(q);
memcpy(q, p + 1, len);
q[len] = 0;
return q;
}
if (*param)
++param;
}
return NULL;
}
static unsigned long
getinterval(char *p, char **endp)
{
int n;
unsigned long hours = 0, minutes = 0, seconds = 0;
for (;;) {
n = 0;
while (*p && isdigit(*p)) {
n = 10*n + *p - '0';
p++;
}
switch (*p) {
case 'h':
case 'H':
if (hours) {
*endp = p;
return -1;
}
hours = n;
break;
case 'm':
case 'M':
if (minutes) {
*endp = p;
return -1;
}
minutes = n;
break;
case 's':
case 'S':
if (seconds) {
*endp = p;
return -1;
}
seconds = n;
break;
default:
*endp = p;
if (!hours && !minutes && !seconds)
return n;
return (hours*60 + minutes)*60 + seconds;
}
p++;
}
}
static struct indexdef {
char *name;
char *pat;
} indextab[] = {
{ "year", "%Y" },
{ "0", "%Y" },
{ "month", "%Y/%m" },
{ "1", "%Y/%m" },
{ "day", "%Y/%m/%d" },
{ "2", "%Y/%m/%d" },
{ NULL }
};
static char *
getindexpat(const char *name)
{
struct indexdef *p;
for (p = indextab; p->name; p++)
if (strcmp(p->name, name) == 0)
return p->pat;
return NULL;
}
void
vmod_init(struct sess *sp, struct vmod_priv *priv,
const char *dir, const char *dataspec, const char *param)
{
struct binlog_config *conf = priv->priv;
struct stat st;
char *p, *q;
unsigned long n;
int user_pattern = 0;
p = findparam(param, "debug");
if (p) {
conf->debug = atoi(p);
free(p);
}
if (stat(dir, &st)) {
if (errno == ENOENT)
binlog_error("logging directory does not exist");
else
binlog_error("cannot stat logging directory %s: %s",
dir, strerror(errno));
abort();
}
if (!S_ISDIR(st.st_mode)) {
binlog_error("%s exists, but is not a directory");
abort();
}
conf->dd = open(dir, O_SEARCH | O_DIRECTORY);
if (conf->dd == -1) {
binlog_error("cannot open directory %s: %s",
dir, strerror(errno));
abort();
}
conf->dir = strdup(dir);
AN(conf->dir);
conf->inst_head = packcomp(dataspec, &p);
if (!conf->inst_head) {
if (errno == EINVAL)
binlog_error("cannot compile data format near %s", p);
else
binlog_error("%s", strerror(errno));
abort();
}
conf->recsize = packsize(conf->inst_head);
conf->env = packenv_create(conf->recsize);
AN(conf->env);
conf->recsize += offsetof(struct binlog_record,data);
conf->dataspec = strdup(dataspec);
AN(conf->dataspec);
p = findparam(param, "pattern");
if (!p) {
p = strdup(BINLOG_PATTERN);
AN(p);
} else
user_pattern = 1;
conf->pattern = p;
p = findparam(param, "index");
if (p) {
q = getindexpat(p);
if (!q) {
binlog_error("invalid index type");
abort();
}
} else if (!user_pattern) {
q = getindexpat(BINLOG_INDEX);
AN(q);
} else
q = NULL;
if (q) {
p = malloc(strlen(q) + strlen(conf->pattern) + 2);
AN(p);
strcpy(p, q);
strcat(p, "/");
strcat(p, conf->pattern);
free(conf->pattern);
conf->pattern = p;
}
p = findparam(param, "size");
if (p) {
uintmax_t u;
errno = 0;
u = strtoul(p, &q, 10);
if (errno) {
binlog_error("invalid size value");
abort();
}
free(p);
switch (*q) {
case 'g':
case 'G':
u <<= 10;
case 'm':
case 'M':
u <<= 10;
case 'k':
case 'K':
u <<= 10;
}
if (u > SIZE_T_MAX) {
binlog_error("invalid size value");
abort();
}
conf->size = u;
} else
conf->size = BINLOG_SIZE;
p = findparam(param, "interval");
if (p) {
conf->interval = getinterval(p, &q);
free(p);
if (*q) {
binlog_error("invalid interval (near \"%s\")", q);
abort();
}
} else
conf->interval = BINLOG_INTERVAL;
p = findparam(param, "umask");
if (p) {
n = strtoul(p, &q, 8);
free(p);
if (n & ~0777) {
binlog_error("umask out of range");
abort();
}
if (*q) {
binlog_error("invalid umask (near \"%s\")", q);
abort();
}
} else
conf->umask = BINLOG_UMASK;
p = findparam(param, "roundts");
if (p) {
if (atoi(p))
conf->flags |= BLF_ROUNDTS;
else
conf->flags &= ~BLF_ROUNDTS;
free(p);
}
p = findparam(param, "reuselog");
if (p) {
if (atoi(p))
conf->flags &= ~BLF_TRUNCATE;
else
conf->flags |= BLF_TRUNCATE;
free(p);
}
conf->fd = -1;
conf->base = NULL;
conf->stoptime = time(NULL);
pthread_mutex_init(&conf->mutex, NULL);
}
static char *
mkfilename(struct sess *sp, struct binlog_config *conf)
{
time_t ts = time(NULL);
size_t u, n;
char *p, *q;
if (conf->flags & BLF_ROUNDTS)
ts -= ts % conf->interval;
u = WS_Reserve(sp->wrk->ws, 0);
p = sp->wrk->ws->f;
n = strftime(p, u, conf->pattern, gmtime(&ts));
if (n == 0) {
WS_Release(sp->wrk->ws, 0);
return NULL;
}
q = strdup(p);
AN(q);
WS_Release(sp->wrk->ws, 0);
return q;
}
static int
mkdir_p(struct binlog_config *conf, char *dir)
{
int rc = 0;
char *p;
struct stat st;
for (p = dir; rc == 0 && *p; p++) {
if (*p != '/')
continue;
*p = 0;
rc = fstatat(conf->dd, dir, &st, 0);
if (rc) {
if (errno == ENOENT) {
rc = mkdirat(conf->dd, dir,
0777 & ~conf->umask);
if (rc)
binlog_error("cannot create %s: %s",
dir,
strerror(errno));
} else
binlog_error("cannot stat %s: %s", dir,
strerror(errno));
} else if (!S_ISDIR(st.st_mode)) {
binlog_error("component \"%s\" is not a directory",
dir);
rc = -1;
}
*p = '/';
}
return rc;
}
static int
createfile(struct sess *sp, struct binlog_config *conf)
{
char *fname;
int fd;
conf->fname = NULL;
conf->fd = -1;
fname = mkfilename(sp, conf);
if (!fname)
return -1;
if (mkdir_p(conf, fname)) {
free(fname);
return -1;
}
fd = openat(conf->dd, fname, O_CREAT|O_RDWR,
0666 & ~conf->umask);
if (fd == -1) {
binlog_error("cannot create log file %s/%s: %s",
conf->dir, fname, strerror(errno));
free(fname);
}
conf->fname = fname;
conf->fd = fd;
return 0;
}
static void
reset(struct binlog_config *conf)
{
conf->fname = NULL;
conf->fd = -1;
conf->base = NULL;
conf->recbase = NULL;
conf->recnum = 0;
}
static int
setstoptime(struct binlog_config *conf)
{
time_t ts;
ts = time(NULL);
conf->stoptime = ts - ts % conf->interval + conf->interval;
}
#define binlog_recnum(conf) \
(((conf)->size - (conf)->base->hdrsize) / (conf)->base->recsize)
static int
checkheader(struct binlog_config *conf, size_t hdrsize)
{
struct binlog_file_header header;
int c;
ssize_t rc;
char *p;
rc = read(conf->fd, &header, sizeof(header));
if (rc == -1) {
binlog_error("error reading header of %s/%s: %s",
conf->dir, conf->fname, strerror(errno));
return -1;
} else if (rc != sizeof(header)) {
binlog_error("error reading header of %s/%s: %s",
conf->dir, conf->fname, "hit eof");
return -1;
}
if (memcmp(header.magic, BINLOG_MAGIC_STR, BINLOG_MAGIC_LEN)) {
binlog_error("%s/%s is not a binlog file",
conf->dir, conf->fname);
return -1;
}
if (header.version != BINLOG_VERSION) {
binlog_error("%s/%s: unknown version", conf->dir, conf->fname);
return -1;
}
if (header.hdrsize != hdrsize) {
debug(conf,1,("%s/%s: header size mismatch",
conf->dir, conf->fname));
return 1;
}
if (header.recsize != conf->recsize) {
debug(conf,1,("%s/%s: record size mismatch",
conf->dir, conf->fname));
return 1;
}
p = conf->dataspec;
while (*p) {
if (read(conf->fd, &c, 1) != 1 || c != *p) {
debug(conf,1,("%s/%s: dataspec mismatch near %s: %c",
conf->dir, conf->fname, p, c));
return 1;
}
++p;
}
if (read(conf->fd, &c, 1) != 1 || c != 0) {
debug(conf,1,("%s/%s: dataspec mismatch at the end: %c",
conf->dir, conf->fname, c));
return 1;
}
return 0;
}
static int
newfile(struct sess *sp, struct binlog_config *conf)
{
int c;
void *base;
size_t hdrsize;
struct stat st;
int reuse = 0;
setstoptime(conf);
if (createfile(sp, conf))
return -1;
hdrsize = ((sizeof(struct binlog_file_header) +
strlen(conf->dataspec) +
conf->recsize - 1) / conf->recsize) * conf->recsize;
if (fstat(conf->fd, &st) == 0) {
/* File already exists */
if (st.st_size > 0 &&
!(conf->flags & BLF_TRUNCATE) &&
checkheader(conf, hdrsize) == 0) {
reuse = 1;
} else {
binlog_error("truncating existing file %s/%s",
conf->dir, conf->fname);
ftruncate(conf->fd, 0);
}
} else {
binlog_error("can't stat %s/%s: %s",
conf->dir, conf->fname, strerror(errno));
/* try to continue anyway */
}
conf->flags |= BLF_TRUNCATE;
if (lseek(conf->fd, conf->size, SEEK_SET) == -1) {
binlog_error("seek in log file %s/%s failed: %s",
conf->dir, conf->fname, strerror(errno));
if (!reuse)
unlinkat(conf->dd, conf->fname, 0);
close(conf->fd);
free(conf->fname);
reset(conf);
return -1;
}
c = 0;
write(conf->fd, &c, 1);
base = mmap((caddr_t)0, conf->size,
PROT_READ|PROT_WRITE, MAP_SHARED,
conf->fd, 0);
if (base == MAP_FAILED) {
binlog_error("mmap: %s", strerror(errno));
if (!reuse)
unlinkat(conf->dd, conf->fname, 0);
close(conf->fd);
free(conf->fname);
reset(conf);
return -1;
}
conf->base = base;
if (reuse) {
debug(conf,1,("reusing log file %s, recnum=%lu",
conf->fname, (unsigned long)conf->base->recnum));
} else {
debug(conf,1,("created new log file %s",conf->fname));
memcpy(conf->base->magic, BINLOG_MAGIC_STR, BINLOG_MAGIC_LEN);
conf->base->version = BINLOG_VERSION;
conf->base->recsize = conf->recsize;
conf->base->recnum = 0;
strcpy((char*)(conf->base + 1), conf->dataspec);
conf->base->hdrsize = hdrsize;
}
conf->recbase = (char *) conf->base + conf->base->hdrsize;
conf->recnum = binlog_recnum(conf);
return 0;
}
static void
closefile(struct sess *sp, struct binlog_config *conf)
{
size_t size;
if (conf->fd == -1)
return;
debug(conf,1,("closing log file %s",conf->fname));
size = binlog_size(conf->base);
munmap(conf->base, conf->size);
if (ftruncate(conf->fd, size))
binlog_error("error truncating \"%s/%s\": %s",
conf->dir, conf->fname, strerror(errno));
close(conf->fd);
free(conf->fname);
reset(conf);
}
void
vmod_start(struct sess *sp, struct vmod_priv *priv)
{
struct binlog_config *conf = priv->priv;
time_t ts;
if (!conf)
return;
ts = time(NULL);
if (ts >= conf->stoptime) {
AZ(pthread_mutex_lock(&conf->mutex));
closefile(sp, conf);
newfile(sp, conf);
AZ(pthread_mutex_unlock(&conf->mutex));
}
packenv_init(conf->env);
conf->state = state_start;
conf->inst_cur = packinit(conf->inst_head);
conf->timestamp = ts;
}
void
vmod_pack(struct sess *sp, struct vmod_priv *priv, const char *str)
{
struct binlog_config *conf = priv->priv;
char *argv[2];
if (!conf)
return;
switch (conf->state) {
case state_start:
case state_pack:
break;
default:
binlog_error("pack called in wrong state (%d)", conf->state);
return;
}
if (!conf->inst_cur) {
binlog_error("format spec exhausted");
return;
}
argv[0] = (char*) str;
argv[1] = NULL;
conf->env->argv = argv;
conf->env->argc = 2;
conf->env->argi = 0;
conf->inst_cur = packinnext(conf->inst_cur, conf->env);
conf->state = state_pack;
}
void
vmod_commit(struct sess *sp, struct vmod_priv *priv)
{
struct binlog_config *conf = priv->priv;
if (!conf)
return;
if (conf->fd == -1)
return;
switch (conf->state) {
case state_start:
binlog_error("committing empty binlog record");
break;
case state_pack:
if (conf->inst_cur)
binlog_error("committing incomplete binlog record");
break;
default:
binlog_error("pack called in wrong state (%d)", conf->state);
return;
}
AZ(pthread_mutex_lock(&conf->mutex));
if (conf->base->recnum == conf->recnum) {
binlog_error("overflow of %s/%s", conf->dir, conf->fname);
} else {
struct binlog_record *p =
(struct binlog_record *)(conf->recbase +
conf->base->recnum *
conf->recsize);
p->ts = conf->timestamp;
memcpy(p->data, conf->env->buf_base, conf->env->buf_size);
conf->base->recnum++;
}
AZ(pthread_mutex_unlock(&conf->mutex));
conf->state = state_init;
}
void
vmod_sync(struct sess *sp, struct vmod_priv *priv)
{
struct binlog_config *conf = priv->priv;
if (!conf)
return;
AZ(pthread_mutex_lock(&conf->mutex));
if (conf->base)
msync(conf->base, binlog_size(conf->base), 0);
AZ(pthread_mutex_unlock(&conf->mutex));
}
void
vmod_close(struct sess *sp, struct vmod_priv *priv)
{
struct binlog_config *conf = priv->priv;
if (!conf)
return;
mutex = conf->mutex;
AZ(pthread_mutex_lock(&mutex));
closefile(sp, conf);
close(conf->dd);
free(conf->dir);
free(conf->pattern);
free(conf);
AZ(pthread_mutex_unlock(&mutex));
}