blob: 69ef2b22f31d2a22335cbaab27b244a69257e9a2 (
about) (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
;;; mumi -- Mediocre, uh, mail interface
;;; Copyright © 2020, 2022 Ricardo Wurmus <rekado@elephly.net>
;;;
;;; This program is free software: you can redistribute it and/or
;;; modify it under the terms of the GNU Affero General Public License
;;; as published by the Free Software Foundation, either version 3 of
;;; the License, or (at your option) any later version.
;;;
;;; This program is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; Affero General Public License for more details.
;;;
;;; You should have received a copy of the GNU Affero General Public
;;; License along with this program. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (mumi jobs)
#:use-module (mumi config)
#:use-module (mumi send-email)
#:use-module (redis)
#:use-module (ice-9 match)
#:use-module (rnrs bytevectors)
#:export (enqueue
worker-loop))
;; This variable is looked up by 'mu-message-send', uh!
(define-public mu-debug 0)
(define (flatten lst)
(cond ((null? lst) '())
((pair? lst)
((@@ (guile) append) (flatten (car lst))
(flatten (cdr lst))))
(else (list lst))))
;; Take care of opening and closing a connection to redis.
(define-syntax with-redis
(lambda (x)
(syntax-case x ()
((_ exp ...)
(with-syntax (($conn (datum->syntax x '$conn)))
#'(let (($conn (redis-connect)))
(dynamic-wind
(lambda () #t)
(lambda () (redis-send $conn
(flatten (list exp ...))))
(lambda () (redis-close $conn)))))))))
(define-syntax-rule (transaction exps ...)
(list (multi)
exps ...
(exec)))
(define %prefix "mumi:")
(define %waiting (string-append %prefix "waiting"))
(define %processing (string-append %prefix "processing"))
(define (make-getter field)
(lambda (item)
(car (with-redis
(get (list (string-append %prefix field ":" item)))))))
(define get-status (make-getter "status"))
(define (set-status! item status)
(set (list (string-append %prefix "status:" item)
(string-append status ":"
(number->string (current-time))))))
(define (next-waiting)
"Wait for an item to appear on the waiting list, then move it to
the processing list and return the name."
(car (with-redis (brpoplpush (list %waiting %processing 0)))))
(define (enqueue job-type options)
"Append the job to the waiting queue and store the OPTIONS."
(let* ((timestamp (match (gettimeofday)
((s . ms) (format #f "~a~a" s ms))))
(job-name (format #f "~a:~a" job-type timestamp)))
(with-redis
(rpush (list %waiting
(format #f "~s" (cons `(id . ,job-name) options)))))
job-name))
(define (process-next processor)
"Get an item from the waiting queue and run PROCESSOR on it.
This procedure blocks until processor exits. Once PROCESSOR exits,
the job status is updated."
(let* ((item (next-waiting)))
(with-redis
(set-status! item "processing"))
;; Process the item!
(let ((result (processor item)))
(with-redis
(transaction
;; TODO: store error message in result if job failed
(if result
(set-status! item "success")
(set-status! item "failed"))
;; We're done processing this.
(lrem (list %processing 0 item)))))
item))
(define (mail-job global job)
"Process the job JOB."
(define args (call-with-input-string job read))
(format (current-error-port)
"processing~% job: ~a~% options: ~a~%"
job args)
(match (string-split job #\:)
(("mail" timestamp)
(let ((message (compose-message (format #f "~a via web <~a>"
(assoc-ref args 'from)
(assoc-ref global 'sender))
(assoc-ref args 'to)
#:reply-to (assoc-ref args 'reply-to)
#:subject (assoc-ref args 'subject)
#:text (assoc-ref args 'text))))
(send-message message (assoc-ref global 'smtp))))
(_ #f)))
(define (worker-loop global-options)
"Process jobs forever. Blocks if there are no jobs."
(let ((processed (process-next (lambda (job args)
(mail-job global-options job)))))
(format (current-error-port) "[~a] ~a: ~a~%"
(current-time) (get-status processed) processed))
(worker-loop global-options))
|