Add handler for downloading patch sets.
[software/mumi.git] / mumi / jobs.scm
1 ;;; mumi -- Mediocre, uh, mail interface
2 ;;; Copyright © 2020 Ricardo Wurmus <rekado@elephly.net>
3 ;;;
4 ;;; This program is free software: you can redistribute it and/or
5 ;;; modify it under the terms of the GNU Affero General Public License
6 ;;; as published by the Free Software Foundation, either version 3 of
7 ;;; the License, or (at your option) any later version.
8 ;;;
9 ;;; This program is distributed in the hope that it will be useful,
10 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
11 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 ;;; Affero General Public License for more details.
13 ;;;
14 ;;; You should have received a copy of the GNU Affero General Public
15 ;;; License along with this program. If not, see
16 ;;; <http://www.gnu.org/licenses/>.
17
18 (define-module (mumi jobs)
19 #:use-module (mumi config)
20 #:use-module (mumi send-email)
21 #:use-module (redis)
22 #:use-module (ice-9 match)
23 #:use-module (rnrs bytevectors)
24 #:export (enqueue
25 worker-loop))
26
27 ;; This variable is looked up by 'mu-message-send', uh!
28 (define-public mu-debug 0)
29
30 (define (flatten lst)
31 (cond ((null? lst) '())
32 ((pair? lst)
33 ((@@ (guile) append) (flatten (car lst))
34 (flatten (cdr lst))))
35 (else (list lst))))
36
37 ;; Take care of opening and closing a connection to redis.
38 (define-syntax with-redis
39 (lambda (x)
40 (syntax-case x ()
41 ((_ exp ...)
42 (with-syntax (($conn (datum->syntax x '$conn)))
43 #'(let (($conn (redis-connect)))
44 (dynamic-wind
45 (lambda () #t)
46 (lambda () (redis-send $conn
47 (flatten (list exp ...))))
48 (lambda () (redis-close $conn)))))))))
49
50 (define-syntax-rule (transaction exps ...)
51 (list (multi)
52 exps ...
53 (exec)))
54
55 (define %prefix "mumi:")
56 (define %waiting (string-append %prefix "waiting"))
57 (define %processing (string-append %prefix "processing"))
58
59 (define (make-getter field)
60 (lambda (item)
61 (car (with-redis
62 (get (list (string-append %prefix field ":" item)))))))
63
64 (define get-status (make-getter "status"))
65 (define get-options (make-getter "options"))
66 (define get-result (make-getter "result"))
67
68 (define (set-status! item status)
69 (set (list (string-append %prefix "status:" item)
70 (string-append status ":"
71 (number->string (current-time))))))
72
73 (define (set-options! item options)
74 (set (list (string-append %prefix "options:" item)
75 (format #f "~s" options))))
76
77 (define (set-result! item result)
78 (set (list (string-append %prefix "result:" item)
79 (format #f "~a" result))))
80
81 (define (next-waiting)
82 "Wait for an item to appear on the waiting list, then move it to
83 the processing list and return the name."
84 (car (with-redis (brpoplpush (list %waiting %processing 0)))))
85
86 (define (enqueue job-type options)
87 "Append the job to the waiting queue and store the OPTIONS."
88 (let* ((timestamp (match (gettimeofday)
89 ((s . ms) (format #f "~a~a" s ms))))
90 (job-name (format #f "~a:~a" job-type timestamp)))
91 (with-redis
92 (transaction
93 (rpush (list %waiting job-name))
94 (set-status! job-name "waiting")
95 (set-options! job-name options)))
96 job-name))
97
98 (define (done? item)
99 "Return the processing result if the ITEM has been processed or #f
100 if it has not."
101 (let ((status (get-status item))
102 (result (get-result item)))
103 (string-prefix? "success" status)))
104
105 (define (process-next processor)
106 "Get an item from the waiting queue and run PROCESSOR on it.
107 This procedure blocks until processor exits. Once PROCESSOR exits,
108 the job status and result are updated."
109 (let* ((item (next-waiting))
110 (options (get-options item)))
111 (with-redis
112 (set-status! item "processing"))
113 ;; Process the item!
114 (let ((result (processor item options)))
115 (with-redis
116 (transaction
117 (set-result! item result)
118 ;; TODO: store error message in result if job failed
119 (if result
120 (set-status! item "success")
121 (set-status! item "failed"))
122 ;; We're done processing this.
123 (lrem (list %processing 0 item)))))
124 item))
125
126 \f
127 (define (mail-job global job raw-args)
128 "Process the job JOB with RAW-ARGS."
129 (define args (call-with-input-string raw-args read))
130 (format (current-error-port)
131 "processing~% job: ~a~% options: ~a~%"
132 job raw-args)
133 (match (string-split job #\:)
134 (("mail" timestamp)
135 (let ((message (compose-message (format #f "~a via web <~a>"
136 (assoc-ref args 'from)
137 (assoc-ref global 'sender))
138 (assoc-ref args 'to)
139 #:reply-to (assoc-ref args 'reply-to)
140 #:subject (assoc-ref args 'subject)
141 #:text (assoc-ref args 'text))))
142 (send-message message (assoc-ref global 'smtp))))
143 (_ #f)))
144
145 (define (worker-loop global-options)
146 "Process jobs forever. Blocks if there are no jobs."
147 (let ((processed (process-next (lambda (job args)
148 (mail-job global-options job args)))))
149 (format (current-error-port) "[~a] ~a: ~a~%"
150 (current-time) (get-status processed) processed))
151 (worker-loop global-options))