diff options
author | Sergey Poznyakoff <gray@gnu.org> | 2020-04-15 16:26:34 +0300 |
---|---|---|
committer | Sergey Poznyakoff <gray@gnu.org> | 2020-04-15 20:25:49 +0300 |
commit | 2cc5b63b5f90d2dd9b05f50ac28e379d417f1a8c (patch) | |
tree | 38238377b042a159fb9bf633e7b8e3853f0fd980 | |
parent | 44ad5cfe84f1cd1d7b6de35ae251f1caf69450d8 (diff) | |
download | wydawca-2cc5b63b5f90d2dd9b05f50ac28e379d417f1a8c.tar.gz wydawca-2cc5b63b5f90d2dd9b05f50ac28e379d417f1a8c.tar.bz2 |
Use job scheduler when processing all spools.alpha-3.1.90
* src/job.c (job_printable): Improve job identification.
(wydawca_scanner): Don't log statistics in single process mode.
(job_queue_wait): New function.
* src/net.c (wydawca_listener): Move the call to job_init to main.
* src/process.c (scan_spool_unlocked): Enqueue the triplet.
Don't call spool_commit_triplets
* src/triplet.c (remove_triplet): Omit checks in dry-run mode.
(triplet_processor,spool_commit_triplets)
(count_collected_triplets): Remove.
* src/wydawca.c (main): Call job_queue_wait if not daemon.
* tests/inotify-unatt01.at: Update expected output.
* tests/upload-dry.at: Likewise.
-rw-r--r-- | src/job.c | 68 | ||||
-rw-r--r-- | src/net.c | 3 | ||||
-rw-r--r-- | src/process.c | 5 | ||||
-rw-r--r-- | src/triplet.c | 62 | ||||
-rw-r--r-- | src/wydawca.c | 6 | ||||
-rw-r--r-- | src/wydawca.h | 9 | ||||
-rw-r--r-- | tests/inotify-unatt01.at | 14 | ||||
-rw-r--r-- | tests/upload-dry.at | 7 |
8 files changed, 68 insertions, 106 deletions
@@ -69,21 +69,23 @@ char const * job_printable(struct job *job) { if (!job->printable) { + size_t s = 0; + switch (job->type) { case JOB_TRIPLET: - job->printable = grecs_strdup("triplet"); + if (grecs_asprintf(&job->printable, &s, + "triplet(%s,%s)", + job->v.triplet->spool->tag, + job->v.triplet->name)) + grecs_alloc_die(); break; - case JOB_SPOOL:{ - size_t s = 0; - int rc; - rc = grecs_asprintf(&job->printable, &s, - "spool(%s)", - job->v.spool->tag); - if (rc) - grecs_alloc_die(); - break; - } + case JOB_SPOOL: + if (grecs_asprintf(&job->printable, &s, + "spool(%s)", + job->v.spool->tag)) + grecs_alloc_die(); + break; case JOB_ALL_SPOOLS: job->printable = grecs_strdup("all spools"); @@ -149,7 +151,8 @@ wydawca_scanner(struct job *job) notify_flush(job->v.spool); } timer_stop("wydawca"); - logstats(); + if (!single_process) + logstats(); return rc; } @@ -329,7 +332,6 @@ print_status(struct job *job, int expect_term) void job_queue_runner(time_t min_interval) { - int status; struct job *job; time_t now = time(NULL); @@ -337,6 +339,8 @@ job_queue_runner(time_t min_interval) wakeup = 0; for (;;) { + int status; + pid_t pid = waitpid((pid_t) - 1, &status, WNOHANG); if (pid <= 0) break; @@ -394,7 +398,43 @@ job_queue_runner(time_t min_interval) } void -job_init() +job_queue_wait(void) +{ + while (queue) { + int status; + struct job *job; + + pid_t pid = waitpid((pid_t) - 1, &status, 0); + if (pid <= 0) + break; + for (job = queue; job; job = job->next) { + if (job->state == STATE_ACTIVE && job->pid == pid) { + remove_triplet(job->v.triplet, 1); + job->v.triplet = NULL; + job->exit_status = status; + job->state = STATE_FINISHED; + jobcnt--; + } + } + + for (job = queue; job;) { + struct job *next = job->next; + if (job->state == STATE_FINISHED) { + print_status(job, 0); + job_remove(job); + job_destroy(job); + } else if (job->state == STATE_QUEUED) { + job_remove(job); + job_destroy(job); + } + job = next; + } + } + logstats(); +} + +void +job_init(void) { signal(SIGCHLD, queue_signal); signal(SIGALRM, queue_signal); @@ -159,7 +159,7 @@ notify_parent(void) } void -wydawca_listener() +wydawca_listener(void) { int ctlfd = open_listener(); int wfd = watcher_init(); @@ -181,7 +181,6 @@ wydawca_listener() if (notify_parent()) kill(getppid(), SIGUSR1); - job_init(); signal(SIGHUP, sig_hup); signal(SIGTERM, sig_term); signal(SIGQUIT, sig_term); diff --git a/src/process.c b/src/process.c index 9fdaa5f..4c14fd5 100644 --- a/src/process.c +++ b/src/process.c @@ -187,14 +187,11 @@ scan_spool_unlocked(struct spool *spool) if (strcmp(ent->d_name, ".") == 0 || strcmp(ent->d_name, "..") == 0) continue; - spool_add_new_file(spool, ent->d_name); + triplet_enqueue(spool_add_new_file(spool, ent->d_name)); } closedir(dir); - if (count_collected_triplets() > 0) - spool_commit_triplets(spool); - timer_stop(spool->tag); timer_stop("spool"); } diff --git a/src/triplet.c b/src/triplet.c index 92adf34..e41da07 100644 --- a/src/triplet.c +++ b/src/triplet.c @@ -336,6 +336,8 @@ remove_triplet(struct wy_triplet *trp, int check) if (check) { struct stat st; + if (wy_dry_run) + continue; if (fstatat(trp->spool->source_fd, trp->file[i].name, &st, 0)) { if (errno == ENOENT) @@ -397,66 +399,6 @@ triplet_commit(struct wy_triplet *trp) timer_stop("spool"); } -/* Process a single triplet from the table */ -static int -triplet_processor(void *data, void *proc_data) -{ - struct wy_triplet *trp = data; - struct spool *spool = proc_data; - - if (trp->spool == spool) { - wy_debug(1, ("FILE %s, DIST=%s, SIG=%s, DIRECTIVE=%s", - trp->name, - SP(trp->file[file_dist].name), - SP(trp->file[file_signature].name), - SP(trp->file[file_directive].name))); - - switch (check_triplet_state(trp)) { - case triplet_directive: - case triplet_complete: - wy_debug(1, (_("processing triplet `%s'"), trp->name)); - if (process_directives(trp)) - remove_triplet(trp, 0); - return 0; - - case triplet_incomplete: - wy_debug(1, (_("%s: incomplete triplet"), trp->name)); - /* ignore unless expired (see below); */ - increase_stat_counter(WY_STAT_INCOMPLETE_TRIPLETS); - break; - - case triplet_bad: - increase_stat_counter(WY_STAT_BAD_TRIPLETS); - remove_triplet(trp, 0); - return 0; - } - - if (triplet_expired_p(trp)) { - increase_stat_counter(WY_STAT_EXPIRED_TRIPLETS); - remove_triplet(trp, 0); - } - } - return 0; -} - -/* Process all triplets from the table according to the SPOOL */ -void -spool_commit_triplets(struct spool *spool) -{ - wy_debug(1, (_("processing spool %s (%s)"), - spool->tag, wy_url_printable(spool->dest_url))); - if (triplet_table) { - if (spool_open_dictionaries(spool) == 0) - grecs_symtab_foreach(triplet_table, triplet_processor, spool); - } -} - -size_t -count_collected_triplets() -{ - return triplet_table ? grecs_symtab_count(triplet_table) : 0; -} - void triplet_remove_file(struct spool *spool, const char *name) { diff --git a/src/wydawca.c b/src/wydawca.c index eb35111..d948429 100644 --- a/src/wydawca.c +++ b/src/wydawca.c @@ -400,14 +400,16 @@ main(int argc, char **argv) } wydawca_lock_init(); + job_init(); wy_log(LOG_NOTICE, _("wydawca (%s) started"), PACKAGE_STRING); scan_all_spools(); - logstats(); if (wy_mode == WY_MODE_DAEMON) wydawca_daemon(); - + else + job_queue_wait(); + dictionaries_close(); modules_close(); diff --git a/src/wydawca.h b/src/wydawca.h index 1f64623..6cc71f1 100644 --- a/src/wydawca.h +++ b/src/wydawca.h @@ -387,9 +387,6 @@ int for_each_spool(int (*fun) (struct spool *, void *), void *data); void register_spool(struct spool *spool); struct spool *wydawca_find_spool(const char *name); struct wy_triplet *register_file(struct file_info *finfo, struct spool *spool); -void spool_commit_triplets(struct spool *); -struct wy_triplet *link_processable_triplets(void); -size_t count_collected_triplets(void); char *triplet_expand_dictionary_query(struct dictionary *dict, void *handle, @@ -546,6 +543,8 @@ job_schedule_spool(struct spool *spool) void job_init(void); void job_queue_runner(time_t min_timeout); +void job_queue_wait(void); + /* profile.c */ void check_pidfile(void); @@ -573,10 +572,6 @@ int tcpwrap_access(int fd); /* userprivs.c */ int wydawca_userprivs(uid_t uid, gid_t gid, gid_t * grplist, size_t ngrp); -int push_dir(const char *dirname); -int pop_dir(void); -char *getcwd_alloc(void); - #ifdef WITH_INOTIFY int watcher_init(void); int watcher_run(int); diff --git a/tests/inotify-unatt01.at b/tests/inotify-unatt01.at index 75b14c4..c8c6da6 100644 --- a/tests/inotify-unatt01.at +++ b/tests/inotify-unatt01.at @@ -19,20 +19,6 @@ AT_KEYWORDS([inotify unattended pickup inotify-unatt01]) AT_DATA([experr], [wydawca: [[NOTICE]] AT_PACKAGE_TARNAME (AT_PACKAGE_NAME AT_PACKAGE_VERSION) started -wydawca: [[INFO]] errors: 0 -wydawca: [[INFO]] warnings: 0 -wydawca: [[INFO]] bad signatures: 0 -wydawca: [[INFO]] access violation attempts: 0 -wydawca: [[INFO]] complete triplets: 0 -wydawca: [[INFO]] incomplete triplets: 1 -wydawca: [[INFO]] bad triplets: 0 -wydawca: [[INFO]] expired triplets: 0 -wydawca: [[INFO]] triplet successes: 0 -wydawca: [[INFO]] files uploaded: 0 -wydawca: [[INFO]] files archived: 0 -wydawca: [[INFO]] symlinks created: 0 -wydawca: [[INFO]] symlinks removed: 0 -wydawca: [[INFO]] check failures: 0 wydawca: [[NOTICE]] file.directive.asc: VERSION: 1.1 wydawca: [[NOTICE]] file.directive.asc: COMMENT: Gnupload for Wydawca testsuite wydawca: [[INFO]] errors: 0 diff --git a/tests/upload-dry.at b/tests/upload-dry.at index 8245552..7a53e4f 100644 --- a/tests/upload-dry.at +++ b/tests/upload-dry.at @@ -22,11 +22,11 @@ wydawca: [[NOTICE]] AT_PACKAGE_TARNAME (AT_PACKAGE_NAME AT_PACKAGE_VERSION) star wydawca: [[DEBUG]] ./source/ok -> ./dest wydawca: [[DEBUG]] ./source/fail -> ./dest wydawca: [[DEBUG]] ./source/test -> ./dest -wydawca: [[DEBUG]] processing spool test (./dest) -wydawca: [[DEBUG]] FILE file, DIST=file, SIG=file.sig, DIRECTIVE=file.directive.asc wydawca: [[DEBUG]] Good signature from Wydawca (Testsuite) <bug-wydawca@gnu.org.ua> wydawca: [[DEBUG]] file: directive file signature OK wydawca: [[NOTICE]] file.directive.asc: VERSION: 1.1 +wydawca: [[DEBUG]] scheduling job triplet(test,file) +wydawca: [[DEBUG]] starting job: triplet(test,file) wydawca: [[DEBUG]] processing triplet `file' wydawca: [[NOTICE]] file.directive.asc: COMMENT: Gnupload for Wydawca testsuite wydawca: [[DEBUG]] good detached signature for file @@ -46,6 +46,7 @@ wydawca: [[INFO]] files archived: 0 wydawca: [[INFO]] symlinks created: 0 wydawca: [[INFO]] symlinks removed: 0 wydawca: [[INFO]] check failures: 0 +wydawca: [[DEBUG]] removing job: triplet(test,file) wydawca: [[NOTICE]] AT_PACKAGE_TARNAME (AT_PACKAGE_NAME AT_PACKAGE_VERSION) finished ]) @@ -57,7 +58,7 @@ set -e wydawca $WY_FORCE --no-preproc --dry-run -c wydawca.cf 2>err set +e grep "found file" err | sort -grep -v "found file" err | cwdrepl >&2 +sed -e '/found file/d' -e '/exited successfully/d' err | cwdrepl >&2 ], [0], [wydawca: [[DEBUG]] found file file.directive.asc: signed upload directive, stem: file |