/* This file is part of vmod-binlog
Copyright (C) 2013, 2018 Sergey Poznyakoff
Vmod-binlog is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3, or (at your option)
any later version.
Vmod-binlog is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with vmod-binlog. If not, see .
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "vmod-binlog.h"
#include "pack.h"
#include "err.h"
#include "xalloc.h"
#include "parse-datetime.h"
char *timefmt = "%c";
int number_option;
int verbose_option;
int timediff_option;
char *directory;
char *pattern;
enum binlog_index_type index_type = index_year;
time_t last_ts;
char *module_name;
char *module_args;
void (*module_init)(int, char *, void (*)(const char *, const char *, const char *));
int (*module_open)(const char *, size_t, const char *);
void (*module_close)(void);
void (*module_done)(void);
void (*module_record)(const char *, time_t, void *);
/* Time mask flags indicate which timestamp is set. */
#define START_TIME 0x01 /* start time is set */
#define STOP_TIME 0x02 /* stop time is set */
/* The two flags below are valid only for global timemask. They
are set when building interval list and are then used in main to
reset the START_TIME and STOP_TIME bits. */
#define CLEAR_START_TIME 0x04 /* Reset start time */
#define CLEAR_STOP_TIME 0x08 /* Reset stop time */
int timemask;
time_t start_time, to_time;
static int matchnames(const char *dir, const char *pat, glob_t *gl);
void selglob(const char *dir, const char *pattern);
struct interval {
struct interval *next;
char *name;
char *descr;
int timemask;
time_t start;
time_t end;
};
static struct interval *interval_head, *interval_tail;
size_t interval_count;
void
interval_add(const char *name, const char *descr,
int tmask, time_t start, time_t end)
{
struct interval *p = xmalloc(sizeof(*p));
p->next = NULL;
p->name = xstrdup(name);
p->descr = xstrdup(descr);
p->timemask = tmask;
p->start = start;
p->end = end;
if (interval_tail)
interval_tail->next = p;
else
interval_head = p;
interval_tail = p;
++interval_count;
if (tmask & START_TIME) {
if (!(timemask & START_TIME) || start_time > start) {
start_time = start;
timemask |= START_TIME;
}
} else
timemask |= CLEAR_START_TIME;
if (tmask & STOP_TIME) {
if (!(timemask & STOP_TIME) || to_time < end) {
to_time = end;
timemask |= STOP_TIME;
}
} else
timemask |= CLEAR_STOP_TIME;
}
void
interval_add_str(const char *id, const char *from, const char *to)
{
struct timespec ts;
time_t start, end;
int tmask = 0;
if (from) {
if (!parse_datetime(&ts, from, NULL)) {
error("invalid timespec: %s", from);
exit(1);
}
start = ts.tv_sec;
tmask |= START_TIME;
}
if (to) {
if (!parse_datetime(&ts, to, NULL)) {
error("invalid timespec: %s", to);
exit(1);
}
end = ts.tv_sec;
tmask |= STOP_TIME;
}
if (tmask) {
char *descr = xmalloc((from ? strlen(from) : 0)
+ (from ? strlen(from) : 0) + 1);
descr[0] = 0;
if (from)
strcat(descr, from);
strcat(descr, ":");
if (to)
strcat(descr, to);
interval_add(id, descr, tmask, start, end);
free(descr);
}
}
void
help()
{
printf("usage: %s [OPTIONS] [FILE...]\n", progname);
printf("Select records from binary logs\n");
printf("\nOPTIONS are:\n\n");
printf("File selection:\n");
printf(" -D DIR log file storage directory\n");
printf(" -i 0|1|2 select directory indexing level\n");
printf(" -p PATTERN select files matching PATTERN\n");
printf("\n");
printf("Time intervals:\n");
printf(" -I TAG set tag for the subsequent -F and -T option pair\n");
printf(" -F TIME print records starting from TIME\n");
printf(" -T TIME print records starting up to TIME\n");
printf("\n");
printf("Loadable module control\n");
printf(" -L DIR append DIR to the loadable module search path\n");
printf(" -P DIR add DIR to the loadable module search path before\n");
printf(" the default module directory\n");
printf(" -m 'MODULE[ ARGS]'\n");
printf(" load given MODULE\n");
printf("\n");
printf("Output control:\n");
printf(" -d print timestamps relative to first record in the file\n");
printf(" -n output record numbers\n");
printf(" -v print information about each file\n");
printf(" -t FMT format timestamps according to FMT\n");
printf("\n");
printf("Informational options:\n");
printf(" -h print this help summary\n");
printf(" -V show program version\n");
printf("\n");
printf("Report bugs and suggestions to <%s>\n", PACKAGE_BUGREPORT);
if (sizeof(PACKAGE_URL) > 1)
printf("%s home page: <%s>\n", PACKAGE_NAME, PACKAGE_URL);
}
/* Convert strftime-like pattern into globbing pattern */
char *
convpattern(const char *dir)
{
char *p, *q;
char *newpat;
size_t size = strlen(pattern) + 1;
if (dir)
size += strlen(dir) + 1;
newpat = xmalloc(size);
p = newpat;
if (dir) {
strcpy(p, dir);
p += strlen(dir);
*p++ = '/';
}
for (q = pattern; *q; ) {
if (*q == '%') {
if (p > newpat && p[-1] != '*')
*p++ = '*';
q += 2;
} else
*p++ = *q++;
}
*p = 0;
return newpat;
}
#define getrec(base, recsize, n) \
((struct binlog_record*)((char *)(base) + (n) * (recsize)))
int
searchts(void *base, size_t recsize, size_t from, size_t to, time_t ts,
size_t *ret)
{
struct binlog_record *rec;
size_t middle;
rec = getrec(base, recsize, from);
if (ts < rec->ts) {
*ret = from;
return -1;
}
if (to == from) {
*ret = from;
return 0;
}
rec = getrec(base, recsize, to);
if (ts > rec->ts) {
*ret = from;
return 1;
}
middle = (to + from) / 2;
rec = getrec(base, recsize, middle);
if (ts == rec->ts) {
*ret = middle;
return 0;
}
if (ts > rec->ts)
return searchts(base, recsize, middle + 1, to, ts, ret);
return searchts(base, recsize, from, middle, ts, ret);
}
void
selmem(const char *name, void *base)
{
struct binlog_file_header const *hdr;
struct binlog_record const *rec;
struct packenv *env;
struct packinst *inst;
char *p;
size_t i, start;
time_t start_ts;
char timebuf[128];
char *format;
struct interval *ip;
size_t datasize;
hdr = base;
if (memcmp(hdr->magic, BINLOG_MAGIC_STR, BINLOG_MAGIC_LEN)) {
error("%s is not a binlog file", name);
return;
}
if (hdr->version != BINLOG_VERSION) {
error("%s: unknown version", name);
return;
}
format = (char*)(hdr + 1);
if (verbose_option)
printf("# %s; format=%s; recsize=%lu; recnum=%lu\n",
name, format, (unsigned long) hdr->recsize,
(unsigned long) hdr->recnum);
datasize = hdr->recsize - offsetof(struct binlog_record,data);
if (module_open && module_open(name, datasize, format)) {
error("%s: rejected by module %s", name, module_name);
return;
}
inst = packcomp(format, &p);
if (!inst) {
if (errno == EINVAL) {
error("%s: %s: bad format near %s", name,
format, p);
return;
}
error("%s", strerror(errno));
return;
}
env = packenv_create(datasize);
env->fp = stdout;
base = (char*)base + hdr->hdrsize;
if (timemask & START_TIME) {
switch (searchts(base, hdr->recsize, 0, hdr->recnum - 1,
start_time, &start)) {
case 0:
break;
case -1:
for (; start < hdr->recnum; start++) {
if (getrec(base, hdr->recsize, start)->ts > start_time)
break;
}
break;
case 1:
for (; start >= 0; start--) {
if (start_time <
getrec(base, hdr->recsize, start)->ts) {
++start;
break;
}
}
if (start == 0)
return;
}
} else
start = 0;
for (i = 0; start < hdr->recnum; i++, start++) {
time_t ts;
rec = getrec(base, hdr->recsize, start);
if ((timemask & STOP_TIME) && rec->ts > to_time)
break;
ts = rec->ts;
if (timediff_option) {
if (i == 0)
start_ts = ts;
ts -= start_ts;
}
strftime(timebuf, sizeof timebuf, timefmt,
localtime(&rec->ts));
memcpy(env->buf_base, rec->data, env->buf_size);
for (ip = interval_head; ip; ip = ip->next) {
if ((ip->timemask & START_TIME) && ip->start > rec->ts)
continue;
if ((ip->timemask & STOP_TIME) && ip->end < rec->ts)
continue;
last_ts = rec->ts;
if (module_record)
module_record(ip->name, ts, env->buf_base);
else {
if (ip->name)
printf("%s ", ip->name);
else if (ip == interval_head && ip->next)
printf("default ");
if (number_option)
printf("%lu ", (unsigned long) start);
printf("%s ", timebuf);
env->buf_pos = 0;
packout(inst, env);
fputc('\n', stdout);
}
}
}
if (module_close)
module_close();
}
static int
fchecktime(FILE *fp, const char *fname, time_t *ts)
{
struct binlog_file_header header;
struct binlog_record rec;
time_t start, end;
if (fread(&header, sizeof(header), 1, fp) != 1) {
error("error reading header of %s: %s",
fname, strerror(errno));
return -1;
}
if (memcmp(header.magic, BINLOG_MAGIC_STR, BINLOG_MAGIC_LEN)) {
error("%s is not a binlog file", fname);
return -1;
}
if (header.version != BINLOG_VERSION) {
error("%s: unknown version", fname);
return -1;
}
if (fseek(fp, header.hdrsize, SEEK_SET)) {
error("%s: seek error: %s", fname, strerror(errno));
return -1;
}
if (fread(&rec, sizeof(rec), 1, fp) != 1) {
error("%s: %s", fname, strerror(errno));
return -1;
}
start = rec.ts;
if (fseek(fp, header.hdrsize +
header.recsize * (header.recnum - 1),
SEEK_SET)) {
error("%s: seek error: %s", fname, strerror(errno));
return -1;
}
if (fread(&rec, sizeof(rec), 1, fp) != 1) {
error("%s: %s", fname, strerror(errno));
return -1;
}
end = rec.ts;
if ((timemask & STOP_TIME) && header.recnum > 1) {
if (to_time < start)
return 1;
if (timemask & START_TIME) {
if (start_time > end)
return 1;
}
} else if ((timemask & START_TIME) && start_time > end)
return 1;
*ts = start;
return 0;
}
static int
checktime(const char *name, time_t *ts)
{
int rc;
FILE *fp = fopen(name, "r");
if (!fp) {
error("can't open %s: %s", name, strerror(errno));
return -1;
}
rc = fchecktime(fp, name, ts);
fclose(fp);
return rc;
}
void
selfile(char *name)
{
int fd;
struct stat st;
void *base;
fd = open(name, O_RDONLY);
if (fd == -1) {
error("can't open %s: %s", name, strerror(errno));
exit(1);
}
if (fstat(fd, &st)) {
error("can't stat %s: %s", name, strerror(errno));
exit(1);
}
base = mmap((caddr_t)0, st.st_size,
PROT_READ, MAP_SHARED,
fd, 0);
if (base == MAP_FAILED) {
error("mmap(%s): %s", name, strerror(errno));
exit(1);
}
selmem(name, base);
munmap(base, st.st_size);
close(fd);
}
void
selfilelist(char **argv)
{
for (;*argv;++argv)
selfile(*argv);
}
static char *
mkfilename(const char *dir, const char *file)
{
size_t dirlen, size;
char *ret;
if (!dir)
return xstrdup(file);
dirlen = strlen(dir);
while (dirlen > 0 && dir[dirlen-1] == '/')
--dirlen;
size = dirlen + 1 + strlen(file) + 1;
ret = xmalloc(size);
memcpy(ret, dir, dirlen);
ret[dirlen++] = '/';
strcpy(ret + dirlen, file);
return ret;
}
int
filename_to_int(char *name)
{
char *p = strrchr(name, '/');
if (!p)
abort();
return atoi(p + 1);
}
void
selidx_day(const char *dir)
{
int from_day, to_day;
glob_t gl;
int glinit = 0;
char *dirbuf;
size_t dirlen;
if (access(dir, F_OK)) {
if (errno == ENOENT)
return;
else
error("can't access %s: %s", dir, strerror(errno));
}
if (index_type == index_month) {
selglob(dir, BINLOG_GLOB_PATTERN);
return;
}
if (timemask & START_TIME)
from_day = gmtime(&start_time)->tm_mday;
else {
glinit = matchnames(dir, "[0-9][0-9]", &gl);
if (glinit)
from_day = filename_to_int(gl.gl_pathv[0]);
else {
error("no matching files");
return;
}
}
if (timemask & STOP_TIME)
to_day = gmtime(&to_time)->tm_mday;
else {
if (!glinit) {
glinit = matchnames(dir, "[0-9][0-9]", &gl);
if (!glinit) {
error("no matching files");
return;
}
}
to_day = filename_to_int(gl.gl_pathv[gl.gl_pathc - 1]);
}
dirlen = strlen(dir) + 4;
dirbuf = xmalloc(dirlen);
for (; from_day <= to_day; from_day++) {
snprintf(dirbuf, dirlen, "%s/%02d", dir, from_day);
selglob(dirbuf, BINLOG_GLOB_PATTERN);
}
free(dirbuf);
if (glinit)
globfree(&gl);
}
void
selidx_month(const char *dir)
{
int from_month, to_month;
glob_t gl;
int glinit = 0;
char *dirbuf;
size_t dirlen;
if (access(dir, F_OK)) {
if (errno == ENOENT)
return;
else
error("can't access %s: %s", dir, strerror(errno));
}
if (index_type == index_year) {
selglob(dir, BINLOG_GLOB_PATTERN);
return;
}
if (timemask & START_TIME)
from_month = 1 + gmtime(&start_time)->tm_mon;
else {
glinit = matchnames(dir, "[0-9][0-9]", &gl);
if (glinit)
from_month = filename_to_int(gl.gl_pathv[0]);
else {
error("no matching files");
return;
}
}
if (timemask & STOP_TIME)
to_month = 1 + gmtime(&to_time)->tm_mon;
else {
if (!glinit) {
glinit = matchnames(dir, "[0-9][0-9]", &gl);
if (!glinit) {
error("no matching files");
return;
}
}
to_month = filename_to_int(gl.gl_pathv[gl.gl_pathc - 1]);
}
dirlen = strlen(dir) + 4;
dirbuf = xmalloc(dirlen);
for (; from_month <= to_month; from_month++) {
snprintf(dirbuf, dirlen, "%s/%02d", dir, from_month);
selidx_day(dirbuf);
}
free(dirbuf);
if (glinit)
globfree(&gl);
}
void
selidx_year(const char *dir)
{
int from_year, to_year;
glob_t gl;
int glinit = 0;
char *dirbuf;
size_t dirlen;
if (access(dir, F_OK)) {
if (errno == ENOENT)
return;
else
error("can't access %s: %s", dir, strerror(errno));
}
if (timemask & START_TIME)
from_year = 1900 + gmtime(&start_time)->tm_year;
else {
glinit = matchnames(dir, "[0-9][0-9][0-9][0-9]", &gl);
if (glinit)
from_year = filename_to_int(gl.gl_pathv[0]);
else {
error("no matching files");
return;
}
}
if (timemask & STOP_TIME)
to_year = 1900 + gmtime(&to_time)->tm_year;
else {
if (!glinit) {
glinit = matchnames(dir, "[0-9][0-9][0-9][0-9]", &gl);
if (!glinit) {
error("no matching files");
return;
}
}
to_year = filename_to_int(gl.gl_pathv[gl.gl_pathc - 1]);
}
dirlen = strlen(dir) + 6;
dirbuf = xmalloc(dirlen);
for (; from_year <= to_year; from_year++) {
snprintf(dirbuf, dirlen, "%s/%04d", dir, from_year);
selidx_month(dirbuf);
timemask &= ~START_TIME;
}
free(dirbuf);
if (glinit)
globfree(&gl);
}
int
globerrfunc(const char *epath, int eerrno)
{
error("%s: %s", epath, strerror(eerrno));
return 0;
}
static int
matchnames(const char *dir, const char *pat, glob_t *gl)
{
char *p = mkfilename(dir, pat);
int rc = glob(p, GLOB_ERR|GLOB_ONLYDIR, globerrfunc, gl);
free(p);
switch (rc) {
case 0:
break;
case GLOB_NOSPACE:
error("out of memory");
exit(1);
case GLOB_ABORTED:
error("read error");
exit(1);
case GLOB_NOMATCH:
return 0;
}
return 1;
}
struct logfile {
char *name;
time_t start;
};
static int
tsort(const void *a, const void *b)
{
struct logfile const *la = a;
struct logfile const *lb = b;
if (la->start > lb->start)
return 1;
if (la->start < lb->start)
return -1;
return 0;
}
void
selglob(const char *dir, const char *pattern)
{
size_t i, j;
glob_t gl;
struct logfile *logfiles;
char *p = mkfilename(dir, pattern);
switch (glob(p, GLOB_ERR|GLOB_NOSORT, globerrfunc, &gl)) {
case 0:
break;
case GLOB_NOSPACE:
error("out of memory");
exit(1);
case GLOB_ABORTED:
error("read error");
exit(1);
case GLOB_NOMATCH:
error("no files matched pattern");
exit(1);
}
free(p);
logfiles = xcalloc(gl.gl_pathc, sizeof(*logfiles));
for (i = j = 0; i < gl.gl_pathc; i++) {
time_t t;
if (checktime(gl.gl_pathv[i], &t) == 0) {
logfiles[j].name = gl.gl_pathv[i];
logfiles[j].start = t;
++j;
}
}
/* sort the returned array */
qsort(logfiles, j, sizeof(*logfiles), tsort);
for (i = 0; i < j; i++)
selfile(logfiles[i].name);
free(logfiles);
globfree(&gl);
}
#define LP_PREPEND 0
#define LP_APPEND 1
struct libdir {
struct libdir *next;
struct libdir *prev;
const char *dir;
};
struct libdir *libdir_head, *libdir_mid, *libdir_tail;
void
add_load_path(const char *dir, int where)
{
struct libdir *ld = xmalloc(sizeof(ld[0]));
ld->dir = dir;
if (!libdir_mid) {
ld->next = ld->prev = NULL;
libdir_mid = libdir_head = libdir_tail = ld;
} else if (where == LP_PREPEND) {
ld->next = libdir_mid;
ld->prev = libdir_mid->prev;
if (ld->prev)
ld->prev->next = libdir_mid;
else
libdir_head = ld;
libdir_mid->prev = ld;
} else /* if (where == LT_APPEND) */ {
ld->next = NULL;
ld->prev = libdir_tail;
libdir_tail->next = ld;
libdir_tail = ld;
}
}
void
loader_init(void)
{
struct libdir *ld;
lt_dlhandle handle = NULL;
lt_dladvise advise = NULL;
if (!module_name)
return;
lt_dlinit();
for (ld = libdir_head; ld; ld = ld->next)
lt_dladdsearchdir(ld->dir);
if (!lt_dladvise_init(&advise) && !lt_dladvise_ext(&advise)
&& !lt_dladvise_global(&advise))
handle = lt_dlopenadvise(module_name, advise);
lt_dladvise_destroy(&advise);
if (!handle) {
error("cannot load module %s: %s", module_name,
lt_dlerror());
exit(1);
}
module_record = lt_dlsym(handle, "record");
if (!module_record) {
error("%s: faulty module: function record() not defined",
module_name);
exit(1);
}
module_init = lt_dlsym(handle, "init");
module_open = lt_dlsym(handle, "open");
module_close = lt_dlsym(handle, "close");
module_done = lt_dlsym(handle, "done");
}
/* Status file format:
timestamp LF
N LF
INT-1-DESCR LF
...
INT-N-DESCR LF
*/
int
read_status_fp(FILE *fp)
{
struct interval *ip;
unsigned long i;
char buf[256];
unsigned long n;
time_t ts;
if (fscanf(fp, "%lu\n", &n) != 1)
return 1;
ts = (time_t) n;
if (fscanf(fp, "%lu\n", &n) != 1)
return 1;
if (n != interval_count)
return 1;
for (ip = interval_head; ip; ip = ip->next) {
char c;
char *p = ip->descr;
do {
if ((c = fgetc(fp)) == EOF)
return 1;
if (c == '\n')
c = 0;
if (c != *p++)
return 1;
} while (c);
}
if (ferror(fp))
return 1;
if (fgetc(fp) != EOF)
return 1;
timemask |= START_TIME;
start_time = ts + 1;
last_ts = start_time;
return 0;
}
int
read_status_file(char const *name)
{
FILE *fp;
int rc;
fp = fopen(name, "r");
if (!fp) {
if (errno == ENOENT)
return 0;
error("cannot open '%s' for reading: %s",
name, strerror(errno));
return 1;
}
rc = read_status_fp(fp);
fclose(fp);
return rc;
}
int
write_status_file(char const *name)
{
FILE *fp;
struct interval *ip;
unsigned long i;
fp = fopen(name, "w");
if (!fp) {
error("cannot open '%s' for writing: %s",
name, strerror(errno));
return 1;
}
fprintf(fp, "%lu\n", (unsigned long) last_ts);
fprintf(fp, "%lu\n", (unsigned long) interval_count);
/* Save intervals */
for (ip = interval_head; ip; ip = ip->next) {
fprintf(fp, "%s\n", ip->descr);
}
fclose(fp);
}
#define F_DRY_RUN 0x01
#define F_INCREMENTAL 0x02
int
main(int argc, char **argv)
{
int c;
struct timespec ts;
const char *id;
char *start = NULL, *stop = NULL;
char *p;
char *status_file_name = NULL;
int flags = 0;
setprogname(argv[0]);
add_load_path(BINLOGSEL_MODDIR, LP_APPEND);
while ((c = getopt(argc, argv, "D:dF:hi:I:L:m:Nns:p:P:T:t:Vv")) != EOF)
switch (c) {
case 'D':
directory = optarg;
break;
case 'd':
timediff_option = 1;
timefmt = "%s";
break;
case 'F':
start = optarg;
break;
case 'h':
help();
return 0;
case 'I':
if (start || stop)
interval_add_str(id, start, stop);
start = stop = NULL;
id = optarg;
break;
case 'i':
index_type = atoi(optarg);
if (index_type < 0 || index_type > index_last) {
error("invalid index type: %s", optarg);
exit(1);
}
break;
case 'L':
add_load_path(optarg, LP_APPEND);
break;
case 'm':
p = strchr(optarg, ' ');
if (p) {
*p++ = 0;
while (*p == ' ')
p++;
if (*p)
module_args = p;
}
module_name = optarg;
break;
case 'n':
flags |= F_DRY_RUN;
break;
case 'P':
add_load_path(optarg, LP_PREPEND);
break;
case 'p':
pattern = optarg;
break;
case 's':
flags |= F_INCREMENTAL;
status_file_name = optarg;
break;
case 'T':
stop = optarg;
break;
case 't':
timefmt = optarg;
break;
case 'N':
number_option = 1;
break;
case 'V':
version();
exit(0);
case 'v':
verbose_option = 1;
break;
default:
exit(1);
}
if (start || stop)
interval_add_str(id, start, stop);
argc -= optind;
argv += optind;
loader_init();
if (module_init)
module_init(flags, module_args, interval_add_str);
if (timemask & CLEAR_START_TIME)
timemask &= ~START_TIME;
if (timemask & CLEAR_STOP_TIME)
timemask &= ~STOP_TIME;
if (status_file_name)
read_status_file(status_file_name);
if (argc) {
if (pattern) {
error("either files or pattern (-p) must be given, "
"but not both");
exit(1);
}
selfilelist(argv);
} else if (pattern) {
selglob(directory, convpattern(pattern));
} else {
if (!directory)
directory = ".";
selidx_year(directory);
}
if (module_done)
module_done();
if (status_file_name && !(flags & F_DRY_RUN))
write_status_file(status_file_name);
exit(0);
}