diff options
author | Sergey Poznyakoff <gray@gnu.org.ua> | 2013-01-01 19:48:33 +0200 |
---|---|---|
committer | Sergey Poznyakoff <gray@gnu.org.ua> | 2013-01-01 20:53:41 +0200 |
commit | 66775034166ac35bda383ef41213c95021b37b64 (patch) | |
tree | 5a64f2e45955acc5758977c6de0740ba34707d65 | |
parent | fad678ec27c7f37da83ddf3ba3a96196a962c238 (diff) | |
download | wydawca-66775034166ac35bda383ef41213c95021b37b64.tar.gz wydawca-66775034166ac35bda383ef41213c95021b37b64.tar.bz2 |
Avoid processing same triplet twice.release-2.2
* src/wydawca.h (file_triplet) <jq_prev,jq_next,job>: New members.
(spool_commit_triplets,schedule_job): Update signature.
(count_processable_triplets): Remove.
(link_processable_triplets): New proto.
(remove_triplet): New proto.
(triplet_jq_unlink): New proto.
* src/job.c (job) <triplet_list>: New member.
(triplet_jq_unlink,jq_clear): New functions.
(procspool): Create and stop spool timers.
(wydawca_scanner): Call spool_create_timers.
(schedule_job): Additional argument specifies triplets
to process.
Initialize job->triplet_list;
(job_queue_runner): Call jq_clear.
* src/net.c (handle_connection): Update call to
schedule_job.
* src/process.c (spool_cwd_add_new_file): Update call to
spool_commit_triplets.
* src/triplet.c (remove_triplet): Now extern. Takes two args.
All uses updated.
(spool_commit_triplets): Additional argument specifies triplets
to process.
(count_processable_triplets): Remove.
(link_processable_triplets): New function.
* src/watcher.c: Use link_processable_triplets to obtain a
doubly-linked list of triplets to process. Pass it to
schedule_job.
-rw-r--r-- | src/job.c | 57 | ||||
-rw-r--r-- | src/net.c | 2 | ||||
-rw-r--r-- | src/process.c | 6 | ||||
-rw-r--r-- | src/triplet.c | 85 | ||||
-rw-r--r-- | src/watcher.c | 7 | ||||
-rw-r--r-- | src/wydawca.h | 24 |
6 files changed, 137 insertions, 44 deletions
@@ -23,2 +23,4 @@ +#define STATE_ANY (STATE_FINISHED|STATE_QUEUED|STATE_ACTIVE) + struct job { @@ -27,2 +29,3 @@ struct job { struct spool *spool; + struct file_triplet *triplet_list; uid_t uid; @@ -56,4 +59,35 @@ set_timer(time_t interval) +void +triplet_jq_unlink(struct file_triplet *tp) +{ + if (!tp || !tp->job) + return; + if (tp->prev) + tp->jq_prev->jq_next = tp->jq_next; + else + tp->job->triplet_list = tp->jq_next; + + if (tp->jq_next) + tp->jq_next->jq_prev = tp->jq_prev; + + tp->jq_next = tp->jq_prev = NULL; +} + +void +jq_clear(struct job *job) +{ + struct file_triplet *tp; + + if (!job->triplet_list) + return; + for (tp = job->triplet_list; tp; ) { + struct file_triplet *next = tp->jq_next; + remove_triplet(tp, 1); + tp = next; + } + job->triplet_list = NULL; +} + struct job * -job_locate(const struct spool *spool, uid_t uid) +job_locate(const struct spool *spool, uid_t uid, int mask) { @@ -61,3 +95,3 @@ job_locate(const struct spool *spool, uid_t uid) for (p = queue; p; p = p->next) - if (p->spool == spool && p->uid == uid) + if ((p->state & mask) && p->spool == spool && p->uid == uid) break; @@ -80,3 +114,7 @@ procspool(struct spool *spool, void *data) { - spool_commit_triplets(spool); + timer_start("spool"); + timer_start(spool->tag); + spool_commit_triplets(spool, (struct file_triplet *)data); + timer_stop(spool->tag); + timer_stop("spool"); return 0; @@ -90,4 +128,5 @@ wydawca_scanner(struct job *job) timer_start("wydawca"); + spool_create_timers(); if (job->spool == &inotify_spool) - rc = for_each_spool(procspool, NULL); + rc = for_each_spool(procspool, job->triplet_list); else if (job->spool == &fake_spool) @@ -95,3 +134,2 @@ wydawca_scanner(struct job *job) else { - spool_create_timers(); rc = scan_spool(job->spool, 1, &job->uid); @@ -192,3 +230,3 @@ job_insert(struct job *job, struct job *elt) void -schedule_job(struct spool *spool, uid_t uid) +schedule_job(struct spool *spool, uid_t uid, struct file_triplet *tp) { @@ -203,3 +241,3 @@ schedule_job(struct spool *spool, uid_t uid) - job = job_locate(spool, uid); + job = job_locate(spool, uid, tp ? STATE_QUEUED : STATE_ANY); if (!job) { @@ -209,2 +247,5 @@ schedule_job(struct spool *spool, uid_t uid) job->pid = -1; + job->triplet_list = tp; + for (; tp; tp = tp->jq_next) + tp->job = job; time(&job->timestamp); @@ -293,2 +334,3 @@ job_queue_runner(time_t min_interval) job->state |= STATE_FINISHED; + jq_clear(job); job->exit_status = status; @@ -354 +396,2 @@ job_init() } + @@ -134,3 +134,3 @@ handle_connection(FILE * in, FILE * out) if (pw) - schedule_job(spool, pw->pw_uid); + schedule_job(spool, pw->pw_uid, NULL); else diff --git a/src/process.c b/src/process.c index 1226ed2..0b4fd1b 100644 --- a/src/process.c +++ b/src/process.c @@ -151,3 +151,3 @@ int spool_cwd_add_new_file(const struct spool *spool, const char *name, - int uc, uid_t * uv) + int uc, uid_t *uv) { @@ -205,3 +205,3 @@ spool_add_new_file(const struct spool *spool, const char *name, void -scan_spool_unlocked(struct spool *spool, int uc, uid_t * uv) +scan_spool_unlocked(struct spool *spool, int uc, uid_t *uv) { @@ -240,3 +240,3 @@ scan_spool_unlocked(struct spool *spool, int uc, uid_t * uv) if (count_collected_triplets() > 0) - spool_commit_triplets(spool); + spool_commit_triplets(spool, NULL); diff --git a/src/triplet.c b/src/triplet.c index c4dc576..2df9151 100644 --- a/src/triplet.c +++ b/src/triplet.c @@ -296,4 +296,4 @@ check_triplet_state(struct file_triplet *trp, int noauth) /* Unlink all parts of the triplet TRP */ -static void -remove_triplet(struct file_triplet *trp) +int +remove_triplet(struct file_triplet *trp, int check) { @@ -303,7 +303,18 @@ remove_triplet(struct file_triplet *trp) if (trp->file[i].name) { - logmsg(LOG_NOTICE, _("removing %s"), - trp->file[i].name); - if (!dry_run_mode && unlink(trp->file[i].name)) - logmsg(LOG_ERR, _("cannot remove %s: %s"), - trp->file[i].name, strerror(errno)); + if (check) { + if (access(trp->file[i].name, F_OK) == 0) { + logmsg(LOG_NOTICE, + _("%s still exists"), + trp->file[i].name); + return 1; + } + } else { + logmsg(LOG_NOTICE, _("removing %s"), + trp->file[i].name); + if (!dry_run_mode && unlink(trp->file[i].name)) + logmsg(LOG_ERR, + _("cannot remove %s: %s"), + trp->file[i].name, + strerror(errno)); + } } @@ -311,3 +322,5 @@ remove_triplet(struct file_triplet *trp) triplet_list_unlink(trp); + triplet_jq_unlink(trp); grecs_symtab_remove(triplet_table, trp); + return 0; } @@ -318,3 +331,3 @@ triplet_sweep(void) while (triplet_list && triplet_expired_p(triplet_list)) - remove_triplet(triplet_list); + remove_triplet(triplet_list, 0); @@ -343,3 +356,3 @@ triplet_processor(void *data, void *proc_data) if (process_directives(trp)) - remove_triplet(trp); + remove_triplet(trp, 0); return 0; @@ -356,3 +369,3 @@ triplet_processor(void *data, void *proc_data) UPDATE_STATS(STAT_BAD_TRIPLETS); - remove_triplet(trp); + remove_triplet(trp, 0); return 0; @@ -362,3 +375,3 @@ triplet_processor(void *data, void *proc_data) UPDATE_STATS(STAT_EXPIRED_TRIPLETS); - remove_triplet(trp); + remove_triplet(trp, 0); } @@ -370,3 +383,3 @@ triplet_processor(void *data, void *proc_data) void -spool_commit_triplets(struct spool *spool) +spool_commit_triplets(struct spool *spool, struct file_triplet *tplist) { @@ -377,3 +390,10 @@ spool_commit_triplets(struct spool *spool) return; - if (triplet_table) { + if (tplist) { + while (tplist) { + struct file_triplet *next = tplist->jq_next; + if (tplist->spool == spool) + triplet_processor(tplist, NULL); + tplist = next; + } + } else if (triplet_table) { grecs_symtab_enumerate(triplet_table, triplet_processor, NULL); @@ -389,8 +409,14 @@ count_collected_triplets() +struct count_closure { + struct file_triplet *tp; +}; + static int -triplet_counter(void *data, void *proc_data) +triplet_linker(void *data, void *proc_data) { struct file_triplet *trp = data; - size_t *cp = proc_data; + struct count_closure *cp = proc_data; + if (trp->job) + return 0; if (debug_level) @@ -401,3 +427,3 @@ triplet_counter(void *data, void *proc_data) SP(trp->file[file_directive].name)); - + switch (check_triplet_state(trp, 1)) { @@ -406,5 +432,10 @@ triplet_counter(void *data, void *proc_data) if (triplet_expired_p(trp)) - remove_triplet(trp); - else - ++*cp; + remove_triplet(trp, 0); + else { + trp->jq_prev = NULL; + if (cp->tp) + cp->tp->jq_prev = trp; + trp->jq_next = cp->tp; + cp->tp = trp; + } break; @@ -413,3 +444,3 @@ triplet_counter(void *data, void *proc_data) case triplet_bad: - remove_triplet(trp); + remove_triplet(trp, 0); } @@ -419,9 +450,11 @@ triplet_counter(void *data, void *proc_data) -size_t -count_processable_triplets() +struct file_triplet * +link_processable_triplets() { - size_t count = 0; + struct count_closure clos; + + clos.tp = NULL; if (triplet_table) - grecs_symtab_enumerate(triplet_table, triplet_counter, &count); - return count; + grecs_symtab_enumerate(triplet_table, triplet_linker, &clos); + return clos.tp; } @@ -450,3 +483,3 @@ triplet_remove_file(struct spool *spool, const char *name) spool->source_dir, name); - remove_triplet(tp); + remove_triplet(tp, 0); } diff --git a/src/watcher.c b/src/watcher.c index 0c587f1..c18beab 100644 --- a/src/watcher.c +++ b/src/watcher.c @@ -157,3 +157,4 @@ process_event(struct inotify_event *ep) dwp = dirwatcher_find_wd(ep->wd); - + struct file_triplet *tp; + if (ep->mask & IN_IGNORED) @@ -189,4 +190,4 @@ process_event(struct inotify_event *ep) if (spool_add_new_file(dwp->spool, ep->name, 0, NULL) == 0 - && count_processable_triplets()) - schedule_job(&inotify_spool, getuid()); + && (tp = link_processable_triplets())) + schedule_job(&inotify_spool, getuid(), tp); } else diff --git a/src/wydawca.h b/src/wydawca.h index 3a90ea1..9e4b2be 100644 --- a/src/wydawca.h +++ b/src/wydawca.h @@ -172,3 +172,4 @@ struct file_triplet { char **directive; /* Decoded directive pairs (key: value\0) */ - char *blurb; /* Block of directives: directive[i] points here */ + char *blurb; /* Block of directives: directive[i] points + here */ char *tmp; /* Temporary storage */ @@ -176,3 +177,16 @@ struct file_triplet { struct txtacc *acc; /* Text accumulator for string allocation */ + /* Triplets are joined in two doubly-linked lists: + 1) a cronological list, with prev pointing to a triplet older + than this one, and next pointing to a triplet newer than it: */ struct file_triplet *prev, *next; + /* 2) "job queue", a list of triplets processed by the same job: */ + struct file_triplet *jq_prev, *jq_next; + /* The job queue is used only when triplets are processed by + the inotify watcher. In that case, the job member points to + the job processing this request. When the job terminates, all + requests belonging to it are removed from the table to avoid them + being processed by subsequent jobs. + If started as a cron job, or awoken by a TCP listener, both + job and jq_prev, jq_next are NULL. */ + struct job *job; /* User data */ @@ -403,5 +417,5 @@ struct spool *wydawca_find_spool(const char *name); void register_file(struct file_info *finfo, const struct spool *spool); -void spool_commit_triplets(struct spool *); +void spool_commit_triplets(struct spool *, struct file_triplet *); +struct file_triplet *link_processable_triplets(void); size_t count_collected_triplets(void); -size_t count_processable_triplets(void); @@ -414,2 +428,3 @@ void triplet_remove_file(struct spool *spool, const char *name); time_t triplet_sweep(void); +int remove_triplet(struct file_triplet *trp, int check); @@ -519,5 +534,6 @@ extern char *report_string; /* job.c */ -void schedule_job(struct spool *spool, uid_t uid); +void schedule_job(struct spool *spool, uid_t uid, struct file_triplet *tp); void job_init(void); void job_queue_runner(time_t min_timeout); +void triplet_jq_unlink(struct file_triplet *tp); |