aboutsummaryrefslogtreecommitdiff
path: root/src/triplet.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/triplet.c')
-rw-r--r--src/triplet.c393
1 files changed, 267 insertions, 126 deletions
diff --git a/src/triplet.c b/src/triplet.c
index e41da07..797a09e 100644
--- a/src/triplet.c
+++ b/src/triplet.c
@@ -19,14 +19,106 @@
/* Triplets are stored in a symtab: */
static struct grecs_symtab *triplet_table;
+static pthread_mutex_t triplet_table_mutex = PTHREAD_MUTEX_INITIALIZER;
+
/* ... and are organized into a doubly-linked list, using the prev and
- next members of struct wy_triplet. The list is ordered so that
+ next members of struct wy_triplet: */
+struct triplet_list {
+ struct wy_triplet *head, *tail;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+};
+
+#define TRIPLET_LIST_INITIALIZER \
+ { NULL, NULL, PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER }
+
+/* Two such lists are maintained. The pending list is ordered so that
prev points to a triplet older than this one, and next points to a
- newer triplet. The triplet_list variable points to the root of the
- list, i.e. the oldest triplet available. */
-static struct wy_triplet *triplet_list;
+ newer triplet. Its head member points to the oldest triplet available. */
+static struct triplet_list triplet_pending_list = TRIPLET_LIST_INITIALIZER;
+
+/* The running list contains triplets which are processed by running
+ threads */
+static struct triplet_list triplet_running_list = TRIPLET_LIST_INITIALIZER;
/* Functions for building the ordered doubly-linked list of triplets */
+
+static inline void
+triplet_list_lock(struct triplet_list *list)
+{
+ pthread_mutex_lock(&list->mutex);
+}
+
+static inline void
+triplet_list_unlock(struct triplet_list *list)
+{
+ pthread_mutex_unlock(&list->mutex);
+}
+
+void
+triplet_list_unlink(struct triplet_list *list, struct wy_triplet *tp)
+{
+ int head_changed = 0;
+
+ if (tp->prev)
+ tp->prev->next = tp->next;
+ else if (tp == list->head) {
+ list->head = tp->next;
+ head_changed = 1;
+ }
+
+ if (tp->next)
+ tp->next->prev = tp->prev;
+ else
+ list->tail = tp->prev;
+
+ tp->next = tp->prev = NULL;
+ if (head_changed)
+ pthread_cond_broadcast(&list->cond);
+ tp->list = NULL;
+}
+
+static void
+triplet_list_insert(struct triplet_list *list,
+ struct wy_triplet *newp, struct wy_triplet *anchor,
+ int after)
+{
+ int head_changed = 0;
+
+ if (!anchor) {
+ newp->prev = NULL;
+ newp->next = list->head;
+ if (list->head)
+ list->head->prev = newp;
+ else
+ list->tail = newp;
+ list->head = newp;
+ head_changed = 1;
+ } else if (after) {
+ if (anchor->next)
+ anchor->next->prev = newp;
+ else
+ list->tail = newp;
+ newp->prev = anchor;
+ newp->next = anchor->next;
+ anchor->next = newp;
+ } else {
+ if (anchor->prev)
+ anchor->prev->next = newp;
+ else {
+ list->head = newp;
+ head_changed = 1;
+ }
+ newp->prev = anchor->prev;
+ newp->next = anchor;
+ anchor->prev = newp;
+ }
+ newp->list = list;
+ if (head_changed)
+ pthread_cond_broadcast(&list->cond);
+}
+
+
static time_t
triplet_timestamp(struct wy_triplet *tp)
{
@@ -43,69 +135,35 @@ triplet_timestamp(struct wy_triplet *tp)
return t;
}
-static time_t
-triplet_ttl(struct wy_triplet *tp)
+static struct timespec *
+triplet_ttl(struct wy_triplet *tp, struct timespec *ts)
{
time_t t;
if (!tp)
- return 0;
+ return NULL;
t = time(NULL) - triplet_timestamp(tp);
if (t < tp->spool->file_sweep_time)
- return tp->spool->file_sweep_time - t;
- return 0;
-}
-
-void
-triplet_list_unlink(struct wy_triplet *tp)
-{
- if (tp->prev)
- tp->prev->next = tp->next;
- else if (tp == triplet_list)
- triplet_list = tp->next;
-
- if (tp->next)
- tp->next->prev = tp->prev;
-
- tp->next = tp->prev = NULL;
-}
-
-static void
-triplet_list_insert_before(struct wy_triplet *newp, struct wy_triplet *anchor)
-{
- if (!anchor) {
- triplet_list = newp;
- return;
- }
-
- if (anchor->prev)
- anchor->prev->next = newp;
+ ts->tv_sec += tp->spool->file_sweep_time - t;
else
- triplet_list = newp;
- newp->prev = anchor->prev;
-
- anchor->prev = newp;
- newp->next = anchor;
+ ts->tv_sec = 0;
+ ts->tv_nsec = 0;
+ return ts;
}
+
void
-triplet_list_ordered_insert(struct wy_triplet *tp)
+triplet_list_ordered_insert(struct triplet_list *list, struct wy_triplet *tp)
{
time_t t = triplet_timestamp(tp);
struct wy_triplet *p, *prev = NULL;
- for (p = triplet_list; p && triplet_timestamp(p) < t;
+ for (p = list->head; p && triplet_timestamp(p) < t;
prev = p, p = p->next);
if (p)
- triplet_list_insert_before(tp, p);
- else if (prev) {
- prev->next = tp;
- tp->prev = prev;
- tp->next = NULL;
- } else {
- tp->next = tp->prev = NULL;
- triplet_list = tp;
- }
+ triplet_list_insert(list, tp, p, 0);
+ else
+ triplet_list_insert(list, tp, prev ? prev->next : NULL, 1);
}
static struct wy_user *
@@ -190,6 +248,7 @@ register_file(struct file_info *finfo, struct spool *spool)
struct wy_triplet key, *ret;
int install = 1;
+ pthread_mutex_lock(&triplet_table_mutex);
if (!triplet_table) {
triplet_table =
grecs_symtab_create(sizeof(struct wy_triplet),
@@ -206,16 +265,21 @@ register_file(struct file_info *finfo, struct spool *spool)
key.spool = spool;
ret = grecs_symtab_lookup_or_install(triplet_table, &key, &install);
+ pthread_mutex_unlock(&triplet_table_mutex);
+
if (!ret)
grecs_alloc_die();
free(key.name);
ret->file[finfo->type] = *finfo;
+
+ triplet_list_lock(&triplet_pending_list);
if (install) {
ret->spool = spool;
ret->acc = grecs_txtacc_create();
- } else
- triplet_list_unlink(ret);
- triplet_list_ordered_insert(ret);
+ } else if (ret->list)
+ triplet_list_unlink(ret->list, ret);
+ triplet_list_ordered_insert(&triplet_pending_list, ret);
+ triplet_list_unlock(&triplet_pending_list);
return ret;
}
@@ -236,33 +300,14 @@ triplet_lookup(struct spool *spool, const char *name)
key.spool = spool;
file_info_cleanup(&finfo);
+ pthread_mutex_lock(&triplet_table_mutex);
ret = grecs_symtab_lookup_or_install(triplet_table, &key, NULL);
+ pthread_mutex_unlock(&triplet_table_mutex);
free(key.name);
return ret;
}
-/* Return true if any part of the triplet TRP was modified more than
- TTL seconds ago */
-static int
-triplet_expired_p(struct wy_triplet *trp)
-{
- int i;
- time_t now = time(NULL);
- time_t ttl = trp->spool->file_sweep_time;
-
- if (ttl == 0)
- return 0;
-
- for (i = 0; i < FILE_TYPE_COUNT; i++) {
- if (trp->file[i].name && (now - trp->file[i].sb.st_mtime) >= ttl) {
- wy_debug(1, (_("file %s expired"), trp->file[i].name));
- return 1;
- }
- }
- return 0;
-}
-
enum triplet_state {
triplet_directive, /* Short triplet: only a directive is present,
but nothing more is required */
@@ -306,6 +351,8 @@ check_triplet_state(struct wy_triplet *trp)
void
triplet_enqueue(struct wy_triplet *trp)
{
+ pthread_t tid;
+
if (!trp)
return;
if (spool_open_dictionaries(trp->spool))
@@ -313,76 +360,62 @@ triplet_enqueue(struct wy_triplet *trp)
switch (check_triplet_state(trp)) {
case triplet_directive:
case triplet_complete:
- if (triplet_expired_p(trp))
- remove_triplet(trp, 0);
- else
- job_schedule_triplet(trp);
+ triplet_list_lock(&triplet_pending_list);
+ triplet_list_unlink(&triplet_pending_list, trp);
+ triplet_list_unlock(&triplet_pending_list);
+
+ triplet_list_lock(&triplet_running_list);
+ triplet_list_insert(&triplet_running_list, trp,
+ triplet_running_list.tail, 1);
+ triplet_list_unlock(&triplet_running_list);
+
+ pthread_create(&tid, NULL, wy_thr_triplet, trp);
break;
case triplet_incomplete:
break;
case triplet_bad:
- remove_triplet(trp, 0);
+ remove_triplet(trp);
}
}
/* Unlink all parts of the triplet TRP */
-int
-remove_triplet(struct wy_triplet *trp, int check)
+static void
+remove_triplet_unlocked(struct wy_triplet *trp)
{
int i;
for (i = 0; i < FILE_TYPE_COUNT; i++) {
if (trp->file[i].name) {
- if (check) {
- struct stat st;
-
- if (wy_dry_run)
- continue;
- if (fstatat(trp->spool->source_fd,
- trp->file[i].name, &st, 0)) {
- if (errno == ENOENT)
- continue;
- else {
+ if (!wy_dry_run) {
+ if (unlinkat(trp->spool->source_fd, trp->file[i].name, 0)) {
+ if (errno != ENOENT)
wy_log(LOG_ERR,
- _("can't stat %s/%s: %s"),
+ _("cannot remove %s/%s: %s"),
trp->spool->source_dir,
trp->file[i].name, strerror(errno));
- return 1;
- }
- } else {
- wy_log(LOG_NOTICE,
- _("%s/%s still exists"),
+ } else
+ wy_log(LOG_NOTICE, _("removing %s/%s"),
trp->spool->source_dir, trp->file[i].name);
- return 1;
- }
- } else {
- wy_log(LOG_NOTICE, _("removing %s/%s"),
- trp->spool->source_dir, trp->file[i].name);
- if (!wy_dry_run
- && unlinkat(trp->spool->source_fd,
- trp->file[i].name, 0))
- wy_log(LOG_ERR,
- _("cannot remove %s/%s: %s"),
- trp->spool->source_dir,
- trp->file[i].name, strerror(errno));
}
}
}
- triplet_list_unlink(trp);
+ triplet_list_unlink(trp->list, trp);
triplet_gpgme_ctx_release(trp);
- grecs_symtab_remove(triplet_table, trp);
- return 0;
+ pthread_mutex_lock(&triplet_table_mutex);
+ grecs_symtab_remove(triplet_table, trp);
+ pthread_mutex_unlock(&triplet_table_mutex);
}
-time_t
-triplet_sweep(void)
+void
+remove_triplet(struct wy_triplet *trp)
{
- while (triplet_list && triplet_expired_p(triplet_list))
- remove_triplet(triplet_list, 0);
+ struct triplet_list *list = trp->list;
- return triplet_ttl(triplet_list);
+ triplet_list_lock(list);
+ remove_triplet_unlocked(trp);
+ triplet_list_unlock(list);
}
void
@@ -392,8 +425,7 @@ triplet_commit(struct wy_triplet *trp)
if (spool_open_dictionaries(trp->spool) == 0) {
timer_start(trp->spool->tag);
wy_debug(1, (_("processing triplet `%s'"), trp->name));
- if (process_directives(trp))
- remove_triplet(trp, 0);
+ process_directives(trp);
timer_stop(trp->spool->tag);
}
timer_stop("spool");
@@ -405,7 +437,7 @@ triplet_remove_file(struct spool *spool, const char *name)
struct wy_triplet *tp = triplet_lookup(spool, name);
int i, n = 0;
- if (!tp)
+ if (!tp || tp->list == &triplet_running_list)
return;
for (i = 0; i < FILE_TYPE_COUNT; i++) {
@@ -420,9 +452,119 @@ triplet_remove_file(struct spool *spool, const char *name)
if (!n) {
wy_debug(1, ("deleting empty triplet (%s/%s)",
spool->source_dir, name));
- remove_triplet(tp, 0);
+ remove_triplet(tp);
}
}
+
+/* Return true if any part of the triplet TRP was modified more than
+ TTL seconds ago */
+static int
+triplet_expired_p(struct wy_triplet *trp)
+{
+ int i;
+ time_t now;
+ time_t ttl;
+
+ if (!trp)
+ return 0;
+
+ now = time(NULL);
+ ttl = trp->spool->file_sweep_time;
+
+ if (ttl == 0)
+ return 0;
+
+ for (i = 0; i < FILE_TYPE_COUNT; i++) {
+ if (trp->file[i].name && (now - trp->file[i].sb.st_mtime) >= ttl) {
+ wy_debug(1, (_("file %s expired"), trp->file[i].name));
+ return 1;
+ }
+ }
+ return 0;
+}
+
+void *
+wy_thr_cleaner(void *ptr)
+{
+ triplet_list_lock(&triplet_pending_list);
+ while (1) {
+ if (triplet_pending_list.head) {
+ struct timespec ts;
+ clock_gettime(CLOCK_REALTIME, &ts);
+ triplet_ttl(triplet_pending_list.head, &ts);
+ pthread_cond_timedwait(&triplet_pending_list.cond,
+ &triplet_pending_list.mutex, &ts);
+ } else
+ pthread_cond_wait(&triplet_pending_list.cond,
+ &triplet_pending_list.mutex);
+ if (triplet_expired_p(triplet_pending_list.head))
+ remove_triplet_unlocked(triplet_pending_list.head);
+ }
+}
+
+void
+wy_triplet_wait(void)
+{
+ triplet_list_lock(&triplet_running_list);
+ while (triplet_running_list.head) {
+ pthread_cond_wait(&triplet_running_list.cond,
+ &triplet_running_list.mutex);
+ }
+ triplet_list_unlock(&triplet_running_list);
+}
+
+static pthread_key_t key;
+static pthread_once_t key_once = PTHREAD_ONCE_INIT;
+
+static void
+stat_free(void *f)
+{
+ free(f);
+}
+
+static void
+make_key(void)
+{
+ pthread_key_create(&key, stat_free);
+}
+
+WY_STAT_COUNTER *
+wy_get_stat_array(void)
+{
+ WY_STAT_COUNTER *stat;
+ pthread_once(&key_once, make_key);
+ if ((stat = pthread_getspecific(key)) == NULL) {
+ stat = grecs_calloc(WY_MAX_STAT, sizeof(stat[0]));
+ pthread_setspecific(key, stat);
+ }
+ return stat;
+}
+
+WY_STAT_COUNTER *
+wy_get_stat_ptr(int what)
+{
+ WY_STAT_COUNTER *stat;
+ pthread_once(&key_once, make_key);
+ if ((stat = pthread_getspecific(key)) == NULL) {
+ stat = grecs_calloc(WY_MAX_STAT, sizeof(stat[0]));
+ pthread_setspecific(key, stat);
+ }
+ return stat + what;
+}
+
+void *
+wy_thr_triplet(void *ptr)
+{
+ struct wy_triplet *trp = ptr;
+
+ timer_start("wydawca");
+ spool_create_timers();
+ triplet_commit(trp);
+ timer_stop("wydawca");
+ logstats();
+ remove_triplet(trp);
+ return NULL;
+}
/* FIXME */
#define WY_EXP_DFL 0
@@ -844,10 +986,10 @@ expand_triplet_ls_dir(char **ret, struct wy_triplet *trp)
}
DECL_TRIPLET_EXP(real_name, uploader->realname)
- DECL_TRIPLET_EXP(expand_user_email, uploader->email)
+DECL_TRIPLET_EXP(expand_user_email, uploader->email)
- static int
- expand_email_user(char **ret, struct wy_triplet *trp)
+static int
+expand_email_user(char **ret, struct wy_triplet *trp)
{
if (trp && trp->uploader) {
size_t size = 0;
@@ -860,7 +1002,6 @@ DECL_TRIPLET_EXP(real_name, uploader->realname)
return WRDSE_UNDEF;
}
-//FIXME: static env?
static int
expand_report(char **ret, struct wy_triplet *trp)
{

Return to:

Send suggestions and report system problems to the System administrator.