diff options
-rw-r--r-- | mumi/jobs.scm | 29 |
1 files changed, 10 insertions, 19 deletions
diff --git a/mumi/jobs.scm b/mumi/jobs.scm index dfb44f9..69ef2b2 100644 --- a/mumi/jobs.scm +++ b/mumi/jobs.scm @@ -1,5 +1,5 @@ ;;; mumi -- Mediocre, uh, mail interface -;;; Copyright © 2020 Ricardo Wurmus <rekado@elephly.net> +;;; Copyright © 2020, 2022 Ricardo Wurmus <rekado@elephly.net> ;;; ;;; This program is free software: you can redistribute it and/or ;;; modify it under the terms of the GNU Affero General Public License @@ -62,17 +62,11 @@ (get (list (string-append %prefix field ":" item))))))) (define get-status (make-getter "status")) -(define get-options (make-getter "options")) - (define (set-status! item status) (set (list (string-append %prefix "status:" item) (string-append status ":" (number->string (current-time)))))) -(define (set-options! item options) - (set (list (string-append %prefix "options:" item) - (format #f "~s" options)))) - (define (next-waiting) "Wait for an item to appear on the waiting list, then move it to the processing list and return the name." @@ -84,22 +78,19 @@ the processing list and return the name." ((s . ms) (format #f "~a~a" s ms)))) (job-name (format #f "~a:~a" job-type timestamp))) (with-redis - (transaction - (rpush (list %waiting job-name)) - (set-status! job-name "waiting") - (set-options! job-name options))) + (rpush (list %waiting + (format #f "~s" (cons `(id . ,job-name) options))))) job-name)) (define (process-next processor) "Get an item from the waiting queue and run PROCESSOR on it. This procedure blocks until processor exits. Once PROCESSOR exits, the job status is updated." - (let* ((item (next-waiting)) - (options (get-options item))) + (let* ((item (next-waiting))) (with-redis (set-status! item "processing")) ;; Process the item! - (let ((result (processor item options))) + (let ((result (processor item))) (with-redis (transaction ;; TODO: store error message in result if job failed @@ -111,12 +102,12 @@ the job status is updated." item)) -(define (mail-job global job raw-args) - "Process the job JOB with RAW-ARGS." - (define args (call-with-input-string raw-args read)) +(define (mail-job global job) + "Process the job JOB." + (define args (call-with-input-string job read)) (format (current-error-port) "processing~% job: ~a~% options: ~a~%" - job raw-args) + job args) (match (string-split job #\:) (("mail" timestamp) (let ((message (compose-message (format #f "~a via web <~a>" @@ -132,7 +123,7 @@ the job status is updated." (define (worker-loop global-options) "Process jobs forever. Blocks if there are no jobs." (let ((processed (process-next (lambda (job args) - (mail-job global-options job args))))) + (mail-job global-options job))))) (format (current-error-port) "[~a] ~a: ~a~%" (current-time) (get-status processed) processed)) (worker-loop global-options)) |