summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mumi/jobs.scm29
1 files changed, 10 insertions, 19 deletions
diff --git a/mumi/jobs.scm b/mumi/jobs.scm
index dfb44f9..69ef2b2 100644
--- a/mumi/jobs.scm
+++ b/mumi/jobs.scm
@@ -1,5 +1,5 @@
;;; mumi -- Mediocre, uh, mail interface
-;;; Copyright © 2020 Ricardo Wurmus <rekado@elephly.net>
+;;; Copyright © 2020, 2022 Ricardo Wurmus <rekado@elephly.net>
;;;
;;; This program is free software: you can redistribute it and/or
;;; modify it under the terms of the GNU Affero General Public License
@@ -62,17 +62,11 @@
(get (list (string-append %prefix field ":" item)))))))
(define get-status (make-getter "status"))
-(define get-options (make-getter "options"))
-
(define (set-status! item status)
(set (list (string-append %prefix "status:" item)
(string-append status ":"
(number->string (current-time))))))
-(define (set-options! item options)
- (set (list (string-append %prefix "options:" item)
- (format #f "~s" options))))
-
(define (next-waiting)
"Wait for an item to appear on the waiting list, then move it to
the processing list and return the name."
@@ -84,22 +78,19 @@ the processing list and return the name."
((s . ms) (format #f "~a~a" s ms))))
(job-name (format #f "~a:~a" job-type timestamp)))
(with-redis
- (transaction
- (rpush (list %waiting job-name))
- (set-status! job-name "waiting")
- (set-options! job-name options)))
+ (rpush (list %waiting
+ (format #f "~s" (cons `(id . ,job-name) options)))))
job-name))
(define (process-next processor)
"Get an item from the waiting queue and run PROCESSOR on it.
This procedure blocks until processor exits. Once PROCESSOR exits,
the job status is updated."
- (let* ((item (next-waiting))
- (options (get-options item)))
+ (let* ((item (next-waiting)))
(with-redis
(set-status! item "processing"))
;; Process the item!
- (let ((result (processor item options)))
+ (let ((result (processor item)))
(with-redis
(transaction
;; TODO: store error message in result if job failed
@@ -111,12 +102,12 @@ the job status is updated."
item))
-(define (mail-job global job raw-args)
- "Process the job JOB with RAW-ARGS."
- (define args (call-with-input-string raw-args read))
+(define (mail-job global job)
+ "Process the job JOB."
+ (define args (call-with-input-string job read))
(format (current-error-port)
"processing~% job: ~a~% options: ~a~%"
- job raw-args)
+ job args)
(match (string-split job #\:)
(("mail" timestamp)
(let ((message (compose-message (format #f "~a via web <~a>"
@@ -132,7 +123,7 @@ the job status is updated."
(define (worker-loop global-options)
"Process jobs forever. Blocks if there are no jobs."
(let ((processed (process-next (lambda (job args)
- (mail-job global-options job args)))))
+ (mail-job global-options job)))))
(format (current-error-port) "[~a] ~a: ~a~%"
(current-time) (get-status processed) processed))
(worker-loop global-options))