iter queue." (let/ec return (let* ((suspend (lambda (cont future-to-wait) ;; FUTURE wishes to wait for the completion of FUTURE-TO-WAIT. ;; At this point, FUTURE is unlocked and in `started' state, ;; and FUTURE-TO-WAIT is unlocked. (with-mutex %futures-mutex (with-mutex (future-mutex future) (set-future-thunk! future cont) (set-future-state! future 'queued)) (with-mutex (future-mutex future-to-wait) ;; If FUTURE-TO-WAIT completed in the meantime, then ;; reschedule FUTURE directly; otherwise, add it to the ;; waiter queue. (if (eq? 'done (future-state future-to-wait)) (begin (enq! %futures future) (signal-condition-variable %futures-available)) (set! %futures-waiting (alist-cons future-to-wait future %futures-waiting)))) (return #f)))) (thunk (lambda () (call-with-prompt %future-prompt (lambda () (parameterize ((%within-future? #t)) ((future-thunk future)))) suspend)))) (set-future-result! future (catch #t (lambda () (call-with-values thunk (lambda results (lambda () (apply values results))))) (lambda args (lambda () (apply throw args))))) #t))) (define (process-one-future) "Attempt to pick one future from the queue and process it." ;; %FUTURES-MUTEX must be locked on entry, and is locked on exit. (or (q-empty? %futures) (let ((future (deq! %futures))) (lock-mutex (future-mutex future)) (case (future-state future) ((done started) ;; Nothing to do. (unlock-mutex (future-mutex future))) (else ;; Do the actual work. ;; We want to release %FUTURES-MUTEX so that other workers can ;; progress. However, to avoid deadlocks, we have to unlock ;; FUTURE as well, to preserve lock ordering. (unlock-mutex (future-mutex future)) (unlock-mutex %futures-mutex) (lock-mutex (future-mutex future)) (if (eq? (future-state future) 'queued) ; lost the race? (begin ; no, so let's process it (set-future-state! future 'started) (unlock-mutex (future-mutex future)) (let ((done? (process-future! future))) (when done? (with-mutex %futures-mutex (with-mutex (future-mutex future) (set-future-state! future 'done) (notify-completion future)))))) (unlock-mutex (future-mutex future))) ; yes (lock-mutex %futures-mutex)))))) (define (process-futures) "Continuously process futures from the queue." (lock-mutex %futures-mutex) (let loop () (when (q-empty? %futures) (wait-condition-variable %futures-available %futures-mutex)) (process-one-future) (loop))) (define (notify-completion future) "Notify futures and callers waiting that FUTURE completed." ;; FUTURE and %FUTURES-MUTEX are locked. (broadcast-condition-variable (future-completion future)) (let-values (((waiting remaining) (partition (match-lambda ; TODO: optimize ((waitee . _) (eq? waitee future))) %futures-waiting))) (set! %futures-waiting remaining) (for-each (match-lambda ((_ . waiter) (enq! %futures waiter))) waiting))) (define (touch future) "Return the result of FUTURE, computing it if not already done." (define (work) ;; Do some work while waiting for FUTURE to complete. (lock-mutex %futures-mutex) (if (q-empty? %futures) (begin (unlock-mutex %futures-mutex) (with-mutex (future-mutex future) (unless (eq? 'done (future-state future)) (wait-condition-variable (future-completion future) (future-mutex future))))) (begin (process-one-future) (unlock-mutex %futures-mutex)))) (let loop () (lock-mutex (future-mutex future)) (case (future-state future) ((done) (unlock-mutex (future-mutex future))) ((started) (unlock-mutex (future-mutex future)) (if (%within-future?) (abort-to-prompt %future-prompt future) (begin (work) (loop)))) (else (unlock-mutex (future-mutex future)) (work) (loop)))) ((future-result future)))