mumi: Add jobs.
authorRicardo Wurmus <rekado@elephly.net>
Sun, 5 Apr 2020 13:53:00 +0000 (15:53 +0200)
committerRicardo Wurmus <rekado@elephly.net>
Sun, 5 Apr 2020 13:53:00 +0000 (15:53 +0200)
Makefile.am
mumi/jobs.scm [new file with mode: 0644]

index fd34ea5..47343ab 100644 (file)
@@ -46,5 +46,6 @@ SOURCES =                                                     \
   mumi/web/view/utils.scm                      \
   mumi/bugs.scm                                                \
   mumi/messages.scm                                    \
+  mumi/jobs.scm                                                \
   mumi/send-email.scm                          \
   mumi/config.scm
diff --git a/mumi/jobs.scm b/mumi/jobs.scm
new file mode 100644 (file)
index 0000000..addeaae
--- /dev/null
@@ -0,0 +1,149 @@
+;;; mumi -- Mediocre, uh, mail interface
+;;; Copyright © 2020 Ricardo Wurmus <rekado@elephly.net>
+;;;
+;;; This program is free software: you can redistribute it and/or
+;;; modify it under the terms of the GNU Affero General Public License
+;;; as published by the Free Software Foundation, either version 3 of
+;;; the License, or (at your option) any later version.
+;;;
+;;; This program is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+;;; Affero General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU Affero General Public
+;;; License along with this program.  If not, see
+;;; <http://www.gnu.org/licenses/>.
+
+(define-module (mumi jobs)
+  #:use-module (mumi config)
+  #:use-module (mumi send-email)
+  #:use-module (redis)
+  #:use-module (ice-9 match)
+  #:use-module (rnrs bytevectors)
+  #:export (enqueue
+            worker-loop))
+
+;; This variable is looked up by 'mu-message-send', uh!
+(define-public mu-debug 0)
+
+(define (flatten lst)
+  (cond ((null? lst) '())
+        ((pair? lst)
+         ((@@ (guile) append) (flatten (car lst))
+                 (flatten (cdr lst))))
+        (else (list lst))))
+
+;; Take care of opening and closing a connection to redis.
+(define-syntax with-redis
+  (lambda (x)
+    (syntax-case x ()
+      ((_ exp ...)
+       (with-syntax (($conn (datum->syntax x '$conn)))
+         #'(let (($conn (redis-connect)))
+             (dynamic-wind
+               (lambda () #t)
+               (lambda () (redis-send $conn
+                                 (flatten (list exp ...))))
+               (lambda () (redis-close $conn)))))))))
+
+(define-syntax-rule (transaction exps ...)
+  (list (multi)
+        exps ...
+        (exec)))
+
+(define %prefix     "mumi:")
+(define %waiting    (string-append %prefix "waiting"))
+(define %processing (string-append %prefix "processing"))
+
+(define (make-getter field)
+  (lambda (item)
+    (car (with-redis
+          (get (list (string-append %prefix field ":" item)))))))
+
+(define get-status  (make-getter "status"))
+(define get-options (make-getter "options"))
+(define get-result  (make-getter "result"))
+
+(define (set-status! item status)
+  (set (list (string-append %prefix "status:" item)
+             (string-append status ":"
+                            (number->string (current-time))))))
+
+(define (set-options! item options)
+  (set (list (string-append %prefix "options:" item)
+             (format #f "~s" options))))
+
+(define (set-result! item result)
+  (set (list (string-append %prefix "result:" item)
+             (format #f "~a" result))))
+
+(define (next-waiting)
+  "Wait for an item to appear on the waiting list, then move it to
+the processing list and return the name."
+  (car (with-redis (brpoplpush (list %waiting %processing 0)))))
+
+(define (enqueue job-type options)
+  "Append the job to the waiting queue and store the OPTIONS."
+  (let* ((timestamp (match (gettimeofday)
+                      ((s . ms) (format #f "~a~a" s ms))))
+         (job-name (format #f "~a:~a" job-type timestamp)))
+    (with-redis
+     (transaction
+      (rpush (list %waiting job-name))
+      (set-status! job-name "waiting")
+      (set-options! job-name options)))
+    job-name))
+
+(define (done? item)
+  "Return the processing result if the ITEM has been processed or #f
+if it has not."
+  (let ((status (get-status item))
+        (result (get-result item)))
+    (string-prefix? "success" status)))
+
+(define (process-next processor)
+  "Get an item from the waiting queue and run PROCESSOR on it.
+This procedure blocks until processor exits.  Once PROCESSOR exits,
+the job status and result are updated."
+  (let* ((item    (next-waiting))
+         (options (get-options item)))
+    (with-redis
+     (set-status! item "processing"))
+    ;; Process the item!
+    (let ((result (processor item options)))
+      (with-redis
+       (transaction
+        (set-result! item result)
+        ;; TODO: store error message in result if job failed
+        (if result
+            (set-status! item "success")
+            (set-status! item "failed"))
+        ;; We're done processing this.
+        (lrem (list %processing 0 item)))))
+    item))
+
+\f
+(define (mail-job global job raw-args)
+  "Process the job JOB with RAW-ARGS."
+  (define args (call-with-input-string raw-args read))
+  (format (current-error-port)
+          "processing~%  job: ~a~%  options: ~a~%"
+          job raw-args)
+  (match (string-split job #\:)
+    (("mail" timestamp)
+     (let ((message (compose-message (assoc-ref global 'sender)
+                                     (assoc-ref args 'to)
+                                     #:reply-to (assoc-ref args 'reply-to)
+                                     #:subject (assoc-ref args 'subject)
+                                     #:text (assoc-ref args 'text))))
+       (send-message message (assoc-ref global 'smtp))))
+    (_ #f)))
+
+(define (worker-loop global-options)
+  "Process jobs forever.  Blocks if there are no jobs."
+  (let ((processed (process-next (lambda (job args)
+                                   (mail-job global-options job args)))))
+    (format (current-error-port) "[~a] ~a: ~a~%"
+            (current-time) (get-status processed) processed))
+  (worker-loop global-options))