summaryrefslogtreecommitdiff
path: root/mumi/jobs.scm
blob: 69ef2b22f31d2a22335cbaab27b244a69257e9a2 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
;;; mumi -- Mediocre, uh, mail interface
;;; Copyright © 2020, 2022 Ricardo Wurmus <rekado@elephly.net>
;;;
;;; This program is free software: you can redistribute it and/or
;;; modify it under the terms of the GNU Affero General Public License
;;; as published by the Free Software Foundation, either version 3 of
;;; the License, or (at your option) any later version.
;;;
;;; This program is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;;; Affero General Public License for more details.
;;;
;;; You should have received a copy of the GNU Affero General Public
;;; License along with this program.  If not, see
;;; <http://www.gnu.org/licenses/>.

(define-module (mumi jobs)
  #:use-module (mumi config)
  #:use-module (mumi send-email)
  #:use-module (redis)
  #:use-module (ice-9 match)
  #:use-module (rnrs bytevectors)
  #:export (enqueue
            worker-loop))

;; This variable is looked up by 'mu-message-send', uh!
(define-public mu-debug 0)

(define (flatten lst)
  (cond ((null? lst) '())
        ((pair? lst)
         ((@@ (guile) append) (flatten (car lst))
                 (flatten (cdr lst))))
        (else (list lst))))

;; Take care of opening and closing a connection to redis.
(define-syntax with-redis
  (lambda (x)
    (syntax-case x ()
      ((_ exp ...)
       (with-syntax (($conn (datum->syntax x '$conn)))
         #'(let (($conn (redis-connect)))
             (dynamic-wind
               (lambda () #t)
               (lambda () (redis-send $conn
                                 (flatten (list exp ...))))
               (lambda () (redis-close $conn)))))))))

(define-syntax-rule (transaction exps ...)
  (list (multi)
        exps ...
        (exec)))

(define %prefix     "mumi:")
(define %waiting    (string-append %prefix "waiting"))
(define %processing (string-append %prefix "processing"))

(define (make-getter field)
  (lambda (item)
    (car (with-redis
          (get (list (string-append %prefix field ":" item)))))))

(define get-status  (make-getter "status"))
(define (set-status! item status)
  (set (list (string-append %prefix "status:" item)
             (string-append status ":"
                            (number->string (current-time))))))

(define (next-waiting)
  "Wait for an item to appear on the waiting list, then move it to
the processing list and return the name."
  (car (with-redis (brpoplpush (list %waiting %processing 0)))))

(define (enqueue job-type options)
  "Append the job to the waiting queue and store the OPTIONS."
  (let* ((timestamp (match (gettimeofday)
                      ((s . ms) (format #f "~a~a" s ms))))
         (job-name (format #f "~a:~a" job-type timestamp)))
    (with-redis
     (rpush (list %waiting
                  (format #f "~s" (cons `(id . ,job-name) options)))))
    job-name))

(define (process-next processor)
  "Get an item from the waiting queue and run PROCESSOR on it.
This procedure blocks until processor exits.  Once PROCESSOR exits,
the job status is updated."
  (let* ((item (next-waiting)))
    (with-redis
     (set-status! item "processing"))
    ;; Process the item!
    (let ((result (processor item)))
      (with-redis
       (transaction
        ;; TODO: store error message in result if job failed
        (if result
            (set-status! item "success")
            (set-status! item "failed"))
        ;; We're done processing this.
        (lrem (list %processing 0 item)))))
    item))


(define (mail-job global job)
  "Process the job JOB."
  (define args (call-with-input-string job read))
  (format (current-error-port)
          "processing~%  job: ~a~%  options: ~a~%"
          job args)
  (match (string-split job #\:)
    (("mail" timestamp)
     (let ((message (compose-message (format #f "~a via web <~a>"
                                             (assoc-ref args 'from)
                                             (assoc-ref global 'sender))
                                     (assoc-ref args 'to)
                                     #:reply-to (assoc-ref args 'reply-to)
                                     #:subject (assoc-ref args 'subject)
                                     #:text (assoc-ref args 'text))))
       (send-message message (assoc-ref global 'smtp))))
    (_ #f)))

(define (worker-loop global-options)
  "Process jobs forever.  Blocks if there are no jobs."
  (let ((processed (process-next (lambda (job args)
                                   (mail-job global-options job)))))
    (format (current-error-port) "[~a] ~a: ~a~%"
            (current-time) (get-status processed) processed))
  (worker-loop global-options))