;;; mumi -- Mediocre, uh, mail interface ;;; Copyright © 2020, 2022 Ricardo Wurmus ;;; ;;; This program is free software: you can redistribute it and/or ;;; modify it under the terms of the GNU Affero General Public License ;;; as published by the Free Software Foundation, either version 3 of ;;; the License, or (at your option) any later version. ;;; ;;; This program is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; Affero General Public License for more details. ;;; ;;; You should have received a copy of the GNU Affero General Public ;;; License along with this program. If not, see ;;; . (define-module (mumi jobs) #:use-module (mumi config) #:use-module (mumi send-email) #:use-module (redis) #:use-module (ice-9 match) #:use-module (rnrs bytevectors) #:export (enqueue worker-loop)) ;; This variable is looked up by 'mu-message-send', uh! (define-public mu-debug 0) (define (flatten lst) (cond ((null? lst) '()) ((pair? lst) ((@@ (guile) append) (flatten (car lst)) (flatten (cdr lst)))) (else (list lst)))) ;; Take care of opening and closing a connection to redis. (define-syntax with-redis (lambda (x) (syntax-case x () ((_ exp ...) (with-syntax (($conn (datum->syntax x '$conn))) #'(let (($conn (redis-connect))) (dynamic-wind (lambda () #t) (lambda () (redis-send $conn (flatten (list exp ...)))) (lambda () (redis-close $conn))))))))) (define-syntax-rule (transaction exps ...) (list (multi) exps ... (exec))) (define %prefix "mumi:") (define %waiting (string-append %prefix "waiting")) (define %processing (string-append %prefix "processing")) (define (make-getter field) (lambda (item) (car (with-redis (get (list (string-append %prefix field ":" item))))))) (define get-status (make-getter "status")) (define (set-status! item status) (let ((key (string-append %prefix "status:" item))) (with-redis (transaction (set (list key status)) (expire (list key (* 60 60 24 30))))))) ;delete after a month (define (next-waiting) "Wait for an item to appear on the waiting list, then move it to the processing list and return the name." (car (with-redis (brpoplpush (list %waiting %processing 0))))) (define (enqueue job-type options) "Append the job to the waiting queue and store the OPTIONS." (let* ((timestamp (match (gettimeofday) ((s . ms) (format #f "~a~a" s ms)))) (job-name (format #f "~a:~a" job-type timestamp))) (with-redis (rpush (list %waiting (format #f "~s" (cons `(id . ,job-name) options))))) job-name)) (define (process-next processor) "Get an item from the waiting queue and run PROCESSOR on it. This procedure blocks until processor exits. Once PROCESSOR exits, the job status is updated." (let* ((item (next-waiting))) (with-redis (set-status! item "processing")) ;; Process the item! (let ((result (processor item))) (with-redis (transaction ;; TODO: store error message in result if job failed (if result (set-status! item "success") (set-status! item "failed")) ;; We're done processing this. (lrem (list %processing 0 item))))) item)) (define (mail-job global job) "Process the job JOB." (define args (call-with-input-string job read)) (format (current-error-port) "processing~% job: ~a~% options: ~a~%" job args) (match (string-split job #\:) (("mail" timestamp) (let ((message (compose-message (format #f "~a via web <~a>" (assoc-ref args 'from) (assoc-ref global 'sender)) (assoc-ref args 'to) #:reply-to (assoc-ref args 'reply-to) #:subject (assoc-ref args 'subject) #:text (assoc-ref args 'text)))) (send-message message (assoc-ref global 'smtp)))) (_ #f))) (define (worker-loop global-options) "Process jobs forever. Blocks if there are no jobs." (let ((processed (process-next (lambda (job args) (mail-job global-options job))))) (format (current-error-port) "[~a] ~a: ~a~%" (current-time) (get-status processed) processed)) (worker-loop global-options))