summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRicardo Wurmus <rekado@elephly.net>2020-04-05 15:53:00 +0200
committerRicardo Wurmus <rekado@elephly.net>2020-04-05 15:53:00 +0200
commit85db2c23baac71a7b5e9bec339ef4ba231046675 (patch)
treeec66aefa8a503c73a301edc310dfba5aed863ea5
parent2725c7079bb85a78660149170f1d45dbeb5714ef (diff)
mumi: Add jobs.
-rw-r--r--Makefile.am1
-rw-r--r--mumi/jobs.scm149
2 files changed, 150 insertions, 0 deletions
diff --git a/Makefile.am b/Makefile.am
index fd34ea5..47343ab 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -46,5 +46,6 @@ SOURCES = \
mumi/web/view/utils.scm \
mumi/bugs.scm \
mumi/messages.scm \
+ mumi/jobs.scm \
mumi/send-email.scm \
mumi/config.scm
diff --git a/mumi/jobs.scm b/mumi/jobs.scm
new file mode 100644
index 0000000..addeaae
--- /dev/null
+++ b/mumi/jobs.scm
@@ -0,0 +1,149 @@
+;;; mumi -- Mediocre, uh, mail interface
+;;; Copyright © 2020 Ricardo Wurmus <rekado@elephly.net>
+;;;
+;;; This program is free software: you can redistribute it and/or
+;;; modify it under the terms of the GNU Affero General Public License
+;;; as published by the Free Software Foundation, either version 3 of
+;;; the License, or (at your option) any later version.
+;;;
+;;; This program is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;;; Affero General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU Affero General Public
+;;; License along with this program. If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+(define-module (mumi jobs)
+ #:use-module (mumi config)
+ #:use-module (mumi send-email)
+ #:use-module (redis)
+ #:use-module (ice-9 match)
+ #:use-module (rnrs bytevectors)
+ #:export (enqueue
+ worker-loop))
+
+;; This variable is looked up by 'mu-message-send', uh!
+(define-public mu-debug 0)
+
+(define (flatten lst)
+ (cond ((null? lst) '())
+ ((pair? lst)
+ ((@@ (guile) append) (flatten (car lst))
+ (flatten (cdr lst))))
+ (else (list lst))))
+
+;; Take care of opening and closing a connection to redis.
+(define-syntax with-redis
+ (lambda (x)
+ (syntax-case x ()
+ ((_ exp ...)
+ (with-syntax (($conn (datum->syntax x '$conn)))
+ #'(let (($conn (redis-connect)))
+ (dynamic-wind
+ (lambda () #t)
+ (lambda () (redis-send $conn
+ (flatten (list exp ...))))
+ (lambda () (redis-close $conn)))))))))
+
+(define-syntax-rule (transaction exps ...)
+ (list (multi)
+ exps ...
+ (exec)))
+
+(define %prefix "mumi:")
+(define %waiting (string-append %prefix "waiting"))
+(define %processing (string-append %prefix "processing"))
+
+(define (make-getter field)
+ (lambda (item)
+ (car (with-redis
+ (get (list (string-append %prefix field ":" item)))))))
+
+(define get-status (make-getter "status"))
+(define get-options (make-getter "options"))
+(define get-result (make-getter "result"))
+
+(define (set-status! item status)
+ (set (list (string-append %prefix "status:" item)
+ (string-append status ":"
+ (number->string (current-time))))))
+
+(define (set-options! item options)
+ (set (list (string-append %prefix "options:" item)
+ (format #f "~s" options))))
+
+(define (set-result! item result)
+ (set (list (string-append %prefix "result:" item)
+ (format #f "~a" result))))
+
+(define (next-waiting)
+ "Wait for an item to appear on the waiting list, then move it to
+the processing list and return the name."
+ (car (with-redis (brpoplpush (list %waiting %processing 0)))))
+
+(define (enqueue job-type options)
+ "Append the job to the waiting queue and store the OPTIONS."
+ (let* ((timestamp (match (gettimeofday)
+ ((s . ms) (format #f "~a~a" s ms))))
+ (job-name (format #f "~a:~a" job-type timestamp)))
+ (with-redis
+ (transaction
+ (rpush (list %waiting job-name))
+ (set-status! job-name "waiting")
+ (set-options! job-name options)))
+ job-name))
+
+(define (done? item)
+ "Return the processing result if the ITEM has been processed or #f
+if it has not."
+ (let ((status (get-status item))
+ (result (get-result item)))
+ (string-prefix? "success" status)))
+
+(define (process-next processor)
+ "Get an item from the waiting queue and run PROCESSOR on it.
+This procedure blocks until processor exits. Once PROCESSOR exits,
+the job status and result are updated."
+ (let* ((item (next-waiting))
+ (options (get-options item)))
+ (with-redis
+ (set-status! item "processing"))
+ ;; Process the item!
+ (let ((result (processor item options)))
+ (with-redis
+ (transaction
+ (set-result! item result)
+ ;; TODO: store error message in result if job failed
+ (if result
+ (set-status! item "success")
+ (set-status! item "failed"))
+ ;; We're done processing this.
+ (lrem (list %processing 0 item)))))
+ item))
+
+
+(define (mail-job global job raw-args)
+ "Process the job JOB with RAW-ARGS."
+ (define args (call-with-input-string raw-args read))
+ (format (current-error-port)
+ "processing~% job: ~a~% options: ~a~%"
+ job raw-args)
+ (match (string-split job #\:)
+ (("mail" timestamp)
+ (let ((message (compose-message (assoc-ref global 'sender)
+ (assoc-ref args 'to)
+ #:reply-to (assoc-ref args 'reply-to)
+ #:subject (assoc-ref args 'subject)
+ #:text (assoc-ref args 'text))))
+ (send-message message (assoc-ref global 'smtp))))
+ (_ #f)))
+
+(define (worker-loop global-options)
+ "Process jobs forever. Blocks if there are no jobs."
+ (let ((processed (process-next (lambda (job args)
+ (mail-job global-options job args)))))
+ (format (current-error-port) "[~a] ~a: ~a~%"
+ (current-time) (get-status processed) processed))
+ (worker-loop global-options))