1.1--- a/lisp/lib/net/proto/crew.lisp Tue Jan 02 17:43:11 2024 -0500
1.2+++ b/lisp/lib/net/proto/crew.lisp Wed Jan 03 17:26:27 2024 -0500
1.3@@ -1,45 +1,690 @@
1.4 (in-package :net/proto/crew)
1.5
1.6 (defclass crew-connection-info ()
1.7- ((host-name :type string
1.8- :documentation "Host this worker is running on.")
1.9- (port :type port
1.10- :documentation "Port on which the worker's swank server is listening for connections.")))
1.11+ ((host-name :initarg :host-name
1.12+ :initform "localhost"
1.13+ :type string
1.14+ :documentation "Host this worker is running on."
1.15+ :accessor host-name)
1.16+ (port :initarg :port
1.17+ :type port
1.18+ :documentation "Port on which the worker's swank server is listening for connections."
1.19+ :accessor port)))
1.20
1.21 (defclass crew-worker ()
1.22- ((connection-info :type crew-connection-info)
1.23- (lock :initform (make-mutex :name "worker"))
1.24+ ((connection-info :type crew-connection-info :accessor connection-info :initarg :connection-info)
1.25+ (lock :initform (make-mutex :name "worker") :accessor lock)
1.26+ (set-worker :accessor set-worker
1.27+ :type function
1.28+ :initform #'identity
1.29+ :documentation "Function called to record which worker is evaluating a form.")
1.30 (connection :type (or null swank-connection)
1.31- :initform nil))
1.32+ :initform nil :accessor connection))
1.33 (:documentation "A remote Lisp running a Swank server."))
1.34
1.35 (defclass crew-worker-pool (id)
1.36- ((connect-info :type crew-connection-info)
1.37- (workers :type vector :initform (vector))
1.38- (lock :initform (make-mutex :name "worker-pool"))
1.39- (idle-workers :type list :initform nil)
1.40- (worker-available :initform (make-gate))
1.41- (disconnecting :initform nil)
1.42- (replay-forms-lock :initform (make-mutex :name "replay-forms-lock"))
1.43- (replay-forms :type list :initform nil)))
1.44+ ((connection-info :type crew-connection-info :accessor connection-info)
1.45+ (leader :reader leader
1.46+ :type crew-connection-info
1.47+ :initarg :leader
1.48+ :initform (required-argument))
1.49+ (workers :type vector :initform (vector) :accessor workers)
1.50+ (lock :initform (make-mutex :name "worker-pool") :accessor lock)
1.51+ (idle-workers :type list :initform nil :accessor idle-workers)
1.52+ (worker-ready :initform (make-gate :name "worker-ready" :open nil) :accessor worker-ready)
1.53+ (disconnecting :initform nil :accessor disconnecting)
1.54+ (replay-forms-lock :initform (make-mutex :name "replay-forms-lock") :accessor replay-forms-lock)
1.55+ (replay-forms :type list :initform nil :accessor replay-forms)))
1.56
1.57 (defvar *crew-worker-pools-lock* (make-mutex :name "worker-pools-lock")
1.58 "Lock protecting access to *WORKER-POOLS*.")
1.59
1.60+;; could be chashtable
1.61 (defvar *crew-worker-pools* (make-hash-table) "Mapping from worker pool IDs to active worker pools.")
1.62
1.63+(defun worker-counts (worker-pool)
1.64+ "Returns the number of idle, busy, and disconnected workers in WORKER-POOL.
1.65+This function executes without locking WORKER-POOL, so it may return
1.66+inconsistent information."
1.67+ ;; We copy the idle workers list without holding a lock on WORKER-POOL. Other threads may be
1.68+ ;; simultaneously popping workers off the head of the list, so we may get stale data.
1.69+ (let ((idlers (copy-seq (idle-workers worker-pool)))
1.70+ (idle-hash (make-hash-table :test 'eq)))
1.71+ (dolist (idle-worker idlers)
1.72+ (setf (gethash idle-worker idle-hash) t))
1.73+ (let ((idle 0)
1.74+ (busy 0)
1.75+ (disconnected 0))
1.76+ (loop for worker across (workers worker-pool) do
1.77+ ;; We call (CONNECTION WORKER) without holding the worker's lock, so we may get stale data
1.78+ ;; here as well.
1.79+ (cond ((null (connection worker)) (incf disconnected))
1.80+ ((gethash worker idle-hash) (incf idle))
1.81+ (t (incf busy))))
1.82+ (values idle busy disconnected))))
1.83+
1.84+(defun worker-count (worker-pool)
1.85+ "Returns the total number of workers in WORKER-POOL."
1.86+ (length (workers worker-pool)))
1.87+
1.88+(defmethod print-object ((worker-pool crew-worker-pool) stream)
1.89+ "Prints WORKER-POOL to STREAM. This function runs without locking
1.90+WORKER-POOL, so it may output inconsistent information."
1.91+ (print-unreadable-object (worker-pool stream :type t :identity t)
1.92+ (multiple-value-bind (idle busy disconnected)
1.93+ (worker-counts worker-pool)
1.94+ (format stream "id: ~S workers: ~D idle: ~D busy: ~D disconnected: ~D"
1.95+ (id worker-pool) (worker-count worker-pool) idle busy disconnected))))
1.96+
1.97 (defmethod initialize-instance :after ((self crew-worker-pool) &key)
1.98 (with-mutex (*crew-worker-pools-lock*)
1.99 (setf (gethash (id self) *crew-worker-pools*) self)))
1.100
1.101+(defun find-worker-pool (worker-pool-id)
1.102+ "Returns the worker pool identified by WORKER-POOL-ID."
1.103+ (with-mutex (*crew-worker-pools-lock*)
1.104+ (gethash worker-pool-id *crew-worker-pools*)))
1.105+
1.106+(defun make-worker-pool (leader connect-infos connect-worker)
1.107+ (let* ((worker-pool (make-instance 'crew-worker-pool
1.108+ :leader leader))
1.109+ (workers
1.110+ (loop for connect-info in connect-infos
1.111+ for worker = (make-instance 'crew-worker :connection-info connect-info)
1.112+ do (let ((worker worker))
1.113+ (setf (connection worker)
1.114+ (funcall connect-worker
1.115+ connect-info
1.116+ (lambda () (handle-connection-closed worker worker-pool)))))
1.117+ collect worker)))
1.118+ (setf (workers worker-pool) (coerce workers 'vector)
1.119+ (idle-workers worker-pool) workers)
1.120+ (make-thread (lambda () (reconnect-workers worker-pool)) :name "reconnector")
1.121+ worker-pool))
1.122+
1.123 (defgeneric connect-worker (info hook)
1.124 (:documentation
1.125 "Creates a connection a worker's Swank server using INFO. Passes
1.126 thunk HOOK to SWANK-CLIENT:SLIME-CONNECT so that it is
1.127 evoked when the connection closes."))
1.128
1.129+(defmethod connect-worker ((connect-info crew-connection-info) close-handler)
1.130+ (slime-connect (host-name connect-info) (port connect-info) close-handler))
1.131+
1.132+(defun connect-workers (host/port-alist leader)
1.133+ "Makes Swank connections to all the workers in HOST/PORT-ALIST and returns a
1.134+WORKER-POOL containing them. HOST/PORT-ALIST is a list of (host-name . port)
1.135+pairs. MASTER-HOST-NAME and MASTER-SWANK-PORT are a host name and Swank port
1.136+that workers can use to return results to the master."
1.137+ (let ((connect-infos
1.138+ (loop for (host-name . port) in host/port-alist
1.139+ collect (make-instance 'crew-connection-info :host-name host-name :port port))))
1.140+ (make-worker-pool leader connect-infos #'connect-worker)))
1.141+
1.142+(defun disconnect-workers (worker-pool)
1.143+ "Closes the Swank connections of all connected workers in WORKER-POOL."
1.144+ (with-mutex ((lock worker-pool))
1.145+ (when (disconnecting worker-pool)
1.146+ (return-from disconnect-workers))
1.147+ ;; Signal that the worker pool is being torn down and make sure there are no idle workers
1.148+ ;; available. After the pool is marked as disconnecting, no workers will transition to idle.
1.149+ (setf (disconnecting worker-pool) t)
1.150+ (setf (idle-workers worker-pool) '()))
1.151+ (flet ((disconnect (worker)
1.152+ (with-mutex ((lock worker))
1.153+ (when (connection worker)
1.154+ (slime-close (connection worker))))))
1.155+ ;; Disconnect all workers.
1.156+ (loop for worker across (workers worker-pool) do (disconnect worker)))
1.157+ (values))
1.158+
1.159 (defgeneric reconnect-worker (info hook)
1.160 (:documentation
1.161 "Reconnects to a Swank server using information in INFO. Passes the
1.162 thunk HOOK to SWANK-CLIENT:SLIME-CONNECT, so that it is invoked when
1.163 the connection closes."))
1.164+
1.165+(defmethod reconnect-worker ((connect-info crew-connection-info) close-handler)
1.166+ (connect-worker connect-info close-handler))
1.167+
1.168+(defconstant +worker-reconnect-interval+ 10 "Seconds between attempts to reconnect workers.")
1.169+
1.170+(defun reconnect-workers (worker-pool)
1.171+ "Reconnects disconnected workers in WORKER-POOL."
1.172+ (loop
1.173+ (sleep +worker-reconnect-interval+)
1.174+ (with-mutex ((lock worker-pool))
1.175+ (when (disconnecting worker-pool)
1.176+ (return-from reconnect-workers)))
1.177+ (loop for worker across (workers worker-pool) do
1.178+ (let ((worker worker))
1.179+ (when (with-mutex ((lock worker))
1.180+ (unless (connection worker)
1.181+ (let ((close-handler (lambda () (handle-connection-closed worker worker-pool))))
1.182+ (setf (connection worker)
1.183+ (reconnect-worker (connection-info worker) close-handler)))))
1.184+ (with-mutex ((lock worker-pool))
1.185+ (when (member worker (idle-workers worker-pool))
1.186+ ;; We reconnected an idle worker, so notify the pool that a worker is available.
1.187+ (condition-notify (worker-ready worker-pool)))))))))
1.188+
1.189+(defun allocate-worker (worker-pool &key worker-to-avoid)
1.190+ "Allocates and returns an idle worker from WORKER-POOL that is connected. If
1.191+WORKER-POOL is being shut down, then NIL is returned."
1.192+ (with-mutex ((lock worker-pool))
1.193+ (loop
1.194+ (when (disconnecting worker-pool) (return nil))
1.195+ ;; Find a connected idle worker.
1.196+ (let ((idle-worker (find-if (lambda (worker)
1.197+ (unless (eql worker worker-to-avoid)
1.198+ (with-mutex ((lock worker))
1.199+ (connection worker))))
1.200+ (idle-workers worker-pool))))
1.201+ (when idle-worker
1.202+ (setf (idle-workers worker-pool) (remove idle-worker (idle-workers worker-pool)))
1.203+ (return idle-worker)))
1.204+ (condition-wait (worker-ready worker-pool) (lock worker-pool)))))
1.205+
1.206+(defun free-worker (worker worker-pool)
1.207+ "Changes the state of WORKER to idle, so that it's available for allocation."
1.208+ (with-mutex ((lock worker-pool))
1.209+ (unless (disconnecting worker-pool)
1.210+ (push worker (idle-workers worker-pool))
1.211+ (when (connection worker)
1.212+ ;; The free worker is connected, so notify the pool that a worker is available.
1.213+ (condition-notify (worker-ready worker-pool))))))
1.214+
1.215+(defun allocate-worker-and-evaluate (worker-pool form set-worker worker-done)
1.216+ "Allocates a connected worker from WORKER-POOL and sends it FORM to evaluate.
1.217+Returns the worker that was successfully allocated, or NIL if WORKER-POOL is
1.218+shutting down. SET-WORKER is a function that is called to tell the WORKER-DONE
1.219+continuation what worker is evaluating FORM."
1.220+ (let ((worker nil)
1.221+ (disconnected-idle-workers '()))
1.222+ (loop do
1.223+ (setf worker (allocate-worker worker-pool))
1.224+ (unless worker (return-from allocate-worker-and-evaluate nil))
1.225+ (with-mutex ((lock worker))
1.226+ ;; Has the connected idle worker we allocated just disconnected?
1.227+ (let ((connection (connection worker)))
1.228+ (if connection
1.229+ (progn
1.230+ (setf (set-worker worker) set-worker)
1.231+ (funcall set-worker worker)
1.232+ ;; Ignore network errors. If there is a network problem, socket keep alives will
1.233+ ;; eventually discover that the connection is dead and HANDLE-CONNECTION-CLOSED
1.234+ ;; will migrate the work to another worker.
1.235+ (handler-case (slime-eval-async form connection worker-done)
1.236+ (slime-network-error ()))
1.237+ (loop-finish))
1.238+ ;; The worker is disconnected, so remember it and look for another one.
1.239+ (push worker disconnected-idle-workers)))))
1.240+ ;; Place any disconnected workers we found back on the idle workers list.
1.241+ (dolist (w disconnected-idle-workers) (free-worker w worker-pool))
1.242+ worker))
1.243+
1.244+(defun handle-connection-closed (disconnected-worker worker-pool)
1.245+ "Called when the connection to DISCONNECTED-WORKER, a member of WORKER-POOL,
1.246+is closed because of a call to SLIME-CLOSE, the death of the worker, or because
1.247+of a communications error. Moves all uncompleted work intended for
1.248+DISCONNECTED-WORKER to another idle connected worker in WORKER-POOL."
1.249+ (with-mutex ((lock disconnected-worker))
1.250+ (let ((old-connection (connection disconnected-worker))
1.251+ (disconnected-idle-workers '()))
1.252+ (when (slime-pending-evals-p old-connection)
1.253+ (loop do
1.254+ (let ((worker (allocate-worker worker-pool :worker-to-avoid disconnected-worker)))
1.255+ (unless worker (return-from handle-connection-closed))
1.256+ (with-mutex ((lock worker))
1.257+ ;; Has the connected idle worker we allocated just disconnected?
1.258+ (let ((connection (connection worker)))
1.259+ (if connection
1.260+ (let ((old-set-worker (set-worker disconnected-worker)))
1.261+ ;; Migrate all pending work from DISCONNECTED-WORKER to WORKER.
1.262+ (setf (set-worker worker) old-set-worker)
1.263+ (funcall old-set-worker worker)
1.264+ ;; Ignore Slime network errors. If there is a network problem, socket keep
1.265+ ;; alives will eventually discover that the connection is dead and
1.266+ ;; HANDLE-CONNECTION-CLOSED will migrate the pending work again.
1.267+ (handler-case (slime-migrate-evals old-connection connection)
1.268+ (slime-network-error ()))
1.269+ (loop-finish))
1.270+ ;; The worker is disconnected, so remember it and look for another one.
1.271+ (push worker disconnected-idle-workers))))))
1.272+ ;; Place any disconnected workers we found back on the idle workers list.
1.273+ (dolist (w disconnected-idle-workers) (free-worker w worker-pool))))
1.274+ ;; Allow the reconnector to see that DISCONNECTED-WORKER is dead.
1.275+ (setf (connection disconnected-worker) nil)))
1.276+
1.277+
1.278+(defun no-workers-p (worker-pool)
1.279+ "Returns T if WORKER-POOL is NIL or contains no workers."
1.280+ (or (null worker-pool) (zerop (worker-count worker-pool))))
1.281+
1.282+(defun eval-on-master (make-work list result-done)
1.283+ "Iterates over the members of LIST calling MAKE-WORK on each one. If
1.284+RESULT-DONE is not NIL, it must be a function of two arguments. In this case,
1.285+after each application of MAKE-WORK to a LIST member, RESULT-DONE is called with
1.286+the member's position in LIST and MAKE-WORK's result."
1.287+ (loop for position from 0
1.288+ for element in list
1.289+ for result = (eval (funcall make-work element))
1.290+ collect result
1.291+ do (when result-done (funcall result-done position result))))
1.292+
1.293+(defun add-evaluated-form (worker-pool form)
1.294+ "Adds FORM to the set of forms that need to be evaluated on a new worker when
1.295+it joins WORKER-POOL."
1.296+ (with-mutex ((replay-forms-lock worker-pool))
1.297+ (push form (replay-forms worker-pool))))
1.298+
1.299+(defun unevaluated-replay-forms (worker-pool-id evaluated-count)
1.300+ "For a worker that has executed EVALUATED-COUNT of the replay forms associated
1.301+with the worker-pool identified by WORKER-POOL-ID, returns a list of the forms
1.302+the worker needs to evaluate in order to be completely up to date."
1.303+ (let ((worker-pool (find-worker-pool worker-pool-id)))
1.304+ (when worker-pool
1.305+ (with-mutex ((replay-forms-lock worker-pool))
1.306+ (nthcdr evaluated-count (reverse (replay-forms worker-pool)))))))
1.307+
1.308+(defun ensure-caught-up-then-evaluate (form worker-pool replay-required)
1.309+ "Returns a form that when evaluated on a worker in WORKER-POOL ensures that
1.310+the worker is caught up then evaluates FORM and returns its result.
1.311+REPLAY-REQUIRED indicates whether new workers must evaluate FORM before being
1.312+considered to be caught up."
1.313+ (let ((forms-count (with-mutex ((replay-forms-lock worker-pool))
1.314+ (length (replay-forms worker-pool))))
1.315+ (master-host-name (host-name (leader worker-pool)))
1.316+ (master-swank-port (port (leader worker-pool)))
1.317+ (worker-pool-id (id worker-pool)))
1.318+ `(evaluate-form ',form ,master-host-name ,master-swank-port ,worker-pool-id ,forms-count
1.319+ ,replay-required)))
1.320+
1.321+(defun dispatch-work (worker-pool make-work list result-done retain-workers replay-required)
1.322+ "Traverses LIST, calling MAKE-WORK on each element to create a form that is
1.323+then passed to a remote worker in WORKER-POOL for evaluation. When
1.324+RETAIN-WORKERS is true, each form is evaluated on a unique worker. Otherwise,
1.325+workers are reused and may process multiple forms. In either case, the results
1.326+of evaluating each form are collected into a list, which is returned when every
1.327+remote worker is done. If RESULT-DONE is not NIL, then it must be a function of
1.328+two arguments. In this case RESULT-DONE is called on the master as each worker
1.329+returns a result. The arguments to RESULT-DONE are an integer counter,
1.330+indicating the work form's position in LIST, and the result of evaluating the
1.331+form. REPLAY-REQUIRED indicates whether new workers will have to perform the
1.332+work generated by MAKE-WORK before being considered to be caught up."
1.333+ (if (no-workers-p worker-pool)
1.334+ (eval-on-master make-work list result-done)
1.335+ (let* ((work-count (length list))
1.336+ (results (make-list work-count))
1.337+ (done-lock (make-mutex :name "dispatch-work"))
1.338+ (done-condition (make-gate :name "done" :open nil)))
1.339+ (loop for element in list
1.340+ for result-cons = results then (cdr result-cons)
1.341+ for position from 0
1.342+ do (let ((result-cons result-cons)
1.343+ (worker nil)
1.344+ (position position))
1.345+ (flet ((set-worker (w) (setf worker w))
1.346+ (worker-done (result)
1.347+ (setf (car result-cons) result)
1.348+ ;; Unless we need each worker to evaluate exactly one form, free the
1.349+ ;; worker, so it can process other work.
1.350+ (unless retain-workers (free-worker worker worker-pool))
1.351+ (with-mutex (done-lock)
1.352+ (decf work-count)
1.353+ (when result-done (funcall result-done position result))
1.354+ (when (zerop work-count)
1.355+ (condition-notify done-condition)))))
1.356+ (let ((work (ensure-caught-up-then-evaluate (funcall make-work element)
1.357+ worker-pool
1.358+ replay-required)))
1.359+ (allocate-worker-and-evaluate worker-pool
1.360+ work
1.361+ #'set-worker
1.362+ #'worker-done)))))
1.363+ (with-mutex (done-lock)
1.364+ (loop until (zerop work-count)
1.365+ do (condition-wait done-condition done-lock)))
1.366+ ;; When we need each worker to evaluate exactly one form, we end up having allocated every
1.367+ ;; worker in the pool, so we need to free them all.
1.368+ (when retain-workers
1.369+ (loop for worker across (workers worker-pool) do (free-worker worker worker-pool)))
1.370+ results)))
1.371+
1.372+(defun eval-form-all-workers (worker-pool form &key result-done (replay-required t))
1.373+ "Evaluates FORM on all workers in WORKER-POOL. When RESULT-DONE is non-NIL,
1.374+it must be a function of two arguments. In this case RESULT-DONE is called as
1.375+each worker returns a result with two arguments, a non-negative integer
1.376+representing the order in which the work was dispatched and the worker's result.
1.377+If REPLAY-REQUIRED is true, which is the default, FORM will be remembered and
1.378+evaluated again for side effects on any new worker that joins POOL."
1.379+ (let* ((work-list (make-list (if (no-workers-p worker-pool) 1 (worker-count worker-pool))))
1.380+ (make-work (constantly form))
1.381+ (results (dispatch-work worker-pool make-work work-list result-done t replay-required)))
1.382+ (when (and worker-pool replay-required) (add-evaluated-form worker-pool form))
1.383+ results))
1.384+
1.385+(defun parallel-mapcar (worker-pool make-work list &optional result-done)
1.386+ "Traverses LIST, calling MAKE-WORK on each element to create a form that is
1.387+then passed to a remote worker in WORKER-POOL for evaluation. Results of
1.388+evaluating each form are collected into a list, which is returned when every
1.389+remote worker is done. If RESULT-DONE is provided, then it must be a function
1.390+of two arguments. In this case RESULT-DONE is called on the master as each
1.391+worker returns a result. The arguments to RESULT-DONE are the position of the
1.392+work in LIST and the worker's result."
1.393+ (dispatch-work worker-pool make-work list result-done nil nil))
1.394+
1.395+(defun parallel-reduce (worker-pool make-work list initial-value reducer)
1.396+ "Traverses LIST, calling MAKE-WORK on each element to create a form that is
1.397+then passed to a remote worker in WORKER-POOL for evaluation. As results are
1.398+returned, REDUCER, a binary function, is used to combine successive results in
1.399+the manner of REDUCE. INITIAL-VALUE is used as the starting value for the
1.400+reduction computation. The form
1.401+
1.402+ (parallel-reduce worker-pool make-work list initial-value reducer)
1.403+
1.404+is equivalent to
1.405+
1.406+ (reduce reducer
1.407+ (mapcar (lambda (x) (eval (funcall make-work x))) list)
1.408+ :initial-value initial-value)"
1.409+ (if (no-workers-p worker-pool)
1.410+ (reduce reducer
1.411+ (mapcar (lambda (x) (eval (funcall make-work x))) list)
1.412+ :initial-value initial-value)
1.413+ (let* ((work-count (length list))
1.414+ (unknown-result (cons nil nil))
1.415+ (results (make-array (1+ work-count) :initial-element unknown-result))
1.416+ (i 0)
1.417+ (reduce-result initial-value)
1.418+ (done-lock (make-mutex :name "crew-reduce"))
1.419+ (done-condition (make-gate :name "done" :open nil)))
1.420+ (loop for element in list
1.421+ for position from 0
1.422+ do (let ((worker nil)
1.423+ (position position))
1.424+ (flet ((set-worker (w) (setf worker w))
1.425+ (worker-done (result)
1.426+ (free-worker worker worker-pool)
1.427+ (with-mutex (done-lock)
1.428+ (setf (aref results position) result)
1.429+ (loop for next = (aref results i)
1.430+ while (not (eq next unknown-result))
1.431+ do (setf reduce-result (funcall reducer reduce-result next))
1.432+ (setf (aref results i) nil)
1.433+ (incf i))
1.434+ (decf work-count)
1.435+ (when (zerop work-count)
1.436+ (condition-notify done-condition)))))
1.437+ (let ((work (ensure-caught-up-then-evaluate (funcall make-work element)
1.438+ worker-pool
1.439+ nil)))
1.440+ (allocate-worker-and-evaluate worker-pool
1.441+ work
1.442+ #'set-worker
1.443+ #'worker-done)))))
1.444+ (with-mutex (done-lock)
1.445+ (loop until (zerop work-count)
1.446+ do (condition-wait done-condition done-lock)))
1.447+ reduce-result)))
1.448+
1.449+(defvar *evaluation-id-lock* (make-mutex :name "eval-id")
1.450+ "Lock protecting access to *EVALUATION-ID*.")
1.451+
1.452+(defvar *evaluation-id* 0 "Counter used to generate a unique ID for EVALUATION instances.")
1.453+
1.454+(defclass evaluation ()
1.455+ ((id :reader id
1.456+ :type integer
1.457+ :initarg :id
1.458+ :initform (with-mutex (*evaluation-id-lock*)
1.459+ (incf *evaluation-id*))
1.460+ :documentation "Unique ID for this running evaluation request.")
1.461+ (lock :reader lock
1.462+ :initform (make-mutex :name "evaluation")
1.463+ :documentation "Lock protecting access to DONE, DONE-CONDITION, and other mutable slots.")
1.464+ (done :accessor done
1.465+ :type boolean
1.466+ :initform nil
1.467+ :documentation "Is the computation done?")
1.468+ (done-condition :reader done-condition
1.469+ :initform (make-gate :name "done" :open nil)
1.470+ :documentation "Condition variable notified when computation is done."))
1.471+ (:documentation "Stores the data needed to process incoming evaluation results."))
1.472+
1.473+(defclass repeated-eval (evaluation)
1.474+ ((results :reader results
1.475+ :type simple-vector
1.476+ :initarg :results
1.477+ :initform (required-argument)
1.478+ :documentation "Vector holding returned results.")
1.479+ (results-position :accessor results-position
1.480+ :type vector-index
1.481+ :initform 0
1.482+ :documentation "Position where the next result will be recorded."))
1.483+ (:documentation "Stores the data needed to process an incoming repeated eval result."))
1.484+
1.485+(defvar *evals-lock* (make-mutex :name "evals")
1.486+ "Lock protecting access to *EVALS*.")
1.487+
1.488+(defvar *evals* '() "List containing an EVALUATION instance for each running computation.")
1.489+
1.490+(defun add-eval (eval)
1.491+ "Adds EVAL to the list of in-progress computations."
1.492+ (with-mutex (*evals-lock*)
1.493+ (push eval *evals*)))
1.494+
1.495+(defun remove-eval (eval)
1.496+ "Removes EVAL from the list of in-progress computations."
1.497+ (with-mutex (*evals-lock*)
1.498+ (setf *evals* (remove eval *evals*))))
1.499+
1.500+(defun find-eval (id)
1.501+ "Returns the running EVAL instance identified by ID, or NIL if no
1.502+computation with that ID is currently running."
1.503+ (with-mutex (*evals-lock*)
1.504+ (find id *evals* :test #'= :key #'id)))
1.505+
1.506+(defun record-repeated-result (id result)
1.507+ "Stores RESULT as one of the values produced by the repeated evaluation
1.508+identified by ID. Returns a boolean indicating whether the worker should
1.509+continue to call RECORD-REPEATED-RESULT with additional results."
1.510+ (let ((repeated-eval (find-eval id)))
1.511+ (when repeated-eval
1.512+ (let ((done nil))
1.513+ (with-mutex ((lock repeated-eval))
1.514+ (let* ((results (results repeated-eval))
1.515+ (length (length results))
1.516+ (position (results-position repeated-eval)))
1.517+ (when (< position length)
1.518+ (setf (aref results position) result)
1.519+ (incf position)
1.520+ (setf (results-position repeated-eval) position))
1.521+ (when (>= position length)
1.522+ (setf done t
1.523+ (done repeated-eval) t)
1.524+ (condition-notify (done-condition repeated-eval)))))
1.525+ (not done)))))
1.526+
1.527+(defun repeated-work-form (form worker-pool id)
1.528+ "Returns a form for evaluation on a client of WORKER-POOL that ensures the
1.529+client is caught up and then evaluates FORM for the repeated evaluation request
1.530+identified by ID."
1.531+ (let ((master-host-name (host-name (leader worker-pool)))
1.532+ (master-swank-port (port (leader worker-pool))))
1.533+ (ensure-caught-up-then-evaluate
1.534+ `(repeatedly-evaluate ',form ,id ,master-host-name ,master-swank-port)
1.535+ worker-pool
1.536+ nil)))
1.537+
1.538+(defun eval-form-repeatedly (worker-pool result-count form
1.539+ &key (worker-count (when worker-pool (worker-count worker-pool))))
1.540+ "Evaluates FORM, which must return a function of no arguments, on WORKER-COUNT
1.541+workers in WORKER-POOL, then arranges for the workers to repeatedly call the
1.542+function to create RESULT-COUNT results, which are returned in a list.
1.543+WORKER-COUNT defaults to the number of workers in WORKER-POOL."
1.544+ (if (or (no-workers-p worker-pool) (not (plusp worker-count)))
1.545+ (let ((work-function (eval form)))
1.546+ (loop repeat result-count collect (funcall work-function)))
1.547+ (let* ((results (make-instance 'repeated-eval :results (make-array result-count)))
1.548+ (results-lock (lock results))
1.549+ (retained-workers '()))
1.550+ (add-eval results)
1.551+ (loop repeat worker-count
1.552+ until (with-mutex (results-lock)
1.553+ (done results))
1.554+ do (let ((worker nil))
1.555+ (flet ((set-worker (w) (setf worker w))
1.556+ (worker-done (result)
1.557+ (declare (ignore result))
1.558+ ;; Workers that finish before all results are available are retained,
1.559+ ;; while those that finish later are immediately released.
1.560+ (with-mutex (results-lock)
1.561+ (if (not (done results))
1.562+ (push worker retained-workers)
1.563+ (free-worker worker worker-pool)))))
1.564+ (let ((work (repeated-work-form form worker-pool (id results))))
1.565+ (allocate-worker-and-evaluate worker-pool
1.566+ work
1.567+ #'set-worker
1.568+ #'worker-done)))))
1.569+ (with-mutex (results-lock)
1.570+ (loop until (done results)
1.571+ do (condition-wait (done-condition results) results-lock)))
1.572+ ;; TODO(brown): Perhaps UNWIND-PROTECT should be used to ensure the remove happens.
1.573+ (remove-eval results)
1.574+ ;; We can manipulate RETAINED-WORKERS without a lock because all results are available,
1.575+ ;; so no WORKER-DONE function will modify the list.
1.576+ (dolist (w retained-workers) (free-worker w worker-pool))
1.577+ (coerce (results results) 'list))))
1.578+
1.579+(defclass async-eval (evaluation)
1.580+ ((results :accessor results
1.581+ :type list
1.582+ :initform '()
1.583+ :documentation "List holding returned, but unprocessed, results.")
1.584+ (results-available :reader results-available
1.585+ :initform (make-gate :name "results-available" :open nil)
1.586+ :documentation "Condition notified when new results are available.")
1.587+ (state :accessor state
1.588+ :initarg :state
1.589+ :initform (required-argument)
1.590+ :documentation "State of the asynchronous computation, updated from results.")
1.591+ (state-counter :accessor state-counter
1.592+ :type (integer 0)
1.593+ :initform 0
1.594+ :documentation "Counter incremented each time STATE is updated."))
1.595+ (:documentation "Data needed to process incoming asynchronous evaluation results."))
1.596+
1.597+(defun async-results-loop (async-eval update-state)
1.598+ "Handles incoming results for ASYNC-EVAL by calling UPDATE-STATE whenever
1.599+ASYNC-EVAL holds unprocessed results. UPDATE-STATE is called with two
1.600+arguments, the work state and a list of the unprocessed results."
1.601+ (let ((lock (lock async-eval))
1.602+ (state nil)
1.603+ (results nil))
1.604+ (loop
1.605+ (with-mutex (lock)
1.606+ (loop until (results async-eval)
1.607+ do (condition-wait (results-available async-eval) lock))
1.608+ (setf state (state async-eval)
1.609+ results (results async-eval)
1.610+ (results async-eval) '()))
1.611+ (multiple-value-bind (new-state done send-state)
1.612+ (funcall update-state state results)
1.613+ (with-mutex (lock)
1.614+ (setf (state async-eval) new-state)
1.615+ (when (and send-state (not done))
1.616+ (incf (state-counter async-eval))))
1.617+ (when done
1.618+ (with-mutex (lock)
1.619+ (setf (done async-eval) t)
1.620+ (condition-notify (done-condition async-eval)))
1.621+ (return-from async-results-loop))))))
1.622+
1.623+(defun record-async-result (id result worker-state-counter)
1.624+ "Stores RESULT as one of the values produced by the async evaluation
1.625+identified by ID. Returns a boolean indicating whether the worker should
1.626+continue to call ASYNC-RESULT with additional results."
1.627+ (let ((done t)
1.628+ (state-counter 0)
1.629+ (state nil)
1.630+ (async-eval (find-eval id)))
1.631+ (when async-eval
1.632+ (with-mutex ((lock async-eval))
1.633+ (push result (results async-eval))
1.634+ (setf state-counter (state-counter async-eval)
1.635+ done (done async-eval))
1.636+ (when (/= worker-state-counter state-counter)
1.637+ (setf state (state async-eval)))
1.638+ (condition-notify (results-available async-eval))))
1.639+ (list (not done) state-counter state)))
1.640+
1.641+(defun async-work-form (form initial-state worker-pool id)
1.642+ "Returns a form for evaluation on a client of WORKER-POOL that ensures the
1.643+client is caught up and then evaluates FORM for the async evaluation request
1.644+identified by ID."
1.645+ (let ((master-host-name (host-name (leader worker-pool)))
1.646+ (master-swank-port (port (leader worker-pool))))
1.647+ (ensure-caught-up-then-evaluate
1.648+ `(async-evaluate ',form ',initial-state ,id ,master-host-name ,master-swank-port)
1.649+ worker-pool
1.650+ nil)))
1.651+
1.652+(defun eval-repeatedly-async-state (worker-pool form initial-state update-state
1.653+ &key (worker-count
1.654+ (when worker-pool (worker-count worker-pool))))
1.655+ "Evaluates FORM, which must return a function of one argument, on WORKER-COUNT
1.656+workers in WORKER-POOL, then arranges for the workers to repeatedly call the
1.657+work function and send its results back to the master. The work function is
1.658+passed a state argument, initially set to INITIAL-STATE, that the master can
1.659+update asynchronously as it receives new results.
1.660+
1.661+On the master, the work state is initialized to INITIAL-STATE and UPDATE-STATE
1.662+is called repeatedly to process results received from the workers. UPDATE-STATE
1.663+is called with two arguments, the current work state and a list containing all
1.664+unprocessed work results. UPDATE-STATE should return three values: the new work
1.665+state, a boolean indicating whether the computation should end, and a boolean
1.666+indicating whether the latest work state should be distributed to workers. When
1.667+UPDATE-STATE's second return value is true, EVAL-REPEATEDLY-ASYNC-STATE tells
1.668+the workers to stop and returns the latest work state."
1.669+ (if (or (no-workers-p worker-pool) (not (plusp worker-count)))
1.670+ (let ((work-function (eval form))
1.671+ (state initial-state))
1.672+ (loop (multiple-value-bind (new-state done send-state)
1.673+ (funcall update-state state (list (funcall work-function state)))
1.674+ (when done (return new-state))
1.675+ (when send-state (setf state new-state)))))
1.676+ (let* ((results (make-instance 'async-eval :state initial-state))
1.677+ (results-lock (lock results))
1.678+ (retained-workers '()))
1.679+ (add-eval results)
1.680+ (make-thread (lambda () (async-results-loop results update-state))
1.681+ :name "async results loop")
1.682+ (loop repeat worker-count
1.683+ until (with-mutex (results-lock)
1.684+ (done results))
1.685+ do (let ((worker nil))
1.686+ (flet ((set-worker (w) (setf worker w))
1.687+ (worker-done (result)
1.688+ (declare (ignore result))
1.689+ ;; Workers that finish before all results are available are retained,
1.690+ ;; while those that finish later are immediately released.
1.691+ (with-mutex (results-lock)
1.692+ (if (not (done results))
1.693+ (push worker retained-workers)
1.694+ (free-worker worker worker-pool)))))
1.695+ (let ((work (async-work-form form initial-state worker-pool (id results))))
1.696+ (allocate-worker-and-evaluate worker-pool
1.697+ work
1.698+ #'set-worker
1.699+ #'worker-done)))))
1.700+ (with-mutex (results-lock)
1.701+ (loop until (done results)
1.702+ do (condition-wait (done-condition results) results-lock)))
1.703+ ;; TODO(brown): Perhaps UNWIND-PROTECT should be used to ensure the remove happens.
1.704+ (remove-eval results)
1.705+ ;; We can manipulate RETAINED-WORKERS without a lock because all results are available, so
1.706+ ;; no WORKER-DONE function will modify the list.
1.707+ (dolist (w retained-workers) (free-worker w worker-pool))
1.708+ (state results))))