Mercurial > core / lisp/lib/net/proto/crew.lisp
changeset 698: |
96958d3eb5b0 |
parent: |
f6a340b92274
|
author: |
Richard Westhaver <ellis@rwest.io> |
date: |
Fri, 04 Oct 2024 22:04:59 -0400 |
permissions: |
-rw-r--r-- |
description: |
fixes |
1 ;;; net/proto/crew.lisp --- Swank Crew Protocol 6 (in-package :net/proto/crew) 8 (defclass crew-connection-info () 9 ((host-name :initarg :host-name 12 :documentation "Host this worker is running on." 16 :documentation "Port on which the worker's swank server is listening for connections." 19 (defclass crew-worker () 20 ((connection-info :type crew-connection-info :accessor connection-info :initarg :connection-info) 21 (lock :initform (make-mutex :name "worker") :accessor lock) 22 (set-worker :accessor set-worker 25 :documentation "Function called to record which worker is evaluating a form.") 26 (connection :type (or null swank-connection) 27 :initform nil :accessor %connection)) 28 (:documentation "A remote Lisp running a Swank server.")) 30 (defclass crew-worker-pool (id) 31 ((connection-info :type crew-connection-info :accessor connection-info) 32 (leader :reader leader 33 :type crew-connection-info 35 :initform (required-argument)) 36 (workers :type vector :initform (vector) :accessor workers) 37 (lock :initform (make-mutex :name "worker-pool") :accessor lock) 38 (idle-workers :type list :initform nil :accessor idle-workers) 39 (worker-ready :initform (make-waitqueue :name "worker-ready") :accessor worker-ready) 40 (disconnecting :initform nil :accessor disconnecting) 41 (replay-forms-lock :initform (make-mutex :name "replay-forms-lock") :accessor replay-forms-lock) 42 (replay-forms :type list :initform nil :accessor replay-forms))) 44 (defvar *crew-worker-pools-lock* (make-mutex :name "worker-pools-lock") 45 "Lock protecting access to *WORKER-POOLS*.") 47 ;; could be chashtable 48 (defvar *crew-worker-pools* (make-hash-table) "Mapping from worker pool IDs to active worker pools.") 50 (defun worker-counts (worker-pool) 51 "Returns the number of idle, busy, and disconnected workers in WORKER-POOL. 52 This function executes without locking WORKER-POOL, so it may return 53 inconsistent information." 54 ;; We copy the idle workers list without holding a lock on WORKER-POOL. Other threads may be 55 ;; simultaneously popping workers off the head of the list, so we may get stale data. 56 (let ((idlers (copy-seq (idle-workers worker-pool))) 57 (idle-hash (make-hash-table :test 'eq))) 58 (dolist (idle-worker idlers) 59 (setf (gethash idle-worker idle-hash) t)) 63 (loop for worker across (workers worker-pool) do 64 ;; We call (CONNECTION WORKER) without holding the worker's lock, so we may get stale data 66 (cond ((null (%connection worker)) (incf disconnected)) 67 ((gethash worker idle-hash) (incf idle)) 69 (values idle busy disconnected)))) 71 (defmethod worker-count ((self crew-worker-pool) &key) 72 (length (workers self))) 74 (defmethod print-object ((worker-pool crew-worker-pool) stream) 75 "Prints WORKER-POOL to STREAM. This function runs without locking 76 WORKER-POOL, so it may output inconsistent information." 77 (print-unreadable-object (worker-pool stream :type t :identity t) 78 (multiple-value-bind (idle busy disconnected) 79 (worker-counts worker-pool) 80 (format stream "id: ~S workers: ~D idle: ~D busy: ~D disconnected: ~D" 81 (id worker-pool) (worker-count worker-pool) idle busy disconnected)))) 83 (defmethod initialize-instance :after ((self crew-worker-pool) &key) 84 (with-mutex (*crew-worker-pools-lock*) 85 (setf (gethash (id self) *crew-worker-pools*) self))) 87 (defun find-worker-pool (worker-pool-id) 88 "Returns the worker pool identified by WORKER-POOL-ID." 89 (with-mutex (*crew-worker-pools-lock*) 90 (gethash worker-pool-id *crew-worker-pools*))) 92 (defun make-worker-pool (leader connect-infos connect-worker) 93 (let* ((worker-pool (make-instance 'crew-worker-pool 96 (loop for connect-info in connect-infos 97 for worker = (make-instance 'crew-worker :connection-info connect-info) 98 do (let ((worker worker)) 99 (setf (%connection worker) 100 (funcall connect-worker 102 (lambda () (handle-connection-closed worker worker-pool))))) 104 (setf (workers worker-pool) (coerce workers 'vector) 105 (idle-workers worker-pool) workers) 106 (make-thread (lambda () (reconnect-workers worker-pool)) :name "reconnector") 109 (defgeneric connect-worker (info hook) 111 "Creates a connection a worker's Swank server using INFO. Passes 112 thunk HOOK to SWANK-CLIENT:SLIME-CONNECT so that it is 113 evoked when the connection closes.")) 115 (defmethod connect-worker ((connect-info crew-connection-info) close-handler) 116 (slime-connect (host-name connect-info) (port connect-info) close-handler)) 118 (defun connect-workers (host/port-alist leader) 119 "Makes Swank connections to all the workers in HOST/PORT-ALIST and returns a 120 WORKER-POOL containing them. HOST/PORT-ALIST is a list of (host-name . port) 121 pairs. MASTER-HOST-NAME and MASTER-SWANK-PORT are a host name and Swank port 122 that workers can use to return results to the master." 124 (loop for (host-name . port) in host/port-alist 125 collect (make-instance 'crew-connection-info :host-name host-name :port port)))) 126 (make-worker-pool leader connect-infos #'connect-worker))) 128 (defun disconnect-workers (worker-pool) 129 "Closes the Swank connections of all connected workers in WORKER-POOL." 130 (with-mutex ((lock worker-pool)) 131 (when (disconnecting worker-pool) 132 (return-from disconnect-workers)) 133 ;; Signal that the worker pool is being torn down and make sure there are no idle workers 134 ;; available. After the pool is marked as disconnecting, no workers will transition to idle. 135 (setf (disconnecting worker-pool) t) 136 (setf (idle-workers worker-pool) '())) 137 (flet ((disconnect (worker) 138 (with-mutex ((lock worker)) 139 (when (%connection worker) 140 (slime-close (%connection worker)))))) 141 ;; Disconnect all workers. 142 (loop for worker across (workers worker-pool) do (disconnect worker))) 145 (defgeneric reconnect-worker (info hook) 147 "Reconnects to a Swank server using information in INFO. Passes the 148 thunk HOOK to SWANK-CLIENT:SLIME-CONNECT, so that it is invoked when 149 the connection closes.")) 151 (defmethod reconnect-worker ((connect-info crew-connection-info) close-handler) 152 (connect-worker connect-info close-handler)) 154 (defconstant +worker-reconnect-interval+ 10 "Seconds between attempts to reconnect workers.") 156 (defun reconnect-workers (worker-pool) 157 "Reconnects disconnected workers in WORKER-POOL." 159 (sleep +worker-reconnect-interval+) 160 (with-mutex ((lock worker-pool)) 161 (when (disconnecting worker-pool) 162 (return-from reconnect-workers))) 163 (loop for worker across (workers worker-pool) do 164 (let ((worker worker)) 165 (when (with-mutex ((lock worker)) 166 (unless (%connection worker) 167 (let ((close-handler (lambda () (handle-connection-closed worker worker-pool)))) 168 (setf (%connection worker) 169 (reconnect-worker (connection-info worker) close-handler))))) 170 (with-mutex ((lock worker-pool)) 171 (when (member worker (idle-workers worker-pool)) 172 ;; We reconnected an idle worker, so notify the pool that a worker is available. 173 (condition-notify (worker-ready worker-pool))))))))) 175 (defun allocate-worker (worker-pool &key worker-to-avoid) 176 "Allocates and returns an idle worker from WORKER-POOL that is connected. If 177 WORKER-POOL is being shut down, then NIL is returned." 178 (with-mutex ((lock worker-pool)) 180 (when (disconnecting worker-pool) (return nil)) 181 ;; Find a connected idle worker. 182 (let ((idle-worker (find-if (lambda (worker) 183 (unless (eql worker worker-to-avoid) 184 (with-mutex ((lock worker)) 185 (%connection worker)))) 186 (idle-workers worker-pool)))) 188 (setf (idle-workers worker-pool) (remove idle-worker (idle-workers worker-pool))) 189 (return idle-worker))) 190 (condition-wait (worker-ready worker-pool) (lock worker-pool))))) 192 (defun free-worker (worker worker-pool) 193 "Changes the state of WORKER to idle, so that it's available for allocation." 194 (with-mutex ((lock worker-pool)) 195 (unless (disconnecting worker-pool) 196 (push worker (idle-workers worker-pool)) 197 (when (%connection worker) 198 ;; The free worker is connected, so notify the pool that a worker is available. 199 (condition-notify (worker-ready worker-pool)))))) 201 (defun allocate-worker-and-evaluate (worker-pool form set-worker worker-done) 202 "Allocates a connected worker from WORKER-POOL and sends it FORM to evaluate. 203 Returns the worker that was successfully allocated, or NIL if WORKER-POOL is 204 shutting down. SET-WORKER is a function that is called to tell the WORKER-DONE 205 continuation what worker is evaluating FORM." 207 (disconnected-idle-workers '())) 209 (setf worker (allocate-worker worker-pool)) 210 (unless worker (return-from allocate-worker-and-evaluate nil)) 211 (with-mutex ((lock worker)) 212 ;; Has the connected idle worker we allocated just disconnected? 213 (let ((connection (%connection worker))) 216 (setf (set-worker worker) set-worker) 217 (funcall set-worker worker) 218 ;; Ignore network errors. If there is a network problem, socket keep alives will 219 ;; eventually discover that the connection is dead and HANDLE-CONNECTION-CLOSED 220 ;; will migrate the work to another worker. 221 (handler-case (slime-eval-async form connection worker-done) 222 (slime-network-error ())) 224 ;; The worker is disconnected, so remember it and look for another one. 225 (push worker disconnected-idle-workers))))) 226 ;; Place any disconnected workers we found back on the idle workers list. 227 (dolist (w disconnected-idle-workers) (free-worker w worker-pool)) 230 (defun handle-connection-closed (disconnected-worker worker-pool) 231 "Called when the connection to DISCONNECTED-WORKER, a member of WORKER-POOL, 232 is closed because of a call to SLIME-CLOSE, the death of the worker, or because 233 of a communications error. Moves all uncompleted work intended for 234 DISCONNECTED-WORKER to another idle connected worker in WORKER-POOL." 235 (with-mutex ((lock disconnected-worker)) 236 (let ((old-connection (%connection disconnected-worker)) 237 (disconnected-idle-workers '())) 238 (when (slime-pending-evals-p old-connection) 240 (let ((worker (allocate-worker worker-pool :worker-to-avoid disconnected-worker))) 241 (unless worker (return-from handle-connection-closed)) 242 (with-mutex ((lock worker)) 243 ;; Has the connected idle worker we allocated just disconnected? 244 (let ((connection (%connection worker))) 246 (let ((old-set-worker (set-worker disconnected-worker))) 247 ;; Migrate all pending work from DISCONNECTED-WORKER to WORKER. 248 (setf (set-worker worker) old-set-worker) 249 (funcall old-set-worker worker) 250 ;; Ignore Slime network errors. If there is a network problem, socket keep 251 ;; alives will eventually discover that the connection is dead and 252 ;; HANDLE-CONNECTION-CLOSED will migrate the pending work again. 253 (handler-case (slime-migrate-evals old-connection connection) 254 (slime-network-error ())) 256 ;; The worker is disconnected, so remember it and look for another one. 257 (push worker disconnected-idle-workers)))))) 258 ;; Place any disconnected workers we found back on the idle workers list. 259 (dolist (w disconnected-idle-workers) (free-worker w worker-pool)))) 260 ;; Allow the reconnector to see that DISCONNECTED-WORKER is dead. 261 (setf (%connection disconnected-worker) nil))) 264 (defun no-workers-p (worker-pool) 265 "Returns T if WORKER-POOL is NIL or contains no workers." 266 (or (null worker-pool) (zerop (worker-count worker-pool)))) 268 (defun eval-on-master (make-work list result-done) 269 "Iterates over the members of LIST calling MAKE-WORK on each one. If 270 RESULT-DONE is not NIL, it must be a function of two arguments. In this case, 271 after each application of MAKE-WORK to a LIST member, RESULT-DONE is called with 272 the member's position in LIST and MAKE-WORK's result." 273 (loop for position from 0 275 for result = (eval (funcall make-work element)) 277 do (when result-done (funcall result-done position result)))) 279 (defun add-evaluated-form (worker-pool form) 280 "Adds FORM to the set of forms that need to be evaluated on a new worker when 281 it joins WORKER-POOL." 282 (with-mutex ((replay-forms-lock worker-pool)) 283 (push form (replay-forms worker-pool)))) 285 (defun unevaluated-replay-forms (worker-pool-id evaluated-count) 286 "For a worker that has executed EVALUATED-COUNT of the replay forms associated 287 with the worker-pool identified by WORKER-POOL-ID, returns a list of the forms 288 the worker needs to evaluate in order to be completely up to date." 289 (let ((worker-pool (find-worker-pool worker-pool-id))) 291 (with-mutex ((replay-forms-lock worker-pool)) 292 (nthcdr evaluated-count (reverse (replay-forms worker-pool))))))) 294 (defun ensure-caught-up-then-evaluate (form worker-pool replay-required) 295 "Returns a form that when evaluated on a worker in WORKER-POOL ensures that 296 the worker is caught up then evaluates FORM and returns its result. 297 REPLAY-REQUIRED indicates whether new workers must evaluate FORM before being 298 considered to be caught up." 299 (let ((forms-count (with-mutex ((replay-forms-lock worker-pool)) 300 (length (replay-forms worker-pool)))) 301 (master-host-name (host-name (leader worker-pool))) 302 (master-swank-port (port (leader worker-pool))) 303 (worker-pool-id (id worker-pool))) 304 `(evaluate-form ',form ,master-host-name ,master-swank-port ,worker-pool-id ,forms-count 307 (defun dispatch-work (worker-pool make-work list result-done retain-workers replay-required) 308 "Traverses LIST, calling MAKE-WORK on each element to create a form that is 309 then passed to a remote worker in WORKER-POOL for evaluation. When 310 RETAIN-WORKERS is true, each form is evaluated on a unique worker. Otherwise, 311 workers are reused and may process multiple forms. In either case, the results 312 of evaluating each form are collected into a list, which is returned when every 313 remote worker is done. If RESULT-DONE is not NIL, then it must be a function of 314 two arguments. In this case RESULT-DONE is called on the master as each worker 315 returns a result. The arguments to RESULT-DONE are an integer counter, 316 indicating the work form's position in LIST, and the result of evaluating the 317 form. REPLAY-REQUIRED indicates whether new workers will have to perform the 318 work generated by MAKE-WORK before being considered to be caught up." 319 (if (no-workers-p worker-pool) 320 (eval-on-master make-work list result-done) 321 (let* ((work-count (length list)) 322 (results (make-list work-count)) 323 (done-lock (make-mutex :name "dispatch-work")) 324 (done-condition (make-gate :name "done" :open nil))) 325 (loop for element in list 326 for result-cons = results then (cdr result-cons) 328 do (let ((result-cons result-cons) 331 (flet ((set-worker (w) (setf worker w)) 332 (worker-done (result) 333 (setf (car result-cons) result) 334 ;; Unless we need each worker to evaluate exactly one form, free the 335 ;; worker, so it can process other work. 336 (unless retain-workers (free-worker worker worker-pool)) 337 (with-mutex (done-lock) 339 (when result-done (funcall result-done position result)) 340 (when (zerop work-count) 341 (condition-notify done-condition))))) 342 (let ((work (ensure-caught-up-then-evaluate (funcall make-work element) 345 (allocate-worker-and-evaluate worker-pool 349 (with-mutex (done-lock) 350 (loop until (zerop work-count) 351 do (condition-wait done-condition done-lock))) 352 ;; When we need each worker to evaluate exactly one form, we end up having allocated every 353 ;; worker in the pool, so we need to free them all. 355 (loop for worker across (workers worker-pool) do (free-worker worker worker-pool))) 358 (defun eval-form-all-workers (worker-pool form &key result-done (replay-required t)) 359 "Evaluates FORM on all workers in WORKER-POOL. When RESULT-DONE is non-NIL, 360 it must be a function of two arguments. In this case RESULT-DONE is called as 361 each worker returns a result with two arguments, a non-negative integer 362 representing the order in which the work was dispatched and the worker's result. 363 If REPLAY-REQUIRED is true, which is the default, FORM will be remembered and 364 evaluated again for side effects on any new worker that joins POOL." 365 (let* ((work-list (make-list (if (no-workers-p worker-pool) 1 (worker-count worker-pool)))) 366 (make-work (constantly form)) 367 (results (dispatch-work worker-pool make-work work-list result-done t replay-required))) 368 (when (and worker-pool replay-required) (add-evaluated-form worker-pool form)) 371 (defun parallel-mapcar (worker-pool make-work list &optional result-done) 372 "Traverses LIST, calling MAKE-WORK on each element to create a form that is 373 then passed to a remote worker in WORKER-POOL for evaluation. Results of 374 evaluating each form are collected into a list, which is returned when every 375 remote worker is done. If RESULT-DONE is provided, then it must be a function 376 of two arguments. In this case RESULT-DONE is called on the master as each 377 worker returns a result. The arguments to RESULT-DONE are the position of the 378 work in LIST and the worker's result." 379 (dispatch-work worker-pool make-work list result-done nil nil)) 381 (defun parallel-reduce (worker-pool make-work list initial-value reducer) 382 "Traverses LIST, calling MAKE-WORK on each element to create a form that is 383 then passed to a remote worker in WORKER-POOL for evaluation. As results are 384 returned, REDUCER, a binary function, is used to combine successive results in 385 the manner of REDUCE. INITIAL-VALUE is used as the starting value for the 386 reduction computation. The form 388 (parallel-reduce worker-pool make-work list initial-value reducer) 393 (mapcar (lambda (x) (eval (funcall make-work x))) list) 394 :initial-value initial-value)" 395 (if (no-workers-p worker-pool) 397 (mapcar (lambda (x) (eval (funcall make-work x))) list) 398 :initial-value initial-value) 399 (let* ((work-count (length list)) 400 (unknown-result (cons nil nil)) 401 (results (make-array (1+ work-count) :initial-element unknown-result)) 403 (reduce-result initial-value) 404 (done-lock (make-mutex :name "crew-reduce")) 405 (done-condition (make-gate :name "done" :open nil))) 406 (loop for element in list 408 do (let ((worker nil) 410 (flet ((set-worker (w) (setf worker w)) 411 (worker-done (result) 412 (free-worker worker worker-pool) 413 (with-mutex (done-lock) 414 (setf (aref results position) result) 415 (loop for next = (aref results i) 416 while (not (eq next unknown-result)) 417 do (setf reduce-result (funcall reducer reduce-result next)) 418 (setf (aref results i) nil) 421 (when (zerop work-count) 422 (condition-notify done-condition))))) 423 (let ((work (ensure-caught-up-then-evaluate (funcall make-work element) 426 (allocate-worker-and-evaluate worker-pool 430 (with-mutex (done-lock) 431 (loop until (zerop work-count) 432 do (condition-wait done-condition done-lock))) 435 (defvar *evaluation-id-lock* (make-mutex :name "eval-id") 436 "Lock protecting access to *EVALUATION-ID*.") 438 (defvar *evaluation-id* 0 "Counter used to generate a unique ID for EVALUATION instances.") 440 (defclass evaluation () 444 :initform (with-mutex (*evaluation-id-lock*) 445 (incf *evaluation-id*)) 446 :documentation "Unique ID for this running evaluation request.") 448 :initform (make-mutex :name "evaluation") 449 :documentation "Lock protecting access to DONE, DONE-CONDITION, and other mutable slots.") 453 :documentation "Is the computation done?") 454 (done-condition :reader done-condition 455 :initform (make-gate :name "done" :open nil) 456 :documentation "Condition variable notified when computation is done.")) 457 (:documentation "Stores the data needed to process incoming evaluation results.")) 459 (defclass repeated-eval (evaluation) 460 ((results :reader results 463 :initform (required-argument) 464 :documentation "Vector holding returned results.") 465 (results-position :accessor results-position 468 :documentation "Position where the next result will be recorded.")) 469 (:documentation "Stores the data needed to process an incoming repeated eval result.")) 471 (defvar *evals-lock* (make-mutex :name "evals") 472 "Lock protecting access to *EVALS*.") 474 (defvar *evals* '() "List containing an EVALUATION instance for each running computation.") 476 (defun add-eval (eval) 477 "Adds EVAL to the list of in-progress computations." 478 (with-mutex (*evals-lock*) 479 (push eval *evals*))) 481 (defun remove-eval (eval) 482 "Removes EVAL from the list of in-progress computations." 483 (with-mutex (*evals-lock*) 484 (setf *evals* (remove eval *evals*)))) 486 (defun find-eval (id) 487 "Returns the running EVAL instance identified by ID, or NIL if no 488 computation with that ID is currently running." 489 (with-mutex (*evals-lock*) 490 (find id *evals* :test #'= :key #'id))) 492 (defun record-repeated-result (id result) 493 "Stores RESULT as one of the values produced by the repeated evaluation 494 identified by ID. Returns a boolean indicating whether the worker should 495 continue to call RECORD-REPEATED-RESULT with additional results." 496 (let ((repeated-eval (find-eval id))) 499 (with-mutex ((lock repeated-eval)) 500 (let* ((results (results repeated-eval)) 501 (length (length results)) 502 (position (results-position repeated-eval))) 503 (when (< position length) 504 (setf (aref results position) result) 506 (setf (results-position repeated-eval) position)) 507 (when (>= position length) 509 (done repeated-eval) t) 510 (condition-notify (done-condition repeated-eval))))) 513 (defun repeated-work-form (form worker-pool id) 514 "Returns a form for evaluation on a client of WORKER-POOL that ensures the 515 client is caught up and then evaluates FORM for the repeated evaluation request 517 (let ((master-host-name (host-name (leader worker-pool))) 518 (master-swank-port (port (leader worker-pool)))) 519 (ensure-caught-up-then-evaluate 520 `(repeatedly-evaluate ',form ,id ,master-host-name ,master-swank-port) 524 (defun eval-form-repeatedly (worker-pool result-count form 525 &key (worker-count (when worker-pool (worker-count worker-pool)))) 526 "Evaluates FORM, which must return a function of no arguments, on WORKER-COUNT 527 workers in WORKER-POOL, then arranges for the workers to repeatedly call the 528 function to create RESULT-COUNT results, which are returned in a list. 529 WORKER-COUNT defaults to the number of workers in WORKER-POOL." 530 (if (or (no-workers-p worker-pool) (not (plusp worker-count))) 531 (let ((work-function (eval form))) 532 (loop repeat result-count collect (funcall work-function))) 533 (let* ((results (make-instance 'repeated-eval :results (make-array result-count))) 534 (results-lock (lock results)) 535 (retained-workers '())) 537 (loop repeat worker-count 538 until (with-mutex (results-lock) 540 do (let ((worker nil)) 541 (flet ((set-worker (w) (setf worker w)) 542 (worker-done (result) 543 (declare (ignore result)) 544 ;; Workers that finish before all results are available are retained, 545 ;; while those that finish later are immediately released. 546 (with-mutex (results-lock) 547 (if (not (done results)) 548 (push worker retained-workers) 549 (free-worker worker worker-pool))))) 550 (let ((work (repeated-work-form form worker-pool (id results)))) 551 (allocate-worker-and-evaluate worker-pool 555 (with-mutex (results-lock) 556 (loop until (done results) 557 do (condition-wait (done-condition results) results-lock))) 558 ;; TODO(brown): Perhaps UNWIND-PROTECT should be used to ensure the remove happens. 559 (remove-eval results) 560 ;; We can manipulate RETAINED-WORKERS without a lock because all results are available, 561 ;; so no WORKER-DONE function will modify the list. 562 (dolist (w retained-workers) (free-worker w worker-pool)) 563 (coerce (results results) 'list)))) 565 (defclass async-eval (evaluation) 566 ((results :accessor results 569 :documentation "List holding returned, but unprocessed, results.") 570 (results-available :reader results-available 571 :initform (make-gate :name "results-available" :open nil) 572 :documentation "Condition notified when new results are available.") 573 (state :accessor state 575 :initform (required-argument) 576 :documentation "State of the asynchronous computation, updated from results.") 577 (state-counter :accessor state-counter 580 :documentation "Counter incremented each time STATE is updated.")) 581 (:documentation "Data needed to process incoming asynchronous evaluation results.")) 583 (defun async-results-loop (async-eval update-state) 584 "Handles incoming results for ASYNC-EVAL by calling UPDATE-STATE whenever 585 ASYNC-EVAL holds unprocessed results. UPDATE-STATE is called with two 586 arguments, the work state and a list of the unprocessed results." 587 (let ((lock (lock async-eval)) 592 (loop until (results async-eval) 593 do (condition-wait (results-available async-eval) lock)) 594 (setf state (state async-eval) 595 results (results async-eval) 596 (results async-eval) '())) 597 (multiple-value-bind (new-state done send-state) 598 (funcall update-state state results) 600 (setf (state async-eval) new-state) 601 (when (and send-state (not done)) 602 (incf (state-counter async-eval)))) 605 (setf (done async-eval) t) 606 (condition-notify (done-condition async-eval))) 607 (return-from async-results-loop)))))) 609 (defun record-async-result (id result worker-state-counter) 610 "Stores RESULT as one of the values produced by the async evaluation 611 identified by ID. Returns a boolean indicating whether the worker should 612 continue to call ASYNC-RESULT with additional results." 616 (async-eval (find-eval id))) 618 (with-mutex ((lock async-eval)) 619 (push result (results async-eval)) 620 (setf state-counter (state-counter async-eval) 621 done (done async-eval)) 622 (when (/= worker-state-counter state-counter) 623 (setf state (state async-eval))) 624 (condition-notify (results-available async-eval)))) 625 (list (not done) state-counter state))) 627 (defun async-work-form (form initial-state worker-pool id) 628 "Returns a form for evaluation on a client of WORKER-POOL that ensures the 629 client is caught up and then evaluates FORM for the async evaluation request 631 (let ((master-host-name (host-name (leader worker-pool))) 632 (master-swank-port (port (leader worker-pool)))) 633 (ensure-caught-up-then-evaluate 634 `(async-evaluate ',form ',initial-state ,id ,master-host-name ,master-swank-port) 638 (defun eval-repeatedly-async-state (worker-pool form initial-state update-state 640 (when worker-pool (worker-count worker-pool)))) 641 "Evaluates FORM, which must return a function of one argument, on WORKER-COUNT 642 workers in WORKER-POOL, then arranges for the workers to repeatedly call the 643 work function and send its results back to the master. The work function is 644 passed a state argument, initially set to INITIAL-STATE, that the master can 645 update asynchronously as it receives new results. 647 On the master, the work state is initialized to INITIAL-STATE and UPDATE-STATE 648 is called repeatedly to process results received from the workers. UPDATE-STATE 649 is called with two arguments, the current work state and a list containing all 650 unprocessed work results. UPDATE-STATE should return three values: the new work 651 state, a boolean indicating whether the computation should end, and a boolean 652 indicating whether the latest work state should be distributed to workers. When 653 UPDATE-STATE's second return value is true, EVAL-REPEATEDLY-ASYNC-STATE tells 654 the workers to stop and returns the latest work state." 655 (if (or (no-workers-p worker-pool) (not (plusp worker-count))) 656 (let ((work-function (eval form)) 657 (state initial-state)) 658 (loop (multiple-value-bind (new-state done send-state) 659 (funcall update-state state (list (funcall work-function state))) 660 (when done (return new-state)) 661 (when send-state (setf state new-state))))) 662 (let* ((results (make-instance 'async-eval :state initial-state)) 663 (results-lock (lock results)) 664 (retained-workers '())) 666 (make-thread (lambda () (async-results-loop results update-state)) 667 :name "async results loop") 668 (loop repeat worker-count 669 until (with-mutex (results-lock) 671 do (let ((worker nil)) 672 (flet ((set-worker (w) (setf worker w)) 673 (worker-done (result) 674 (declare (ignore result)) 675 ;; Workers that finish before all results are available are retained, 676 ;; while those that finish later are immediately released. 677 (with-mutex (results-lock) 678 (if (not (done results)) 679 (push worker retained-workers) 680 (free-worker worker worker-pool))))) 681 (let ((work (async-work-form form initial-state worker-pool (id results)))) 682 (allocate-worker-and-evaluate worker-pool 686 (with-mutex (results-lock) 687 (loop until (done results) 688 do (condition-wait (done-condition results) results-lock))) 689 ;; TODO(brown): Perhaps UNWIND-PROTECT should be used to ensure the remove happens. 690 (remove-eval results) 691 ;; We can manipulate RETAINED-WORKERS without a lock because all results are available, so 692 ;; no WORKER-DONE function will modify the list. 693 (dolist (w retained-workers) (free-worker w worker-pool))