diff options
author | Andy Wingo <wingo@pobox.com> | 2016-11-14 21:35:44 +0100 |
---|---|---|
committer | Andy Wingo <wingo@pobox.com> | 2016-11-14 21:50:41 +0100 |
commit | a52144002911f217e03155336ce0980ac8b5b2af (patch) | |
tree | 4328e458705077ed642b8dfd9477906d1936111a | |
parent | e447258c3f204de22c221ec153850db052acc437 (diff) |
join-thread in Scheme
* module/ice-9/threads.scm (join-thread): Implement in Scheme.
(call-with-new-thread): Arrange to record values in a weak table and
signal the join cond.
(with-mutex): Move up definition; call-with-new-thread needs it. (How
was this working before?)
* libguile/threads.c (guilify_self_1, guilify_self_2, do_thread_exit):
Remove join queue management.
* libguile/threads.c (scm_join_thread, scm_join_thread_timed): Call out
to Scheme.
(scm_init_ice_9_threads): Capture join-thread var.
-rw-r--r-- | libguile/threads.c | 87 | ||||
-rw-r--r-- | libguile/threads.h | 2 | ||||
-rw-r--r-- | module/ice-9/threads.scm | 70 |
3 files changed, 70 insertions, 89 deletions
diff --git a/libguile/threads.c b/libguile/threads.c index 97320571b..2798be70f 100644 --- a/libguile/threads.c +++ b/libguile/threads.c @@ -408,7 +408,6 @@ guilify_self_1 (struct GC_stack_base *base) t.pthread = scm_i_pthread_self (); t.handle = SCM_BOOL_F; t.result = SCM_BOOL_F; - t.join_queue = SCM_EOL; t.freelists = NULL; t.pointerless_freelists = NULL; t.dynamic_state = SCM_BOOL_F; @@ -491,7 +490,6 @@ guilify_self_2 (SCM parent) t->dynstack.limit = t->dynstack.base + 16; t->dynstack.top = t->dynstack.base + SCM_DYNSTACK_HEADER_LEN; - t->join_queue = make_queue (); t->block_asyncs = 0; /* See note in finalizers.c:queue_finalizer_async(). */ @@ -509,13 +507,9 @@ do_thread_exit (void *v) scm_i_thread *t = (scm_i_thread *) v; scm_i_scm_pthread_mutex_lock (&t->admin_mutex); - t->exited = 1; close (t->sleep_pipe[0]); close (t->sleep_pipe[1]); - while (scm_is_true (unblock_from_queue (t->join_queue))) - ; - scm_i_pthread_mutex_unlock (&t->admin_mutex); return NULL; @@ -867,9 +861,6 @@ SCM_DEFINE (scm_yield, "yield", 0, 0, 0, } #undef FUNC_NAME -/* Some systems, notably Android, lack 'pthread_cancel'. Don't provide - 'cancel-thread' on these systems. */ - static SCM cancel_thread_var; SCM @@ -879,79 +870,26 @@ scm_cancel_thread (SCM thread) return SCM_UNSPECIFIED; } +static SCM join_thread_var; + SCM scm_join_thread (SCM thread) { - return scm_join_thread_timed (thread, SCM_UNDEFINED, SCM_UNDEFINED); + return scm_call_1 (scm_variable_ref (join_thread_var), thread); } -SCM_DEFINE (scm_join_thread_timed, "join-thread", 1, 2, 0, - (SCM thread, SCM timeout, SCM timeoutval), -"Suspend execution of the calling thread until the target @var{thread} " -"terminates, unless the target @var{thread} has already terminated. ") -#define FUNC_NAME s_scm_join_thread_timed +SCM +scm_join_thread_timed (SCM thread, SCM timeout, SCM timeoutval) { - scm_i_thread *t; - scm_t_timespec ctimeout, *timeout_ptr = NULL; - SCM res = SCM_BOOL_F; - - if (! (SCM_UNBNDP (timeoutval))) - res = timeoutval; - - SCM_VALIDATE_THREAD (1, thread); - if (scm_is_eq (scm_current_thread (), thread)) - SCM_MISC_ERROR ("cannot join the current thread", SCM_EOL); - - t = SCM_I_THREAD_DATA (thread); - scm_i_scm_pthread_mutex_lock (&t->admin_mutex); + SCM join_thread = scm_variable_ref (join_thread_var); - if (! SCM_UNBNDP (timeout)) - { - to_timespec (timeout, &ctimeout); - timeout_ptr = &ctimeout; - } - - if (t->exited) - res = t->result; + if (SCM_UNBNDP (timeout)) + return scm_call_1 (join_thread, thread); + else if (SCM_UNBNDP (timeoutval)) + return scm_call_2 (join_thread, thread, timeout); else - { - while (1) - { - int err = block_self (t->join_queue, &t->admin_mutex, - timeout_ptr); - scm_remember_upto_here_1 (thread); - if (err == 0) - { - if (t->exited) - { - res = t->result; - break; - } - } - else if (err == ETIMEDOUT) - break; - - scm_i_pthread_mutex_unlock (&t->admin_mutex); - SCM_TICK; - scm_i_scm_pthread_mutex_lock (&t->admin_mutex); - - /* Check for exit again, since we just released and - reacquired the admin mutex, before the next block_self - call (which would block forever if t has already - exited). */ - if (t->exited) - { - res = t->result; - break; - } - } - } - - scm_i_pthread_mutex_unlock (&t->admin_mutex); - - return res; + return scm_call_3 (join_thread, thread, timeout, timeoutval); } -#undef FUNC_NAME SCM_DEFINE (scm_thread_p, "thread?", 1, 0, 0, (SCM obj), @@ -1875,6 +1813,9 @@ scm_init_ice_9_threads (void *unused) cancel_thread_var = scm_module_variable (scm_current_module (), scm_from_latin1_symbol ("cancel-thread")); + join_thread_var = + scm_module_variable (scm_current_module (), + scm_from_latin1_symbol ("join-thread")); call_with_new_thread_var = scm_module_variable (scm_current_module (), scm_from_latin1_symbol ("call-with-new-thread")); diff --git a/libguile/threads.h b/libguile/threads.h index db52f16b7..986049c66 100644 --- a/libguile/threads.h +++ b/libguile/threads.h @@ -55,8 +55,6 @@ typedef struct scm_i_thread { SCM handle; scm_i_pthread_t pthread; - SCM join_queue; - scm_i_pthread_mutex_t admin_mutex; SCM result; diff --git a/module/ice-9/threads.scm b/module/ice-9/threads.scm index 119334b46..ae6a97db9 100644 --- a/module/ice-9/threads.scm +++ b/module/ice-9/threads.scm @@ -85,6 +85,13 @@ +(define-syntax-rule (with-mutex m e0 e1 ...) + (let ((x m)) + (dynamic-wind + (lambda () (lock-mutex x)) + (lambda () (begin e0 e1 ...)) + (lambda () (unlock-mutex x))))) + (define cancel-tag (make-prompt-tag "cancel")) (define (cancel-thread thread . values) "Asynchronously interrupt the target @var{thread} and ask it to @@ -101,6 +108,9 @@ no-op." (error "thread cancellation failed, throwing error instead???")))) thread)) +(define thread-join-data (make-object-property)) +(define %thread-results (make-object-property)) + (define* (call-with-new-thread thunk #:optional handler) "Call @code{thunk} in a new thread and with a new dynamic state, returning a new thread object representing the thread. The procedure @@ -121,21 +131,60 @@ Once @var{thunk} or @var{handler} returns, the return value is made the (with-mutex mutex (%call-with-new-thread (lambda () - (call-with-prompt cancel-tag - (lambda () + (call-with-values + (lambda () + (with-continuation-barrier + (lambda () + (call-with-prompt cancel-tag + (lambda () + (lock-mutex mutex) + (set! thread (current-thread)) + (set! (thread-join-data thread) (cons cv mutex)) + (signal-condition-variable cv) + (unlock-mutex mutex) + (thunk)) + (lambda (k . args) + (apply values args)))))) + (lambda vals (lock-mutex mutex) - (set! thread (current-thread)) - (signal-condition-variable cv) + ;; Probably now you're wondering why we are going to use + ;; the cond variable as the key into the thread results + ;; object property. It's because there is a possibility + ;; that the thread object itself ends up as part of the + ;; result, and if that happens we create a cycle whereby + ;; the strong reference to a thread in the value of the + ;; weak-key hash table used by the object property prevents + ;; the thread from ever being collected. So instead we use + ;; the cv as the key. Weak-key hash tables, amirite? + (set! (%thread-results cv) vals) + (broadcast-condition-variable cv) (unlock-mutex mutex) - (thunk)) - (lambda (k . args) - (apply values args))))) + (apply values vals))))) (let lp () (unless thread (wait-condition-variable cv mutex) (lp)))) thread)) +(define* (join-thread thread #:optional timeout timeoutval) + "Suspend execution of the calling thread until the target @var{thread} +terminates, unless the target @var{thread} has already terminated." + (match (thread-join-data thread) + (#f (error "foreign thread cannot be joined" thread)) + ((cv . mutex) + (lock-mutex mutex) + (let lp () + (cond + ((%thread-results cv) + => (lambda (results) + (unlock-mutex mutex) + (apply values results))) + ((if timeout + (wait-condition-variable cv mutex timeout) + (wait-condition-variable cv mutex)) + (lp)) + (else timeoutval)))))) + (define* (try-mutex mutex) "Try to lock @var{mutex}. If the mutex is already locked, return @code{#f}. Otherwise lock the mutex and return @code{#t}." @@ -155,13 +204,6 @@ Once @var{thunk} or @var{handler} returns, the return value is made the (lambda () (proc arg ...)) %thread-handler)) -(define-syntax-rule (with-mutex m e0 e1 ...) - (let ((x m)) - (dynamic-wind - (lambda () (lock-mutex x)) - (lambda () (begin e0 e1 ...)) - (lambda () (unlock-mutex x))))) - (define monitor-mutex-table (make-hash-table)) (define monitor-mutex-table-mutex (make-mutex)) |