changeset 696: |
38e9c3be2392 |
parent: |
a36280d2ef4e
|
author: |
Richard Westhaver <ellis@rwest.io> |
date: |
Fri, 04 Oct 2024 21:11:52 -0400 |
permissions: |
-rw-r--r-- |
description: |
prep for adding zdict wrapper, change default control stack size of inferior-lisp to 8M |
1 ;;; task.lisp --- Standard Task API 10 (defvar *tasks* (make-queue :name "tasks")) 13 (sb-ext:defglobal *worker-threads* nil) 14 (sb-ext:defglobal *oracles* nil) 15 (sb-ext:defglobal *oracle-threads* nil) 16 (sb-ext:defglobal *supervisor-threads* nil) 20 (defvar *task-result* nil)) 22 (define-condition task-error (thread-error) () 23 (:report (lambda (condition stream) 24 (format stream "Unhandled task error in thread ~A" 25 (thread-error-thread condition))))) 27 (defun task-error (thread) 28 (error 'task-error :thread thread)) 31 (defmacro gen-task-kernel (name args lock queue mailbox timeout &body body) 34 (wait-on-semaphore ,lock ,@(when timeout `((:timeout ,timeout)))) 35 (let ((*task* (dequeue ,queue))) 37 (handler-case (setf *task-result* (progn ,@body)) 38 (error () (task-error *current-thread*))) 39 (send-message ,mailbox *task-result*) 40 (release-foreground)))))) 42 (defmacro define-task-kernel (name (&key lock timeout mailbox queue) args &body body) 43 "Define a task kernel. 45 (define-task-kernel NAME (&key ARGS ACCESSORS) 47 The kernel should process all options and return a function - the 50 The kernel function is installed in worker threads by passing it to 51 SB-THREAD:MAKE-THREAD. It may accept a varying number of arguments 54 Within the BODY the variable *task* is bound to the result of (DEQUEUE QUEUE) 55 and *task-result* is bound to the return value of BODY. 57 This interface is experimental and subject to change." 58 `(gen-task-kernel ,name ,args ,lock ,queue ,mailbox ,timeout 61 (defun make-ephemeral-thread (name) 62 (sb-thread::%make-thread name t (make-semaphore :name name))) 65 (defgeneric designate-oracle (host guest)) 66 (defgeneric assign-supervisor (worker supervisor)) 68 (defgeneric make-workers (count &rest initargs &key &allow-other-keys) 69 (:method ((count number) &key thread kernel input) 71 (dotimes (i count ret) 72 (push (make-worker :thread thread :kernel kernel :input input) ret))))) 74 (defgeneric tasks (self)) 75 (defgeneric results (self)) 76 (defgeneric run-object (self)) 77 (defgeneric work (self &key &allow-other-keys)) 78 (defgeneric workers (self)) 81 (defclass supervisor () 82 ((thread :initform (make-ephemeral-thread (symbol-name (gensym "supervisor"))) :accessor supervisor-thread) 85 (:documentation "Supervisors are threads which are responsible for a set of worker threads 86 within their DOMAIN and SCOPE.")) 88 (defmethod initialize-instance :after ((self supervisor) &key &allow-other-keys) 89 (push (supervisor-thread self) *supervisor-threads*)) 94 ;; (multiple-value-list (sb-unix:unix-getrusage 0)) 95 ;; (setf sb-unix::*on-dangerous-wait* :error) 97 ;; TODO 2024-10-03: with-cas-lock? 99 ((thread :initform (make-ephemeral-thread (symbol-name (gensym "worker"))) 100 :accessor worker-thread 102 (kernel :type function :accessor worker-kernel :initarg :kernel) 103 (tasks :initform nil :accessor tasks :initarg :input))) 105 (defmethod initialize-instance :after ((self worker) &key &allow-other-keys) 106 (push (worker-thread self) *worker-threads*)) 108 (defun make-worker (&key thread kernel input) 109 (apply #'make-instance 'worker 110 `(,@(when thread `(:thread ,thread)) 111 ,@(when kernel `(:kernel ,kernel)) 112 ,@(when input `(:input ,input))))) 114 ;; TODO 2024-10-03: pause/resume 116 (declaim (inline kill-worker join-worker start-worker run-worker)) 117 (defun start-worker (worker) 118 (sb-thread::start-thread (worker-thread worker) (worker-kernel worker) (tasks worker))) 120 (defun run-worker (worker &key input wait) 122 (setf (tasks worker) input)) 123 (start-worker worker) 124 (if wait (join-worker worker) 127 (defmethod run-object ((self worker)) 130 (defun run-with-worker (worker object &key wait) 131 (run-worker worker :input object :wait wait)) 133 (defun kill-worker (worker) 134 (declare (worker worker)) 135 (let ((th (worker-thread worker))) 136 (unwind-protect (kill-thread th) 137 (deletef *worker-threads* th)))) 139 (defun join-worker (worker) 140 (declare (worker worker)) 141 (let ((th (worker-thread worker))) 142 (unwind-protect (join-thread th) 143 (deletef *worker-threads* th)))) 146 (defstruct (oracle (:constructor %make-oracle (id thread))) 147 "Oracles provide a tagged view into some threaded scope of work." 148 (id 0 :type (unsigned-byte 32) :read-only t) 149 (thread *current-thread* :read-only t)) 151 (defun oracle-of-id (id) 152 (find id *oracles* :test '= :key 'oracle-id)) 154 (defun make-oracle (thread) 155 (let ((id (thread-os-tid thread))) 156 (if-let ((found (oracle-of-id id))) 158 (let ((orc (%make-oracle id thread))) 160 (push (oracle-thread orc) *oracle-threads*) 165 (kernel 'identity :type symbol) 167 (lock (make-semaphore :name "online") :type semaphore) 168 ;; TODO: test weak-vector here 169 (workers (make-array 0 :element-type 'worker :fill-pointer 0) :type (vector worker)) 170 (results (make-mailbox :name "results"))) 172 (defmethod tasks ((self task-pool)) (task-pool-tasks self)) 173 (defmethod results ((self task-pool)) (task-pool-results self)) 174 (defmethod workers ((self task-pool)) (task-pool-workers self)) 176 (defmethod print-object ((self task-pool) (stream t)) 177 (print-unreadable-object (self stream :type t) 178 (format stream "~A :workers ~A :tasks ~A/~A :results ~A" 179 (task-pool-kernel self) 180 (length (workers self)) 181 (queue-count (tasks self)) 182 (semaphore-count (task-pool-lock self)) 183 (mailbox-count (task-pool-results self))))) 185 (defun kill-workers (pool) 186 "Call FINISH-THREADS on task-pool's workers." 187 (dotimes (i (length (workers pool))) 188 (kill-worker (vector-pop (workers pool))))) 190 (defun worker-count (task-pool &key online) 192 (semaphore-count (task-pool-lock task-pool)) 193 (length (task-pool-workers task-pool)))) 195 (defmethod designate-oracle ((self task-pool) (guest thread)) 196 (designate-oracle self (make-oracle guest))) 198 (declaim (inline push-worker push-workers pop-worker)) 199 (defun push-worker (worker pool) 200 (vector-push-extend worker (task-pool-workers pool))) 202 (defun push-workers (threads pool) 203 (with-slots (workers) pool 205 (vector-push-extend w workers)))) 207 (defmethod pop-worker (pool) 208 (vector-pop (task-pool-workers pool))) 210 (defun start-task-worker (pool index) 211 ;; (with-recursive-lock 212 (start-worker (aref (workers pool) index))) 214 (defun start-task-workers (pool) 215 "Start all workers in the given task POOL." 216 (loop for w across (workers pool) 217 do (start-worker w))) 221 ((state :initform nil :initarg :state :accessor task-state)) 222 (:documentation "This object represents a single unit of work to be done by some 223 worker. Tasks are typically distributed from the task-pool, but workers may 224 also be granted the ability to create and distribute their own tasks. Once a 225 task is assigned, the 'owner', i.e. the worker that is assigned this task, may 226 modify the object. When the work associated with a task is complete, the owner 227 is responsible for indicating in the state slot the result of the computation.")) 229 (defmethod print-object ((self task) stream) 230 (print-unreadable-object (self stream :type t) 231 (format stream ":state ~A" (task-state self)))) 233 (defun run-task (worker task) 234 (run-worker worker :input task)) 238 ((tasks :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t) 242 (lock :initform (make-mutex :name "job") :type mutex 244 (:documentation "A collection of tasks forming a single unit of work.")) 246 (declaim (inline make-job)) 247 (defun make-job (&rest tasks) 249 :tasks (make-array (length tasks) 251 :initial-contents tasks))) 253 (defmethod print-object ((self job) stream) 254 (print-unreadable-object (self stream :type t) 255 (format stream "~A tasks" (length (tasks self))))) 257 (defun run-job (worker job) 258 (run-worker worker :input job)) 261 (defclass work-scope () 262 ((tasks :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t) 266 (lock :initform (make-mutex :name "work-scope") :initarg :lock :accessor work-scope-lock :type mutex))) 268 (defmethod print-object ((self work-scope) stream) 269 (print-unreadable-object (self stream :type t) 270 (format stream "~A" (tasks self)))) 273 (defmacro with-task-pool ((sym &key oracle (tasks 0) lock (workers 4) start kernel results) &body body) 274 (unless lock (setf lock (make-semaphore :name "online" :count workers))) 275 (unless results (setf results (make-mailbox :name "results"))) 276 `(let ((,sym (make-task-pool :lock ,lock :results ,results 282 :initial-element (make-instance 'task)))))) 283 ,@(if kernel `((setf (task-pool-kernel ,sym) ,kernel)) 284 `((setf (task-pool-kernel ,sym) 285 (gen-task-kernel (gensym "TASK-KERNEL") () 286 (task-pool-lock ,sym) 290 (loop for i below ,workers 291 do (push-worker (make-worker :kernel (task-pool-kernel ,sym)) ,sym)) 292 ,@(when oracle `((designate-oracle ,sym ,oracle))) 293 ,@(when start `((start-task-workers ,sym)))