aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergey Poznyakoff <gray@gnu.org.ua>2013-01-01 19:48:33 +0200
committerSergey Poznyakoff <gray@gnu.org.ua>2013-01-01 20:53:41 +0200
commit66775034166ac35bda383ef41213c95021b37b64 (patch)
tree5a64f2e45955acc5758977c6de0740ba34707d65
parentfad678ec27c7f37da83ddf3ba3a96196a962c238 (diff)
downloadwydawca-66775034166ac35bda383ef41213c95021b37b64.tar.gz
wydawca-66775034166ac35bda383ef41213c95021b37b64.tar.bz2
Avoid processing same triplet twice.release-2.2
* src/wydawca.h (file_triplet) <jq_prev,jq_next,job>: New members. (spool_commit_triplets,schedule_job): Update signature. (count_processable_triplets): Remove. (link_processable_triplets): New proto. (remove_triplet): New proto. (triplet_jq_unlink): New proto. * src/job.c (job) <triplet_list>: New member. (triplet_jq_unlink,jq_clear): New functions. (procspool): Create and stop spool timers. (wydawca_scanner): Call spool_create_timers. (schedule_job): Additional argument specifies triplets to process. Initialize job->triplet_list; (job_queue_runner): Call jq_clear. * src/net.c (handle_connection): Update call to schedule_job. * src/process.c (spool_cwd_add_new_file): Update call to spool_commit_triplets. * src/triplet.c (remove_triplet): Now extern. Takes two args. All uses updated. (spool_commit_triplets): Additional argument specifies triplets to process. (count_processable_triplets): Remove. (link_processable_triplets): New function. * src/watcher.c: Use link_processable_triplets to obtain a doubly-linked list of triplets to process. Pass it to schedule_job.
-rw-r--r--src/job.c57
-rw-r--r--src/net.c2
-rw-r--r--src/process.c6
-rw-r--r--src/triplet.c85
-rw-r--r--src/watcher.c7
-rw-r--r--src/wydawca.h24
6 files changed, 137 insertions, 44 deletions
diff --git a/src/job.c b/src/job.c
index 7a9316a..b49efe3 100644
--- a/src/job.c
+++ b/src/job.c
@@ -23,2 +23,4 @@
+#define STATE_ANY (STATE_FINISHED|STATE_QUEUED|STATE_ACTIVE)
+
struct job {
@@ -27,2 +29,3 @@ struct job {
struct spool *spool;
+ struct file_triplet *triplet_list;
uid_t uid;
@@ -56,4 +59,35 @@ set_timer(time_t interval)
+void
+triplet_jq_unlink(struct file_triplet *tp)
+{
+ if (!tp || !tp->job)
+ return;
+ if (tp->prev)
+ tp->jq_prev->jq_next = tp->jq_next;
+ else
+ tp->job->triplet_list = tp->jq_next;
+
+ if (tp->jq_next)
+ tp->jq_next->jq_prev = tp->jq_prev;
+
+ tp->jq_next = tp->jq_prev = NULL;
+}
+
+void
+jq_clear(struct job *job)
+{
+ struct file_triplet *tp;
+
+ if (!job->triplet_list)
+ return;
+ for (tp = job->triplet_list; tp; ) {
+ struct file_triplet *next = tp->jq_next;
+ remove_triplet(tp, 1);
+ tp = next;
+ }
+ job->triplet_list = NULL;
+}
+
struct job *
-job_locate(const struct spool *spool, uid_t uid)
+job_locate(const struct spool *spool, uid_t uid, int mask)
{
@@ -61,3 +95,3 @@ job_locate(const struct spool *spool, uid_t uid)
for (p = queue; p; p = p->next)
- if (p->spool == spool && p->uid == uid)
+ if ((p->state & mask) && p->spool == spool && p->uid == uid)
break;
@@ -80,3 +114,7 @@ procspool(struct spool *spool, void *data)
{
- spool_commit_triplets(spool);
+ timer_start("spool");
+ timer_start(spool->tag);
+ spool_commit_triplets(spool, (struct file_triplet *)data);
+ timer_stop(spool->tag);
+ timer_stop("spool");
return 0;
@@ -90,4 +128,5 @@ wydawca_scanner(struct job *job)
timer_start("wydawca");
+ spool_create_timers();
if (job->spool == &inotify_spool)
- rc = for_each_spool(procspool, NULL);
+ rc = for_each_spool(procspool, job->triplet_list);
else if (job->spool == &fake_spool)
@@ -95,3 +134,2 @@ wydawca_scanner(struct job *job)
else {
- spool_create_timers();
rc = scan_spool(job->spool, 1, &job->uid);
@@ -192,3 +230,3 @@ job_insert(struct job *job, struct job *elt)
void
-schedule_job(struct spool *spool, uid_t uid)
+schedule_job(struct spool *spool, uid_t uid, struct file_triplet *tp)
{
@@ -203,3 +241,3 @@ schedule_job(struct spool *spool, uid_t uid)
- job = job_locate(spool, uid);
+ job = job_locate(spool, uid, tp ? STATE_QUEUED : STATE_ANY);
if (!job) {
@@ -209,2 +247,5 @@ schedule_job(struct spool *spool, uid_t uid)
job->pid = -1;
+ job->triplet_list = tp;
+ for (; tp; tp = tp->jq_next)
+ tp->job = job;
time(&job->timestamp);
@@ -293,2 +334,3 @@ job_queue_runner(time_t min_interval)
job->state |= STATE_FINISHED;
+ jq_clear(job);
job->exit_status = status;
@@ -354 +396,2 @@ job_init()
}
+
diff --git a/src/net.c b/src/net.c
index c63bce8..c3ba881 100644
--- a/src/net.c
+++ b/src/net.c
@@ -134,3 +134,3 @@ handle_connection(FILE * in, FILE * out)
if (pw)
- schedule_job(spool, pw->pw_uid);
+ schedule_job(spool, pw->pw_uid, NULL);
else
diff --git a/src/process.c b/src/process.c
index 1226ed2..0b4fd1b 100644
--- a/src/process.c
+++ b/src/process.c
@@ -151,3 +151,3 @@ int
spool_cwd_add_new_file(const struct spool *spool, const char *name,
- int uc, uid_t * uv)
+ int uc, uid_t *uv)
{
@@ -205,3 +205,3 @@ spool_add_new_file(const struct spool *spool, const char *name,
void
-scan_spool_unlocked(struct spool *spool, int uc, uid_t * uv)
+scan_spool_unlocked(struct spool *spool, int uc, uid_t *uv)
{
@@ -240,3 +240,3 @@ scan_spool_unlocked(struct spool *spool, int uc, uid_t * uv)
if (count_collected_triplets() > 0)
- spool_commit_triplets(spool);
+ spool_commit_triplets(spool, NULL);
diff --git a/src/triplet.c b/src/triplet.c
index c4dc576..2df9151 100644
--- a/src/triplet.c
+++ b/src/triplet.c
@@ -296,4 +296,4 @@ check_triplet_state(struct file_triplet *trp, int noauth)
/* Unlink all parts of the triplet TRP */
-static void
-remove_triplet(struct file_triplet *trp)
+int
+remove_triplet(struct file_triplet *trp, int check)
{
@@ -303,7 +303,18 @@ remove_triplet(struct file_triplet *trp)
if (trp->file[i].name) {
- logmsg(LOG_NOTICE, _("removing %s"),
- trp->file[i].name);
- if (!dry_run_mode && unlink(trp->file[i].name))
- logmsg(LOG_ERR, _("cannot remove %s: %s"),
- trp->file[i].name, strerror(errno));
+ if (check) {
+ if (access(trp->file[i].name, F_OK) == 0) {
+ logmsg(LOG_NOTICE,
+ _("%s still exists"),
+ trp->file[i].name);
+ return 1;
+ }
+ } else {
+ logmsg(LOG_NOTICE, _("removing %s"),
+ trp->file[i].name);
+ if (!dry_run_mode && unlink(trp->file[i].name))
+ logmsg(LOG_ERR,
+ _("cannot remove %s: %s"),
+ trp->file[i].name,
+ strerror(errno));
+ }
}
@@ -311,3 +322,5 @@ remove_triplet(struct file_triplet *trp)
triplet_list_unlink(trp);
+ triplet_jq_unlink(trp);
grecs_symtab_remove(triplet_table, trp);
+ return 0;
}
@@ -318,3 +331,3 @@ triplet_sweep(void)
while (triplet_list && triplet_expired_p(triplet_list))
- remove_triplet(triplet_list);
+ remove_triplet(triplet_list, 0);
@@ -343,3 +356,3 @@ triplet_processor(void *data, void *proc_data)
if (process_directives(trp))
- remove_triplet(trp);
+ remove_triplet(trp, 0);
return 0;
@@ -356,3 +369,3 @@ triplet_processor(void *data, void *proc_data)
UPDATE_STATS(STAT_BAD_TRIPLETS);
- remove_triplet(trp);
+ remove_triplet(trp, 0);
return 0;
@@ -362,3 +375,3 @@ triplet_processor(void *data, void *proc_data)
UPDATE_STATS(STAT_EXPIRED_TRIPLETS);
- remove_triplet(trp);
+ remove_triplet(trp, 0);
}
@@ -370,3 +383,3 @@ triplet_processor(void *data, void *proc_data)
void
-spool_commit_triplets(struct spool *spool)
+spool_commit_triplets(struct spool *spool, struct file_triplet *tplist)
{
@@ -377,3 +390,10 @@ spool_commit_triplets(struct spool *spool)
return;
- if (triplet_table) {
+ if (tplist) {
+ while (tplist) {
+ struct file_triplet *next = tplist->jq_next;
+ if (tplist->spool == spool)
+ triplet_processor(tplist, NULL);
+ tplist = next;
+ }
+ } else if (triplet_table) {
grecs_symtab_enumerate(triplet_table, triplet_processor, NULL);
@@ -389,8 +409,14 @@ count_collected_triplets()
+struct count_closure {
+ struct file_triplet *tp;
+};
+
static int
-triplet_counter(void *data, void *proc_data)
+triplet_linker(void *data, void *proc_data)
{
struct file_triplet *trp = data;
- size_t *cp = proc_data;
+ struct count_closure *cp = proc_data;
+ if (trp->job)
+ return 0;
if (debug_level)
@@ -401,3 +427,3 @@ triplet_counter(void *data, void *proc_data)
SP(trp->file[file_directive].name));
-
+
switch (check_triplet_state(trp, 1)) {
@@ -406,5 +432,10 @@ triplet_counter(void *data, void *proc_data)
if (triplet_expired_p(trp))
- remove_triplet(trp);
- else
- ++*cp;
+ remove_triplet(trp, 0);
+ else {
+ trp->jq_prev = NULL;
+ if (cp->tp)
+ cp->tp->jq_prev = trp;
+ trp->jq_next = cp->tp;
+ cp->tp = trp;
+ }
break;
@@ -413,3 +444,3 @@ triplet_counter(void *data, void *proc_data)
case triplet_bad:
- remove_triplet(trp);
+ remove_triplet(trp, 0);
}
@@ -419,9 +450,11 @@ triplet_counter(void *data, void *proc_data)
-size_t
-count_processable_triplets()
+struct file_triplet *
+link_processable_triplets()
{
- size_t count = 0;
+ struct count_closure clos;
+
+ clos.tp = NULL;
if (triplet_table)
- grecs_symtab_enumerate(triplet_table, triplet_counter, &count);
- return count;
+ grecs_symtab_enumerate(triplet_table, triplet_linker, &clos);
+ return clos.tp;
}
@@ -450,3 +483,3 @@ triplet_remove_file(struct spool *spool, const char *name)
spool->source_dir, name);
- remove_triplet(tp);
+ remove_triplet(tp, 0);
}
diff --git a/src/watcher.c b/src/watcher.c
index 0c587f1..c18beab 100644
--- a/src/watcher.c
+++ b/src/watcher.c
@@ -157,3 +157,4 @@ process_event(struct inotify_event *ep)
dwp = dirwatcher_find_wd(ep->wd);
-
+ struct file_triplet *tp;
+
if (ep->mask & IN_IGNORED)
@@ -189,4 +190,4 @@ process_event(struct inotify_event *ep)
if (spool_add_new_file(dwp->spool, ep->name, 0, NULL) == 0
- && count_processable_triplets())
- schedule_job(&inotify_spool, getuid());
+ && (tp = link_processable_triplets()))
+ schedule_job(&inotify_spool, getuid(), tp);
} else
diff --git a/src/wydawca.h b/src/wydawca.h
index 3a90ea1..9e4b2be 100644
--- a/src/wydawca.h
+++ b/src/wydawca.h
@@ -172,3 +172,4 @@ struct file_triplet {
char **directive; /* Decoded directive pairs (key: value\0) */
- char *blurb; /* Block of directives: directive[i] points here */
+ char *blurb; /* Block of directives: directive[i] points
+ here */
char *tmp; /* Temporary storage */
@@ -176,3 +177,16 @@ struct file_triplet {
struct txtacc *acc; /* Text accumulator for string allocation */
+ /* Triplets are joined in two doubly-linked lists:
+ 1) a cronological list, with prev pointing to a triplet older
+ than this one, and next pointing to a triplet newer than it: */
struct file_triplet *prev, *next;
+ /* 2) "job queue", a list of triplets processed by the same job: */
+ struct file_triplet *jq_prev, *jq_next;
+ /* The job queue is used only when triplets are processed by
+ the inotify watcher. In that case, the job member points to
+ the job processing this request. When the job terminates, all
+ requests belonging to it are removed from the table to avoid them
+ being processed by subsequent jobs.
+ If started as a cron job, or awoken by a TCP listener, both
+ job and jq_prev, jq_next are NULL. */
+ struct job *job;
/* User data */
@@ -403,5 +417,5 @@ struct spool *wydawca_find_spool(const char *name);
void register_file(struct file_info *finfo, const struct spool *spool);
-void spool_commit_triplets(struct spool *);
+void spool_commit_triplets(struct spool *, struct file_triplet *);
+struct file_triplet *link_processable_triplets(void);
size_t count_collected_triplets(void);
-size_t count_processable_triplets(void);
@@ -414,2 +428,3 @@ void triplet_remove_file(struct spool *spool, const char *name);
time_t triplet_sweep(void);
+int remove_triplet(struct file_triplet *trp, int check);
@@ -519,5 +534,6 @@ extern char *report_string;
/* job.c */
-void schedule_job(struct spool *spool, uid_t uid);
+void schedule_job(struct spool *spool, uid_t uid, struct file_triplet *tp);
void job_init(void);
void job_queue_runner(time_t min_timeout);
+void triplet_jq_unlink(struct file_triplet *tp);

Return to:

Send suggestions and report system problems to the System administrator.