diff options
Diffstat (limited to 'src/triplet.c')
-rw-r--r-- | src/triplet.c | 393 |
1 files changed, 267 insertions, 126 deletions
diff --git a/src/triplet.c b/src/triplet.c index e41da07..797a09e 100644 --- a/src/triplet.c +++ b/src/triplet.c @@ -19,14 +19,106 @@ /* Triplets are stored in a symtab: */ static struct grecs_symtab *triplet_table; +static pthread_mutex_t triplet_table_mutex = PTHREAD_MUTEX_INITIALIZER; + /* ... and are organized into a doubly-linked list, using the prev and - next members of struct wy_triplet. The list is ordered so that + next members of struct wy_triplet: */ +struct triplet_list { + struct wy_triplet *head, *tail; + pthread_mutex_t mutex; + pthread_cond_t cond; +}; + +#define TRIPLET_LIST_INITIALIZER \ + { NULL, NULL, PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER } + +/* Two such lists are maintained. The pending list is ordered so that prev points to a triplet older than this one, and next points to a - newer triplet. The triplet_list variable points to the root of the - list, i.e. the oldest triplet available. */ -static struct wy_triplet *triplet_list; + newer triplet. Its head member points to the oldest triplet available. */ +static struct triplet_list triplet_pending_list = TRIPLET_LIST_INITIALIZER; + +/* The running list contains triplets which are processed by running + threads */ +static struct triplet_list triplet_running_list = TRIPLET_LIST_INITIALIZER; /* Functions for building the ordered doubly-linked list of triplets */ + +static inline void +triplet_list_lock(struct triplet_list *list) +{ + pthread_mutex_lock(&list->mutex); +} + +static inline void +triplet_list_unlock(struct triplet_list *list) +{ + pthread_mutex_unlock(&list->mutex); +} + +void +triplet_list_unlink(struct triplet_list *list, struct wy_triplet *tp) +{ + int head_changed = 0; + + if (tp->prev) + tp->prev->next = tp->next; + else if (tp == list->head) { + list->head = tp->next; + head_changed = 1; + } + + if (tp->next) + tp->next->prev = tp->prev; + else + list->tail = tp->prev; + + tp->next = tp->prev = NULL; + if (head_changed) + pthread_cond_broadcast(&list->cond); + tp->list = NULL; +} + +static void +triplet_list_insert(struct triplet_list *list, + struct wy_triplet *newp, struct wy_triplet *anchor, + int after) +{ + int head_changed = 0; + + if (!anchor) { + newp->prev = NULL; + newp->next = list->head; + if (list->head) + list->head->prev = newp; + else + list->tail = newp; + list->head = newp; + head_changed = 1; + } else if (after) { + if (anchor->next) + anchor->next->prev = newp; + else + list->tail = newp; + newp->prev = anchor; + newp->next = anchor->next; + anchor->next = newp; + } else { + if (anchor->prev) + anchor->prev->next = newp; + else { + list->head = newp; + head_changed = 1; + } + newp->prev = anchor->prev; + newp->next = anchor; + anchor->prev = newp; + } + newp->list = list; + if (head_changed) + pthread_cond_broadcast(&list->cond); +} + + static time_t triplet_timestamp(struct wy_triplet *tp) { @@ -43,69 +135,35 @@ triplet_timestamp(struct wy_triplet *tp) return t; } -static time_t -triplet_ttl(struct wy_triplet *tp) +static struct timespec * +triplet_ttl(struct wy_triplet *tp, struct timespec *ts) { time_t t; if (!tp) - return 0; + return NULL; t = time(NULL) - triplet_timestamp(tp); if (t < tp->spool->file_sweep_time) - return tp->spool->file_sweep_time - t; - return 0; -} - -void -triplet_list_unlink(struct wy_triplet *tp) -{ - if (tp->prev) - tp->prev->next = tp->next; - else if (tp == triplet_list) - triplet_list = tp->next; - - if (tp->next) - tp->next->prev = tp->prev; - - tp->next = tp->prev = NULL; -} - -static void -triplet_list_insert_before(struct wy_triplet *newp, struct wy_triplet *anchor) -{ - if (!anchor) { - triplet_list = newp; - return; - } - - if (anchor->prev) - anchor->prev->next = newp; + ts->tv_sec += tp->spool->file_sweep_time - t; else - triplet_list = newp; - newp->prev = anchor->prev; - - anchor->prev = newp; - newp->next = anchor; + ts->tv_sec = 0; + ts->tv_nsec = 0; + return ts; } + void -triplet_list_ordered_insert(struct wy_triplet *tp) +triplet_list_ordered_insert(struct triplet_list *list, struct wy_triplet *tp) { time_t t = triplet_timestamp(tp); struct wy_triplet *p, *prev = NULL; - for (p = triplet_list; p && triplet_timestamp(p) < t; + for (p = list->head; p && triplet_timestamp(p) < t; prev = p, p = p->next); if (p) - triplet_list_insert_before(tp, p); - else if (prev) { - prev->next = tp; - tp->prev = prev; - tp->next = NULL; - } else { - tp->next = tp->prev = NULL; - triplet_list = tp; - } + triplet_list_insert(list, tp, p, 0); + else + triplet_list_insert(list, tp, prev ? prev->next : NULL, 1); } static struct wy_user * @@ -190,6 +248,7 @@ register_file(struct file_info *finfo, struct spool *spool) struct wy_triplet key, *ret; int install = 1; + pthread_mutex_lock(&triplet_table_mutex); if (!triplet_table) { triplet_table = grecs_symtab_create(sizeof(struct wy_triplet), @@ -206,16 +265,21 @@ register_file(struct file_info *finfo, struct spool *spool) key.spool = spool; ret = grecs_symtab_lookup_or_install(triplet_table, &key, &install); + pthread_mutex_unlock(&triplet_table_mutex); + if (!ret) grecs_alloc_die(); free(key.name); ret->file[finfo->type] = *finfo; + + triplet_list_lock(&triplet_pending_list); if (install) { ret->spool = spool; ret->acc = grecs_txtacc_create(); - } else - triplet_list_unlink(ret); - triplet_list_ordered_insert(ret); + } else if (ret->list) + triplet_list_unlink(ret->list, ret); + triplet_list_ordered_insert(&triplet_pending_list, ret); + triplet_list_unlock(&triplet_pending_list); return ret; } @@ -236,33 +300,14 @@ triplet_lookup(struct spool *spool, const char *name) key.spool = spool; file_info_cleanup(&finfo); + pthread_mutex_lock(&triplet_table_mutex); ret = grecs_symtab_lookup_or_install(triplet_table, &key, NULL); + pthread_mutex_unlock(&triplet_table_mutex); free(key.name); return ret; } -/* Return true if any part of the triplet TRP was modified more than - TTL seconds ago */ -static int -triplet_expired_p(struct wy_triplet *trp) -{ - int i; - time_t now = time(NULL); - time_t ttl = trp->spool->file_sweep_time; - - if (ttl == 0) - return 0; - - for (i = 0; i < FILE_TYPE_COUNT; i++) { - if (trp->file[i].name && (now - trp->file[i].sb.st_mtime) >= ttl) { - wy_debug(1, (_("file %s expired"), trp->file[i].name)); - return 1; - } - } - return 0; -} - enum triplet_state { triplet_directive, /* Short triplet: only a directive is present, but nothing more is required */ @@ -306,6 +351,8 @@ check_triplet_state(struct wy_triplet *trp) void triplet_enqueue(struct wy_triplet *trp) { + pthread_t tid; + if (!trp) return; if (spool_open_dictionaries(trp->spool)) @@ -313,76 +360,62 @@ triplet_enqueue(struct wy_triplet *trp) switch (check_triplet_state(trp)) { case triplet_directive: case triplet_complete: - if (triplet_expired_p(trp)) - remove_triplet(trp, 0); - else - job_schedule_triplet(trp); + triplet_list_lock(&triplet_pending_list); + triplet_list_unlink(&triplet_pending_list, trp); + triplet_list_unlock(&triplet_pending_list); + + triplet_list_lock(&triplet_running_list); + triplet_list_insert(&triplet_running_list, trp, + triplet_running_list.tail, 1); + triplet_list_unlock(&triplet_running_list); + + pthread_create(&tid, NULL, wy_thr_triplet, trp); break; case triplet_incomplete: break; case triplet_bad: - remove_triplet(trp, 0); + remove_triplet(trp); } } /* Unlink all parts of the triplet TRP */ -int -remove_triplet(struct wy_triplet *trp, int check) +static void +remove_triplet_unlocked(struct wy_triplet *trp) { int i; for (i = 0; i < FILE_TYPE_COUNT; i++) { if (trp->file[i].name) { - if (check) { - struct stat st; - - if (wy_dry_run) - continue; - if (fstatat(trp->spool->source_fd, - trp->file[i].name, &st, 0)) { - if (errno == ENOENT) - continue; - else { + if (!wy_dry_run) { + if (unlinkat(trp->spool->source_fd, trp->file[i].name, 0)) { + if (errno != ENOENT) wy_log(LOG_ERR, - _("can't stat %s/%s: %s"), + _("cannot remove %s/%s: %s"), trp->spool->source_dir, trp->file[i].name, strerror(errno)); - return 1; - } - } else { - wy_log(LOG_NOTICE, - _("%s/%s still exists"), + } else + wy_log(LOG_NOTICE, _("removing %s/%s"), trp->spool->source_dir, trp->file[i].name); - return 1; - } - } else { - wy_log(LOG_NOTICE, _("removing %s/%s"), - trp->spool->source_dir, trp->file[i].name); - if (!wy_dry_run - && unlinkat(trp->spool->source_fd, - trp->file[i].name, 0)) - wy_log(LOG_ERR, - _("cannot remove %s/%s: %s"), - trp->spool->source_dir, - trp->file[i].name, strerror(errno)); } } } - triplet_list_unlink(trp); + triplet_list_unlink(trp->list, trp); triplet_gpgme_ctx_release(trp); - grecs_symtab_remove(triplet_table, trp); - return 0; + pthread_mutex_lock(&triplet_table_mutex); + grecs_symtab_remove(triplet_table, trp); + pthread_mutex_unlock(&triplet_table_mutex); } -time_t -triplet_sweep(void) +void +remove_triplet(struct wy_triplet *trp) { - while (triplet_list && triplet_expired_p(triplet_list)) - remove_triplet(triplet_list, 0); + struct triplet_list *list = trp->list; - return triplet_ttl(triplet_list); + triplet_list_lock(list); + remove_triplet_unlocked(trp); + triplet_list_unlock(list); } void @@ -392,8 +425,7 @@ triplet_commit(struct wy_triplet *trp) if (spool_open_dictionaries(trp->spool) == 0) { timer_start(trp->spool->tag); wy_debug(1, (_("processing triplet `%s'"), trp->name)); - if (process_directives(trp)) - remove_triplet(trp, 0); + process_directives(trp); timer_stop(trp->spool->tag); } timer_stop("spool"); @@ -405,7 +437,7 @@ triplet_remove_file(struct spool *spool, const char *name) struct wy_triplet *tp = triplet_lookup(spool, name); int i, n = 0; - if (!tp) + if (!tp || tp->list == &triplet_running_list) return; for (i = 0; i < FILE_TYPE_COUNT; i++) { @@ -420,9 +452,119 @@ triplet_remove_file(struct spool *spool, const char *name) if (!n) { wy_debug(1, ("deleting empty triplet (%s/%s)", spool->source_dir, name)); - remove_triplet(tp, 0); + remove_triplet(tp); } } + +/* Return true if any part of the triplet TRP was modified more than + TTL seconds ago */ +static int +triplet_expired_p(struct wy_triplet *trp) +{ + int i; + time_t now; + time_t ttl; + + if (!trp) + return 0; + + now = time(NULL); + ttl = trp->spool->file_sweep_time; + + if (ttl == 0) + return 0; + + for (i = 0; i < FILE_TYPE_COUNT; i++) { + if (trp->file[i].name && (now - trp->file[i].sb.st_mtime) >= ttl) { + wy_debug(1, (_("file %s expired"), trp->file[i].name)); + return 1; + } + } + return 0; +} + +void * +wy_thr_cleaner(void *ptr) +{ + triplet_list_lock(&triplet_pending_list); + while (1) { + if (triplet_pending_list.head) { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + triplet_ttl(triplet_pending_list.head, &ts); + pthread_cond_timedwait(&triplet_pending_list.cond, + &triplet_pending_list.mutex, &ts); + } else + pthread_cond_wait(&triplet_pending_list.cond, + &triplet_pending_list.mutex); + if (triplet_expired_p(triplet_pending_list.head)) + remove_triplet_unlocked(triplet_pending_list.head); + } +} + +void +wy_triplet_wait(void) +{ + triplet_list_lock(&triplet_running_list); + while (triplet_running_list.head) { + pthread_cond_wait(&triplet_running_list.cond, + &triplet_running_list.mutex); + } + triplet_list_unlock(&triplet_running_list); +} + +static pthread_key_t key; +static pthread_once_t key_once = PTHREAD_ONCE_INIT; + +static void +stat_free(void *f) +{ + free(f); +} + +static void +make_key(void) +{ + pthread_key_create(&key, stat_free); +} + +WY_STAT_COUNTER * +wy_get_stat_array(void) +{ + WY_STAT_COUNTER *stat; + pthread_once(&key_once, make_key); + if ((stat = pthread_getspecific(key)) == NULL) { + stat = grecs_calloc(WY_MAX_STAT, sizeof(stat[0])); + pthread_setspecific(key, stat); + } + return stat; +} + +WY_STAT_COUNTER * +wy_get_stat_ptr(int what) +{ + WY_STAT_COUNTER *stat; + pthread_once(&key_once, make_key); + if ((stat = pthread_getspecific(key)) == NULL) { + stat = grecs_calloc(WY_MAX_STAT, sizeof(stat[0])); + pthread_setspecific(key, stat); + } + return stat + what; +} + +void * +wy_thr_triplet(void *ptr) +{ + struct wy_triplet *trp = ptr; + + timer_start("wydawca"); + spool_create_timers(); + triplet_commit(trp); + timer_stop("wydawca"); + logstats(); + remove_triplet(trp); + return NULL; +} /* FIXME */ #define WY_EXP_DFL 0 @@ -844,10 +986,10 @@ expand_triplet_ls_dir(char **ret, struct wy_triplet *trp) } DECL_TRIPLET_EXP(real_name, uploader->realname) - DECL_TRIPLET_EXP(expand_user_email, uploader->email) +DECL_TRIPLET_EXP(expand_user_email, uploader->email) - static int - expand_email_user(char **ret, struct wy_triplet *trp) +static int +expand_email_user(char **ret, struct wy_triplet *trp) { if (trp && trp->uploader) { size_t size = 0; @@ -860,7 +1002,6 @@ DECL_TRIPLET_EXP(real_name, uploader->realname) return WRDSE_UNDEF; } -//FIXME: static env? static int expand_report(char **ret, struct wy_triplet *trp) { |