summaryrefslogtreecommitdiffabout
path: root/src
authorSergey Poznyakoff <gray@gnu.org.ua>2012-12-24 06:50:18 (GMT)
committer Sergey Poznyakoff <gray@gnu.org.ua>2012-12-24 06:50:18 (GMT)
commit633512c033c129a80a11a86ec85037fb2d2babf5 (patch) (side-by-side diff)
tree7f1c0810110518f530237636dda7d9f2f39ba818 /src
parent0077bc419d34a0341f978fc0e2d5cba8d35d4b28 (diff)
downloadwydawca-633512c033c129a80a11a86ec85037fb2d2babf5.tar.gz
wydawca-633512c033c129a80a11a86ec85037fb2d2babf5.tar.bz2
Improve triplet queue handling.
Remove the expired and invalid triplets in the master process. * src/job.c (job_queue_runner): Take estimated minimal interval as an argument. Set timer even if no wake-up is scheduled. * src/net.c: Call triplet_sweep to sweep off expired triplets and estimate minimal timeout for job_queue_runner. * src/triplet.c: Link all registered triplets into a doubly- linked list ordered by the triplet age, in descending order. Use that list to estimate minimal timeout. (triplet_list): New variable. (triplet_timestamp, triplet_ttl) (triplet_list_unlink,triplet_list_insert_before) (triplet_list_ordered_insert): New static functions. (register_file): Insert triplet into the list. (triplet_expired_p): Take one argument. Obtain the ttl from the associated spool. Fix condition. (remove_triplet): Remove triplet from the list and symtab. (triplet_sweep): New function. (triplet_counter): Remove bad and expired triplets. (triplet_remove_file): Call remove_triplet. * src/wydawca.h (file_triplet) <next,prev>: New members. (triplet_sweep): New proto. (job_queue_runner): Change signature.
Diffstat (limited to 'src') (more/less context) (ignore whitespace changes)
-rw-r--r--src/job.c117
-rw-r--r--src/net.c3
-rw-r--r--src/triplet.c138
-rw-r--r--src/watcher.c1
-rw-r--r--src/wydawca.h5
5 files changed, 194 insertions, 70 deletions
diff --git a/src/job.c b/src/job.c
index 315303f..ea1085e 100644
--- a/src/job.c
+++ b/src/job.c
@@ -291,84 +291,87 @@ print_status (struct job *job, int expect_term)
}
void
-job_queue_runner ()
+job_queue_runner (time_t min_interval)
{
int status;
struct job *job;
time_t now = time (NULL);
- time_t min_interval = 0;
- if (!wakeup)
- return;
- wakeup = 0;
-
- for (;;)
+ if (wakeup)
{
- pid_t pid = waitpid ((pid_t)-1, &status, WNOHANG);
- if (pid <= 0)
- break;
- for (job = queue; job; job = job->next)
+ wakeup = 0;
+
+ for (;;)
{
- if ((job->state & STATE_ACTIVE) && job->pid == pid)
+ pid_t pid = waitpid ((pid_t)-1, &status, WNOHANG);
+ if (pid <= 0)
+ break;
+ for (job = queue; job; job = job->next)
{
- job->state &= ~STATE_ACTIVE;
- job->state |= STATE_FINISHED;
- job->exit_status = status;
- jobcnt--;
+ if ((job->state & STATE_ACTIVE) && job->pid == pid)
+ {
+ job->state &= ~STATE_ACTIVE;
+ job->state |= STATE_FINISHED;
+ job->exit_status = status;
+ jobcnt--;
+ }
}
}
- }
-
- for (job = queue; job;)
- {
- struct job *next = job->next;
- if (job->state & STATE_FINISHED)
+
+ for (job = queue; job;)
{
- print_status (job, 0);
- if ((job->state &= ~STATE_FINISHED) == 0)
+ struct job *next = job->next;
+ if (job->state & STATE_FINISHED)
{
- if (WIFEXITED (job->exit_status)
- && WEXITSTATUS (job->exit_status) == WYDAWCA_EX_AGAIN)
+ print_status (job, 0);
+ if ((job->state &= ~STATE_FINISHED) == 0)
{
- time_t interval = lock_timeout;
- if (interval == 0)
- interval = lock_expire_time;
- /* Re-queue the job */
- job->state = STATE_QUEUED;
- job->timestamp = now + interval;
+ if (WIFEXITED (job->exit_status)
+ && WEXITSTATUS (job->exit_status) == WYDAWCA_EX_AGAIN)
+ {
+ time_t interval = lock_timeout;
+ if (interval == 0)
+ interval = lock_expire_time;
+ /* Re-queue the job */
+ job->state = STATE_QUEUED;
+ job->timestamp = now + interval;
+ }
+ else
+ {
+ job_remove (job);
+ free (job);
+ job = next;
+ continue;
+ }
+ }
+ }
+
+ if (job->state == STATE_QUEUED)
+ {
+ if (job->timestamp >= now)
+ {
+ if (job_start (job))
+ pause (); /* FIXME */
+ now = time (NULL);
}
else
{
- job_remove (job);
- free (job);
- job = next;
- continue;
+ time_t interval = job->timestamp - now;
+ if (min_interval == 0 || interval < min_interval)
+ min_interval = interval;
}
}
+
+ job = next;
}
-
- if (job->state == STATE_QUEUED)
- {
- if (job->timestamp >= now)
- {
- if (job_start (job))
- pause (); /* FIXME */
- now = time (NULL);
- }
- else
- {
- time_t interval = job->timestamp - now;
- if (min_interval == 0 || interval < min_interval)
- min_interval = interval;
- }
- }
-
- job = next;
}
- if (debug_level > 1)
- logmsg (LOG_DEBUG, _("computed interval: %lu"), min_interval);
- set_timer (min_interval);
+ if (min_interval)
+ {
+ if (debug_level > 1)
+ logmsg (LOG_DEBUG, _("computed interval: %lu"), min_interval);
+ set_timer (min_interval);
+ }
}
void
diff --git a/src/net.c b/src/net.c
index 3157fe8..9874da0 100644
--- a/src/net.c
+++ b/src/net.c
@@ -198,7 +198,8 @@ wydawca_listener ()
fd_set rset;
struct timeval to, *pto;
- job_queue_runner ();
+ job_queue_runner (triplet_sweep ());
+
FD_ZERO (&rset);
if (ctlfd != -1)
FD_SET (ctlfd, &rset);
diff --git a/src/triplet.c b/src/triplet.c
index aa74de0..14182be 100644
--- a/src/triplet.c
+++ b/src/triplet.c
@@ -17,8 +17,106 @@
#include "wydawca.h"
#include <time.h>
+/* Triplets are stored in a symtab: */
static struct grecs_symtab *triplet_table;
+/* ... and are organized into a doubly-linked list, using the prev and
+ next members of struct file_triplet. The list is ordered so that
+ prev points to a triplet older than this one, and next points to a
+ newer triplet. The triplet_list variable points to the root of the
+ list, i.e. the oldest triplet available. */
+static struct file_triplet *triplet_list;
+
+/* Functions for building the ordered doubly-linked list of triplets */
+static time_t
+triplet_timestamp (struct file_triplet *tp)
+{
+ int i;
+ time_t t = 0;
+
+ if (!tp)
+ return 0;
+
+ for (i = 0; i < FILE_TYPE_COUNT; i++)
+ {
+ if (tp->file[i].name &&
+ (t == 0 || t > tp->file[i].sb.st_mtime))
+ t = tp->file[i].sb.st_mtime;
+ }
+ return t;
+}
+
+static time_t
+triplet_ttl (struct file_triplet *tp)
+{
+ time_t t;
+ if (!tp)
+ return 0;
+ t = time (NULL) - triplet_timestamp (tp);
+ if (t < tp->spool->file_sweep_time)
+ return tp->spool->file_sweep_time - t;
+ return 0;
+}
+
+void
+triplet_list_unlink (struct file_triplet *tp)
+{
+ if (tp->prev)
+ tp->prev->next = tp->next;
+ else
+ triplet_list = tp->next;
+
+ if (tp->next)
+ tp->next->prev = tp->prev;
+
+ tp->next = tp->prev = NULL;
+}
+
+static void
+triplet_list_insert_before (struct file_triplet *newp,
+ struct file_triplet *anchor)
+{
+ if (!anchor)
+ {
+ triplet_list = newp;
+ return;
+ }
+
+ if (anchor->prev)
+ anchor->prev->next = newp;
+ else
+ triplet_list = newp;
+ newp->prev = anchor->prev;
+
+ anchor->prev = newp;
+ newp->next = anchor;
+}
+
+void
+triplet_list_ordered_insert (struct file_triplet *tp)
+{
+ time_t t = triplet_timestamp (tp);
+ struct file_triplet *p, *prev = NULL;
+
+ for (p = triplet_list; p && triplet_timestamp (p) < t; prev = p, p = p->next)
+ ;
+ if (p)
+ triplet_list_insert_before (tp, p);
+ else if (prev)
+ {
+ prev->next = tp;
+ tp->prev = prev;
+ tp->next = NULL;
+ }
+ else
+ {
+ tp->next = tp->prev = NULL;
+ triplet_list = tp;
+ }
+}
+
+
+/* Functions for operation on a symtab of triplets. */
static unsigned
hash_triplet_hasher (void *data, unsigned long n_buckets)
{
@@ -110,6 +208,8 @@ register_file (struct file_info *finfo, const struct spool *spool)
ret->acc = txtacc_create ();
}
ret->file[finfo->type] = *finfo;
+ triplet_list_unlink (ret);
+ triplet_list_ordered_insert (ret);
}
struct file_triplet *
@@ -138,18 +238,19 @@ triplet_lookup (struct spool *spool, const char *name)
/* Return true if any part of the triplet TRP was modified more than
TTL seconds ago */
static int
-triplet_expired_p (struct file_triplet *trp, time_t ttl)
+triplet_expired_p (struct file_triplet *trp)
{
int i;
time_t now = time (NULL);
-
+ time_t ttl = trp->spool->file_sweep_time;
+
if (ttl == 0)
return 0;
for (i = 0; i < FILE_TYPE_COUNT; i++)
{
if (trp->file[i].name
- && (now - trp->file[i].sb.st_mtime) > ttl)
+ && (now - trp->file[i].sb.st_mtime) >= ttl)
{
if (debug_level)
logmsg (LOG_DEBUG, _("file %s expired"), trp->file[i].name);
@@ -220,6 +321,19 @@ remove_triplet (struct file_triplet *trp)
trp->file[i].name, strerror (errno));
}
}
+ triplet_list_unlink (trp);
+ grecs_symtab_remove (triplet_table, trp);
+}
+
+time_t
+triplet_sweep (void)
+{
+ struct file_triplet const *tp;
+
+ while (triplet_list && triplet_expired_p (triplet_list))
+ remove_triplet (triplet_list);
+
+ return triplet_ttl (triplet_list);
}
/* Process a single triplet from the table */
@@ -258,7 +372,7 @@ triplet_processor (void *data, void *proc_data)
return 0;
}
- if (triplet_expired_p (trp, trp->spool->file_sweep_time))
+ if (triplet_expired_p (trp))
{
UPDATE_STATS (STAT_EXPIRED_TRIPLETS);
remove_triplet (trp);
@@ -306,15 +420,17 @@ triplet_counter (void *data, void *proc_data)
{
case triplet_directive:
case triplet_complete:
- case triplet_bad:
- ++*cp;
+ if (triplet_expired_p (trp))
+ remove_triplet (trp);
+ else
+ ++*cp;
+ break;
case triplet_incomplete:
- return 0;
+ break;
+ case triplet_bad:
+ remove_triplet (trp);
}
- if (triplet_expired_p (trp, trp->spool->file_sweep_time))
- ++*cp;//FIXME
-
return 0;
}
@@ -351,7 +467,7 @@ triplet_remove_file (struct spool *spool, const char *name)
if (debug_level > 0)
logmsg (LOG_DEBUG, "deleting empty triplet (%s/%s)",
spool->source_dir, name);
- grecs_symtab_remove (triplet_table, tp);
+ remove_triplet (tp);
}
}
diff --git a/src/watcher.c b/src/watcher.c
index c52e3e6..1c67294 100644
--- a/src/watcher.c
+++ b/src/watcher.c
@@ -200,6 +200,7 @@ process_event (struct inotify_event *ep)
else
logmsg (LOG_NOTICE, "%s/%s: unexpected event %x",
dwp->spool->source_dir, ep->name, ep->mask);
+
}
static char buffer[4096];
diff --git a/src/wydawca.h b/src/wydawca.h
index 323d403..4021a56 100644
--- a/src/wydawca.h
+++ b/src/wydawca.h
@@ -185,6 +185,7 @@ struct file_triplet
char *tmp; /* Temporary storage */
size_t tmpsize; /* Size of memory allocated in tmp */
struct txtacc *acc; /* Text accumulator for string allocation */
+ struct file_triplet *prev, *next;
/* User data */
size_t uploader_count;
struct uploader_info *uploader_list;
@@ -442,6 +443,8 @@ char *triplet_expand_dictionary_query (struct dictionary *dict, void *handle,
void triplet_remove_file (struct spool *spool, const char *name);
+time_t triplet_sweep (void);
+
/* General-purpose dictionary support */
struct dictionary *dictionary_new (enum dictionary_id id,
enum dictionary_type type);
@@ -558,7 +561,7 @@ extern char *report_string;
/* job.c */
void schedule_job (struct spool *spool, uid_t uid);
void job_init (void);
-void job_queue_runner (void);
+void job_queue_runner (time_t min_timeout);
/* profile.c */

Return to:

Send suggestions and report system problems to the System administrator.