diff options
author | Sergey Poznyakoff <gray@gnu.org.ua> | 2012-12-24 08:50:18 +0200 |
---|---|---|
committer | Sergey Poznyakoff <gray@gnu.org.ua> | 2012-12-24 08:50:18 +0200 |
commit | 633512c033c129a80a11a86ec85037fb2d2babf5 (patch) | |
tree | 7f1c0810110518f530237636dda7d9f2f39ba818 /src | |
parent | 0077bc419d34a0341f978fc0e2d5cba8d35d4b28 (diff) | |
download | wydawca-633512c033c129a80a11a86ec85037fb2d2babf5.tar.gz wydawca-633512c033c129a80a11a86ec85037fb2d2babf5.tar.bz2 |
Improve triplet queue handling.
Remove the expired and invalid triplets in the master process.
* src/job.c (job_queue_runner): Take estimated minimal
interval as an argument. Set timer even if no wake-up
is scheduled.
* src/net.c: Call triplet_sweep to sweep off expired
triplets and estimate minimal timeout for job_queue_runner.
* src/triplet.c: Link all registered triplets into a doubly-
linked list ordered by the triplet age, in descending order.
Use that list to estimate minimal timeout.
(triplet_list): New variable.
(triplet_timestamp, triplet_ttl)
(triplet_list_unlink,triplet_list_insert_before)
(triplet_list_ordered_insert): New static functions.
(register_file): Insert triplet into the list.
(triplet_expired_p): Take one argument. Obtain the ttl
from the associated spool. Fix condition.
(remove_triplet): Remove triplet from the list and symtab.
(triplet_sweep): New function.
(triplet_counter): Remove bad and expired triplets.
(triplet_remove_file): Call remove_triplet.
* src/wydawca.h (file_triplet) <next,prev>: New members.
(triplet_sweep): New proto.
(job_queue_runner): Change signature.
Diffstat (limited to 'src')
-rw-r--r-- | src/job.c | 117 | ||||
-rw-r--r-- | src/net.c | 3 | ||||
-rw-r--r-- | src/triplet.c | 138 | ||||
-rw-r--r-- | src/watcher.c | 1 | ||||
-rw-r--r-- | src/wydawca.h | 5 |
5 files changed, 194 insertions, 70 deletions
@@ -293,3 +293,3 @@ print_status (struct job *job, int expect_term) void -job_queue_runner () +job_queue_runner (time_t min_interval) { @@ -298,42 +298,59 @@ job_queue_runner () time_t now = time (NULL); - time_t min_interval = 0; - if (!wakeup) - return; - wakeup = 0; - - for (;;) + if (wakeup) { - pid_t pid = waitpid ((pid_t)-1, &status, WNOHANG); - if (pid <= 0) - break; - for (job = queue; job; job = job->next) + wakeup = 0; + + for (;;) { - if ((job->state & STATE_ACTIVE) && job->pid == pid) + pid_t pid = waitpid ((pid_t)-1, &status, WNOHANG); + if (pid <= 0) + break; + for (job = queue; job; job = job->next) { - job->state &= ~STATE_ACTIVE; - job->state |= STATE_FINISHED; - job->exit_status = status; - jobcnt--; + if ((job->state & STATE_ACTIVE) && job->pid == pid) + { + job->state &= ~STATE_ACTIVE; + job->state |= STATE_FINISHED; + job->exit_status = status; + jobcnt--; + } } } - } - - for (job = queue; job;) - { - struct job *next = job->next; - if (job->state & STATE_FINISHED) + + for (job = queue; job;) { - print_status (job, 0); - if ((job->state &= ~STATE_FINISHED) == 0) + struct job *next = job->next; + if (job->state & STATE_FINISHED) { - if (WIFEXITED (job->exit_status) - && WEXITSTATUS (job->exit_status) == WYDAWCA_EX_AGAIN) + print_status (job, 0); + if ((job->state &= ~STATE_FINISHED) == 0) { - time_t interval = lock_timeout; - if (interval == 0) - interval = lock_expire_time; - /* Re-queue the job */ - job->state = STATE_QUEUED; - job->timestamp = now + interval; + if (WIFEXITED (job->exit_status) + && WEXITSTATUS (job->exit_status) == WYDAWCA_EX_AGAIN) + { + time_t interval = lock_timeout; + if (interval == 0) + interval = lock_expire_time; + /* Re-queue the job */ + job->state = STATE_QUEUED; + job->timestamp = now + interval; + } + else + { + job_remove (job); + free (job); + job = next; + continue; + } + } + } + + if (job->state == STATE_QUEUED) + { + if (job->timestamp >= now) + { + if (job_start (job)) + pause (); /* FIXME */ + now = time (NULL); } @@ -341,32 +358,18 @@ job_queue_runner () { - job_remove (job); - free (job); - job = next; - continue; + time_t interval = job->timestamp - now; + if (min_interval == 0 || interval < min_interval) + min_interval = interval; } } + + job = next; } - - if (job->state == STATE_QUEUED) - { - if (job->timestamp >= now) - { - if (job_start (job)) - pause (); /* FIXME */ - now = time (NULL); - } - else - { - time_t interval = job->timestamp - now; - if (min_interval == 0 || interval < min_interval) - min_interval = interval; - } - } - - job = next; } - if (debug_level > 1) - logmsg (LOG_DEBUG, _("computed interval: %lu"), min_interval); - set_timer (min_interval); + if (min_interval) + { + if (debug_level > 1) + logmsg (LOG_DEBUG, _("computed interval: %lu"), min_interval); + set_timer (min_interval); + } } @@ -200,3 +200,4 @@ wydawca_listener () - job_queue_runner (); + job_queue_runner (triplet_sweep ()); + FD_ZERO (&rset); diff --git a/src/triplet.c b/src/triplet.c index aa74de0..14182be 100644 --- a/src/triplet.c +++ b/src/triplet.c @@ -19,4 +19,102 @@ +/* Triplets are stored in a symtab: */ static struct grecs_symtab *triplet_table; +/* ... and are organized into a doubly-linked list, using the prev and + next members of struct file_triplet. The list is ordered so that + prev points to a triplet older than this one, and next points to a + newer triplet. The triplet_list variable points to the root of the + list, i.e. the oldest triplet available. */ +static struct file_triplet *triplet_list; + +/* Functions for building the ordered doubly-linked list of triplets */ +static time_t +triplet_timestamp (struct file_triplet *tp) +{ + int i; + time_t t = 0; + + if (!tp) + return 0; + + for (i = 0; i < FILE_TYPE_COUNT; i++) + { + if (tp->file[i].name && + (t == 0 || t > tp->file[i].sb.st_mtime)) + t = tp->file[i].sb.st_mtime; + } + return t; +} + +static time_t +triplet_ttl (struct file_triplet *tp) +{ + time_t t; + if (!tp) + return 0; + t = time (NULL) - triplet_timestamp (tp); + if (t < tp->spool->file_sweep_time) + return tp->spool->file_sweep_time - t; + return 0; +} + +void +triplet_list_unlink (struct file_triplet *tp) +{ + if (tp->prev) + tp->prev->next = tp->next; + else + triplet_list = tp->next; + + if (tp->next) + tp->next->prev = tp->prev; + + tp->next = tp->prev = NULL; +} + +static void +triplet_list_insert_before (struct file_triplet *newp, + struct file_triplet *anchor) +{ + if (!anchor) + { + triplet_list = newp; + return; + } + + if (anchor->prev) + anchor->prev->next = newp; + else + triplet_list = newp; + newp->prev = anchor->prev; + + anchor->prev = newp; + newp->next = anchor; +} + +void +triplet_list_ordered_insert (struct file_triplet *tp) +{ + time_t t = triplet_timestamp (tp); + struct file_triplet *p, *prev = NULL; + + for (p = triplet_list; p && triplet_timestamp (p) < t; prev = p, p = p->next) + ; + if (p) + triplet_list_insert_before (tp, p); + else if (prev) + { + prev->next = tp; + tp->prev = prev; + tp->next = NULL; + } + else + { + tp->next = tp->prev = NULL; + triplet_list = tp; + } +} + + +/* Functions for operation on a symtab of triplets. */ static unsigned @@ -112,2 +210,4 @@ register_file (struct file_info *finfo, const struct spool *spool) ret->file[finfo->type] = *finfo; + triplet_list_unlink (ret); + triplet_list_ordered_insert (ret); } @@ -140,3 +240,3 @@ triplet_lookup (struct spool *spool, const char *name) static int -triplet_expired_p (struct file_triplet *trp, time_t ttl) +triplet_expired_p (struct file_triplet *trp) { @@ -144,3 +244,4 @@ triplet_expired_p (struct file_triplet *trp, time_t ttl) time_t now = time (NULL); - + time_t ttl = trp->spool->file_sweep_time; + if (ttl == 0) @@ -151,3 +252,3 @@ triplet_expired_p (struct file_triplet *trp, time_t ttl) if (trp->file[i].name - && (now - trp->file[i].sb.st_mtime) > ttl) + && (now - trp->file[i].sb.st_mtime) >= ttl) { @@ -222,2 +323,15 @@ remove_triplet (struct file_triplet *trp) } + triplet_list_unlink (trp); + grecs_symtab_remove (triplet_table, trp); +} + +time_t +triplet_sweep (void) +{ + struct file_triplet const *tp; + + while (triplet_list && triplet_expired_p (triplet_list)) + remove_triplet (triplet_list); + + return triplet_ttl (triplet_list); } @@ -260,3 +374,3 @@ triplet_processor (void *data, void *proc_data) - if (triplet_expired_p (trp, trp->spool->file_sweep_time)) + if (triplet_expired_p (trp)) { @@ -308,11 +422,13 @@ triplet_counter (void *data, void *proc_data) case triplet_complete: - case triplet_bad: - ++*cp; + if (triplet_expired_p (trp)) + remove_triplet (trp); + else + ++*cp; + break; case triplet_incomplete: - return 0; + break; + case triplet_bad: + remove_triplet (trp); } - if (triplet_expired_p (trp, trp->spool->file_sweep_time)) - ++*cp;//FIXME - return 0; @@ -353,3 +469,3 @@ triplet_remove_file (struct spool *spool, const char *name) spool->source_dir, name); - grecs_symtab_remove (triplet_table, tp); + remove_triplet (tp); } diff --git a/src/watcher.c b/src/watcher.c index c52e3e6..1c67294 100644 --- a/src/watcher.c +++ b/src/watcher.c @@ -202,2 +202,3 @@ process_event (struct inotify_event *ep) dwp->spool->source_dir, ep->name, ep->mask); + } diff --git a/src/wydawca.h b/src/wydawca.h index 323d403..4021a56 100644 --- a/src/wydawca.h +++ b/src/wydawca.h @@ -187,2 +187,3 @@ struct file_triplet struct txtacc *acc; /* Text accumulator for string allocation */ + struct file_triplet *prev, *next; /* User data */ @@ -444,2 +445,4 @@ void triplet_remove_file (struct spool *spool, const char *name); +time_t triplet_sweep (void); + /* General-purpose dictionary support */ @@ -560,3 +563,3 @@ void schedule_job (struct spool *spool, uid_t uid); void job_init (void); -void job_queue_runner (void); +void job_queue_runner (time_t min_timeout); |