changelog shortlog graph tags branches changeset file revisions annotate raw help

Mercurial > core / lisp/lib/net/proto/crew.lisp

revision 162: cc74c0054bc1
parent 117: d0b235557fab
child 267: f3d814fb136a
     1.1--- a/lisp/lib/net/proto/crew.lisp	Tue Jan 02 17:43:11 2024 -0500
     1.2+++ b/lisp/lib/net/proto/crew.lisp	Wed Jan 03 17:26:27 2024 -0500
     1.3@@ -1,45 +1,690 @@
     1.4 (in-package :net/proto/crew)
     1.5 
     1.6 (defclass crew-connection-info ()
     1.7-  ((host-name :type string
     1.8-              :documentation "Host this worker is running on.")
     1.9-   (port :type port
    1.10-         :documentation "Port on which the worker's swank server is listening for connections.")))
    1.11+  ((host-name :initarg :host-name
    1.12+              :initform "localhost"
    1.13+              :type string
    1.14+              :documentation "Host this worker is running on."
    1.15+              :accessor host-name)
    1.16+   (port :initarg :port
    1.17+         :type port
    1.18+         :documentation "Port on which the worker's swank server is listening for connections."
    1.19+         :accessor port)))
    1.20 
    1.21 (defclass crew-worker ()
    1.22-  ((connection-info :type crew-connection-info)
    1.23-   (lock :initform (make-mutex :name "worker"))
    1.24+  ((connection-info :type crew-connection-info :accessor connection-info :initarg :connection-info)
    1.25+   (lock :initform (make-mutex :name "worker") :accessor lock)
    1.26+   (set-worker :accessor set-worker
    1.27+               :type function
    1.28+               :initform #'identity
    1.29+               :documentation "Function called to record which worker is evaluating a form.")
    1.30    (connection :type (or null swank-connection)
    1.31-               :initform nil))
    1.32+               :initform nil :accessor connection))
    1.33   (:documentation "A remote Lisp running a Swank server."))
    1.34 
    1.35 (defclass crew-worker-pool (id)
    1.36-  ((connect-info :type crew-connection-info)
    1.37-   (workers :type vector :initform (vector))
    1.38-   (lock :initform (make-mutex :name "worker-pool"))
    1.39-   (idle-workers :type list :initform nil)
    1.40-   (worker-available :initform (make-gate))
    1.41-   (disconnecting :initform nil)
    1.42-   (replay-forms-lock :initform (make-mutex :name "replay-forms-lock"))
    1.43-   (replay-forms :type list :initform nil)))
    1.44+  ((connection-info :type crew-connection-info :accessor connection-info)
    1.45+   (leader :reader leader
    1.46+           :type crew-connection-info
    1.47+           :initarg :leader
    1.48+           :initform (required-argument))
    1.49+   (workers :type vector :initform (vector) :accessor workers)
    1.50+   (lock :initform (make-mutex :name "worker-pool") :accessor lock)
    1.51+   (idle-workers :type list :initform nil :accessor idle-workers)
    1.52+   (worker-ready :initform (make-gate :name "worker-ready" :open nil) :accessor worker-ready)
    1.53+   (disconnecting :initform nil :accessor disconnecting)
    1.54+   (replay-forms-lock :initform (make-mutex :name "replay-forms-lock") :accessor replay-forms-lock)
    1.55+   (replay-forms :type list :initform nil :accessor replay-forms)))
    1.56 
    1.57 (defvar *crew-worker-pools-lock* (make-mutex :name "worker-pools-lock")
    1.58   "Lock protecting access to *WORKER-POOLS*.")
    1.59 
    1.60+;; could be chashtable
    1.61 (defvar *crew-worker-pools* (make-hash-table) "Mapping from worker pool IDs to active worker pools.")
    1.62 
    1.63+(defun worker-counts (worker-pool)
    1.64+  "Returns the number of idle, busy, and disconnected workers in WORKER-POOL.
    1.65+This function executes without locking WORKER-POOL, so it may return
    1.66+inconsistent information."
    1.67+  ;; We copy the idle workers list without holding a lock on WORKER-POOL.  Other threads may be
    1.68+  ;; simultaneously popping workers off the head of the list, so we may get stale data.
    1.69+  (let ((idlers (copy-seq (idle-workers worker-pool)))
    1.70+        (idle-hash (make-hash-table :test 'eq)))
    1.71+    (dolist (idle-worker idlers)
    1.72+      (setf (gethash idle-worker idle-hash) t))
    1.73+    (let ((idle 0)
    1.74+          (busy 0)
    1.75+          (disconnected 0))
    1.76+      (loop for worker across (workers worker-pool) do
    1.77+        ;; We call (CONNECTION WORKER) without holding the worker's lock, so we may get stale data
    1.78+        ;; here as well.
    1.79+        (cond ((null (connection worker)) (incf disconnected))
    1.80+              ((gethash worker idle-hash) (incf idle))
    1.81+              (t (incf busy))))
    1.82+      (values idle busy disconnected))))
    1.83+
    1.84+(defun worker-count (worker-pool)
    1.85+  "Returns the total number of workers in WORKER-POOL."
    1.86+  (length (workers worker-pool)))
    1.87+
    1.88+(defmethod print-object ((worker-pool crew-worker-pool) stream)
    1.89+  "Prints WORKER-POOL to STREAM.  This function runs without locking
    1.90+WORKER-POOL, so it may output inconsistent information."
    1.91+  (print-unreadable-object (worker-pool stream :type t :identity t)
    1.92+    (multiple-value-bind (idle busy disconnected)
    1.93+        (worker-counts worker-pool)
    1.94+      (format stream "id: ~S workers: ~D idle: ~D busy: ~D disconnected: ~D"
    1.95+              (id worker-pool) (worker-count worker-pool) idle busy disconnected))))
    1.96+
    1.97 (defmethod initialize-instance :after ((self crew-worker-pool) &key)
    1.98   (with-mutex (*crew-worker-pools-lock*)
    1.99     (setf (gethash (id self) *crew-worker-pools*) self)))
   1.100 
   1.101+(defun find-worker-pool (worker-pool-id)
   1.102+  "Returns the worker pool identified by WORKER-POOL-ID."
   1.103+  (with-mutex (*crew-worker-pools-lock*)
   1.104+    (gethash worker-pool-id *crew-worker-pools*)))
   1.105+
   1.106+(defun make-worker-pool (leader connect-infos connect-worker)
   1.107+  (let* ((worker-pool (make-instance 'crew-worker-pool
   1.108+                        :leader leader))
   1.109+         (workers
   1.110+           (loop for connect-info in connect-infos
   1.111+                 for worker = (make-instance 'crew-worker :connection-info connect-info)
   1.112+                 do (let ((worker worker))
   1.113+                      (setf (connection worker)
   1.114+                            (funcall connect-worker
   1.115+                                     connect-info
   1.116+                                     (lambda () (handle-connection-closed worker worker-pool)))))
   1.117+                 collect worker)))
   1.118+    (setf (workers worker-pool) (coerce workers 'vector)
   1.119+          (idle-workers worker-pool) workers)
   1.120+    (make-thread (lambda () (reconnect-workers worker-pool)) :name "reconnector")
   1.121+    worker-pool))
   1.122+
   1.123 (defgeneric connect-worker (info hook)
   1.124   (:documentation
   1.125    "Creates a connection a worker's Swank server using INFO. Passes
   1.126    thunk HOOK to SWANK-CLIENT:SLIME-CONNECT so that it is
   1.127    evoked when the connection closes."))
   1.128 
   1.129+(defmethod connect-worker ((connect-info crew-connection-info) close-handler)
   1.130+  (slime-connect (host-name connect-info) (port connect-info) close-handler))
   1.131+
   1.132+(defun connect-workers (host/port-alist leader)
   1.133+  "Makes Swank connections to all the workers in HOST/PORT-ALIST and returns a
   1.134+WORKER-POOL containing them.  HOST/PORT-ALIST is a list of (host-name . port)
   1.135+pairs.  MASTER-HOST-NAME and MASTER-SWANK-PORT are a host name and Swank port
   1.136+that workers can use to return results to the master."
   1.137+  (let ((connect-infos
   1.138+          (loop for (host-name . port) in host/port-alist
   1.139+                collect (make-instance 'crew-connection-info :host-name host-name :port port))))
   1.140+    (make-worker-pool leader connect-infos #'connect-worker)))
   1.141+
   1.142+(defun disconnect-workers (worker-pool)
   1.143+  "Closes the Swank connections of all connected workers in WORKER-POOL."
   1.144+  (with-mutex ((lock worker-pool))
   1.145+    (when (disconnecting worker-pool)
   1.146+      (return-from disconnect-workers))
   1.147+    ;; Signal that the worker pool is being torn down and make sure there are no idle workers
   1.148+    ;; available.  After the pool is marked as disconnecting, no workers will transition to idle.
   1.149+    (setf (disconnecting worker-pool) t)
   1.150+    (setf (idle-workers worker-pool) '()))
   1.151+  (flet ((disconnect (worker)
   1.152+           (with-mutex ((lock worker))
   1.153+             (when (connection worker)
   1.154+               (slime-close (connection worker))))))
   1.155+    ;; Disconnect all workers.
   1.156+    (loop for worker across (workers worker-pool) do (disconnect worker)))
   1.157+  (values))
   1.158+
   1.159 (defgeneric reconnect-worker (info hook)
   1.160   (:documentation
   1.161    "Reconnects to a Swank server using information in INFO.  Passes the
   1.162 thunk HOOK to SWANK-CLIENT:SLIME-CONNECT, so that it is invoked when
   1.163 the connection closes."))
   1.164+
   1.165+(defmethod reconnect-worker ((connect-info crew-connection-info) close-handler)
   1.166+  (connect-worker connect-info close-handler))
   1.167+
   1.168+(defconstant +worker-reconnect-interval+ 10 "Seconds between attempts to reconnect workers.")
   1.169+
   1.170+(defun reconnect-workers (worker-pool)
   1.171+  "Reconnects disconnected workers in WORKER-POOL."
   1.172+  (loop
   1.173+    (sleep +worker-reconnect-interval+)
   1.174+    (with-mutex ((lock worker-pool))
   1.175+      (when (disconnecting worker-pool)
   1.176+        (return-from reconnect-workers)))
   1.177+    (loop for worker across (workers worker-pool) do
   1.178+      (let ((worker worker))
   1.179+        (when (with-mutex ((lock worker))
   1.180+                (unless (connection worker)
   1.181+                  (let ((close-handler (lambda () (handle-connection-closed worker worker-pool))))
   1.182+                    (setf (connection worker)
   1.183+                          (reconnect-worker (connection-info worker) close-handler)))))
   1.184+          (with-mutex ((lock worker-pool))
   1.185+            (when (member worker (idle-workers worker-pool))
   1.186+              ;; We reconnected an idle worker, so notify the pool that a worker is available.
   1.187+              (condition-notify (worker-ready worker-pool)))))))))
   1.188+
   1.189+(defun allocate-worker (worker-pool &key worker-to-avoid)
   1.190+  "Allocates and returns an idle worker from WORKER-POOL that is connected.  If
   1.191+WORKER-POOL is being shut down, then NIL is returned."
   1.192+  (with-mutex ((lock worker-pool))
   1.193+    (loop
   1.194+      (when (disconnecting worker-pool) (return nil))
   1.195+      ;; Find a connected idle worker.
   1.196+      (let ((idle-worker (find-if (lambda (worker)
   1.197+                                    (unless (eql worker worker-to-avoid)
   1.198+                                      (with-mutex ((lock worker))
   1.199+                                        (connection worker))))
   1.200+                                  (idle-workers worker-pool))))
   1.201+        (when idle-worker
   1.202+          (setf (idle-workers worker-pool) (remove idle-worker (idle-workers worker-pool)))
   1.203+          (return idle-worker)))
   1.204+      (condition-wait (worker-ready worker-pool) (lock worker-pool)))))
   1.205+
   1.206+(defun free-worker (worker worker-pool)
   1.207+  "Changes the state of WORKER to idle, so that it's available for allocation."
   1.208+  (with-mutex ((lock worker-pool))
   1.209+    (unless (disconnecting worker-pool)
   1.210+      (push worker (idle-workers worker-pool))
   1.211+      (when (connection worker)
   1.212+        ;; The free worker is connected, so notify the pool that a worker is available.
   1.213+        (condition-notify (worker-ready worker-pool))))))
   1.214+
   1.215+(defun allocate-worker-and-evaluate (worker-pool form set-worker worker-done)
   1.216+  "Allocates a connected worker from WORKER-POOL and sends it FORM to evaluate.
   1.217+Returns the worker that was successfully allocated, or NIL if WORKER-POOL is
   1.218+shutting down.  SET-WORKER is a function that is called to tell the WORKER-DONE
   1.219+continuation what worker is evaluating FORM."
   1.220+  (let ((worker nil)
   1.221+        (disconnected-idle-workers '()))
   1.222+    (loop do
   1.223+      (setf worker (allocate-worker worker-pool))
   1.224+      (unless worker (return-from allocate-worker-and-evaluate nil))
   1.225+      (with-mutex ((lock worker))
   1.226+        ;; Has the connected idle worker we allocated just disconnected?
   1.227+        (let ((connection (connection worker)))
   1.228+          (if connection
   1.229+              (progn
   1.230+                (setf (set-worker worker) set-worker)
   1.231+                (funcall set-worker worker)
   1.232+                ;; Ignore network errors.  If there is a network problem, socket keep alives will
   1.233+                ;; eventually discover that the connection is dead and HANDLE-CONNECTION-CLOSED
   1.234+                ;; will migrate the work to another worker.
   1.235+                (handler-case (slime-eval-async form connection worker-done)
   1.236+                  (slime-network-error ()))
   1.237+                (loop-finish))
   1.238+              ;; The worker is disconnected, so remember it and look for another one.
   1.239+              (push worker disconnected-idle-workers)))))
   1.240+    ;; Place any disconnected workers we found back on the idle workers list.
   1.241+    (dolist (w disconnected-idle-workers) (free-worker w worker-pool))
   1.242+    worker))
   1.243+
   1.244+(defun handle-connection-closed (disconnected-worker worker-pool)
   1.245+  "Called when the connection to DISCONNECTED-WORKER, a member of WORKER-POOL,
   1.246+is closed because of a call to SLIME-CLOSE, the death of the worker, or because
   1.247+of a communications error.  Moves all uncompleted work intended for
   1.248+DISCONNECTED-WORKER to another idle connected worker in WORKER-POOL."
   1.249+  (with-mutex ((lock disconnected-worker))
   1.250+    (let ((old-connection (connection disconnected-worker))
   1.251+          (disconnected-idle-workers '()))
   1.252+      (when (slime-pending-evals-p old-connection)
   1.253+        (loop do
   1.254+          (let ((worker (allocate-worker worker-pool :worker-to-avoid disconnected-worker)))
   1.255+            (unless worker (return-from handle-connection-closed))
   1.256+            (with-mutex ((lock worker))
   1.257+              ;; Has the connected idle worker we allocated just disconnected?
   1.258+              (let ((connection (connection worker)))
   1.259+                (if connection
   1.260+                    (let ((old-set-worker (set-worker disconnected-worker)))
   1.261+                      ;; Migrate all pending work from DISCONNECTED-WORKER to WORKER.
   1.262+                      (setf (set-worker worker) old-set-worker)
   1.263+                      (funcall old-set-worker worker)
   1.264+                      ;; Ignore Slime network errors.  If there is a network problem, socket keep
   1.265+                      ;; alives will eventually discover that the connection is dead and
   1.266+                      ;; HANDLE-CONNECTION-CLOSED will migrate the pending work again.
   1.267+                      (handler-case (slime-migrate-evals old-connection connection)
   1.268+                        (slime-network-error ()))
   1.269+                      (loop-finish))
   1.270+                    ;; The worker is disconnected, so remember it and look for another one.
   1.271+                    (push worker disconnected-idle-workers))))))
   1.272+        ;; Place any disconnected workers we found back on the idle workers list.
   1.273+        (dolist (w disconnected-idle-workers) (free-worker w worker-pool))))
   1.274+    ;; Allow the reconnector to see that DISCONNECTED-WORKER is dead.
   1.275+    (setf (connection disconnected-worker) nil)))
   1.276+
   1.277+
   1.278+(defun no-workers-p (worker-pool)
   1.279+  "Returns T if WORKER-POOL is NIL or contains no workers."
   1.280+  (or (null worker-pool) (zerop (worker-count worker-pool))))
   1.281+
   1.282+(defun eval-on-master (make-work list result-done)
   1.283+  "Iterates over the members of LIST calling MAKE-WORK on each one.  If
   1.284+RESULT-DONE is not NIL, it must be a function of two arguments.  In this case,
   1.285+after each application of MAKE-WORK to a LIST member, RESULT-DONE is called with
   1.286+the member's position in LIST and MAKE-WORK's result."
   1.287+  (loop for position from 0
   1.288+        for element in list
   1.289+        for result = (eval (funcall make-work element))
   1.290+        collect result
   1.291+        do (when result-done (funcall result-done position result))))
   1.292+
   1.293+(defun add-evaluated-form (worker-pool form)
   1.294+  "Adds FORM to the set of forms that need to be evaluated on a new worker when
   1.295+it joins WORKER-POOL."
   1.296+  (with-mutex ((replay-forms-lock worker-pool))
   1.297+    (push form (replay-forms worker-pool))))
   1.298+
   1.299+(defun unevaluated-replay-forms (worker-pool-id evaluated-count)
   1.300+  "For a worker that has executed EVALUATED-COUNT of the replay forms associated
   1.301+with the worker-pool identified by WORKER-POOL-ID, returns a list of the forms
   1.302+the worker needs to evaluate in order to be completely up to date."
   1.303+  (let ((worker-pool (find-worker-pool worker-pool-id)))
   1.304+    (when worker-pool
   1.305+      (with-mutex ((replay-forms-lock worker-pool))
   1.306+        (nthcdr evaluated-count (reverse (replay-forms worker-pool)))))))
   1.307+
   1.308+(defun ensure-caught-up-then-evaluate (form worker-pool replay-required)
   1.309+  "Returns a form that when evaluated on a worker in WORKER-POOL ensures that
   1.310+the worker is caught up then evaluates FORM and returns its result.
   1.311+REPLAY-REQUIRED indicates whether new workers must evaluate FORM before being
   1.312+considered to be caught up."
   1.313+  (let ((forms-count (with-mutex ((replay-forms-lock worker-pool))
   1.314+                       (length (replay-forms worker-pool))))
   1.315+        (master-host-name (host-name (leader worker-pool)))
   1.316+        (master-swank-port (port (leader worker-pool)))
   1.317+        (worker-pool-id (id worker-pool)))
   1.318+    `(evaluate-form ',form ,master-host-name ,master-swank-port ,worker-pool-id ,forms-count
   1.319+                    ,replay-required)))
   1.320+
   1.321+(defun dispatch-work (worker-pool make-work list result-done retain-workers replay-required)
   1.322+  "Traverses LIST, calling MAKE-WORK on each element to create a form that is
   1.323+then passed to a remote worker in WORKER-POOL for evaluation.  When
   1.324+RETAIN-WORKERS is true, each form is evaluated on a unique worker.  Otherwise,
   1.325+workers are reused and may process multiple forms.  In either case, the results
   1.326+of evaluating each form are collected into a list, which is returned when every
   1.327+remote worker is done.  If RESULT-DONE is not NIL, then it must be a function of
   1.328+two arguments.  In this case RESULT-DONE is called on the master as each worker
   1.329+returns a result.  The arguments to RESULT-DONE are an integer counter,
   1.330+indicating the work form's position in LIST, and the result of evaluating the
   1.331+form.  REPLAY-REQUIRED indicates whether new workers will have to perform the
   1.332+work generated by MAKE-WORK before being considered to be caught up."
   1.333+  (if (no-workers-p worker-pool)
   1.334+      (eval-on-master make-work list result-done)
   1.335+      (let* ((work-count (length list))
   1.336+             (results (make-list work-count))
   1.337+             (done-lock (make-mutex :name "dispatch-work"))
   1.338+             (done-condition (make-gate :name "done" :open nil)))
   1.339+        (loop for element in list
   1.340+              for result-cons = results then (cdr result-cons)
   1.341+              for position from 0
   1.342+              do (let ((result-cons result-cons)
   1.343+                       (worker nil)
   1.344+                       (position position))
   1.345+                   (flet ((set-worker (w) (setf worker w))
   1.346+                          (worker-done (result)
   1.347+                            (setf (car result-cons) result)
   1.348+                            ;; Unless we need each worker to evaluate exactly one form, free the
   1.349+                            ;; worker, so it can process other work.
   1.350+                            (unless retain-workers (free-worker worker worker-pool))
   1.351+                            (with-mutex (done-lock)
   1.352+                              (decf work-count)
   1.353+                              (when result-done (funcall result-done position result))
   1.354+                              (when (zerop work-count)
   1.355+                                (condition-notify done-condition)))))
   1.356+                     (let ((work (ensure-caught-up-then-evaluate (funcall make-work element)
   1.357+                                                                 worker-pool
   1.358+                                                                 replay-required)))
   1.359+                       (allocate-worker-and-evaluate worker-pool
   1.360+                                                     work
   1.361+                                                     #'set-worker
   1.362+                                                     #'worker-done)))))
   1.363+        (with-mutex (done-lock)
   1.364+          (loop until (zerop work-count)
   1.365+                do (condition-wait done-condition done-lock)))
   1.366+        ;; When we need each worker to evaluate exactly one form, we end up having allocated every
   1.367+        ;; worker in the pool, so we need to free them all.
   1.368+        (when retain-workers
   1.369+          (loop for worker across (workers worker-pool) do (free-worker worker worker-pool)))
   1.370+        results)))
   1.371+
   1.372+(defun eval-form-all-workers (worker-pool form &key result-done (replay-required t))
   1.373+  "Evaluates FORM on all workers in WORKER-POOL.  When RESULT-DONE is non-NIL,
   1.374+it must be a function of two arguments.  In this case RESULT-DONE is called as
   1.375+each worker returns a result with two arguments, a non-negative integer
   1.376+representing the order in which the work was dispatched and the worker's result.
   1.377+If REPLAY-REQUIRED is true, which is the default, FORM will be remembered and
   1.378+evaluated again for side effects on any new worker that joins POOL."
   1.379+  (let* ((work-list (make-list (if (no-workers-p worker-pool) 1 (worker-count worker-pool))))
   1.380+         (make-work (constantly form))
   1.381+         (results (dispatch-work worker-pool make-work work-list result-done t replay-required)))
   1.382+    (when (and worker-pool replay-required) (add-evaluated-form worker-pool form))
   1.383+    results))
   1.384+
   1.385+(defun parallel-mapcar (worker-pool make-work list &optional result-done)
   1.386+  "Traverses LIST, calling MAKE-WORK on each element to create a form that is
   1.387+then passed to a remote worker in WORKER-POOL for evaluation.  Results of
   1.388+evaluating each form are collected into a list, which is returned when every
   1.389+remote worker is done.  If RESULT-DONE is provided, then it must be a function
   1.390+of two arguments.  In this case RESULT-DONE is called on the master as each
   1.391+worker returns a result.  The arguments to RESULT-DONE are the position of the
   1.392+work in LIST and the worker's result."
   1.393+  (dispatch-work worker-pool make-work list result-done nil nil))
   1.394+
   1.395+(defun parallel-reduce (worker-pool make-work list initial-value reducer)
   1.396+  "Traverses LIST, calling MAKE-WORK on each element to create a form that is
   1.397+then passed to a remote worker in WORKER-POOL for evaluation.  As results are
   1.398+returned, REDUCER, a binary function, is used to combine successive results in
   1.399+the manner of REDUCE.  INITIAL-VALUE is used as the starting value for the
   1.400+reduction computation.  The form
   1.401+
   1.402+  (parallel-reduce worker-pool make-work list initial-value reducer)
   1.403+
   1.404+is equivalent to
   1.405+
   1.406+  (reduce reducer
   1.407+          (mapcar (lambda (x) (eval (funcall make-work x))) list)
   1.408+          :initial-value initial-value)"
   1.409+  (if (no-workers-p worker-pool)
   1.410+      (reduce reducer
   1.411+              (mapcar (lambda (x) (eval (funcall make-work x))) list)
   1.412+              :initial-value initial-value)
   1.413+      (let* ((work-count (length list))
   1.414+             (unknown-result (cons nil nil))
   1.415+             (results (make-array (1+ work-count) :initial-element unknown-result))
   1.416+             (i 0)
   1.417+             (reduce-result initial-value)
   1.418+             (done-lock (make-mutex :name "crew-reduce"))
   1.419+             (done-condition (make-gate :name "done" :open nil)))
   1.420+        (loop for element in list
   1.421+              for position from 0
   1.422+              do (let ((worker nil)
   1.423+                       (position position))
   1.424+                   (flet ((set-worker (w) (setf worker w))
   1.425+                          (worker-done (result)
   1.426+                            (free-worker worker worker-pool)
   1.427+                            (with-mutex (done-lock)
   1.428+                              (setf (aref results position) result)
   1.429+                              (loop for next = (aref results i)
   1.430+                                    while (not (eq next unknown-result))
   1.431+                                    do (setf reduce-result (funcall reducer reduce-result next))
   1.432+                                       (setf (aref results i) nil)
   1.433+                                       (incf i))
   1.434+                              (decf work-count)
   1.435+                              (when (zerop work-count)
   1.436+                                (condition-notify done-condition)))))
   1.437+                     (let ((work (ensure-caught-up-then-evaluate (funcall make-work element)
   1.438+                                                                 worker-pool
   1.439+                                                                 nil)))
   1.440+                       (allocate-worker-and-evaluate worker-pool
   1.441+                                                     work
   1.442+                                                     #'set-worker
   1.443+                                                     #'worker-done)))))
   1.444+        (with-mutex (done-lock)
   1.445+          (loop until (zerop work-count)
   1.446+                do (condition-wait done-condition done-lock)))
   1.447+        reduce-result)))
   1.448+
   1.449+(defvar *evaluation-id-lock* (make-mutex :name "eval-id")
   1.450+  "Lock protecting access to *EVALUATION-ID*.")
   1.451+
   1.452+(defvar *evaluation-id* 0 "Counter used to generate a unique ID for EVALUATION instances.")
   1.453+
   1.454+(defclass evaluation ()
   1.455+  ((id :reader id
   1.456+       :type integer
   1.457+       :initarg :id
   1.458+       :initform (with-mutex (*evaluation-id-lock*)
   1.459+                   (incf *evaluation-id*))
   1.460+       :documentation "Unique ID for this running evaluation request.")
   1.461+   (lock :reader lock
   1.462+         :initform (make-mutex :name "evaluation")
   1.463+         :documentation "Lock protecting access to DONE, DONE-CONDITION, and other mutable slots.")
   1.464+   (done :accessor done
   1.465+         :type boolean
   1.466+         :initform nil
   1.467+         :documentation "Is the computation done?")
   1.468+   (done-condition :reader done-condition
   1.469+                   :initform (make-gate :name "done" :open nil)
   1.470+                   :documentation "Condition variable notified when computation is done."))
   1.471+  (:documentation "Stores the data needed to process incoming evaluation results."))
   1.472+
   1.473+(defclass repeated-eval (evaluation)
   1.474+  ((results :reader results
   1.475+            :type simple-vector
   1.476+            :initarg :results
   1.477+            :initform (required-argument)
   1.478+            :documentation "Vector holding returned results.")
   1.479+   (results-position :accessor results-position
   1.480+                     :type vector-index
   1.481+                     :initform 0
   1.482+                     :documentation "Position where the next result will be recorded."))
   1.483+  (:documentation "Stores the data needed to process an incoming repeated eval result."))
   1.484+
   1.485+(defvar *evals-lock* (make-mutex :name "evals")
   1.486+  "Lock protecting access to *EVALS*.")
   1.487+
   1.488+(defvar *evals* '() "List containing an EVALUATION instance for each running computation.")
   1.489+
   1.490+(defun add-eval (eval)
   1.491+  "Adds EVAL to the list of in-progress computations."
   1.492+  (with-mutex (*evals-lock*)
   1.493+    (push eval *evals*)))
   1.494+
   1.495+(defun remove-eval (eval)
   1.496+  "Removes EVAL from the list of in-progress computations."
   1.497+  (with-mutex (*evals-lock*)
   1.498+    (setf *evals* (remove eval *evals*))))
   1.499+
   1.500+(defun find-eval (id)
   1.501+  "Returns the running EVAL instance identified by ID, or NIL if no
   1.502+computation with that ID is currently running."
   1.503+  (with-mutex (*evals-lock*)
   1.504+    (find id *evals* :test #'= :key #'id)))
   1.505+
   1.506+(defun record-repeated-result (id result)
   1.507+  "Stores RESULT as one of the values produced by the repeated evaluation
   1.508+identified by ID.  Returns a boolean indicating whether the worker should
   1.509+continue to call RECORD-REPEATED-RESULT with additional results."
   1.510+  (let ((repeated-eval (find-eval id)))
   1.511+    (when repeated-eval
   1.512+      (let ((done nil))
   1.513+        (with-mutex ((lock repeated-eval))
   1.514+          (let* ((results (results repeated-eval))
   1.515+                 (length (length results))
   1.516+                 (position (results-position repeated-eval)))
   1.517+            (when (< position length)
   1.518+              (setf (aref results position) result)
   1.519+              (incf position)
   1.520+              (setf (results-position repeated-eval) position))
   1.521+            (when (>= position length)
   1.522+              (setf done t
   1.523+                    (done repeated-eval) t)
   1.524+              (condition-notify (done-condition repeated-eval)))))
   1.525+        (not done)))))
   1.526+
   1.527+(defun repeated-work-form (form worker-pool id)
   1.528+  "Returns a form for evaluation on a client of WORKER-POOL that ensures the
   1.529+client is caught up and then evaluates FORM for the repeated evaluation request
   1.530+identified by ID."
   1.531+  (let ((master-host-name (host-name (leader worker-pool)))
   1.532+        (master-swank-port (port (leader worker-pool))))
   1.533+    (ensure-caught-up-then-evaluate
   1.534+     `(repeatedly-evaluate ',form ,id ,master-host-name ,master-swank-port)
   1.535+     worker-pool
   1.536+     nil)))
   1.537+
   1.538+(defun eval-form-repeatedly (worker-pool result-count form
   1.539+                             &key (worker-count (when worker-pool (worker-count worker-pool))))
   1.540+  "Evaluates FORM, which must return a function of no arguments, on WORKER-COUNT
   1.541+workers in WORKER-POOL, then arranges for the workers to repeatedly call the
   1.542+function to create RESULT-COUNT results, which are returned in a list.
   1.543+WORKER-COUNT defaults to the number of workers in WORKER-POOL."
   1.544+  (if (or (no-workers-p worker-pool) (not (plusp worker-count)))
   1.545+      (let ((work-function (eval form)))
   1.546+        (loop repeat result-count collect (funcall work-function)))
   1.547+      (let* ((results (make-instance 'repeated-eval :results (make-array result-count)))
   1.548+             (results-lock (lock results))
   1.549+             (retained-workers '()))
   1.550+        (add-eval results)
   1.551+        (loop repeat worker-count
   1.552+              until (with-mutex (results-lock)
   1.553+                      (done results))
   1.554+              do (let ((worker nil))
   1.555+                   (flet ((set-worker (w) (setf worker w))
   1.556+                          (worker-done (result)
   1.557+                            (declare (ignore result))
   1.558+                            ;; Workers that finish before all results are available are retained,
   1.559+                            ;; while those that finish later are immediately released.
   1.560+                            (with-mutex (results-lock)
   1.561+                              (if (not (done results))
   1.562+                                  (push worker retained-workers)
   1.563+                                  (free-worker worker worker-pool)))))
   1.564+                     (let ((work (repeated-work-form form worker-pool (id results))))
   1.565+                       (allocate-worker-and-evaluate worker-pool
   1.566+                                                     work
   1.567+                                                     #'set-worker
   1.568+                                                     #'worker-done)))))
   1.569+        (with-mutex (results-lock)
   1.570+          (loop until (done results)
   1.571+                do (condition-wait (done-condition results) results-lock)))
   1.572+        ;; TODO(brown): Perhaps UNWIND-PROTECT should be used to ensure the remove happens.
   1.573+        (remove-eval results)
   1.574+        ;; We can manipulate RETAINED-WORKERS without a lock because all results are available,
   1.575+        ;; so no WORKER-DONE function will modify the list.
   1.576+        (dolist (w retained-workers) (free-worker w worker-pool))
   1.577+        (coerce (results results) 'list))))
   1.578+
   1.579+(defclass async-eval (evaluation)
   1.580+  ((results :accessor results
   1.581+            :type list
   1.582+            :initform '()
   1.583+            :documentation "List holding returned, but unprocessed, results.")
   1.584+   (results-available :reader results-available
   1.585+                      :initform (make-gate :name "results-available" :open nil)
   1.586+                      :documentation "Condition notified when new results are available.")
   1.587+   (state :accessor state
   1.588+          :initarg :state
   1.589+          :initform (required-argument)
   1.590+          :documentation "State of the asynchronous computation, updated from results.")
   1.591+   (state-counter :accessor state-counter
   1.592+                  :type (integer 0)
   1.593+                  :initform 0
   1.594+                  :documentation "Counter incremented each time STATE is updated."))
   1.595+  (:documentation "Data needed to process incoming asynchronous evaluation results."))
   1.596+
   1.597+(defun async-results-loop (async-eval update-state)
   1.598+  "Handles incoming results for ASYNC-EVAL by calling UPDATE-STATE whenever
   1.599+ASYNC-EVAL holds unprocessed results.  UPDATE-STATE is called with two
   1.600+arguments, the work state and a list of the unprocessed results."
   1.601+  (let ((lock (lock async-eval))
   1.602+        (state nil)
   1.603+        (results nil))
   1.604+    (loop
   1.605+      (with-mutex (lock)
   1.606+        (loop until (results async-eval)
   1.607+              do (condition-wait (results-available async-eval) lock))
   1.608+        (setf state (state async-eval)
   1.609+              results (results async-eval)
   1.610+              (results async-eval) '()))
   1.611+      (multiple-value-bind (new-state done send-state)
   1.612+          (funcall update-state state results)
   1.613+        (with-mutex (lock)
   1.614+          (setf (state async-eval) new-state)
   1.615+          (when (and send-state (not done))
   1.616+            (incf (state-counter async-eval))))
   1.617+        (when done
   1.618+          (with-mutex (lock)
   1.619+            (setf (done async-eval) t)
   1.620+            (condition-notify (done-condition async-eval)))
   1.621+          (return-from async-results-loop))))))
   1.622+
   1.623+(defun record-async-result (id result worker-state-counter)
   1.624+  "Stores RESULT as one of the values produced by the async evaluation
   1.625+identified by ID.  Returns a boolean indicating whether the worker should
   1.626+continue to call ASYNC-RESULT with additional results."
   1.627+  (let ((done t)
   1.628+        (state-counter 0)
   1.629+        (state nil)
   1.630+        (async-eval (find-eval id)))
   1.631+    (when async-eval
   1.632+      (with-mutex ((lock async-eval))
   1.633+        (push result (results async-eval))
   1.634+        (setf state-counter (state-counter async-eval)
   1.635+              done (done async-eval))
   1.636+        (when (/= worker-state-counter state-counter)
   1.637+          (setf state (state async-eval)))
   1.638+        (condition-notify (results-available async-eval))))
   1.639+    (list (not done) state-counter state)))
   1.640+
   1.641+(defun async-work-form (form initial-state worker-pool id)
   1.642+  "Returns a form for evaluation on a client of WORKER-POOL that ensures the
   1.643+client is caught up and then evaluates FORM for the async evaluation request
   1.644+identified by ID."
   1.645+  (let ((master-host-name (host-name (leader worker-pool)))
   1.646+        (master-swank-port (port (leader worker-pool))))
   1.647+    (ensure-caught-up-then-evaluate
   1.648+     `(async-evaluate ',form ',initial-state ,id ,master-host-name ,master-swank-port)
   1.649+     worker-pool
   1.650+     nil)))
   1.651+
   1.652+(defun eval-repeatedly-async-state (worker-pool form initial-state update-state
   1.653+                                    &key (worker-count
   1.654+                                          (when worker-pool (worker-count worker-pool))))
   1.655+  "Evaluates FORM, which must return a function of one argument, on WORKER-COUNT
   1.656+workers in WORKER-POOL, then arranges for the workers to repeatedly call the
   1.657+work function and send its results back to the master.  The work function is
   1.658+passed a state argument, initially set to INITIAL-STATE, that the master can
   1.659+update asynchronously as it receives new results.
   1.660+
   1.661+On the master, the work state is initialized to INITIAL-STATE and UPDATE-STATE
   1.662+is called repeatedly to process results received from the workers.  UPDATE-STATE
   1.663+is called with two arguments, the current work state and a list containing all
   1.664+unprocessed work results.  UPDATE-STATE should return three values: the new work
   1.665+state, a boolean indicating whether the computation should end, and a boolean
   1.666+indicating whether the latest work state should be distributed to workers.  When
   1.667+UPDATE-STATE's second return value is true, EVAL-REPEATEDLY-ASYNC-STATE tells
   1.668+the workers to stop and returns the latest work state."
   1.669+  (if (or (no-workers-p worker-pool) (not (plusp worker-count)))
   1.670+      (let ((work-function (eval form))
   1.671+            (state initial-state))
   1.672+        (loop (multiple-value-bind (new-state done send-state)
   1.673+                  (funcall update-state state (list (funcall work-function state)))
   1.674+                (when done (return new-state))
   1.675+                (when send-state (setf state new-state)))))
   1.676+      (let* ((results (make-instance 'async-eval :state initial-state))
   1.677+             (results-lock (lock results))
   1.678+             (retained-workers '()))
   1.679+        (add-eval results)
   1.680+        (make-thread (lambda () (async-results-loop results update-state))
   1.681+                     :name "async results loop")
   1.682+        (loop repeat worker-count
   1.683+              until (with-mutex (results-lock)
   1.684+                      (done results))
   1.685+              do (let ((worker nil))
   1.686+                   (flet ((set-worker (w) (setf worker w))
   1.687+                          (worker-done (result)
   1.688+                            (declare (ignore result))
   1.689+                            ;; Workers that finish before all results are available are retained,
   1.690+                            ;; while those that finish later are immediately released.
   1.691+                            (with-mutex (results-lock)
   1.692+                              (if (not (done results))
   1.693+                                  (push worker retained-workers)
   1.694+                                  (free-worker worker worker-pool)))))
   1.695+                     (let ((work (async-work-form form initial-state worker-pool (id results))))
   1.696+                       (allocate-worker-and-evaluate worker-pool
   1.697+                                                     work
   1.698+                                                     #'set-worker
   1.699+                                                     #'worker-done)))))
   1.700+        (with-mutex (results-lock)
   1.701+          (loop until (done results)
   1.702+                do (condition-wait (done-condition results) results-lock)))
   1.703+        ;; TODO(brown): Perhaps UNWIND-PROTECT should be used to ensure the remove happens.
   1.704+        (remove-eval results)
   1.705+        ;; We can manipulate RETAINED-WORKERS without a lock because all results are available, so
   1.706+        ;; no WORKER-DONE function will modify the list.
   1.707+        (dolist (w retained-workers) (free-worker w worker-pool))
   1.708+        (state results))))