/* This file is part of vmod-tbf
Copyright (C) 2013-2014 Sergey Poznyakoff
Vmod-tbf is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3, or (at your option)
any later version.
Vmod-tbf is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with vmod-tbf. If not, see .
*/
#include "tbf.h"
#include
#include
#include
#include
#include
static int debug_level;
static void
debugprt(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
vsyslog(LOG_DAEMON|LOG_DEBUG, fmt, ap);
va_end(ap);
}
#define debug(n,c) do { if (debug_level>=(n)) debugprt c; } while (0)
#ifndef USEC_PER_SEC
# define USEC_PER_SEC 1000000L
#endif
#define DEFDBNAME "tbf.bdb"
#define DEFOPENPARAMS "truncate"
#define DBFILEMODE 0640
static char *dbdir;
static char *dbname;
static DB_ENV *dbenv;
static DB *db;
static uint64_t autosync_max;
static uint64_t autosync_count;
static int tbf_disabled;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
/* The keylock structure serializes accesses to each db record, ensuring
that no other thread could modify the data between calls to get and
put */
struct keylock {
char *key; /* Key string */
unsigned refcnt; /* Reference count */
pthread_mutex_t mutex;
VTAILQ_ENTRY(keylock) list;
};
/* Keylock_head keeps a list of active (i.e. used by at least one thread)
keylocks. Keylock_avail keeps a list of available threads, to avoid
unnecessary memory allocations/frees. */
static VTAILQ_HEAD(, keylock) keylock_head, keylock_avail;
/* Find and return a keylock corresponding to the given key. If not found,
create it, either by getting an unused entry from keylock_avail or by
allocating a new one. */
static struct keylock *
keylock_find(const char *key)
{
struct keylock *kp;
VTAILQ_FOREACH(kp, &keylock_head, list) {
if (strcmp(kp->key, key) == 0) {
kp->refcnt++;
return kp;
}
}
if (VTAILQ_FIRST(&keylock_avail)) {
kp = VTAILQ_FIRST(&keylock_avail);
VTAILQ_REMOVE(&keylock_avail, kp, list);
} else {
kp = malloc(sizeof(*kp));
AN(kp);
pthread_mutex_init(&kp->mutex, NULL);
}
kp->key = strdup(key);
AN(kp->key);
kp->refcnt = 1;
VTAILQ_INSERT_TAIL(&keylock_head, kp, list);
return kp;
}
/* Thread-safe version of the above. */
static struct keylock *
keylock_find_safe(const char *key)
{
struct keylock *kp;
pthread_mutex_lock(&mutex);
kp = keylock_find(key);
pthread_mutex_unlock(&mutex);
return kp;
}
/* Remove keylock from keylock_head and attach it to keylock_avail for
eventual future use. */
static void
keylock_remove_safe(struct keylock *kp)
{
pthread_mutex_lock(&mutex);
free(kp->key);
kp->key = NULL;
VTAILQ_REMOVE(&keylock_head, kp, list);
VTAILQ_INSERT_TAIL(&keylock_avail, kp, list);
pthread_mutex_unlock(&mutex);
}
static void
tbf_set_db_dir(const char *dir)
{
if (dbdir)
free(dbdir);
dbdir = strdup(dir);
AN(dbdir);
}
struct param_kw {
char *pkw_str;
int pkw_len;
int pkw_tok;
};
enum {
PKW_TRUNCATE,
PKW_MODE,
PKW_SYNC,
PKW_DEBUG,
PKW_DBNAME
};
static struct param_kw param_kw_tab[] = {
#define S(s) #s, sizeof(#s)-1
{ S(truncate), PKW_TRUNCATE },
{ S(trunc), PKW_TRUNCATE },
{ S(mode=), PKW_MODE },
{ S(sync=), PKW_SYNC },
{ S(debug=), PKW_DEBUG },
{ S(dbname=), PKW_DBNAME },
{ NULL }
#undef S
};
static void
tbf_open(const char *params)
{
int rc;
int filemode = DBFILEMODE;
uint64_t n;
char *p;
struct stat st;
int truncate = 0;
if (!dbdir) {
dbdir = strdup(LOCALSTATEDIR "/vmod-tbf");
AN(dbdir);
}
if (!dbname) {
dbname = strdup(DEFDBNAME);
AN(dbname);
}
while (*params) {
struct param_kw *pkw;
for (pkw = param_kw_tab; pkw->pkw_str; pkw++) {
if (strncmp(params, pkw->pkw_str, pkw->pkw_len) == 0)
break;
}
if (!pkw->pkw_str) {
syslog(LOG_DAEMON|LOG_ERR, "invalid keyword %s", params);
break;
}
params += pkw->pkw_len;
switch (pkw->pkw_tok) {
case PKW_TRUNCATE:
truncate = 1;
break;
case PKW_MODE:
errno = 0;
n = strtoul(params, &p, 8);
if (errno || (n & ~0777) || !(*p == 0 || *p == ';')) {
syslog(LOG_DAEMON|LOG_ERR,
"invalid file mode near %s", p);
params += strlen(params);
} else {
filemode = n;
params = p;
}
break;
case PKW_SYNC:
errno = 0;
n = strtoul(params, &p, 10);
if (errno || !(*p == 0 || *p == ';')) {
syslog(LOG_DAEMON|LOG_ERR,
"invalid count near %s", p);
params += strlen(params);
} else {
autosync_max = n;
autosync_count = 0;
params = p;
}
break;
case PKW_DEBUG:
errno = 0;
n = strtoul(params, &p, 10);
if (errno || !(*p == 0 || *p == ';')) {
syslog(LOG_DAEMON|LOG_ERR,
"invalid debug level near %s", p);
params += strlen(params);
} else {
debug_level = n;
params = p;
}
break;
case PKW_DBNAME:
if (dbname)
free(dbname);
n = strcspn(params, ";");
dbname = malloc(n + 1);
AN(dbname);
memcpy(dbname, params, n);
dbname[n] = 0;
params += n;
break;
}
if (*params == 0)
break;
else if (*params == ';')
params++;
else {
syslog(LOG_DAEMON|LOG_ERR,
"expected ';' near %s", params);
break;
}
}
debug(1, ("opening database %s/%s", dbdir, dbname));
if (rc = db_env_create(&dbenv, 0)) {
syslog(LOG_DAEMON|LOG_ERR, "cannot create db environment: %s",
db_strerror(rc));
return;
}
if (stat(dbdir, &st)) {
if (errno == ENOENT) {
if (mkdir(dbdir,
filemode | 0100 |
((filemode & 0060) ? 0010 : 0) |
((filemode & 0006) ? 0001 : 0))) {
syslog(LOG_DAEMON|LOG_ERR,
"cannot create db environment directory %s: %m",
dbdir);
}
} else {
syslog(LOG_DAEMON|LOG_ERR,
"cannot stat db environment directory %s: %m",
dbdir);
return;
}
} else if (!S_ISDIR(st.st_mode)) {
syslog(LOG_DAEMON|LOG_ERR, "%s is not a directory",
dbdir);
return;
}
rc = dbenv->open(dbenv, dbdir,
DB_THREAD | DB_CREATE | DB_INIT_MPOOL | DB_INIT_CDB,
0);
if (rc) {
syslog(LOG_DAEMON|LOG_ERR, "cannot open db environment %s: %s",
dbdir, db_strerror(rc));
tbf_disabled = 1;
return;
}
rc = db_create(&db, dbenv, 0);
if (rc) {
syslog(LOG_DAEMON|LOG_ERR, "cannot create db struct");
return;
}
rc = db->open(db, NULL, dbname, NULL, DB_HASH,
DB_THREAD | DB_CREATE, filemode);
if (rc) {
syslog(LOG_DAEMON|LOG_ERR, "cannot open database %s: %s",
dbname, db_strerror (rc));
db->close(db, 0);
db = NULL;
dbenv->close(dbenv, 0);
dbenv = NULL;
tbf_disabled = 1;
}
if (truncate) {
rc = db->truncate(db, NULL, NULL, 0);
if (rc)
syslog(LOG_DAEMON|LOG_WARNING,
"failed to truncate database %s: %s",
dbname, db_strerror(rc));
}
}
static DB *
tbf_open_safe(const char *params)
{
if (tbf_disabled)
return NULL;
pthread_mutex_lock(&mutex);
if (!db)
tbf_open(params ? params : DEFOPENPARAMS);
pthread_mutex_unlock(&mutex);
return db;
}
int
tbf_init(struct vmod_priv *priv, const struct VCL_conf *vclconf)
{
VTAILQ_INIT(&keylock_head);
VTAILQ_INIT(&keylock_avail);
}
void
vmod_open(MOD_CTX ctx, const char *dir, const char *params)
{
if (db) {
syslog(LOG_DAEMON|LOG_ERR, "tbf.open called twice");
return;
}
tbf_set_db_dir(dir);
tbf_open_safe(params);
}
void
vmod_close(MOD_CTX ctx)
{
pthread_mutex_lock(&mutex);
if (db) {
debug(1, ("closing database %s", dbname));
db->close(db, 0);
db = NULL;
dbenv->close(dbenv, 0);
dbenv = NULL;
tbf_disabled = 0;
}
pthread_mutex_unlock(&mutex);
}
void
vmod_sync(MOD_CTX ctx)
{
if (db) {
debug(1, ("synchronizing database"));
db->sync(db, 0);
}
}
/* Algorithm:
* A token is added to the bucket at a constant rate of 1 token per INTERVAL
microseconds.
* A bucket can hold at most BURST_SIZE tokens. If a token arrives when the
bucket is full, that token is discarded.
* When COST items of data arrive, COST tokens are removed
from the bucket and the data are accepted.
* If fewer than COST tokens are available, no tokens are removed from
the bucket and the data are not accepted.
This keeps the data traffic at a constant rate INTERVAL with bursts of
up to BURST_SIZE data items. Such bursts occur when no data was being
arrived for BURST_SIZE*INTERVAL or more microseconds.
*/
struct tbf_bucket {
uint64_t timestamp; /* microseconds since epoch */
size_t tokens; /* tokens available */
};
int
tbf_proc(MOD_CTX ctx, DB *db, const char *key, int cost,
unsigned long interval, int burst_size)
{
DBT keydat, content;
struct timeval tv;
uint64_t now;
uint64_t elapsed;
uint64_t tokens;
struct tbf_bucket *bkt, init_bkt;
int rc, res;
memset(&keydat, 0, sizeof keydat);
keydat.data = (void*) key;
keydat.size = strlen(key);
gettimeofday(&tv, NULL);
now = (uint64_t) tv.tv_sec * USEC_PER_SEC + (uint64_t)tv.tv_usec;
memset(&content, 0, sizeof content);
content.flags = DB_DBT_MALLOC;
rc = db->get(db, NULL, &keydat, &content, 0);
switch (rc) {
case 0:
bkt = (struct tbf_bucket *) content.data;
/* calculate elapsed time and number of new tokens since
last add */;
elapsed = now - bkt->timestamp;
tokens = elapsed / interval; /* partial tokens ignored */
/* timestamp set to time of most recent token */
bkt->timestamp += tokens * interval;
/* add existing tokens to 64bit counter to prevent overflow
in range check */
tokens += bkt->tokens;
if (tokens >= burst_size)
bkt->tokens = burst_size;
else
bkt->tokens = (size_t)tokens;
debug(2, ("found, elapsed time: %"PRIu64" us, "
"new tokens: %"PRIu64", total: %lu ",
elapsed, tokens, (unsigned long) bkt->tokens));
break;
case DB_NOTFOUND:
/* Initialize the structure */
init_bkt.timestamp = now;
init_bkt.tokens = burst_size;
bkt = &init_bkt;
break;
default:
syslog(LOG_DAEMON|LOG_ERR, "cannot fetch data %s: %s",
key, db_strerror(rc));
return false;
}
if (cost <= bkt->tokens) {
res = 1;
bkt->tokens -= cost;
debug(2, ("tbf_rate matched %s, tokens left %lu", key,
(unsigned long)bkt->tokens));
} else {
res = 0;
debug(1, ("tbf_rate overlimit on %s", key));
}
/* Update the db */
content.data = (void*) bkt;
content.size = sizeof(*bkt);
rc = db->put(db, NULL, &keydat, &content, 0);
if (rc) {
syslog(LOG_DAEMON|LOG_ERR, "error updating key %s: %s",
key, db_strerror(rc));
}
if (bkt != &init_bkt)
free(bkt);
if (autosync_max && ++autosync_count >= autosync_max) {
debug(1, ("synchronizing database"));
db->sync(db, 0);
autosync_count = 0;
}
return res;
}
VCL_BOOL
vmod_rate(MOD_CTX ctx, VCL_STRING key, VCL_INT cost, VCL_REAL t,
VCL_INT burst_size)
{
unsigned long interval = t * USEC_PER_SEC;
int rc;
debug(2, ("entering rate(%s,%d,%g,%d)", key, cost, t, burst_size));
if (interval == 0 || burst_size == 0)
return false;
if (!cost) {
/* cost free, so don't waste time on database access */
return true;
}
if (cost > burst_size) {
/* impossibly expensive, so don't waste time on
database access */
return false;
}
db = tbf_open_safe(NULL);
if (db) {
struct keylock *kp;
kp = keylock_find_safe(key);
debug(2, ("found key %s, ref %u", key, kp->refcnt));
AZ(pthread_mutex_lock(&kp->mutex));
rc = tbf_proc(ctx, db, key, cost, interval, burst_size);
if (--kp->refcnt == 0)
keylock_remove_safe(kp);
AZ(pthread_mutex_unlock(&kp->mutex));
} else
rc = false;
return rc;
}
#define ISWS(c) ((c)==' '||(c)=='\t')
VCL_BOOL
vmod_check(MOD_CTX ctx, VCL_STRING key, VCL_STRING spec)
{
double t, v, n;
char *p;
#define SKIPWS(init) for (init; *spec && ISWS(*spec); spec++)
int burst;
errno = 0;
v = strtod(spec, &p);
if (errno || v < 0) {
syslog(LOG_DAEMON|LOG_ERR, "bad rate: %s", spec);
return false;
}
SKIPWS(spec = p);
if (strncmp(spec, "req", 3)) {
syslog(LOG_DAEMON|LOG_ERR,
"bad rate: expected \"req\", but found \"%s\"", spec);
return false;
}
SKIPWS(spec += 3);
if (*spec != '/') {
syslog(LOG_DAEMON|LOG_ERR,
"bad rate: expected \"/\", but found \"%c\"", *spec);
return false;
}
SKIPWS(++spec);
if (*spec >= '0' && *spec <= '9') {
errno = 0;
n = strtod(spec, &p);
if (errno || n < 0) {
syslog(LOG_DAEMON|LOG_ERR, "bad interval: %s", spec);
return false;
}
spec = p;
} else
n = 1;
SKIPWS();
switch (*spec) {
case 0:
case 's':
break;
case 'd':
n *= 24;
case 'h':
n *= 60;
case 'm':
n *= 60;
break;
default:
syslog(LOG_DAEMON|LOG_ERR, "invalid interval specifier: %s",
spec);
return false;
}
SKIPWS(++spec);
if (*spec)
syslog(LOG_DAEMON|LOG_WARNING, "garbage after rate spec: %s",
spec);
return vmod_rate(ctx, key, 1, n/v, v/n+1);
}