summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Wingo <wingo@pobox.com>2016-11-14 21:35:44 +0100
committerAndy Wingo <wingo@pobox.com>2016-11-14 21:50:41 +0100
commita52144002911f217e03155336ce0980ac8b5b2af (patch)
tree4328e458705077ed642b8dfd9477906d1936111a
parente447258c3f204de22c221ec153850db052acc437 (diff)
join-thread in Scheme
* module/ice-9/threads.scm (join-thread): Implement in Scheme. (call-with-new-thread): Arrange to record values in a weak table and signal the join cond. (with-mutex): Move up definition; call-with-new-thread needs it. (How was this working before?) * libguile/threads.c (guilify_self_1, guilify_self_2, do_thread_exit): Remove join queue management. * libguile/threads.c (scm_join_thread, scm_join_thread_timed): Call out to Scheme. (scm_init_ice_9_threads): Capture join-thread var.
-rw-r--r--libguile/threads.c87
-rw-r--r--libguile/threads.h2
-rw-r--r--module/ice-9/threads.scm70
3 files changed, 70 insertions, 89 deletions
diff --git a/libguile/threads.c b/libguile/threads.c
index 97320571b..2798be70f 100644
--- a/libguile/threads.c
+++ b/libguile/threads.c
@@ -408,7 +408,6 @@ guilify_self_1 (struct GC_stack_base *base)
t.pthread = scm_i_pthread_self ();
t.handle = SCM_BOOL_F;
t.result = SCM_BOOL_F;
- t.join_queue = SCM_EOL;
t.freelists = NULL;
t.pointerless_freelists = NULL;
t.dynamic_state = SCM_BOOL_F;
@@ -491,7 +490,6 @@ guilify_self_2 (SCM parent)
t->dynstack.limit = t->dynstack.base + 16;
t->dynstack.top = t->dynstack.base + SCM_DYNSTACK_HEADER_LEN;
- t->join_queue = make_queue ();
t->block_asyncs = 0;
/* See note in finalizers.c:queue_finalizer_async(). */
@@ -509,13 +507,9 @@ do_thread_exit (void *v)
scm_i_thread *t = (scm_i_thread *) v;
scm_i_scm_pthread_mutex_lock (&t->admin_mutex);
-
t->exited = 1;
close (t->sleep_pipe[0]);
close (t->sleep_pipe[1]);
- while (scm_is_true (unblock_from_queue (t->join_queue)))
- ;
-
scm_i_pthread_mutex_unlock (&t->admin_mutex);
return NULL;
@@ -867,9 +861,6 @@ SCM_DEFINE (scm_yield, "yield", 0, 0, 0,
}
#undef FUNC_NAME
-/* Some systems, notably Android, lack 'pthread_cancel'. Don't provide
- 'cancel-thread' on these systems. */
-
static SCM cancel_thread_var;
SCM
@@ -879,79 +870,26 @@ scm_cancel_thread (SCM thread)
return SCM_UNSPECIFIED;
}
+static SCM join_thread_var;
+
SCM
scm_join_thread (SCM thread)
{
- return scm_join_thread_timed (thread, SCM_UNDEFINED, SCM_UNDEFINED);
+ return scm_call_1 (scm_variable_ref (join_thread_var), thread);
}
-SCM_DEFINE (scm_join_thread_timed, "join-thread", 1, 2, 0,
- (SCM thread, SCM timeout, SCM timeoutval),
-"Suspend execution of the calling thread until the target @var{thread} "
-"terminates, unless the target @var{thread} has already terminated. ")
-#define FUNC_NAME s_scm_join_thread_timed
+SCM
+scm_join_thread_timed (SCM thread, SCM timeout, SCM timeoutval)
{
- scm_i_thread *t;
- scm_t_timespec ctimeout, *timeout_ptr = NULL;
- SCM res = SCM_BOOL_F;
-
- if (! (SCM_UNBNDP (timeoutval)))
- res = timeoutval;
-
- SCM_VALIDATE_THREAD (1, thread);
- if (scm_is_eq (scm_current_thread (), thread))
- SCM_MISC_ERROR ("cannot join the current thread", SCM_EOL);
-
- t = SCM_I_THREAD_DATA (thread);
- scm_i_scm_pthread_mutex_lock (&t->admin_mutex);
+ SCM join_thread = scm_variable_ref (join_thread_var);
- if (! SCM_UNBNDP (timeout))
- {
- to_timespec (timeout, &ctimeout);
- timeout_ptr = &ctimeout;
- }
-
- if (t->exited)
- res = t->result;
+ if (SCM_UNBNDP (timeout))
+ return scm_call_1 (join_thread, thread);
+ else if (SCM_UNBNDP (timeoutval))
+ return scm_call_2 (join_thread, thread, timeout);
else
- {
- while (1)
- {
- int err = block_self (t->join_queue, &t->admin_mutex,
- timeout_ptr);
- scm_remember_upto_here_1 (thread);
- if (err == 0)
- {
- if (t->exited)
- {
- res = t->result;
- break;
- }
- }
- else if (err == ETIMEDOUT)
- break;
-
- scm_i_pthread_mutex_unlock (&t->admin_mutex);
- SCM_TICK;
- scm_i_scm_pthread_mutex_lock (&t->admin_mutex);
-
- /* Check for exit again, since we just released and
- reacquired the admin mutex, before the next block_self
- call (which would block forever if t has already
- exited). */
- if (t->exited)
- {
- res = t->result;
- break;
- }
- }
- }
-
- scm_i_pthread_mutex_unlock (&t->admin_mutex);
-
- return res;
+ return scm_call_3 (join_thread, thread, timeout, timeoutval);
}
-#undef FUNC_NAME
SCM_DEFINE (scm_thread_p, "thread?", 1, 0, 0,
(SCM obj),
@@ -1875,6 +1813,9 @@ scm_init_ice_9_threads (void *unused)
cancel_thread_var =
scm_module_variable (scm_current_module (),
scm_from_latin1_symbol ("cancel-thread"));
+ join_thread_var =
+ scm_module_variable (scm_current_module (),
+ scm_from_latin1_symbol ("join-thread"));
call_with_new_thread_var =
scm_module_variable (scm_current_module (),
scm_from_latin1_symbol ("call-with-new-thread"));
diff --git a/libguile/threads.h b/libguile/threads.h
index db52f16b7..986049c66 100644
--- a/libguile/threads.h
+++ b/libguile/threads.h
@@ -55,8 +55,6 @@ typedef struct scm_i_thread {
SCM handle;
scm_i_pthread_t pthread;
- SCM join_queue;
-
scm_i_pthread_mutex_t admin_mutex;
SCM result;
diff --git a/module/ice-9/threads.scm b/module/ice-9/threads.scm
index 119334b46..ae6a97db9 100644
--- a/module/ice-9/threads.scm
+++ b/module/ice-9/threads.scm
@@ -85,6 +85,13 @@
+(define-syntax-rule (with-mutex m e0 e1 ...)
+ (let ((x m))
+ (dynamic-wind
+ (lambda () (lock-mutex x))
+ (lambda () (begin e0 e1 ...))
+ (lambda () (unlock-mutex x)))))
+
(define cancel-tag (make-prompt-tag "cancel"))
(define (cancel-thread thread . values)
"Asynchronously interrupt the target @var{thread} and ask it to
@@ -101,6 +108,9 @@ no-op."
(error "thread cancellation failed, throwing error instead???"))))
thread))
+(define thread-join-data (make-object-property))
+(define %thread-results (make-object-property))
+
(define* (call-with-new-thread thunk #:optional handler)
"Call @code{thunk} in a new thread and with a new dynamic state,
returning a new thread object representing the thread. The procedure
@@ -121,21 +131,60 @@ Once @var{thunk} or @var{handler} returns, the return value is made the
(with-mutex mutex
(%call-with-new-thread
(lambda ()
- (call-with-prompt cancel-tag
- (lambda ()
+ (call-with-values
+ (lambda ()
+ (with-continuation-barrier
+ (lambda ()
+ (call-with-prompt cancel-tag
+ (lambda ()
+ (lock-mutex mutex)
+ (set! thread (current-thread))
+ (set! (thread-join-data thread) (cons cv mutex))
+ (signal-condition-variable cv)
+ (unlock-mutex mutex)
+ (thunk))
+ (lambda (k . args)
+ (apply values args))))))
+ (lambda vals
(lock-mutex mutex)
- (set! thread (current-thread))
- (signal-condition-variable cv)
+ ;; Probably now you're wondering why we are going to use
+ ;; the cond variable as the key into the thread results
+ ;; object property. It's because there is a possibility
+ ;; that the thread object itself ends up as part of the
+ ;; result, and if that happens we create a cycle whereby
+ ;; the strong reference to a thread in the value of the
+ ;; weak-key hash table used by the object property prevents
+ ;; the thread from ever being collected. So instead we use
+ ;; the cv as the key. Weak-key hash tables, amirite?
+ (set! (%thread-results cv) vals)
+ (broadcast-condition-variable cv)
(unlock-mutex mutex)
- (thunk))
- (lambda (k . args)
- (apply values args)))))
+ (apply values vals)))))
(let lp ()
(unless thread
(wait-condition-variable cv mutex)
(lp))))
thread))
+(define* (join-thread thread #:optional timeout timeoutval)
+ "Suspend execution of the calling thread until the target @var{thread}
+terminates, unless the target @var{thread} has already terminated."
+ (match (thread-join-data thread)
+ (#f (error "foreign thread cannot be joined" thread))
+ ((cv . mutex)
+ (lock-mutex mutex)
+ (let lp ()
+ (cond
+ ((%thread-results cv)
+ => (lambda (results)
+ (unlock-mutex mutex)
+ (apply values results)))
+ ((if timeout
+ (wait-condition-variable cv mutex timeout)
+ (wait-condition-variable cv mutex))
+ (lp))
+ (else timeoutval))))))
+
(define* (try-mutex mutex)
"Try to lock @var{mutex}. If the mutex is already locked, return
@code{#f}. Otherwise lock the mutex and return @code{#t}."
@@ -155,13 +204,6 @@ Once @var{thunk} or @var{handler} returns, the return value is made the
(lambda () (proc arg ...))
%thread-handler))
-(define-syntax-rule (with-mutex m e0 e1 ...)
- (let ((x m))
- (dynamic-wind
- (lambda () (lock-mutex x))
- (lambda () (begin e0 e1 ...))
- (lambda () (unlock-mutex x)))))
-
(define monitor-mutex-table (make-hash-table))
(define monitor-mutex-table-mutex (make-mutex))