From 633512c033c129a80a11a86ec85037fb2d2babf5 Mon Sep 17 00:00:00 2001 From: Sergey Poznyakoff Date: Mon, 24 Dec 2012 08:50:18 +0200 Subject: Improve triplet queue handling. Remove the expired and invalid triplets in the master process. * src/job.c (job_queue_runner): Take estimated minimal interval as an argument. Set timer even if no wake-up is scheduled. * src/net.c: Call triplet_sweep to sweep off expired triplets and estimate minimal timeout for job_queue_runner. * src/triplet.c: Link all registered triplets into a doubly- linked list ordered by the triplet age, in descending order. Use that list to estimate minimal timeout. (triplet_list): New variable. (triplet_timestamp, triplet_ttl) (triplet_list_unlink,triplet_list_insert_before) (triplet_list_ordered_insert): New static functions. (register_file): Insert triplet into the list. (triplet_expired_p): Take one argument. Obtain the ttl from the associated spool. Fix condition. (remove_triplet): Remove triplet from the list and symtab. (triplet_sweep): New function. (triplet_counter): Remove bad and expired triplets. (triplet_remove_file): Call remove_triplet. * src/wydawca.h (file_triplet) : New members. (triplet_sweep): New proto. (job_queue_runner): Change signature. --- src/job.c | 117 +++++++++++++++++++++++++------------------------ src/net.c | 3 +- src/triplet.c | 138 +++++++++++++++++++++++++++++++++++++++++++++++++++++----- src/watcher.c | 1 + src/wydawca.h | 5 ++- 5 files changed, 194 insertions(+), 70 deletions(-) (limited to 'src') diff --git a/src/job.c b/src/job.c index 315303f..ea1085e 100644 --- a/src/job.c +++ b/src/job.c @@ -291,84 +291,87 @@ print_status (struct job *job, int expect_term) } void -job_queue_runner () +job_queue_runner (time_t min_interval) { int status; struct job *job; time_t now = time (NULL); - time_t min_interval = 0; - if (!wakeup) - return; - wakeup = 0; - - for (;;) + if (wakeup) { - pid_t pid = waitpid ((pid_t)-1, &status, WNOHANG); - if (pid <= 0) - break; - for (job = queue; job; job = job->next) + wakeup = 0; + + for (;;) { - if ((job->state & STATE_ACTIVE) && job->pid == pid) + pid_t pid = waitpid ((pid_t)-1, &status, WNOHANG); + if (pid <= 0) + break; + for (job = queue; job; job = job->next) { - job->state &= ~STATE_ACTIVE; - job->state |= STATE_FINISHED; - job->exit_status = status; - jobcnt--; + if ((job->state & STATE_ACTIVE) && job->pid == pid) + { + job->state &= ~STATE_ACTIVE; + job->state |= STATE_FINISHED; + job->exit_status = status; + jobcnt--; + } } } - } - - for (job = queue; job;) - { - struct job *next = job->next; - if (job->state & STATE_FINISHED) + + for (job = queue; job;) { - print_status (job, 0); - if ((job->state &= ~STATE_FINISHED) == 0) + struct job *next = job->next; + if (job->state & STATE_FINISHED) { - if (WIFEXITED (job->exit_status) - && WEXITSTATUS (job->exit_status) == WYDAWCA_EX_AGAIN) + print_status (job, 0); + if ((job->state &= ~STATE_FINISHED) == 0) { - time_t interval = lock_timeout; - if (interval == 0) - interval = lock_expire_time; - /* Re-queue the job */ - job->state = STATE_QUEUED; - job->timestamp = now + interval; + if (WIFEXITED (job->exit_status) + && WEXITSTATUS (job->exit_status) == WYDAWCA_EX_AGAIN) + { + time_t interval = lock_timeout; + if (interval == 0) + interval = lock_expire_time; + /* Re-queue the job */ + job->state = STATE_QUEUED; + job->timestamp = now + interval; + } + else + { + job_remove (job); + free (job); + job = next; + continue; + } + } + } + + if (job->state == STATE_QUEUED) + { + if (job->timestamp >= now) + { + if (job_start (job)) + pause (); /* FIXME */ + now = time (NULL); } else { - job_remove (job); - free (job); - job = next; - continue; + time_t interval = job->timestamp - now; + if (min_interval == 0 || interval < min_interval) + min_interval = interval; } } + + job = next; } - - if (job->state == STATE_QUEUED) - { - if (job->timestamp >= now) - { - if (job_start (job)) - pause (); /* FIXME */ - now = time (NULL); - } - else - { - time_t interval = job->timestamp - now; - if (min_interval == 0 || interval < min_interval) - min_interval = interval; - } - } - - job = next; } - if (debug_level > 1) - logmsg (LOG_DEBUG, _("computed interval: %lu"), min_interval); - set_timer (min_interval); + if (min_interval) + { + if (debug_level > 1) + logmsg (LOG_DEBUG, _("computed interval: %lu"), min_interval); + set_timer (min_interval); + } } void diff --git a/src/net.c b/src/net.c index 3157fe8..9874da0 100644 --- a/src/net.c +++ b/src/net.c @@ -198,7 +198,8 @@ wydawca_listener () fd_set rset; struct timeval to, *pto; - job_queue_runner (); + job_queue_runner (triplet_sweep ()); + FD_ZERO (&rset); if (ctlfd != -1) FD_SET (ctlfd, &rset); diff --git a/src/triplet.c b/src/triplet.c index aa74de0..14182be 100644 --- a/src/triplet.c +++ b/src/triplet.c @@ -17,8 +17,106 @@ #include "wydawca.h" #include +/* Triplets are stored in a symtab: */ static struct grecs_symtab *triplet_table; +/* ... and are organized into a doubly-linked list, using the prev and + next members of struct file_triplet. The list is ordered so that + prev points to a triplet older than this one, and next points to a + newer triplet. The triplet_list variable points to the root of the + list, i.e. the oldest triplet available. */ +static struct file_triplet *triplet_list; + +/* Functions for building the ordered doubly-linked list of triplets */ +static time_t +triplet_timestamp (struct file_triplet *tp) +{ + int i; + time_t t = 0; + + if (!tp) + return 0; + + for (i = 0; i < FILE_TYPE_COUNT; i++) + { + if (tp->file[i].name && + (t == 0 || t > tp->file[i].sb.st_mtime)) + t = tp->file[i].sb.st_mtime; + } + return t; +} + +static time_t +triplet_ttl (struct file_triplet *tp) +{ + time_t t; + if (!tp) + return 0; + t = time (NULL) - triplet_timestamp (tp); + if (t < tp->spool->file_sweep_time) + return tp->spool->file_sweep_time - t; + return 0; +} + +void +triplet_list_unlink (struct file_triplet *tp) +{ + if (tp->prev) + tp->prev->next = tp->next; + else + triplet_list = tp->next; + + if (tp->next) + tp->next->prev = tp->prev; + + tp->next = tp->prev = NULL; +} + +static void +triplet_list_insert_before (struct file_triplet *newp, + struct file_triplet *anchor) +{ + if (!anchor) + { + triplet_list = newp; + return; + } + + if (anchor->prev) + anchor->prev->next = newp; + else + triplet_list = newp; + newp->prev = anchor->prev; + + anchor->prev = newp; + newp->next = anchor; +} + +void +triplet_list_ordered_insert (struct file_triplet *tp) +{ + time_t t = triplet_timestamp (tp); + struct file_triplet *p, *prev = NULL; + + for (p = triplet_list; p && triplet_timestamp (p) < t; prev = p, p = p->next) + ; + if (p) + triplet_list_insert_before (tp, p); + else if (prev) + { + prev->next = tp; + tp->prev = prev; + tp->next = NULL; + } + else + { + tp->next = tp->prev = NULL; + triplet_list = tp; + } +} + + +/* Functions for operation on a symtab of triplets. */ static unsigned hash_triplet_hasher (void *data, unsigned long n_buckets) { @@ -110,6 +208,8 @@ register_file (struct file_info *finfo, const struct spool *spool) ret->acc = txtacc_create (); } ret->file[finfo->type] = *finfo; + triplet_list_unlink (ret); + triplet_list_ordered_insert (ret); } struct file_triplet * @@ -138,18 +238,19 @@ triplet_lookup (struct spool *spool, const char *name) /* Return true if any part of the triplet TRP was modified more than TTL seconds ago */ static int -triplet_expired_p (struct file_triplet *trp, time_t ttl) +triplet_expired_p (struct file_triplet *trp) { int i; time_t now = time (NULL); - + time_t ttl = trp->spool->file_sweep_time; + if (ttl == 0) return 0; for (i = 0; i < FILE_TYPE_COUNT; i++) { if (trp->file[i].name - && (now - trp->file[i].sb.st_mtime) > ttl) + && (now - trp->file[i].sb.st_mtime) >= ttl) { if (debug_level) logmsg (LOG_DEBUG, _("file %s expired"), trp->file[i].name); @@ -220,6 +321,19 @@ remove_triplet (struct file_triplet *trp) trp->file[i].name, strerror (errno)); } } + triplet_list_unlink (trp); + grecs_symtab_remove (triplet_table, trp); +} + +time_t +triplet_sweep (void) +{ + struct file_triplet const *tp; + + while (triplet_list && triplet_expired_p (triplet_list)) + remove_triplet (triplet_list); + + return triplet_ttl (triplet_list); } /* Process a single triplet from the table */ @@ -258,7 +372,7 @@ triplet_processor (void *data, void *proc_data) return 0; } - if (triplet_expired_p (trp, trp->spool->file_sweep_time)) + if (triplet_expired_p (trp)) { UPDATE_STATS (STAT_EXPIRED_TRIPLETS); remove_triplet (trp); @@ -306,15 +420,17 @@ triplet_counter (void *data, void *proc_data) { case triplet_directive: case triplet_complete: - case triplet_bad: - ++*cp; + if (triplet_expired_p (trp)) + remove_triplet (trp); + else + ++*cp; + break; case triplet_incomplete: - return 0; + break; + case triplet_bad: + remove_triplet (trp); } - if (triplet_expired_p (trp, trp->spool->file_sweep_time)) - ++*cp;//FIXME - return 0; } @@ -351,7 +467,7 @@ triplet_remove_file (struct spool *spool, const char *name) if (debug_level > 0) logmsg (LOG_DEBUG, "deleting empty triplet (%s/%s)", spool->source_dir, name); - grecs_symtab_remove (triplet_table, tp); + remove_triplet (tp); } } diff --git a/src/watcher.c b/src/watcher.c index c52e3e6..1c67294 100644 --- a/src/watcher.c +++ b/src/watcher.c @@ -200,6 +200,7 @@ process_event (struct inotify_event *ep) else logmsg (LOG_NOTICE, "%s/%s: unexpected event %x", dwp->spool->source_dir, ep->name, ep->mask); + } static char buffer[4096]; diff --git a/src/wydawca.h b/src/wydawca.h index 323d403..4021a56 100644 --- a/src/wydawca.h +++ b/src/wydawca.h @@ -185,6 +185,7 @@ struct file_triplet char *tmp; /* Temporary storage */ size_t tmpsize; /* Size of memory allocated in tmp */ struct txtacc *acc; /* Text accumulator for string allocation */ + struct file_triplet *prev, *next; /* User data */ size_t uploader_count; struct uploader_info *uploader_list; @@ -442,6 +443,8 @@ char *triplet_expand_dictionary_query (struct dictionary *dict, void *handle, void triplet_remove_file (struct spool *spool, const char *name); +time_t triplet_sweep (void); + /* General-purpose dictionary support */ struct dictionary *dictionary_new (enum dictionary_id id, enum dictionary_type type); @@ -558,7 +561,7 @@ extern char *report_string; /* job.c */ void schedule_job (struct spool *spool, uid_t uid); void job_init (void); -void job_queue_runner (void); +void job_queue_runner (time_t min_timeout); /* profile.c */ -- cgit v1.2.1