From 85db2c23baac71a7b5e9bec339ef4ba231046675 Mon Sep 17 00:00:00 2001 From: Ricardo Wurmus Date: Sun, 5 Apr 2020 15:53:00 +0200 Subject: mumi: Add jobs. --- Makefile.am | 1 + mumi/jobs.scm | 149 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 150 insertions(+) create mode 100644 mumi/jobs.scm diff --git a/Makefile.am b/Makefile.am index fd34ea5..47343ab 100644 --- a/Makefile.am +++ b/Makefile.am @@ -46,5 +46,6 @@ SOURCES = \ mumi/web/view/utils.scm \ mumi/bugs.scm \ mumi/messages.scm \ + mumi/jobs.scm \ mumi/send-email.scm \ mumi/config.scm diff --git a/mumi/jobs.scm b/mumi/jobs.scm new file mode 100644 index 0000000..addeaae --- /dev/null +++ b/mumi/jobs.scm @@ -0,0 +1,149 @@ +;;; mumi -- Mediocre, uh, mail interface +;;; Copyright © 2020 Ricardo Wurmus +;;; +;;; This program is free software: you can redistribute it and/or +;;; modify it under the terms of the GNU Affero General Public License +;;; as published by the Free Software Foundation, either version 3 of +;;; the License, or (at your option) any later version. +;;; +;;; This program is distributed in the hope that it will be useful, +;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;; Affero General Public License for more details. +;;; +;;; You should have received a copy of the GNU Affero General Public +;;; License along with this program. If not, see +;;; . + +(define-module (mumi jobs) + #:use-module (mumi config) + #:use-module (mumi send-email) + #:use-module (redis) + #:use-module (ice-9 match) + #:use-module (rnrs bytevectors) + #:export (enqueue + worker-loop)) + +;; This variable is looked up by 'mu-message-send', uh! +(define-public mu-debug 0) + +(define (flatten lst) + (cond ((null? lst) '()) + ((pair? lst) + ((@@ (guile) append) (flatten (car lst)) + (flatten (cdr lst)))) + (else (list lst)))) + +;; Take care of opening and closing a connection to redis. +(define-syntax with-redis + (lambda (x) + (syntax-case x () + ((_ exp ...) + (with-syntax (($conn (datum->syntax x '$conn))) + #'(let (($conn (redis-connect))) + (dynamic-wind + (lambda () #t) + (lambda () (redis-send $conn + (flatten (list exp ...)))) + (lambda () (redis-close $conn))))))))) + +(define-syntax-rule (transaction exps ...) + (list (multi) + exps ... + (exec))) + +(define %prefix "mumi:") +(define %waiting (string-append %prefix "waiting")) +(define %processing (string-append %prefix "processing")) + +(define (make-getter field) + (lambda (item) + (car (with-redis + (get (list (string-append %prefix field ":" item))))))) + +(define get-status (make-getter "status")) +(define get-options (make-getter "options")) +(define get-result (make-getter "result")) + +(define (set-status! item status) + (set (list (string-append %prefix "status:" item) + (string-append status ":" + (number->string (current-time)))))) + +(define (set-options! item options) + (set (list (string-append %prefix "options:" item) + (format #f "~s" options)))) + +(define (set-result! item result) + (set (list (string-append %prefix "result:" item) + (format #f "~a" result)))) + +(define (next-waiting) + "Wait for an item to appear on the waiting list, then move it to +the processing list and return the name." + (car (with-redis (brpoplpush (list %waiting %processing 0))))) + +(define (enqueue job-type options) + "Append the job to the waiting queue and store the OPTIONS." + (let* ((timestamp (match (gettimeofday) + ((s . ms) (format #f "~a~a" s ms)))) + (job-name (format #f "~a:~a" job-type timestamp))) + (with-redis + (transaction + (rpush (list %waiting job-name)) + (set-status! job-name "waiting") + (set-options! job-name options))) + job-name)) + +(define (done? item) + "Return the processing result if the ITEM has been processed or #f +if it has not." + (let ((status (get-status item)) + (result (get-result item))) + (string-prefix? "success" status))) + +(define (process-next processor) + "Get an item from the waiting queue and run PROCESSOR on it. +This procedure blocks until processor exits. Once PROCESSOR exits, +the job status and result are updated." + (let* ((item (next-waiting)) + (options (get-options item))) + (with-redis + (set-status! item "processing")) + ;; Process the item! + (let ((result (processor item options))) + (with-redis + (transaction + (set-result! item result) + ;; TODO: store error message in result if job failed + (if result + (set-status! item "success") + (set-status! item "failed")) + ;; We're done processing this. + (lrem (list %processing 0 item))))) + item)) + + +(define (mail-job global job raw-args) + "Process the job JOB with RAW-ARGS." + (define args (call-with-input-string raw-args read)) + (format (current-error-port) + "processing~% job: ~a~% options: ~a~%" + job raw-args) + (match (string-split job #\:) + (("mail" timestamp) + (let ((message (compose-message (assoc-ref global 'sender) + (assoc-ref args 'to) + #:reply-to (assoc-ref args 'reply-to) + #:subject (assoc-ref args 'subject) + #:text (assoc-ref args 'text)))) + (send-message message (assoc-ref global 'smtp)))) + (_ #f))) + +(define (worker-loop global-options) + "Process jobs forever. Blocks if there are no jobs." + (let ((processed (process-next (lambda (job args) + (mail-job global-options job args))))) + (format (current-error-port) "[~a] ~a: ~a~%" + (current-time) (get-status processed) processed)) + (worker-loop global-options)) -- cgit v1.2.3