changelog shortlog graph tags branches changeset files revisions annotate raw help

Mercurial > core / lisp/lib/net/proto/crew.lisp

changeset 698: 96958d3eb5b0
parent: f6a340b92274
author: Richard Westhaver <ellis@rwest.io>
date: Fri, 04 Oct 2024 22:04:59 -0400
permissions: -rw-r--r--
description: fixes
1 ;;; net/proto/crew.lisp --- Swank Crew Protocol
2 
3 ;;
4 
5 ;;; Code:
6 (in-package :net/proto/crew)
7 
8 (defclass crew-connection-info ()
9  ((host-name :initarg :host-name
10  :initform "localhost"
11  :type string
12  :documentation "Host this worker is running on."
13  :accessor host-name)
14  (port :initarg :port
15  :type port
16  :documentation "Port on which the worker's swank server is listening for connections."
17  :accessor port)))
18 
19 (defclass crew-worker ()
20  ((connection-info :type crew-connection-info :accessor connection-info :initarg :connection-info)
21  (lock :initform (make-mutex :name "worker") :accessor lock)
22  (set-worker :accessor set-worker
23  :type function
24  :initform #'identity
25  :documentation "Function called to record which worker is evaluating a form.")
26  (connection :type (or null swank-connection)
27  :initform nil :accessor %connection))
28  (:documentation "A remote Lisp running a Swank server."))
29 
30 (defclass crew-worker-pool (id)
31  ((connection-info :type crew-connection-info :accessor connection-info)
32  (leader :reader leader
33  :type crew-connection-info
34  :initarg :leader
35  :initform (required-argument))
36  (workers :type vector :initform (vector) :accessor workers)
37  (lock :initform (make-mutex :name "worker-pool") :accessor lock)
38  (idle-workers :type list :initform nil :accessor idle-workers)
39  (worker-ready :initform (make-waitqueue :name "worker-ready") :accessor worker-ready)
40  (disconnecting :initform nil :accessor disconnecting)
41  (replay-forms-lock :initform (make-mutex :name "replay-forms-lock") :accessor replay-forms-lock)
42  (replay-forms :type list :initform nil :accessor replay-forms)))
43 
44 (defvar *crew-worker-pools-lock* (make-mutex :name "worker-pools-lock")
45  "Lock protecting access to *WORKER-POOLS*.")
46 
47 ;; could be chashtable
48 (defvar *crew-worker-pools* (make-hash-table) "Mapping from worker pool IDs to active worker pools.")
49 
50 (defun worker-counts (worker-pool)
51  "Returns the number of idle, busy, and disconnected workers in WORKER-POOL.
52 This function executes without locking WORKER-POOL, so it may return
53 inconsistent information."
54  ;; We copy the idle workers list without holding a lock on WORKER-POOL. Other threads may be
55  ;; simultaneously popping workers off the head of the list, so we may get stale data.
56  (let ((idlers (copy-seq (idle-workers worker-pool)))
57  (idle-hash (make-hash-table :test 'eq)))
58  (dolist (idle-worker idlers)
59  (setf (gethash idle-worker idle-hash) t))
60  (let ((idle 0)
61  (busy 0)
62  (disconnected 0))
63  (loop for worker across (workers worker-pool) do
64  ;; We call (CONNECTION WORKER) without holding the worker's lock, so we may get stale data
65  ;; here as well.
66  (cond ((null (%connection worker)) (incf disconnected))
67  ((gethash worker idle-hash) (incf idle))
68  (t (incf busy))))
69  (values idle busy disconnected))))
70 
71 (defmethod worker-count ((self crew-worker-pool) &key)
72  (length (workers self)))
73 
74 (defmethod print-object ((worker-pool crew-worker-pool) stream)
75  "Prints WORKER-POOL to STREAM. This function runs without locking
76 WORKER-POOL, so it may output inconsistent information."
77  (print-unreadable-object (worker-pool stream :type t :identity t)
78  (multiple-value-bind (idle busy disconnected)
79  (worker-counts worker-pool)
80  (format stream "id: ~S workers: ~D idle: ~D busy: ~D disconnected: ~D"
81  (id worker-pool) (worker-count worker-pool) idle busy disconnected))))
82 
83 (defmethod initialize-instance :after ((self crew-worker-pool) &key)
84  (with-mutex (*crew-worker-pools-lock*)
85  (setf (gethash (id self) *crew-worker-pools*) self)))
86 
87 (defun find-worker-pool (worker-pool-id)
88  "Returns the worker pool identified by WORKER-POOL-ID."
89  (with-mutex (*crew-worker-pools-lock*)
90  (gethash worker-pool-id *crew-worker-pools*)))
91 
92 (defun make-worker-pool (leader connect-infos connect-worker)
93  (let* ((worker-pool (make-instance 'crew-worker-pool
94  :leader leader))
95  (workers
96  (loop for connect-info in connect-infos
97  for worker = (make-instance 'crew-worker :connection-info connect-info)
98  do (let ((worker worker))
99  (setf (%connection worker)
100  (funcall connect-worker
101  connect-info
102  (lambda () (handle-connection-closed worker worker-pool)))))
103  collect worker)))
104  (setf (workers worker-pool) (coerce workers 'vector)
105  (idle-workers worker-pool) workers)
106  (make-thread (lambda () (reconnect-workers worker-pool)) :name "reconnector")
107  worker-pool))
108 
109 (defgeneric connect-worker (info hook)
110  (:documentation
111  "Creates a connection a worker's Swank server using INFO. Passes
112  thunk HOOK to SWANK-CLIENT:SLIME-CONNECT so that it is
113  evoked when the connection closes."))
114 
115 (defmethod connect-worker ((connect-info crew-connection-info) close-handler)
116  (slime-connect (host-name connect-info) (port connect-info) close-handler))
117 
118 (defun connect-workers (host/port-alist leader)
119  "Makes Swank connections to all the workers in HOST/PORT-ALIST and returns a
120 WORKER-POOL containing them. HOST/PORT-ALIST is a list of (host-name . port)
121 pairs. MASTER-HOST-NAME and MASTER-SWANK-PORT are a host name and Swank port
122 that workers can use to return results to the master."
123  (let ((connect-infos
124  (loop for (host-name . port) in host/port-alist
125  collect (make-instance 'crew-connection-info :host-name host-name :port port))))
126  (make-worker-pool leader connect-infos #'connect-worker)))
127 
128 (defun disconnect-workers (worker-pool)
129  "Closes the Swank connections of all connected workers in WORKER-POOL."
130  (with-mutex ((lock worker-pool))
131  (when (disconnecting worker-pool)
132  (return-from disconnect-workers))
133  ;; Signal that the worker pool is being torn down and make sure there are no idle workers
134  ;; available. After the pool is marked as disconnecting, no workers will transition to idle.
135  (setf (disconnecting worker-pool) t)
136  (setf (idle-workers worker-pool) '()))
137  (flet ((disconnect (worker)
138  (with-mutex ((lock worker))
139  (when (%connection worker)
140  (slime-close (%connection worker))))))
141  ;; Disconnect all workers.
142  (loop for worker across (workers worker-pool) do (disconnect worker)))
143  (values))
144 
145 (defgeneric reconnect-worker (info hook)
146  (:documentation
147  "Reconnects to a Swank server using information in INFO. Passes the
148 thunk HOOK to SWANK-CLIENT:SLIME-CONNECT, so that it is invoked when
149 the connection closes."))
150 
151 (defmethod reconnect-worker ((connect-info crew-connection-info) close-handler)
152  (connect-worker connect-info close-handler))
153 
154 (defconstant +worker-reconnect-interval+ 10 "Seconds between attempts to reconnect workers.")
155 
156 (defun reconnect-workers (worker-pool)
157  "Reconnects disconnected workers in WORKER-POOL."
158  (loop
159  (sleep +worker-reconnect-interval+)
160  (with-mutex ((lock worker-pool))
161  (when (disconnecting worker-pool)
162  (return-from reconnect-workers)))
163  (loop for worker across (workers worker-pool) do
164  (let ((worker worker))
165  (when (with-mutex ((lock worker))
166  (unless (%connection worker)
167  (let ((close-handler (lambda () (handle-connection-closed worker worker-pool))))
168  (setf (%connection worker)
169  (reconnect-worker (connection-info worker) close-handler)))))
170  (with-mutex ((lock worker-pool))
171  (when (member worker (idle-workers worker-pool))
172  ;; We reconnected an idle worker, so notify the pool that a worker is available.
173  (condition-notify (worker-ready worker-pool)))))))))
174 
175 (defun allocate-worker (worker-pool &key worker-to-avoid)
176  "Allocates and returns an idle worker from WORKER-POOL that is connected. If
177 WORKER-POOL is being shut down, then NIL is returned."
178  (with-mutex ((lock worker-pool))
179  (loop
180  (when (disconnecting worker-pool) (return nil))
181  ;; Find a connected idle worker.
182  (let ((idle-worker (find-if (lambda (worker)
183  (unless (eql worker worker-to-avoid)
184  (with-mutex ((lock worker))
185  (%connection worker))))
186  (idle-workers worker-pool))))
187  (when idle-worker
188  (setf (idle-workers worker-pool) (remove idle-worker (idle-workers worker-pool)))
189  (return idle-worker)))
190  (condition-wait (worker-ready worker-pool) (lock worker-pool)))))
191 
192 (defun free-worker (worker worker-pool)
193  "Changes the state of WORKER to idle, so that it's available for allocation."
194  (with-mutex ((lock worker-pool))
195  (unless (disconnecting worker-pool)
196  (push worker (idle-workers worker-pool))
197  (when (%connection worker)
198  ;; The free worker is connected, so notify the pool that a worker is available.
199  (condition-notify (worker-ready worker-pool))))))
200 
201 (defun allocate-worker-and-evaluate (worker-pool form set-worker worker-done)
202  "Allocates a connected worker from WORKER-POOL and sends it FORM to evaluate.
203 Returns the worker that was successfully allocated, or NIL if WORKER-POOL is
204 shutting down. SET-WORKER is a function that is called to tell the WORKER-DONE
205 continuation what worker is evaluating FORM."
206  (let ((worker nil)
207  (disconnected-idle-workers '()))
208  (loop do
209  (setf worker (allocate-worker worker-pool))
210  (unless worker (return-from allocate-worker-and-evaluate nil))
211  (with-mutex ((lock worker))
212  ;; Has the connected idle worker we allocated just disconnected?
213  (let ((connection (%connection worker)))
214  (if connection
215  (progn
216  (setf (set-worker worker) set-worker)
217  (funcall set-worker worker)
218  ;; Ignore network errors. If there is a network problem, socket keep alives will
219  ;; eventually discover that the connection is dead and HANDLE-CONNECTION-CLOSED
220  ;; will migrate the work to another worker.
221  (handler-case (slime-eval-async form connection worker-done)
222  (slime-network-error ()))
223  (loop-finish))
224  ;; The worker is disconnected, so remember it and look for another one.
225  (push worker disconnected-idle-workers)))))
226  ;; Place any disconnected workers we found back on the idle workers list.
227  (dolist (w disconnected-idle-workers) (free-worker w worker-pool))
228  worker))
229 
230 (defun handle-connection-closed (disconnected-worker worker-pool)
231  "Called when the connection to DISCONNECTED-WORKER, a member of WORKER-POOL,
232 is closed because of a call to SLIME-CLOSE, the death of the worker, or because
233 of a communications error. Moves all uncompleted work intended for
234 DISCONNECTED-WORKER to another idle connected worker in WORKER-POOL."
235  (with-mutex ((lock disconnected-worker))
236  (let ((old-connection (%connection disconnected-worker))
237  (disconnected-idle-workers '()))
238  (when (slime-pending-evals-p old-connection)
239  (loop do
240  (let ((worker (allocate-worker worker-pool :worker-to-avoid disconnected-worker)))
241  (unless worker (return-from handle-connection-closed))
242  (with-mutex ((lock worker))
243  ;; Has the connected idle worker we allocated just disconnected?
244  (let ((connection (%connection worker)))
245  (if connection
246  (let ((old-set-worker (set-worker disconnected-worker)))
247  ;; Migrate all pending work from DISCONNECTED-WORKER to WORKER.
248  (setf (set-worker worker) old-set-worker)
249  (funcall old-set-worker worker)
250  ;; Ignore Slime network errors. If there is a network problem, socket keep
251  ;; alives will eventually discover that the connection is dead and
252  ;; HANDLE-CONNECTION-CLOSED will migrate the pending work again.
253  (handler-case (slime-migrate-evals old-connection connection)
254  (slime-network-error ()))
255  (loop-finish))
256  ;; The worker is disconnected, so remember it and look for another one.
257  (push worker disconnected-idle-workers))))))
258  ;; Place any disconnected workers we found back on the idle workers list.
259  (dolist (w disconnected-idle-workers) (free-worker w worker-pool))))
260  ;; Allow the reconnector to see that DISCONNECTED-WORKER is dead.
261  (setf (%connection disconnected-worker) nil)))
262 
263 
264 (defun no-workers-p (worker-pool)
265  "Returns T if WORKER-POOL is NIL or contains no workers."
266  (or (null worker-pool) (zerop (worker-count worker-pool))))
267 
268 (defun eval-on-master (make-work list result-done)
269  "Iterates over the members of LIST calling MAKE-WORK on each one. If
270 RESULT-DONE is not NIL, it must be a function of two arguments. In this case,
271 after each application of MAKE-WORK to a LIST member, RESULT-DONE is called with
272 the member's position in LIST and MAKE-WORK's result."
273  (loop for position from 0
274  for element in list
275  for result = (eval (funcall make-work element))
276  collect result
277  do (when result-done (funcall result-done position result))))
278 
279 (defun add-evaluated-form (worker-pool form)
280  "Adds FORM to the set of forms that need to be evaluated on a new worker when
281 it joins WORKER-POOL."
282  (with-mutex ((replay-forms-lock worker-pool))
283  (push form (replay-forms worker-pool))))
284 
285 (defun unevaluated-replay-forms (worker-pool-id evaluated-count)
286  "For a worker that has executed EVALUATED-COUNT of the replay forms associated
287 with the worker-pool identified by WORKER-POOL-ID, returns a list of the forms
288 the worker needs to evaluate in order to be completely up to date."
289  (let ((worker-pool (find-worker-pool worker-pool-id)))
290  (when worker-pool
291  (with-mutex ((replay-forms-lock worker-pool))
292  (nthcdr evaluated-count (reverse (replay-forms worker-pool)))))))
293 
294 (defun ensure-caught-up-then-evaluate (form worker-pool replay-required)
295  "Returns a form that when evaluated on a worker in WORKER-POOL ensures that
296 the worker is caught up then evaluates FORM and returns its result.
297 REPLAY-REQUIRED indicates whether new workers must evaluate FORM before being
298 considered to be caught up."
299  (let ((forms-count (with-mutex ((replay-forms-lock worker-pool))
300  (length (replay-forms worker-pool))))
301  (master-host-name (host-name (leader worker-pool)))
302  (master-swank-port (port (leader worker-pool)))
303  (worker-pool-id (id worker-pool)))
304  `(evaluate-form ',form ,master-host-name ,master-swank-port ,worker-pool-id ,forms-count
305  ,replay-required)))
306 
307 (defun dispatch-work (worker-pool make-work list result-done retain-workers replay-required)
308  "Traverses LIST, calling MAKE-WORK on each element to create a form that is
309 then passed to a remote worker in WORKER-POOL for evaluation. When
310 RETAIN-WORKERS is true, each form is evaluated on a unique worker. Otherwise,
311 workers are reused and may process multiple forms. In either case, the results
312 of evaluating each form are collected into a list, which is returned when every
313 remote worker is done. If RESULT-DONE is not NIL, then it must be a function of
314 two arguments. In this case RESULT-DONE is called on the master as each worker
315 returns a result. The arguments to RESULT-DONE are an integer counter,
316 indicating the work form's position in LIST, and the result of evaluating the
317 form. REPLAY-REQUIRED indicates whether new workers will have to perform the
318 work generated by MAKE-WORK before being considered to be caught up."
319  (if (no-workers-p worker-pool)
320  (eval-on-master make-work list result-done)
321  (let* ((work-count (length list))
322  (results (make-list work-count))
323  (done-lock (make-mutex :name "dispatch-work"))
324  (done-condition (make-gate :name "done" :open nil)))
325  (loop for element in list
326  for result-cons = results then (cdr result-cons)
327  for position from 0
328  do (let ((result-cons result-cons)
329  (worker nil)
330  (position position))
331  (flet ((set-worker (w) (setf worker w))
332  (worker-done (result)
333  (setf (car result-cons) result)
334  ;; Unless we need each worker to evaluate exactly one form, free the
335  ;; worker, so it can process other work.
336  (unless retain-workers (free-worker worker worker-pool))
337  (with-mutex (done-lock)
338  (decf work-count)
339  (when result-done (funcall result-done position result))
340  (when (zerop work-count)
341  (condition-notify done-condition)))))
342  (let ((work (ensure-caught-up-then-evaluate (funcall make-work element)
343  worker-pool
344  replay-required)))
345  (allocate-worker-and-evaluate worker-pool
346  work
347  #'set-worker
348  #'worker-done)))))
349  (with-mutex (done-lock)
350  (loop until (zerop work-count)
351  do (condition-wait done-condition done-lock)))
352  ;; When we need each worker to evaluate exactly one form, we end up having allocated every
353  ;; worker in the pool, so we need to free them all.
354  (when retain-workers
355  (loop for worker across (workers worker-pool) do (free-worker worker worker-pool)))
356  results)))
357 
358 (defun eval-form-all-workers (worker-pool form &key result-done (replay-required t))
359  "Evaluates FORM on all workers in WORKER-POOL. When RESULT-DONE is non-NIL,
360 it must be a function of two arguments. In this case RESULT-DONE is called as
361 each worker returns a result with two arguments, a non-negative integer
362 representing the order in which the work was dispatched and the worker's result.
363 If REPLAY-REQUIRED is true, which is the default, FORM will be remembered and
364 evaluated again for side effects on any new worker that joins POOL."
365  (let* ((work-list (make-list (if (no-workers-p worker-pool) 1 (worker-count worker-pool))))
366  (make-work (constantly form))
367  (results (dispatch-work worker-pool make-work work-list result-done t replay-required)))
368  (when (and worker-pool replay-required) (add-evaluated-form worker-pool form))
369  results))
370 
371 (defun parallel-mapcar (worker-pool make-work list &optional result-done)
372  "Traverses LIST, calling MAKE-WORK on each element to create a form that is
373 then passed to a remote worker in WORKER-POOL for evaluation. Results of
374 evaluating each form are collected into a list, which is returned when every
375 remote worker is done. If RESULT-DONE is provided, then it must be a function
376 of two arguments. In this case RESULT-DONE is called on the master as each
377 worker returns a result. The arguments to RESULT-DONE are the position of the
378 work in LIST and the worker's result."
379  (dispatch-work worker-pool make-work list result-done nil nil))
380 
381 (defun parallel-reduce (worker-pool make-work list initial-value reducer)
382  "Traverses LIST, calling MAKE-WORK on each element to create a form that is
383 then passed to a remote worker in WORKER-POOL for evaluation. As results are
384 returned, REDUCER, a binary function, is used to combine successive results in
385 the manner of REDUCE. INITIAL-VALUE is used as the starting value for the
386 reduction computation. The form
387 
388  (parallel-reduce worker-pool make-work list initial-value reducer)
389 
390 is equivalent to
391 
392  (reduce reducer
393  (mapcar (lambda (x) (eval (funcall make-work x))) list)
394  :initial-value initial-value)"
395  (if (no-workers-p worker-pool)
396  (reduce reducer
397  (mapcar (lambda (x) (eval (funcall make-work x))) list)
398  :initial-value initial-value)
399  (let* ((work-count (length list))
400  (unknown-result (cons nil nil))
401  (results (make-array (1+ work-count) :initial-element unknown-result))
402  (i 0)
403  (reduce-result initial-value)
404  (done-lock (make-mutex :name "crew-reduce"))
405  (done-condition (make-gate :name "done" :open nil)))
406  (loop for element in list
407  for position from 0
408  do (let ((worker nil)
409  (position position))
410  (flet ((set-worker (w) (setf worker w))
411  (worker-done (result)
412  (free-worker worker worker-pool)
413  (with-mutex (done-lock)
414  (setf (aref results position) result)
415  (loop for next = (aref results i)
416  while (not (eq next unknown-result))
417  do (setf reduce-result (funcall reducer reduce-result next))
418  (setf (aref results i) nil)
419  (incf i))
420  (decf work-count)
421  (when (zerop work-count)
422  (condition-notify done-condition)))))
423  (let ((work (ensure-caught-up-then-evaluate (funcall make-work element)
424  worker-pool
425  nil)))
426  (allocate-worker-and-evaluate worker-pool
427  work
428  #'set-worker
429  #'worker-done)))))
430  (with-mutex (done-lock)
431  (loop until (zerop work-count)
432  do (condition-wait done-condition done-lock)))
433  reduce-result)))
434 
435 (defvar *evaluation-id-lock* (make-mutex :name "eval-id")
436  "Lock protecting access to *EVALUATION-ID*.")
437 
438 (defvar *evaluation-id* 0 "Counter used to generate a unique ID for EVALUATION instances.")
439 
440 (defclass evaluation ()
441  ((id :reader id
442  :type integer
443  :initarg :id
444  :initform (with-mutex (*evaluation-id-lock*)
445  (incf *evaluation-id*))
446  :documentation "Unique ID for this running evaluation request.")
447  (lock :reader lock
448  :initform (make-mutex :name "evaluation")
449  :documentation "Lock protecting access to DONE, DONE-CONDITION, and other mutable slots.")
450  (done :accessor done
451  :type boolean
452  :initform nil
453  :documentation "Is the computation done?")
454  (done-condition :reader done-condition
455  :initform (make-gate :name "done" :open nil)
456  :documentation "Condition variable notified when computation is done."))
457  (:documentation "Stores the data needed to process incoming evaluation results."))
458 
459 (defclass repeated-eval (evaluation)
460  ((results :reader results
461  :type simple-vector
462  :initarg :results
463  :initform (required-argument)
464  :documentation "Vector holding returned results.")
465  (results-position :accessor results-position
466  :type vector-index
467  :initform 0
468  :documentation "Position where the next result will be recorded."))
469  (:documentation "Stores the data needed to process an incoming repeated eval result."))
470 
471 (defvar *evals-lock* (make-mutex :name "evals")
472  "Lock protecting access to *EVALS*.")
473 
474 (defvar *evals* '() "List containing an EVALUATION instance for each running computation.")
475 
476 (defun add-eval (eval)
477  "Adds EVAL to the list of in-progress computations."
478  (with-mutex (*evals-lock*)
479  (push eval *evals*)))
480 
481 (defun remove-eval (eval)
482  "Removes EVAL from the list of in-progress computations."
483  (with-mutex (*evals-lock*)
484  (setf *evals* (remove eval *evals*))))
485 
486 (defun find-eval (id)
487  "Returns the running EVAL instance identified by ID, or NIL if no
488 computation with that ID is currently running."
489  (with-mutex (*evals-lock*)
490  (find id *evals* :test #'= :key #'id)))
491 
492 (defun record-repeated-result (id result)
493  "Stores RESULT as one of the values produced by the repeated evaluation
494 identified by ID. Returns a boolean indicating whether the worker should
495 continue to call RECORD-REPEATED-RESULT with additional results."
496  (let ((repeated-eval (find-eval id)))
497  (when repeated-eval
498  (let ((done nil))
499  (with-mutex ((lock repeated-eval))
500  (let* ((results (results repeated-eval))
501  (length (length results))
502  (position (results-position repeated-eval)))
503  (when (< position length)
504  (setf (aref results position) result)
505  (incf position)
506  (setf (results-position repeated-eval) position))
507  (when (>= position length)
508  (setf done t
509  (done repeated-eval) t)
510  (condition-notify (done-condition repeated-eval)))))
511  (not done)))))
512 
513 (defun repeated-work-form (form worker-pool id)
514  "Returns a form for evaluation on a client of WORKER-POOL that ensures the
515 client is caught up and then evaluates FORM for the repeated evaluation request
516 identified by ID."
517  (let ((master-host-name (host-name (leader worker-pool)))
518  (master-swank-port (port (leader worker-pool))))
519  (ensure-caught-up-then-evaluate
520  `(repeatedly-evaluate ',form ,id ,master-host-name ,master-swank-port)
521  worker-pool
522  nil)))
523 
524 (defun eval-form-repeatedly (worker-pool result-count form
525  &key (worker-count (when worker-pool (worker-count worker-pool))))
526  "Evaluates FORM, which must return a function of no arguments, on WORKER-COUNT
527 workers in WORKER-POOL, then arranges for the workers to repeatedly call the
528 function to create RESULT-COUNT results, which are returned in a list.
529 WORKER-COUNT defaults to the number of workers in WORKER-POOL."
530  (if (or (no-workers-p worker-pool) (not (plusp worker-count)))
531  (let ((work-function (eval form)))
532  (loop repeat result-count collect (funcall work-function)))
533  (let* ((results (make-instance 'repeated-eval :results (make-array result-count)))
534  (results-lock (lock results))
535  (retained-workers '()))
536  (add-eval results)
537  (loop repeat worker-count
538  until (with-mutex (results-lock)
539  (done results))
540  do (let ((worker nil))
541  (flet ((set-worker (w) (setf worker w))
542  (worker-done (result)
543  (declare (ignore result))
544  ;; Workers that finish before all results are available are retained,
545  ;; while those that finish later are immediately released.
546  (with-mutex (results-lock)
547  (if (not (done results))
548  (push worker retained-workers)
549  (free-worker worker worker-pool)))))
550  (let ((work (repeated-work-form form worker-pool (id results))))
551  (allocate-worker-and-evaluate worker-pool
552  work
553  #'set-worker
554  #'worker-done)))))
555  (with-mutex (results-lock)
556  (loop until (done results)
557  do (condition-wait (done-condition results) results-lock)))
558  ;; TODO(brown): Perhaps UNWIND-PROTECT should be used to ensure the remove happens.
559  (remove-eval results)
560  ;; We can manipulate RETAINED-WORKERS without a lock because all results are available,
561  ;; so no WORKER-DONE function will modify the list.
562  (dolist (w retained-workers) (free-worker w worker-pool))
563  (coerce (results results) 'list))))
564 
565 (defclass async-eval (evaluation)
566  ((results :accessor results
567  :type list
568  :initform '()
569  :documentation "List holding returned, but unprocessed, results.")
570  (results-available :reader results-available
571  :initform (make-gate :name "results-available" :open nil)
572  :documentation "Condition notified when new results are available.")
573  (state :accessor state
574  :initarg :state
575  :initform (required-argument)
576  :documentation "State of the asynchronous computation, updated from results.")
577  (state-counter :accessor state-counter
578  :type (integer 0)
579  :initform 0
580  :documentation "Counter incremented each time STATE is updated."))
581  (:documentation "Data needed to process incoming asynchronous evaluation results."))
582 
583 (defun async-results-loop (async-eval update-state)
584  "Handles incoming results for ASYNC-EVAL by calling UPDATE-STATE whenever
585 ASYNC-EVAL holds unprocessed results. UPDATE-STATE is called with two
586 arguments, the work state and a list of the unprocessed results."
587  (let ((lock (lock async-eval))
588  (state nil)
589  (results nil))
590  (loop
591  (with-mutex (lock)
592  (loop until (results async-eval)
593  do (condition-wait (results-available async-eval) lock))
594  (setf state (state async-eval)
595  results (results async-eval)
596  (results async-eval) '()))
597  (multiple-value-bind (new-state done send-state)
598  (funcall update-state state results)
599  (with-mutex (lock)
600  (setf (state async-eval) new-state)
601  (when (and send-state (not done))
602  (incf (state-counter async-eval))))
603  (when done
604  (with-mutex (lock)
605  (setf (done async-eval) t)
606  (condition-notify (done-condition async-eval)))
607  (return-from async-results-loop))))))
608 
609 (defun record-async-result (id result worker-state-counter)
610  "Stores RESULT as one of the values produced by the async evaluation
611 identified by ID. Returns a boolean indicating whether the worker should
612 continue to call ASYNC-RESULT with additional results."
613  (let ((done t)
614  (state-counter 0)
615  (state nil)
616  (async-eval (find-eval id)))
617  (when async-eval
618  (with-mutex ((lock async-eval))
619  (push result (results async-eval))
620  (setf state-counter (state-counter async-eval)
621  done (done async-eval))
622  (when (/= worker-state-counter state-counter)
623  (setf state (state async-eval)))
624  (condition-notify (results-available async-eval))))
625  (list (not done) state-counter state)))
626 
627 (defun async-work-form (form initial-state worker-pool id)
628  "Returns a form for evaluation on a client of WORKER-POOL that ensures the
629 client is caught up and then evaluates FORM for the async evaluation request
630 identified by ID."
631  (let ((master-host-name (host-name (leader worker-pool)))
632  (master-swank-port (port (leader worker-pool))))
633  (ensure-caught-up-then-evaluate
634  `(async-evaluate ',form ',initial-state ,id ,master-host-name ,master-swank-port)
635  worker-pool
636  nil)))
637 
638 (defun eval-repeatedly-async-state (worker-pool form initial-state update-state
639  &key (worker-count
640  (when worker-pool (worker-count worker-pool))))
641  "Evaluates FORM, which must return a function of one argument, on WORKER-COUNT
642 workers in WORKER-POOL, then arranges for the workers to repeatedly call the
643 work function and send its results back to the master. The work function is
644 passed a state argument, initially set to INITIAL-STATE, that the master can
645 update asynchronously as it receives new results.
646 
647 On the master, the work state is initialized to INITIAL-STATE and UPDATE-STATE
648 is called repeatedly to process results received from the workers. UPDATE-STATE
649 is called with two arguments, the current work state and a list containing all
650 unprocessed work results. UPDATE-STATE should return three values: the new work
651 state, a boolean indicating whether the computation should end, and a boolean
652 indicating whether the latest work state should be distributed to workers. When
653 UPDATE-STATE's second return value is true, EVAL-REPEATEDLY-ASYNC-STATE tells
654 the workers to stop and returns the latest work state."
655  (if (or (no-workers-p worker-pool) (not (plusp worker-count)))
656  (let ((work-function (eval form))
657  (state initial-state))
658  (loop (multiple-value-bind (new-state done send-state)
659  (funcall update-state state (list (funcall work-function state)))
660  (when done (return new-state))
661  (when send-state (setf state new-state)))))
662  (let* ((results (make-instance 'async-eval :state initial-state))
663  (results-lock (lock results))
664  (retained-workers '()))
665  (add-eval results)
666  (make-thread (lambda () (async-results-loop results update-state))
667  :name "async results loop")
668  (loop repeat worker-count
669  until (with-mutex (results-lock)
670  (done results))
671  do (let ((worker nil))
672  (flet ((set-worker (w) (setf worker w))
673  (worker-done (result)
674  (declare (ignore result))
675  ;; Workers that finish before all results are available are retained,
676  ;; while those that finish later are immediately released.
677  (with-mutex (results-lock)
678  (if (not (done results))
679  (push worker retained-workers)
680  (free-worker worker worker-pool)))))
681  (let ((work (async-work-form form initial-state worker-pool (id results))))
682  (allocate-worker-and-evaluate worker-pool
683  work
684  #'set-worker
685  #'worker-done)))))
686  (with-mutex (results-lock)
687  (loop until (done results)
688  do (condition-wait (done-condition results) results-lock)))
689  ;; TODO(brown): Perhaps UNWIND-PROTECT should be used to ensure the remove happens.
690  (remove-eval results)
691  ;; We can manipulate RETAINED-WORKERS without a lock because all results are available, so
692  ;; no WORKER-DONE function will modify the list.
693  (dolist (w retained-workers) (free-worker w worker-pool))
694  (state results))))