changeset 691: |
295ea43ceb2d |
parent: |
ec1d4d544c36
|
child: |
f51b73f49946 |
author: |
Richard Westhaver <ellis@rwest.io> |
date: |
Wed, 02 Oct 2024 23:39:07 -0400 |
permissions: |
-rw-r--r-- |
description: |
tasks |
1 ;;; task.lisp --- Standard Task API 14 (defvar *oracles* nil) 17 (defmacro parse-kernel-ops (op) 18 "Parse an op of the form (NAME ARGS &BODY BODY)" 19 (destructuring-bind (name args &body body) op 20 `(std/macs:named-lambda ,name ,args ,@body))) 22 (defmacro define-task-kernel (name (&key lock timeout mailbox queue) args &body body) 23 "Define a task kernel. 25 (define-task-kernel NAME (&key ARGS ACCESSORS) 27 The kernel should process all options and return a function - the 30 The kernel function is installed in worker threads by passing it to 31 SB-THREAD:MAKE-THREAD. It may accept a varying number of arguments 34 ACCESSORS is a list of pandoric accessors which can be called on the 37 This interface is experimental and subject to change." 38 `(gen-task-kernel ,name ,args ,lock ,queue ,mailbox ,timeout 43 (defvar *task-result*) 46 (defmacro gen-task-kernel (name args lock queue mailbox timeout &body body) 49 (wait-on-semaphore ,lock ,@(when timeout `((:timeout ,timeout)))) 50 (let* ((*task-queue* ,queue) 51 (*task-item* (dequeue ,queue))) 53 (send-message ,mailbox t))))) 56 (defclass supervisor () 58 (:documentation "A class which provides a view of the work done within a specified 61 This object should be used by operators to inspect 'runstreams' 62 performed in other threads, such as WORKERS in TASK-POOL. 64 Before using this object you should ensure the SCOPE is fully 65 initialized. Supervisors should be created at any point during the 66 lifetime of SCOPE, but never before and never after.")) 71 ;; (multiple-value-list (sb-unix:unix-getrusage 0)) 72 ;; (setf sb-unix::*on-dangerous-wait* :error) 73 (defvar *default-worker-name* "worker") 76 ((thread :initform (sb-thread::%make-thread #.#1=(symbol-name (gensym "w")) t (make-semaphore :name #.#1#)) 77 :accessor worker-thread 79 (function :type function :accessor worker-function :initarg :function) 80 (arguments :type list :accessor worker-arguments :initarg :arguments))) 83 (defstruct (oracle (:constructor %make-oracle (id thread))) 84 (id 0 :type (unsigned-byte 32) :read-only t) 85 (thread *current-thread* :read-only t)) 87 (defun find-oracle (id) 88 (find id *oracles* :test '= :key 'oracle-id)) 90 (defun make-oracle (thread) 91 (let* ((id (thread-os-tid thread))) 92 (if-let ((found (find-oracle id))) 94 (let ((orc (%make-oracle id thread))) 100 (defgeneric designate-oracle (host guest)) 102 (defgeneric make-worker (self &rest initargs &key &allow-other-keys) 103 (:method ((self t) &key thread function arguments) 104 (declare (ignore self)) 105 (apply #'make-instance 'worker 106 `(,@(when thread `(:thread ,thread)) 107 ,@(when function `(:function ,function)) 108 ,@(when arguments `(:arguments ,arguments)))))) 109 (defgeneric make-workers (self count &rest initargs &key &allow-other-keys) 110 (:method ((self t) (count t) &key thread function arguments) 112 (dotimes (i count ret) 113 (push (make-worker t :thread thread :function function :arguments arguments) ret))))) 114 (defgeneric delete-worker (worker pool &key &allow-other-keys)) 115 (defgeneric pop-worker (pool)) 116 (defgeneric make-worker-for (pool function &rest args) 117 (:method ((pool null) (function function) &rest args) 118 (declare (ignore pool)) 119 (make-worker t :function function :arguments args))) 120 (defgeneric make-workers-for (pool count function) 121 (:method ((pool null) (count fixnum) (function function)) 122 (declare (ignore pool)) 123 (make-workers t count :function function))) 124 (defgeneric spawn-worker (pool) 125 (:method ((pool null)) 126 (declare (ignore pool)) 127 (make-worker t :function (default-task-kernel)))) 128 (defgeneric spawn-workers (pool count) 129 (:method ((pool null) (count fixnum)) 130 (declare (ignore pool)) 131 (make-workers t count :function (default-task-kernel)))) 134 (defgeneric make-job (self &key &allow-other-keys)) 135 (defgeneric find-job (job pool &key &allow-other-keys)) 136 (defgeneric run-job (self job)) 137 (defgeneric run-jobs (self)) 139 (defgeneric tasks (self)) 140 (defgeneric make-task (&rest args &key &allow-other-keys)) 141 (defgeneric run-task (self task)) 142 (defgeneric run-tasks (self)) 143 (defgeneric results (self)) 145 (defgeneric run-stage (self stage)) 146 (defgeneric workers (self)) 150 (kernel 'identity :type symbol) 151 (tasks (make-queue :name "tasks")) 152 (lock (make-semaphore :name "online") :type semaphore) 153 ;; TODO: test weak-vector here 154 (workers (make-array 0 :element-type 'worker :fill-pointer 0) :type (vector thread)) 155 (results (make-mailbox :name "results"))) 157 (defmethod tasks ((self task-pool)) (task-pool-tasks self)) 158 (defmethod results ((self task-pool)) (task-pool-results self)) 159 (defmethod workers ((self task-pool)) (task-pool-workers self)) 161 (defmethod print-object ((self task-pool) (stream t)) 162 (print-unreadable-object (self stream :type t) 163 (format stream "~A ~A:~A:~A ~A" 164 (task-pool-kernel self) 165 (length (workers self)) 166 (semaphore-count (task-pool-lock self)) 167 (queue-count (tasks self)) 168 (mailbox-count (task-pool-results self))))) 170 (defmethod designate-oracle ((self task-pool) (guest integer)) 171 (setf (task-pool-oracle-id self) (make-oracle (find-thread-by-id guest))) 174 (defun worker-count (task-pool &key online) 176 (semaphore-count (task-pool-online task-pool)) 177 (length (task-pool-workers task-pool)))) 179 (defmethod designate-oracle ((self task-pool) (guest thread)) 180 (designate-oracle self (make-oracle guest))) 182 (defmethod task-pool-oracle ((self task-pool)) 183 (oracle-thread (find-oracle (slot-value self 'oracle)))) 185 (declaim (inline push-worker push-workers pop-worker)) 186 (defun push-worker (worker pool) 187 (vector-push-extend worker (task-pool-workers pool))) 189 (defun push-workers (threads pool) 190 (with-slots (workers) pool 192 (vector-push-extend w workers)))) 194 (defmethod pop-worker (pool) 195 (vector-pop (task-pool-workers pool))) 197 (defmethod make-worker-for ((pool task-pool) function &rest args) 198 (make-thread function :name *default-worker-name* :arguments args)) 200 (defmethod make-workers-for ((pool task-pool) (count fixnum) function) 201 (make-threads count function :name *default-worker-name*)) 203 (defmethod spawn-worker ((pool task-pool)) 204 ;; (with-recursive-lock 205 (push-worker (make-worker-for pool (task-pool-kernel pool)) pool)) 207 (defmethod spawn-workers ((pool task-pool) (count fixnum)) 208 (push-workers (make-workers-for pool count (task-pool-kernel pool)) pool)) 212 ((state :initarg :state :accessor task-state)) 213 (:documentation "This object represents a single unit of work to be done by some 214 worker. Tasks are typically generated by an oracle, but workers may also be 215 granted the ability to create and distribute their own tasks. Once a task is 216 assigned, the 'owner', i.e. the worker that is assigned this task, may modify 217 the object and state. When the work associated with a task is complete, the 218 owner is responsible for indicating in the state slot the result of the 221 (defmethod print-object ((self task) stream) 222 (print-unreadable-object (self stream :type t) 223 (format stream "~A" (task-object self)))) 225 (defmethod push-task-result ((task task) (pool task-pool)) 226 (send-message (task-pool-results pool) task)) 228 (defmethod run-task ((self thread) (task task))) 231 (defstruct (job (:constructor %make-job (tasks))) 232 "A collection of tasks to be performed by worker threads." 233 (tasks (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t) 234 :type (array task *)) 235 (lock (make-mutex :name "job") :type mutex)) 237 (defmethod tasks ((self job)) (job-tasks self)) 239 (defmethod make-job ((self task) &key (size 1)) 240 (%make-job (make-array size :element-type 'task 241 :initial-element self))) 243 (defmethod make-job ((self vector) &key) 246 (defmethod make-job ((self null) &key (size 1)) 247 (%make-job (make-array size :element-type 'task :fill-pointer 0 :adjustable t))) 249 (defmethod print-object ((self job) stream) 250 (print-unreadable-object (self stream :type t) 251 (format stream "~A" (job-tasks self)))) 255 ((jobs :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t) 259 (lock :initform (make-mutex :name "stage") :initarg :lock :accessor stage-lock :type mutex))) 261 (defmethod print-object ((self stage) stream) 262 (print-unreadable-object (self stream :type t) 263 (format stream "~A" (jobs self)))) 265 (defmethod run-stage ((self thread) (stage stage))) 268 (defmacro with-task-pool ((sym &key oracle lock (count 4) spawn kernel results) &body body) 269 (unless lock (setf lock (make-semaphore :name "online" :count count))) 270 (unless results (setf results (make-mailbox :name "results"))) 271 `(let ((,sym (make-task-pool :lock ,lock :results ,results))) 272 ,@(if kernel `((setf (task-pool-kernel ,sym) ,kernel)) 273 `((setf (task-pool-kernel ,sym) 274 (gen-task-kernel '%kernel () 275 (task-pool-lock ,sym) 279 (designate-oracle ,sym ,@(if oracle (list oracle) `((make-oracle *current-thread*)))) 280 ,@(when spawn `((spawn-workers ,sym ,spawn)))