diff options
author | Sergey Poznyakoff <gray@gnu.org.ua> | 2013-01-01 19:48:33 +0200 |
---|---|---|
committer | Sergey Poznyakoff <gray@gnu.org.ua> | 2013-01-01 20:53:41 +0200 |
commit | 66775034166ac35bda383ef41213c95021b37b64 (patch) | |
tree | 5a64f2e45955acc5758977c6de0740ba34707d65 /src | |
parent | fad678ec27c7f37da83ddf3ba3a96196a962c238 (diff) | |
download | wydawca-66775034166ac35bda383ef41213c95021b37b64.tar.gz wydawca-66775034166ac35bda383ef41213c95021b37b64.tar.bz2 |
Avoid processing same triplet twice.release-2.2
* src/wydawca.h (file_triplet) <jq_prev,jq_next,job>: New members.
(spool_commit_triplets,schedule_job): Update signature.
(count_processable_triplets): Remove.
(link_processable_triplets): New proto.
(remove_triplet): New proto.
(triplet_jq_unlink): New proto.
* src/job.c (job) <triplet_list>: New member.
(triplet_jq_unlink,jq_clear): New functions.
(procspool): Create and stop spool timers.
(wydawca_scanner): Call spool_create_timers.
(schedule_job): Additional argument specifies triplets
to process.
Initialize job->triplet_list;
(job_queue_runner): Call jq_clear.
* src/net.c (handle_connection): Update call to
schedule_job.
* src/process.c (spool_cwd_add_new_file): Update call to
spool_commit_triplets.
* src/triplet.c (remove_triplet): Now extern. Takes two args.
All uses updated.
(spool_commit_triplets): Additional argument specifies triplets
to process.
(count_processable_triplets): Remove.
(link_processable_triplets): New function.
* src/watcher.c: Use link_processable_triplets to obtain a
doubly-linked list of triplets to process. Pass it to
schedule_job.
Diffstat (limited to 'src')
-rw-r--r-- | src/job.c | 57 | ||||
-rw-r--r-- | src/net.c | 2 | ||||
-rw-r--r-- | src/process.c | 6 | ||||
-rw-r--r-- | src/triplet.c | 85 | ||||
-rw-r--r-- | src/watcher.c | 7 | ||||
-rw-r--r-- | src/wydawca.h | 24 |
6 files changed, 137 insertions, 44 deletions
@@ -18,16 +18,19 @@ #include "mail.h" #define STATE_FINISHED 0x01 #define STATE_QUEUED 0x02 #define STATE_ACTIVE 0x04 +#define STATE_ANY (STATE_FINISHED|STATE_QUEUED|STATE_ACTIVE) + struct job { struct job *next, *prev; int state; struct spool *spool; + struct file_triplet *triplet_list; uid_t uid; pid_t pid; time_t timestamp; int exit_status; }; @@ -51,18 +54,49 @@ set_timer(time_t interval) { wakeup = 0; if (interval) alarm(interval); } +void +triplet_jq_unlink(struct file_triplet *tp) +{ + if (!tp || !tp->job) + return; + if (tp->prev) + tp->jq_prev->jq_next = tp->jq_next; + else + tp->job->triplet_list = tp->jq_next; + + if (tp->jq_next) + tp->jq_next->jq_prev = tp->jq_prev; + + tp->jq_next = tp->jq_prev = NULL; +} + +void +jq_clear(struct job *job) +{ + struct file_triplet *tp; + + if (!job->triplet_list) + return; + for (tp = job->triplet_list; tp; ) { + struct file_triplet *next = tp->jq_next; + remove_triplet(tp, 1); + tp = next; + } + job->triplet_list = NULL; +} + struct job * -job_locate(const struct spool *spool, uid_t uid) +job_locate(const struct spool *spool, uid_t uid, int mask) { struct job *p; for (p = queue; p; p = p->next) - if (p->spool == spool && p->uid == uid) + if ((p->state & mask) && p->spool == spool && p->uid == uid) break; return p; } size_t job_active_count() @@ -75,28 +109,32 @@ job_active_count() return count; } static int procspool(struct spool *spool, void *data) { - spool_commit_triplets(spool); + timer_start("spool"); + timer_start(spool->tag); + spool_commit_triplets(spool, (struct file_triplet *)data); + timer_stop(spool->tag); + timer_stop("spool"); return 0; } int wydawca_scanner(struct job *job) { int rc; initstats(); timer_start("wydawca"); + spool_create_timers(); if (job->spool == &inotify_spool) - rc = for_each_spool(procspool, NULL); + rc = for_each_spool(procspool, job->triplet_list); else if (job->spool == &fake_spool) rc = scan_all_spools(1, &job->uid); else { - spool_create_timers(); rc = scan_spool(job->spool, 1, &job->uid); } timer_stop("wydawca"); logstats(); mail_finish(); return rc; @@ -187,29 +225,32 @@ job_insert(struct job *job, struct job *elt) if (p) p->prev = job; } void -schedule_job(struct spool *spool, uid_t uid) +schedule_job(struct spool *spool, uid_t uid, struct file_triplet *tp) { struct job *job; if (!spool) spool = &fake_spool; if (debug_level) logmsg(LOG_DEBUG, _("scheduling job: %s, %lu"), spool->tag, (unsigned long)uid); - job = job_locate(spool, uid); + job = job_locate(spool, uid, tp ? STATE_QUEUED : STATE_ANY); if (!job) { job = grecs_zalloc(sizeof(*job)); job->spool = spool; job->uid = uid; job->pid = -1; + job->triplet_list = tp; + for (; tp; tp = tp->jq_next) + tp->job = job; time(&job->timestamp); job_insert(job, NULL); } job->state |= STATE_QUEUED; job_start(job); @@ -288,12 +329,13 @@ job_queue_runner(time_t min_interval) break; for (job = queue; job; job = job->next) { if ((job->state & STATE_ACTIVE) && job->pid == pid) { job->state &= ~STATE_ACTIVE; job->state |= STATE_FINISHED; + jq_clear(job); job->exit_status = status; jobcnt--; } } } @@ -349,6 +391,7 @@ job_queue_runner(time_t min_interval) void job_init() { signal(SIGCHLD, queue_signal); signal(SIGALRM, queue_signal); } + @@ -129,13 +129,13 @@ handle_connection(FILE * in, FILE * out) p++; } else p = ""; pw = getpwnam(buf); if (pw) - schedule_job(spool, pw->pw_uid); + schedule_job(spool, pw->pw_uid, NULL); else logmsg(LOG_ERR, "no such user: %s", buf); free(buf); } int reconfigure; diff --git a/src/process.c b/src/process.c index 1226ed2..0b4fd1b 100644 --- a/src/process.c +++ b/src/process.c @@ -146,13 +146,13 @@ match_uid_p(uid_t uid, int uc, uid_t * uv) return 1; return 0; } int spool_cwd_add_new_file(const struct spool *spool, const char *name, - int uc, uid_t * uv) + int uc, uid_t *uv) { struct stat st; struct file_info finfo; if (stat(name, &st)) { logmsg(LOG_ERR, _("cannot stat file %s/%s: %s"), @@ -200,13 +200,13 @@ spool_add_new_file(const struct spool *spool, const char *name, return spool_cwd_add_new_file(spool, name, uc, uv); } /* Scan upload directory from the DPAIR and register all files found there, forming triplets when possible */ void -scan_spool_unlocked(struct spool *spool, int uc, uid_t * uv) +scan_spool_unlocked(struct spool *spool, int uc, uid_t *uv) { DIR *dir; struct dirent *ent; if (debug_level) logmsg(LOG_DEBUG, "%s -> %s", spool->source_dir, @@ -235,13 +235,13 @@ scan_spool_unlocked(struct spool *spool, int uc, uid_t * uv) spool_cwd_add_new_file(spool, ent->d_name, uc, uv); } closedir(dir); if (count_collected_triplets() > 0) - spool_commit_triplets(spool); + spool_commit_triplets(spool, NULL); timer_stop(spool->tag); timer_stop("spool"); } int diff --git a/src/triplet.c b/src/triplet.c index c4dc576..2df9151 100644 --- a/src/triplet.c +++ b/src/triplet.c @@ -291,35 +291,48 @@ check_triplet_state(struct file_triplet *trp, int noauth) } return triplet_incomplete; } /* Unlink all parts of the triplet TRP */ -static void -remove_triplet(struct file_triplet *trp) +int +remove_triplet(struct file_triplet *trp, int check) { int i; for (i = 0; i < FILE_TYPE_COUNT; i++) { if (trp->file[i].name) { - logmsg(LOG_NOTICE, _("removing %s"), - trp->file[i].name); - if (!dry_run_mode && unlink(trp->file[i].name)) - logmsg(LOG_ERR, _("cannot remove %s: %s"), - trp->file[i].name, strerror(errno)); + if (check) { + if (access(trp->file[i].name, F_OK) == 0) { + logmsg(LOG_NOTICE, + _("%s still exists"), + trp->file[i].name); + return 1; + } + } else { + logmsg(LOG_NOTICE, _("removing %s"), + trp->file[i].name); + if (!dry_run_mode && unlink(trp->file[i].name)) + logmsg(LOG_ERR, + _("cannot remove %s: %s"), + trp->file[i].name, + strerror(errno)); + } } } triplet_list_unlink(trp); + triplet_jq_unlink(trp); grecs_symtab_remove(triplet_table, trp); + return 0; } time_t triplet_sweep(void) { while (triplet_list && triplet_expired_p(triplet_list)) - remove_triplet(triplet_list); + remove_triplet(triplet_list, 0); return triplet_ttl(triplet_list); } /* Process a single triplet from the table */ static int @@ -338,95 +351,115 @@ triplet_processor(void *data, void *proc_data) case triplet_directive: case triplet_complete: if (debug_level) logmsg(LOG_DEBUG, _("processing triplet `%s'"), trp->name); if (process_directives(trp)) - remove_triplet(trp); + remove_triplet(trp, 0); return 0; case triplet_incomplete: if (debug_level) logmsg(LOG_DEBUG, _("%s: incomplete triplet"), trp->name); /* ignore unless expired (see below); */ UPDATE_STATS(STAT_INCOMPLETE_TRIPLETS); break; case triplet_bad: UPDATE_STATS(STAT_BAD_TRIPLETS); - remove_triplet(trp); + remove_triplet(trp, 0); return 0; } if (triplet_expired_p(trp)) { UPDATE_STATS(STAT_EXPIRED_TRIPLETS); - remove_triplet(trp); + remove_triplet(trp, 0); } return 0; } /* Process all triplets from the table according to the SPOOL */ void -spool_commit_triplets(struct spool *spool) +spool_commit_triplets(struct spool *spool, struct file_triplet *tplist) { if (debug_level) logmsg(LOG_DEBUG, _("processing spool %s (%s)"), spool->tag, mu_url_to_string(spool->dest_url)); if (spool_open_dictionaries(spool)) return; - if (triplet_table) { + if (tplist) { + while (tplist) { + struct file_triplet *next = tplist->jq_next; + if (tplist->spool == spool) + triplet_processor(tplist, NULL); + tplist = next; + } + } else if (triplet_table) { grecs_symtab_enumerate(triplet_table, triplet_processor, NULL); grecs_symtab_clear(triplet_table); } } size_t count_collected_triplets() { return triplet_table ? grecs_symtab_count_entries(triplet_table) : 0; } +struct count_closure { + struct file_triplet *tp; +}; + static int -triplet_counter(void *data, void *proc_data) +triplet_linker(void *data, void *proc_data) { struct file_triplet *trp = data; - size_t *cp = proc_data; + struct count_closure *cp = proc_data; + if (trp->job) + return 0; if (debug_level) logmsg(LOG_DEBUG, "FILE %s, DIST=%s, SIG=%s, DIRECTIVE=%s", trp->name, SP(trp->file[file_dist].name), SP(trp->file[file_signature].name), SP(trp->file[file_directive].name)); - + switch (check_triplet_state(trp, 1)) { case triplet_directive: case triplet_complete: if (triplet_expired_p(trp)) - remove_triplet(trp); - else - ++*cp; + remove_triplet(trp, 0); + else { + trp->jq_prev = NULL; + if (cp->tp) + cp->tp->jq_prev = trp; + trp->jq_next = cp->tp; + cp->tp = trp; + } break; case triplet_incomplete: break; case triplet_bad: - remove_triplet(trp); + remove_triplet(trp, 0); } return 0; } -size_t -count_processable_triplets() +struct file_triplet * +link_processable_triplets() { - size_t count = 0; + struct count_closure clos; + + clos.tp = NULL; if (triplet_table) - grecs_symtab_enumerate(triplet_table, triplet_counter, &count); - return count; + grecs_symtab_enumerate(triplet_table, triplet_linker, &clos); + return clos.tp; } void triplet_remove_file(struct spool *spool, const char *name) { struct file_triplet *tp = triplet_lookup(spool, name); @@ -445,13 +478,13 @@ triplet_remove_file(struct spool *spool, const char *name) } if (!n) { if (debug_level > 0) logmsg(LOG_DEBUG, "deleting empty triplet (%s/%s)", spool->source_dir, name); - remove_triplet(tp); + remove_triplet(tp, 0); } } static const char * expand_project_base(struct metadef *def, void *data) { diff --git a/src/watcher.c b/src/watcher.c index 0c587f1..c18beab 100644 --- a/src/watcher.c +++ b/src/watcher.c @@ -152,13 +152,14 @@ watcher_init() static void process_event(struct inotify_event *ep) { static struct dirwatcher *dwp; dwp = dirwatcher_find_wd(ep->wd); - + struct file_triplet *tp; + if (ep->mask & IN_IGNORED) /* nothing */ ; else if (ep->mask & IN_Q_OVERFLOW) logmsg(LOG_NOTICE, "event queue overflow"); else if (ep->mask & IN_UNMOUNT) /* FIXME: not sure if there's @@ -184,14 +185,14 @@ process_event(struct inotify_event *ep) triplet_remove_file(dwp->spool, ep->name); } else if (ep->mask & (IN_CLOSE_WRITE | IN_MOVED_TO)) { if (debug_level > 0) logmsg(LOG_DEBUG, "%s/%s written", dwp->spool->source_dir, ep->name); if (spool_add_new_file(dwp->spool, ep->name, 0, NULL) == 0 - && count_processable_triplets()) - schedule_job(&inotify_spool, getuid()); + && (tp = link_processable_triplets())) + schedule_job(&inotify_spool, getuid(), tp); } else logmsg(LOG_NOTICE, "%s/%s: unexpected event %x", dwp->spool->source_dir, ep->name, ep->mask); } diff --git a/src/wydawca.h b/src/wydawca.h index 3a90ea1..9e4b2be 100644 --- a/src/wydawca.h +++ b/src/wydawca.h @@ -167,17 +167,31 @@ struct file_triplet { char *name; /* Triplet base name */ struct file_info file[FILE_TYPE_COUNT]; /* Components */ unsigned version; /* Protocol version */ const struct spool *spool; /* Owning spool */ char *relative_dir; /* Directory relative to spool->dest_dir */ char **directive; /* Decoded directive pairs (key: value\0) */ - char *blurb; /* Block of directives: directive[i] points here */ + char *blurb; /* Block of directives: directive[i] points + here */ char *tmp; /* Temporary storage */ size_t tmpsize; /* Size of memory allocated in tmp */ struct txtacc *acc; /* Text accumulator for string allocation */ + /* Triplets are joined in two doubly-linked lists: + 1) a cronological list, with prev pointing to a triplet older + than this one, and next pointing to a triplet newer than it: */ struct file_triplet *prev, *next; + /* 2) "job queue", a list of triplets processed by the same job: */ + struct file_triplet *jq_prev, *jq_next; + /* The job queue is used only when triplets are processed by + the inotify watcher. In that case, the job member points to + the job processing this request. When the job terminates, all + requests belonging to it are removed from the table to avoid them + being processed by subsequent jobs. + If started as a cron job, or awoken by a TCP listener, both + job and jq_prev, jq_next are NULL. */ + struct job *job; /* User data */ size_t uploader_count; struct uploader_info *uploader_list; struct uploader_info *uploader; /* Special data for template formatting */ char *project; /* Triplet project name (if known) */ @@ -398,23 +412,24 @@ void spool_close_dictionaries(struct spool *spool); void parse_file_name(const char *name, struct file_info *finfo); void file_info_cleanup(struct file_info *finfo); int for_each_spool(int (*fun) (struct spool *, void *), void *data); void register_spool(struct spool *spool); struct spool *wydawca_find_spool(const char *name); void register_file(struct file_info *finfo, const struct spool *spool); -void spool_commit_triplets(struct spool *); +void spool_commit_triplets(struct spool *, struct file_triplet *); +struct file_triplet *link_processable_triplets(void); size_t count_collected_triplets(void); -size_t count_processable_triplets(void); char *triplet_expand_param(const char *tmpl, struct file_triplet *trp); char *triplet_expand_dictionary_query(struct dictionary *dict, void *handle, struct file_triplet *trp); void triplet_remove_file(struct spool *spool, const char *name); time_t triplet_sweep(void); +int remove_triplet(struct file_triplet *trp, int check); /* General-purpose dictionary support */ struct dictionary *dictionary_new(enum dictionary_id id, enum dictionary_type type); int dictionary_init(struct dictionary *dict); int dictionary_done(struct dictionary *dict); @@ -514,15 +529,16 @@ void timer_free_meta(struct metadef *def, size_t num); void report_init(void); void report_add(const char *fmt, ...); void report_finish(void); extern char *report_string; /* job.c */ -void schedule_job(struct spool *spool, uid_t uid); +void schedule_job(struct spool *spool, uid_t uid, struct file_triplet *tp); void job_init(void); void job_queue_runner(time_t min_timeout); +void triplet_jq_unlink(struct file_triplet *tp); /* profile.c */ void check_pidfile(void); void remove_pidfile(void); /* net.c */ |