From d78a58be5fa2385987a3141ccbe942107332e677 Mon Sep 17 00:00:00 2001 From: Sergey Poznyakoff Date: Thu, 1 Aug 2013 18:09:42 +0300 Subject: Serialize read/write accesses. * src/tbf.c (keylock): New struct. (keylock_head, keylock_avail): New statics. (keylock_find, keylock_find_safe) (keylock_remove_safe): New functions. (tbf_open): Rename to tbf_open_safe. (tbf_open_internal): Rename to tbf_open. (vmod_rate): Rewrite using keylocks to ensure the record is not modified by another thread between calls to get and push. --- src/tbf.c | 147 ++++++++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 119 insertions(+), 28 deletions(-) diff --git a/src/tbf.c b/src/tbf.c index 02c26ec..1dfb6c6 100644 --- a/src/tbf.c +++ b/src/tbf.c @@ -50,6 +50,77 @@ static int tbf_disabled; static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; #define DBFILEMODE 0640 + +/* The keylock structure serializes accesses to each db record, ensuring + that no other thread could modify the data between calls to get and + put */ + +struct keylock { + char *key; /* Key string */ + unsigned refcnt; /* Reference count */ + pthread_mutex_t mutex; + VTAILQ_ENTRY(keylock) list; +}; + +/* Keylock_head keeps a list of active (i.e. used by at least one thread) + keylocks. Keylock_avail keeps a list of available threads, to avoid + unnecessary memory allocations/frees. */ +static VTAILQ_HEAD(, keylock) keylock_head, keylock_avail; + +/* Find and return a keylock corresponding to the given key. If not found, + create it, either by getting an unused entry from keylock_avail or by + allocating a new one. */ +static struct keylock * +keylock_find(const char *key) +{ + struct keylock *kp; + + VTAILQ_FOREACH(kp, &keylock_head, list) { + if (strcmp(kp->key, key) == 0) { + kp->refcnt++; + return kp; + } + } + + if (VTAILQ_FIRST(&keylock_avail)) { + kp = VTAILQ_FIRST(&keylock_avail); + VTAILQ_REMOVE(&keylock_avail, kp, list); + } else { + kp = malloc(sizeof(*kp)); + AN(kp); + pthread_mutex_init(&kp->mutex, NULL); + } + kp->key = strdup(key); + AN(kp->key); + kp->refcnt = 1; + VTAILQ_INSERT_TAIL(&keylock_head, kp, list); + return kp; +} + +/* Thread-safe version of the above. */ +static struct keylock * +keylock_find_safe(const char *key) +{ + struct keylock *kp; + pthread_mutex_lock(&mutex); + kp = keylock_find(key); + pthread_mutex_unlock(&mutex); + return kp; +} + +/* Remove keylock from keylock_head and attach it to keylock_avail for + eventual future use. */ +static void +keylock_remove_safe(struct keylock *kp) +{ + pthread_mutex_lock(&mutex); + free(kp->key); + kp->key = NULL; + VTAILQ_REMOVE(&keylock_head, kp, list); + VTAILQ_INSERT_TAIL(&keylock_avail, kp, list); + pthread_mutex_unlock(&mutex); +} + static void tbf_set_db_name(const char *file_name) { @@ -85,7 +156,7 @@ static struct mode_kw mode_kw_tab[] = { }; static void -tbf_open_internal(const char *mode) +tbf_open(const char *mode) { int rc; int flags = DB_CREATE|DB_THREAD; @@ -185,13 +256,13 @@ tbf_open_internal(const char *mode) } static DB * -tbf_open(const char *mode) +tbf_open_safe(const char *mode) { if (tbf_disabled) - return 0; + return NULL; pthread_mutex_lock(&mutex); if (!db) - tbf_open_internal(mode ? mode : "truncate"); + tbf_open(mode ? mode : "truncate"); pthread_mutex_unlock(&mutex); return db; } @@ -199,6 +270,8 @@ tbf_open(const char *mode) int tbf_init(struct vmod_priv *priv, const struct VCL_conf *vclconf) { + VTAILQ_INIT(&keylock_head); + VTAILQ_INIT(&keylock_avail); } void @@ -209,7 +282,7 @@ vmod_open(struct sess *sp, const char *file_name, const char *mode) return; } tbf_set_db_name(file_name); - tbf_open(mode); + tbf_open_safe(mode); } void @@ -256,10 +329,10 @@ struct tbf_bucket { size_t tokens; /* tokens available */ }; -unsigned -vmod_rate(struct sess *sp, const char *key, int cost, double t, int burst_size) +int +tbf_proc(struct sess *sp, DB *db, const char *key, int cost, + unsigned long interval, int burst_size) { - DB *db; DBT keydat, content; struct timeval tv; uint64_t now; @@ -267,27 +340,7 @@ vmod_rate(struct sess *sp, const char *key, int cost, double t, int burst_size) uint64_t tokens; struct tbf_bucket *bkt, init_bkt; int rc, res; - unsigned long interval = t * USEC_PER_SEC; - - debug(2, ("entering rate(%s,%d,%g,%d)", key, cost, t, burst_size)); - - if (interval == 0 || burst_size == 0) - return false; - - if (!cost) { - /* cost free, so don't waste time on database access */ - return true; - } - if (cost > burst_size) { - /* impossibly expensive, so don't waste time on - database access */ - return false; - } - db = tbf_open(NULL); - if (!db) - return false; - memset(&keydat, 0, sizeof keydat); keydat.data = (void*) key; keydat.size = strlen(key); @@ -366,6 +419,44 @@ vmod_rate(struct sess *sp, const char *key, int cost, double t, int burst_size) return res; } +unsigned +vmod_rate(struct sess *sp, const char *key, int cost, double t, int burst_size) +{ + unsigned long interval = t * USEC_PER_SEC; + int rc; + + debug(2, ("entering rate(%s,%d,%g,%d)", key, cost, t, burst_size)); + + if (interval == 0 || burst_size == 0) + return false; + + if (!cost) { + /* cost free, so don't waste time on database access */ + return true; + } + if (cost > burst_size) { + /* impossibly expensive, so don't waste time on + database access */ + return false; + } + + db = tbf_open_safe(NULL); + if (db) { + struct keylock *kp; + + kp = keylock_find_safe(key); + debug(2, ("found key %s, ref %u", key, kp->refcnt)); + AZ(pthread_mutex_lock(&kp->mutex)); + rc = tbf_proc(sp, db, key, cost, interval, burst_size); + if (--kp->refcnt == 0) + keylock_remove_safe(kp); + AZ(pthread_mutex_unlock(&kp->mutex)); + } else + rc = false; + + return rc; +} + #define ISWS(c) ((c)==' '||(c)=='\t') unsigned -- cgit v1.2.1