summaryrefslogtreecommitdiffabout
authorSergey Poznyakoff <gray@gnu.org.ua>2013-01-01 17:48:33 (GMT)
committer Sergey Poznyakoff <gray@gnu.org.ua>2013-01-01 18:53:41 (GMT)
commit66775034166ac35bda383ef41213c95021b37b64 (patch) (side-by-side diff)
tree5a64f2e45955acc5758977c6de0740ba34707d65
parentfad678ec27c7f37da83ddf3ba3a96196a962c238 (diff)
downloadwydawca-66775034166ac35bda383ef41213c95021b37b64.tar.gz
wydawca-66775034166ac35bda383ef41213c95021b37b64.tar.bz2
Avoid processing same triplet twice.release-2.2
* src/wydawca.h (file_triplet) <jq_prev,jq_next,job>: New members. (spool_commit_triplets,schedule_job): Update signature. (count_processable_triplets): Remove. (link_processable_triplets): New proto. (remove_triplet): New proto. (triplet_jq_unlink): New proto. * src/job.c (job) <triplet_list>: New member. (triplet_jq_unlink,jq_clear): New functions. (procspool): Create and stop spool timers. (wydawca_scanner): Call spool_create_timers. (schedule_job): Additional argument specifies triplets to process. Initialize job->triplet_list; (job_queue_runner): Call jq_clear. * src/net.c (handle_connection): Update call to schedule_job. * src/process.c (spool_cwd_add_new_file): Update call to spool_commit_triplets. * src/triplet.c (remove_triplet): Now extern. Takes two args. All uses updated. (spool_commit_triplets): Additional argument specifies triplets to process. (count_processable_triplets): Remove. (link_processable_triplets): New function. * src/watcher.c: Use link_processable_triplets to obtain a doubly-linked list of triplets to process. Pass it to schedule_job.
Diffstat (more/less context) (ignore whitespace changes)
-rw-r--r--src/job.c57
-rw-r--r--src/net.c2
-rw-r--r--src/process.c6
-rw-r--r--src/triplet.c85
-rw-r--r--src/watcher.c7
-rw-r--r--src/wydawca.h24
6 files changed, 137 insertions, 44 deletions
diff --git a/src/job.c b/src/job.c
index 7a9316a..b49efe3 100644
--- a/src/job.c
+++ b/src/job.c
@@ -21,10 +21,13 @@
#define STATE_QUEUED 0x02
#define STATE_ACTIVE 0x04
+#define STATE_ANY (STATE_FINISHED|STATE_QUEUED|STATE_ACTIVE)
+
struct job {
struct job *next, *prev;
int state;
struct spool *spool;
+ struct file_triplet *triplet_list;
uid_t uid;
pid_t pid;
time_t timestamp;
@@ -54,12 +57,43 @@ set_timer(time_t interval)
alarm(interval);
}
+void
+triplet_jq_unlink(struct file_triplet *tp)
+{
+ if (!tp || !tp->job)
+ return;
+ if (tp->prev)
+ tp->jq_prev->jq_next = tp->jq_next;
+ else
+ tp->job->triplet_list = tp->jq_next;
+
+ if (tp->jq_next)
+ tp->jq_next->jq_prev = tp->jq_prev;
+
+ tp->jq_next = tp->jq_prev = NULL;
+}
+
+void
+jq_clear(struct job *job)
+{
+ struct file_triplet *tp;
+
+ if (!job->triplet_list)
+ return;
+ for (tp = job->triplet_list; tp; ) {
+ struct file_triplet *next = tp->jq_next;
+ remove_triplet(tp, 1);
+ tp = next;
+ }
+ job->triplet_list = NULL;
+}
+
struct job *
-job_locate(const struct spool *spool, uid_t uid)
+job_locate(const struct spool *spool, uid_t uid, int mask)
{
struct job *p;
for (p = queue; p; p = p->next)
- if (p->spool == spool && p->uid == uid)
+ if ((p->state & mask) && p->spool == spool && p->uid == uid)
break;
return p;
}
@@ -78,7 +112,11 @@ job_active_count()
static int
procspool(struct spool *spool, void *data)
{
- spool_commit_triplets(spool);
+ timer_start("spool");
+ timer_start(spool->tag);
+ spool_commit_triplets(spool, (struct file_triplet *)data);
+ timer_stop(spool->tag);
+ timer_stop("spool");
return 0;
}
@@ -88,12 +126,12 @@ wydawca_scanner(struct job *job)
int rc;
initstats();
timer_start("wydawca");
+ spool_create_timers();
if (job->spool == &inotify_spool)
- rc = for_each_spool(procspool, NULL);
+ rc = for_each_spool(procspool, job->triplet_list);
else if (job->spool == &fake_spool)
rc = scan_all_spools(1, &job->uid);
else {
- spool_create_timers();
rc = scan_spool(job->spool, 1, &job->uid);
}
timer_stop("wydawca");
@@ -190,7 +228,7 @@ job_insert(struct job *job, struct job *elt)
}
void
-schedule_job(struct spool *spool, uid_t uid)
+schedule_job(struct spool *spool, uid_t uid, struct file_triplet *tp)
{
struct job *job;
@@ -201,12 +239,15 @@ schedule_job(struct spool *spool, uid_t uid)
logmsg(LOG_DEBUG, _("scheduling job: %s, %lu"),
spool->tag, (unsigned long)uid);
- job = job_locate(spool, uid);
+ job = job_locate(spool, uid, tp ? STATE_QUEUED : STATE_ANY);
if (!job) {
job = grecs_zalloc(sizeof(*job));
job->spool = spool;
job->uid = uid;
job->pid = -1;
+ job->triplet_list = tp;
+ for (; tp; tp = tp->jq_next)
+ tp->job = job;
time(&job->timestamp);
job_insert(job, NULL);
}
@@ -291,6 +332,7 @@ job_queue_runner(time_t min_interval)
job->pid == pid) {
job->state &= ~STATE_ACTIVE;
job->state |= STATE_FINISHED;
+ jq_clear(job);
job->exit_status = status;
jobcnt--;
}
@@ -352,3 +394,4 @@ job_init()
signal(SIGCHLD, queue_signal);
signal(SIGALRM, queue_signal);
}
+
diff --git a/src/net.c b/src/net.c
index c63bce8..c3ba881 100644
--- a/src/net.c
+++ b/src/net.c
@@ -132,7 +132,7 @@ handle_connection(FILE * in, FILE * out)
pw = getpwnam(buf);
if (pw)
- schedule_job(spool, pw->pw_uid);
+ schedule_job(spool, pw->pw_uid, NULL);
else
logmsg(LOG_ERR, "no such user: %s", buf);
free(buf);
diff --git a/src/process.c b/src/process.c
index 1226ed2..0b4fd1b 100644
--- a/src/process.c
+++ b/src/process.c
@@ -149,7 +149,7 @@ match_uid_p(uid_t uid, int uc, uid_t * uv)
int
spool_cwd_add_new_file(const struct spool *spool, const char *name,
- int uc, uid_t * uv)
+ int uc, uid_t *uv)
{
struct stat st;
struct file_info finfo;
@@ -203,7 +203,7 @@ spool_add_new_file(const struct spool *spool, const char *name,
/* Scan upload directory from the DPAIR and register all files found
there, forming triplets when possible */
void
-scan_spool_unlocked(struct spool *spool, int uc, uid_t * uv)
+scan_spool_unlocked(struct spool *spool, int uc, uid_t *uv)
{
DIR *dir;
struct dirent *ent;
@@ -238,7 +238,7 @@ scan_spool_unlocked(struct spool *spool, int uc, uid_t * uv)
closedir(dir);
if (count_collected_triplets() > 0)
- spool_commit_triplets(spool);
+ spool_commit_triplets(spool, NULL);
timer_stop(spool->tag);
timer_stop("spool");
diff --git a/src/triplet.c b/src/triplet.c
index c4dc576..2df9151 100644
--- a/src/triplet.c
+++ b/src/triplet.c
@@ -294,29 +294,42 @@ check_triplet_state(struct file_triplet *trp, int noauth)
}
/* Unlink all parts of the triplet TRP */
-static void
-remove_triplet(struct file_triplet *trp)
+int
+remove_triplet(struct file_triplet *trp, int check)
{
int i;
for (i = 0; i < FILE_TYPE_COUNT; i++) {
if (trp->file[i].name) {
- logmsg(LOG_NOTICE, _("removing %s"),
- trp->file[i].name);
- if (!dry_run_mode && unlink(trp->file[i].name))
- logmsg(LOG_ERR, _("cannot remove %s: %s"),
- trp->file[i].name, strerror(errno));
+ if (check) {
+ if (access(trp->file[i].name, F_OK) == 0) {
+ logmsg(LOG_NOTICE,
+ _("%s still exists"),
+ trp->file[i].name);
+ return 1;
+ }
+ } else {
+ logmsg(LOG_NOTICE, _("removing %s"),
+ trp->file[i].name);
+ if (!dry_run_mode && unlink(trp->file[i].name))
+ logmsg(LOG_ERR,
+ _("cannot remove %s: %s"),
+ trp->file[i].name,
+ strerror(errno));
+ }
}
}
triplet_list_unlink(trp);
+ triplet_jq_unlink(trp);
grecs_symtab_remove(triplet_table, trp);
+ return 0;
}
time_t
triplet_sweep(void)
{
while (triplet_list && triplet_expired_p(triplet_list))
- remove_triplet(triplet_list);
+ remove_triplet(triplet_list, 0);
return triplet_ttl(triplet_list);
}
@@ -341,7 +354,7 @@ triplet_processor(void *data, void *proc_data)
logmsg(LOG_DEBUG, _("processing triplet `%s'"),
trp->name);
if (process_directives(trp))
- remove_triplet(trp);
+ remove_triplet(trp, 0);
return 0;
case triplet_incomplete:
@@ -354,13 +367,13 @@ triplet_processor(void *data, void *proc_data)
case triplet_bad:
UPDATE_STATS(STAT_BAD_TRIPLETS);
- remove_triplet(trp);
+ remove_triplet(trp, 0);
return 0;
}
if (triplet_expired_p(trp)) {
UPDATE_STATS(STAT_EXPIRED_TRIPLETS);
- remove_triplet(trp);
+ remove_triplet(trp, 0);
}
return 0;
@@ -368,14 +381,21 @@ triplet_processor(void *data, void *proc_data)
/* Process all triplets from the table according to the SPOOL */
void
-spool_commit_triplets(struct spool *spool)
+spool_commit_triplets(struct spool *spool, struct file_triplet *tplist)
{
if (debug_level)
logmsg(LOG_DEBUG, _("processing spool %s (%s)"),
spool->tag, mu_url_to_string(spool->dest_url));
if (spool_open_dictionaries(spool))
return;
- if (triplet_table) {
+ if (tplist) {
+ while (tplist) {
+ struct file_triplet *next = tplist->jq_next;
+ if (tplist->spool == spool)
+ triplet_processor(tplist, NULL);
+ tplist = next;
+ }
+ } else if (triplet_table) {
grecs_symtab_enumerate(triplet_table, triplet_processor, NULL);
grecs_symtab_clear(triplet_table);
}
@@ -387,43 +407,56 @@ count_collected_triplets()
return triplet_table ? grecs_symtab_count_entries(triplet_table) : 0;
}
+struct count_closure {
+ struct file_triplet *tp;
+};
+
static int
-triplet_counter(void *data, void *proc_data)
+triplet_linker(void *data, void *proc_data)
{
struct file_triplet *trp = data;
- size_t *cp = proc_data;
+ struct count_closure *cp = proc_data;
+ if (trp->job)
+ return 0;
if (debug_level)
logmsg(LOG_DEBUG, "FILE %s, DIST=%s, SIG=%s, DIRECTIVE=%s",
trp->name,
SP(trp->file[file_dist].name),
SP(trp->file[file_signature].name),
SP(trp->file[file_directive].name));
-
+
switch (check_triplet_state(trp, 1)) {
case triplet_directive:
case triplet_complete:
if (triplet_expired_p(trp))
- remove_triplet(trp);
- else
- ++*cp;
+ remove_triplet(trp, 0);
+ else {
+ trp->jq_prev = NULL;
+ if (cp->tp)
+ cp->tp->jq_prev = trp;
+ trp->jq_next = cp->tp;
+ cp->tp = trp;
+ }
break;
case triplet_incomplete:
break;
case triplet_bad:
- remove_triplet(trp);
+ remove_triplet(trp, 0);
}
return 0;
}
-size_t
-count_processable_triplets()
+struct file_triplet *
+link_processable_triplets()
{
- size_t count = 0;
+ struct count_closure clos;
+
+ clos.tp = NULL;
if (triplet_table)
- grecs_symtab_enumerate(triplet_table, triplet_counter, &count);
- return count;
+ grecs_symtab_enumerate(triplet_table, triplet_linker, &clos);
+ return clos.tp;
}
void
@@ -448,7 +481,7 @@ triplet_remove_file(struct spool *spool, const char *name)
if (debug_level > 0)
logmsg(LOG_DEBUG, "deleting empty triplet (%s/%s)",
spool->source_dir, name);
- remove_triplet(tp);
+ remove_triplet(tp, 0);
}
}
diff --git a/src/watcher.c b/src/watcher.c
index 0c587f1..c18beab 100644
--- a/src/watcher.c
+++ b/src/watcher.c
@@ -155,7 +155,8 @@ process_event(struct inotify_event *ep)
{
static struct dirwatcher *dwp;
dwp = dirwatcher_find_wd(ep->wd);
-
+ struct file_triplet *tp;
+
if (ep->mask & IN_IGNORED)
/* nothing */ ;
else if (ep->mask & IN_Q_OVERFLOW)
@@ -187,8 +188,8 @@ process_event(struct inotify_event *ep)
logmsg(LOG_DEBUG, "%s/%s written",
dwp->spool->source_dir, ep->name);
if (spool_add_new_file(dwp->spool, ep->name, 0, NULL) == 0
- && count_processable_triplets())
- schedule_job(&inotify_spool, getuid());
+ && (tp = link_processable_triplets()))
+ schedule_job(&inotify_spool, getuid(), tp);
} else
logmsg(LOG_NOTICE, "%s/%s: unexpected event %x",
dwp->spool->source_dir, ep->name, ep->mask);
diff --git a/src/wydawca.h b/src/wydawca.h
index 3a90ea1..9e4b2be 100644
--- a/src/wydawca.h
+++ b/src/wydawca.h
@@ -170,11 +170,25 @@ struct file_triplet {
const struct spool *spool; /* Owning spool */
char *relative_dir; /* Directory relative to spool->dest_dir */
char **directive; /* Decoded directive pairs (key: value\0) */
- char *blurb; /* Block of directives: directive[i] points here */
+ char *blurb; /* Block of directives: directive[i] points
+ here */
char *tmp; /* Temporary storage */
size_t tmpsize; /* Size of memory allocated in tmp */
struct txtacc *acc; /* Text accumulator for string allocation */
+ /* Triplets are joined in two doubly-linked lists:
+ 1) a cronological list, with prev pointing to a triplet older
+ than this one, and next pointing to a triplet newer than it: */
struct file_triplet *prev, *next;
+ /* 2) "job queue", a list of triplets processed by the same job: */
+ struct file_triplet *jq_prev, *jq_next;
+ /* The job queue is used only when triplets are processed by
+ the inotify watcher. In that case, the job member points to
+ the job processing this request. When the job terminates, all
+ requests belonging to it are removed from the table to avoid them
+ being processed by subsequent jobs.
+ If started as a cron job, or awoken by a TCP listener, both
+ job and jq_prev, jq_next are NULL. */
+ struct job *job;
/* User data */
size_t uploader_count;
struct uploader_info *uploader_list;
@@ -401,9 +415,9 @@ int for_each_spool(int (*fun) (struct spool *, void *), void *data);
void register_spool(struct spool *spool);
struct spool *wydawca_find_spool(const char *name);
void register_file(struct file_info *finfo, const struct spool *spool);
-void spool_commit_triplets(struct spool *);
+void spool_commit_triplets(struct spool *, struct file_triplet *);
+struct file_triplet *link_processable_triplets(void);
size_t count_collected_triplets(void);
-size_t count_processable_triplets(void);
char *triplet_expand_param(const char *tmpl, struct file_triplet *trp);
char *triplet_expand_dictionary_query(struct dictionary *dict, void *handle,
@@ -412,6 +426,7 @@ char *triplet_expand_dictionary_query(struct dictionary *dict, void *handle,
void triplet_remove_file(struct spool *spool, const char *name);
time_t triplet_sweep(void);
+int remove_triplet(struct file_triplet *trp, int check);
/* General-purpose dictionary support */
struct dictionary *dictionary_new(enum dictionary_id id,
@@ -517,9 +532,10 @@ void report_finish(void);
extern char *report_string;
/* job.c */
-void schedule_job(struct spool *spool, uid_t uid);
+void schedule_job(struct spool *spool, uid_t uid, struct file_triplet *tp);
void job_init(void);
void job_queue_runner(time_t min_timeout);
+void triplet_jq_unlink(struct file_triplet *tp);
/* profile.c */
void check_pidfile(void);

Return to:

Send suggestions and report system problems to the System administrator.