changeset 694: |
a36280d2ef4e |
parent: |
f51b73f49946
|
child: |
38e9c3be2392 |
author: |
Richard Westhaver <ellis@rwest.io> |
date: |
Thu, 03 Oct 2024 21:54:07 -0400 |
permissions: |
-rw-r--r-- |
description: |
tasks |
1 ;;; task.lisp --- Standard Task API 10 (defvar *tasks* (make-queue :name "tasks")) 13 (sb-ext:defglobal *worker-threads* nil) 14 (sb-ext:defglobal *oracles* nil) 15 (sb-ext:defglobal *oracle-threads* nil) 16 (sb-ext:defglobal *supervisor-threads* nil) 18 (eval-when (:compile-toplevel) 20 (defvar *task-result* nil)) 22 (define-condition task-error (thread-error) () 23 (:report (lambda (condition stream) 24 (format stream "Unhandled task error in thread ~A" 25 (thread-error-thread condition))))) 27 (defun task-error (thread) 28 (error 'task-error :thread thread)) 31 (defmacro gen-task-kernel (name args lock queue mailbox timeout &body body) 34 (wait-on-semaphore ,lock ,@(when timeout `((:timeout ,timeout)))) 35 (let ((*task* (dequeue ,queue))) 37 (handler-case (setf *task-result* (progn ,@body)) 38 (error () (task-error *current-thread*))) 39 (send-message ,mailbox *task-result*) 40 (release-foreground)))))) 42 (defmacro define-task-kernel (name (&key lock timeout mailbox queue) args &body body) 43 "Define a task kernel. 45 (define-task-kernel NAME (&key ARGS ACCESSORS) 47 The kernel should process all options and return a function - the 50 The kernel function is installed in worker threads by passing it to 51 SB-THREAD:MAKE-THREAD. It may accept a varying number of arguments 54 Within the BODY the variable *task* is bound to the result of (DEQUEUE QUEUE) 55 and *task-result* is bound to the return value of BODY. 57 This interface is experimental and subject to change." 58 `(gen-task-kernel ,name ,args ,lock ,queue ,mailbox ,timeout 61 (defun make-ephemeral-thread (name) 62 (sb-thread::%make-thread name t (make-semaphore :name name))) 65 (defgeneric designate-oracle (host guest)) 66 (defgeneric assign-supervisor (worker supervisor)) 68 (defgeneric make-workers (count &rest initargs &key &allow-other-keys) 69 (:method ((count number) &key thread kernel input) 71 (dotimes (i count ret) 72 (push (make-worker :thread thread :kernel kernel :input input) ret))))) 74 (defgeneric tasks (self)) 75 (defgeneric results (self)) 76 (defgeneric run (self object &rest initargs &key &allow-other-keys)) 77 (defgeneric run-object (self)) 78 (defgeneric work (self &key &allow-other-keys)) 79 (defgeneric workers (self)) 82 (defclass supervisor () 83 ((thread :initform (make-ephemeral-thread (symbol-name (gensym "supervisor"))) :accessor supervisor-thread) 86 (:documentation "Supervisors are threads which are responsible for a set of worker threads 87 within their DOMAIN and SCOPE.")) 89 (defmethod initialize-instance :after ((self supervisor) &key &allow-other-keys) 90 (push (supervisor-thread self) *supervisor-threads*)) 95 ;; (multiple-value-list (sb-unix:unix-getrusage 0)) 96 ;; (setf sb-unix::*on-dangerous-wait* :error) 98 ;; TODO 2024-10-03: with-cas-lock? 100 ((thread :initform (make-ephemeral-thread (symbol-name (gensym "worker"))) 101 :accessor worker-thread 103 (kernel :type function :accessor worker-kernel :initarg :kernel) 104 (tasks :initform nil :accessor tasks :initarg :input))) 106 (defmethod initialize-instance :after ((self worker) &key &allow-other-keys) 107 (push (worker-thread self) *worker-threads*)) 109 (defun make-worker (&key thread kernel input) 110 (apply #'make-instance 'worker 111 `(,@(when thread `(:thread ,thread)) 112 ,@(when kernel `(:kernel ,kernel)) 113 ,@(when input `(:input ,input))))) 115 ;; TODO 2024-10-03: pause/resume 117 (declaim (inline kill-worker join-worker start-worker run-worker)) 118 (defun start-worker (worker) 119 (sb-thread::start-thread (worker-thread worker) (worker-kernel worker) (tasks worker))) 121 (defun run-worker (worker &key input wait) 123 (setf (tasks worker) input)) 124 (start-worker worker) 125 (if wait (join-worker worker) 128 (defmethod run-object ((self worker)) 131 (defmethod run ((self worker) (object t) &key wait &allow-other-keys) 132 (run-worker self :input object :wait wait)) 134 (defun kill-worker (worker) 135 (declare (worker worker)) 136 (let ((th (worker-thread worker))) 137 (unwind-protect (kill-thread th) 138 (deletef *worker-threads* th)))) 140 (defun join-worker (worker) 141 (declare (worker worker)) 142 (let ((th (worker-thread worker))) 143 (unwind-protect (join-thread th) 144 (deletef *worker-threads* th)))) 147 (defstruct (oracle (:constructor %make-oracle (id thread))) 148 "Oracles provide a tagged view into some threaded scope of work." 149 (id 0 :type (unsigned-byte 32) :read-only t) 150 (thread *current-thread* :read-only t)) 152 (defun oracle-of-id (id) 153 (find id *oracles* :test '= :key 'oracle-id)) 155 (defun make-oracle (thread) 156 (let ((id (thread-os-tid thread))) 157 (if-let ((found (oracle-of-id id))) 159 (let ((orc (%make-oracle id thread))) 161 (push (oracle-thread orc) *oracle-threads*) 166 (kernel 'identity :type symbol) 168 (lock (make-semaphore :name "online") :type semaphore) 169 ;; TODO: test weak-vector here 170 (workers (make-array 0 :element-type 'worker :fill-pointer 0) :type (vector worker)) 171 (results (make-mailbox :name "results"))) 173 (defmethod tasks ((self task-pool)) (task-pool-tasks self)) 174 (defmethod results ((self task-pool)) (task-pool-results self)) 175 (defmethod workers ((self task-pool)) (task-pool-workers self)) 177 (defmethod print-object ((self task-pool) (stream t)) 178 (print-unreadable-object (self stream :type t) 179 (format stream "~A :workers ~A :tasks ~A/~A :results ~A" 180 (task-pool-kernel self) 181 (length (workers self)) 182 (queue-count (tasks self)) 183 (semaphore-count (task-pool-lock self)) 184 (mailbox-count (task-pool-results self))))) 186 (defun kill-workers (pool) 187 "Call FINISH-THREADS on task-pool's workers." 188 (dotimes (i (length (workers pool))) 189 (kill-worker (vector-pop (workers pool))))) 191 (defun worker-count (task-pool &key online) 193 (semaphore-count (task-pool-lock task-pool)) 194 (length (task-pool-workers task-pool)))) 196 (defmethod designate-oracle ((self task-pool) (guest thread)) 197 (designate-oracle self (make-oracle guest))) 199 (declaim (inline push-worker push-workers pop-worker)) 200 (defun push-worker (worker pool) 201 (vector-push-extend worker (task-pool-workers pool))) 203 (defun push-workers (threads pool) 204 (with-slots (workers) pool 206 (vector-push-extend w workers)))) 208 (defmethod pop-worker (pool) 209 (vector-pop (task-pool-workers pool))) 211 (defun start-task-worker (pool index) 212 ;; (with-recursive-lock 213 (start-worker (aref (workers pool) index))) 215 (defun start-task-workers (pool) 216 "Start all workers in the given task POOL." 217 (loop for w across (workers pool) 218 do (start-worker w))) 222 ((state :initform nil :initarg :state :accessor task-state)) 223 (:documentation "This object represents a single unit of work to be done by some 224 worker. Tasks are typically distributed from the task-pool, but workers may 225 also be granted the ability to create and distribute their own tasks. Once a 226 task is assigned, the 'owner', i.e. the worker that is assigned this task, may 227 modify the object. When the work associated with a task is complete, the owner 228 is responsible for indicating in the state slot the result of the computation.")) 230 (defmethod print-object ((self task) stream) 231 (print-unreadable-object (self stream :type t) 232 (format stream ":state ~A" (task-state self)))) 234 (defun run-task (worker task) 235 (run-worker worker :input task)) 239 ((tasks :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t) 243 (lock :initform (make-mutex :name "job") :type mutex 245 (:documentation "A collection of tasks forming a single unit of work.")) 247 (declaim (inline make-job)) 248 (defun make-job (&rest tasks) 250 :tasks (make-array (length tasks) 252 :initial-contents tasks))) 254 (defmethod print-object ((self job) stream) 255 (print-unreadable-object (self stream :type t) 256 (format stream "~A tasks" (length (tasks self))))) 258 (defun run-job (worker job) 259 (run-worker worker :input job)) 262 (defclass work-scope () 263 ((tasks :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t) 267 (lock :initform (make-mutex :name "work-scope") :initarg :lock :accessor work-scope-lock :type mutex))) 269 (defmethod print-object ((self work-scope) stream) 270 (print-unreadable-object (self stream :type t) 271 (format stream "~A" (tasks self)))) 274 (defmacro with-task-pool ((sym &key oracle (tasks 0) lock (workers 4) start kernel results) &body body) 275 (unless lock (setf lock (make-semaphore :name "online" :count workers))) 276 (unless results (setf results (make-mailbox :name "results"))) 277 `(let ((,sym (make-task-pool :lock ,lock :results ,results 283 :initial-element (make-instance 'task)))))) 284 ,@(if kernel `((setf (task-pool-kernel ,sym) ,kernel)) 285 `((setf (task-pool-kernel ,sym) 286 (gen-task-kernel (gensym "TASK-KERNEL") () 287 (task-pool-lock ,sym) 291 (loop for i below ,workers 292 do (push-worker (make-worker :kernel (task-pool-kernel ,sym)) ,sym)) 293 ,@(when oracle `((designate-oracle ,sym ,oracle))) 294 ,@(when start `((start-task-workers ,sym)))