changeset 692: |
f51b73f49946 |
parent: |
295ea43ceb2d
|
child: |
a36280d2ef4e |
author: |
Richard Westhaver <ellis@rwest.io> |
date: |
Thu, 03 Oct 2024 17:56:11 -0400 |
permissions: |
-rw-r--r-- |
description: |
std/task and tests |
1 ;;; task.lisp --- Standard Task API 10 (defvar *tasks* (make-queue :name "tasks")) 13 (defvar *oracles* nil) 14 (defvar *task-oracles* nil) 15 (eval-when (:compile-toplevel) 17 (defvar *task-result* nil)) 19 (define-condition task-error (thread-error) () 20 (:report (lambda (condition stream) 21 (format stream "Unhandled task error in thread ~A" 22 (thread-error-thread condition))))) 24 (defun task-error (thread) 25 (error 'task-error :thread thread)) 28 (defmacro gen-task-kernel (name args lock queue mailbox timeout &body body) 31 (wait-on-semaphore ,lock ,@(when timeout `((:timeout ,timeout)))) 32 (let ((*task* (dequeue ,queue))) 34 (handler-case (setf *task-result* (progn ,@body)) 35 (error () (task-error *current-thread*))) 36 (send-message ,mailbox *task-result*) 37 (release-foreground)))))) 39 (defmacro define-task-kernel (name (&key lock timeout mailbox queue) args &body body) 40 "Define a task kernel. 42 (define-task-kernel NAME (&key ARGS ACCESSORS) 44 The kernel should process all options and return a function - the 47 The kernel function is installed in worker threads by passing it to 48 SB-THREAD:MAKE-THREAD. It may accept a varying number of arguments 51 Within the BODY the variable *task* is bound to the result of (DEQUEUE QUEUE) 52 and *task-result* is bound to the return value of BODY. 54 This interface is experimental and subject to change." 55 `(gen-task-kernel ,name ,args ,lock ,queue ,mailbox ,timeout 59 (defclass supervisor () 61 (:documentation "A class which provides a view of the work done within a specified 64 This object should be used by operators to inspect 'runstreams' 65 performed in other threads, such as WORKERS in TASK-POOL. 67 Before using this object you should ensure the SCOPE is fully 68 initialized. Supervisors should be created at any point during the 69 lifetime of SCOPE, but never before and never after.")) 74 ;; (multiple-value-list (sb-unix:unix-getrusage 0)) 75 ;; (setf sb-unix::*on-dangerous-wait* :error) 76 (defvar *default-worker-name* "worker") 79 ((thread :initform (sb-thread::%make-thread #.#1=(symbol-name (gensym "w")) t (make-semaphore :name #.#1#)) 80 :accessor worker-thread 82 (kernel :type function :accessor worker-kernel :initarg :kernel) 83 (input :initform nil :accessor worker-input :initarg :input))) 85 (defvar *workers* (make-array 0 :element-type 'worker :adjustable t)) 87 (declaim (inline kill-worker join-worker)) 88 (defun start-worker (worker) 89 (sb-thread::start-thread (worker-thread worker) (worker-kernel worker) (worker-input worker))) 90 (defun kill-worker (worker) (kill-thread (worker-thread worker))) 91 (defun join-worker (worker) (join-thread (worker-thread worker))) 94 (defstruct (oracle (:constructor %make-oracle (id thread))) 95 (id 0 :type (unsigned-byte 32) :read-only t) 96 (thread *current-thread* :read-only t)) 98 (defun oracle-of-id (id) 99 (find id *oracles* :test '= :key 'oracle-id)) 101 (defun make-oracle (thread) 102 (let ((id (thread-os-tid thread))) 103 (if-let ((found (oracle-of-id id))) 105 (let ((orc (%make-oracle id thread))) 111 (defgeneric designate-oracle (host guest)) 113 (defun make-worker (&key thread kernel input) 114 (apply #'make-instance 'worker 115 `(,@(when thread `(:thread ,thread)) 116 ,@(when kernel `(:kernel ,kernel)) 117 ,@(when input `(:input ,input))))) 119 (defgeneric make-workers (count &rest initargs &key &allow-other-keys) 120 (:method ((count number) &key thread kernel input) 122 (dotimes (i count ret) 123 (push (make-worker :thread thread :kernel kernel :input input) ret))))) 125 (defgeneric delete-worker (worker pool &key &allow-other-keys)) 126 (defgeneric spawn-worker (pool worker)) 129 (defgeneric make-job (self &key &allow-other-keys)) 130 (defgeneric find-job (job pool &key &allow-other-keys)) 131 (defgeneric run-job (self job)) 132 (defgeneric run-jobs (self)) 134 (defgeneric tasks (self)) 135 (defgeneric run-task (self task)) 136 (defgeneric run-tasks (self)) 137 (defgeneric results (self)) 139 (defgeneric run-stage (self stage)) 140 (defgeneric workers (self)) 144 (kernel 'identity :type symbol) 146 (lock (make-semaphore :name "online") :type semaphore) 147 ;; TODO: test weak-vector here 148 (workers (make-array 0 :element-type 'worker :fill-pointer 0) :type (vector worker)) 149 (results (make-mailbox :name "results"))) 151 (defmethod tasks ((self task-pool)) (task-pool-tasks self)) 152 (defmethod results ((self task-pool)) (task-pool-results self)) 153 (defmethod workers ((self task-pool)) (task-pool-workers self)) 155 (defmethod print-object ((self task-pool) (stream t)) 156 (print-unreadable-object (self stream :type t) 157 (format stream "~A :workers ~A :tasks ~A/~A :results ~A" 158 (task-pool-kernel self) 159 (length (workers self)) 160 (queue-count (tasks self)) 161 (semaphore-count (task-pool-lock self)) 162 (mailbox-count (task-pool-results self))))) 164 (defun kill-workers (pool) 165 "Call FINISH-THREADS on task-pool's workers." 166 (dotimes (i (length (workers pool))) 167 (kill-worker (vector-pop (workers pool))))) 169 (defun worker-count (task-pool &key online) 171 (semaphore-count (task-pool-lock task-pool)) 172 (length (task-pool-workers task-pool)))) 174 (defmethod designate-oracle ((self task-pool) (guest thread)) 175 (designate-oracle self (make-oracle guest))) 177 (declaim (inline push-worker push-workers pop-worker)) 178 (defun push-worker (worker pool) 179 (vector-push-extend worker (task-pool-workers pool))) 181 (defun push-workers (threads pool) 182 (with-slots (workers) pool 184 (vector-push-extend w workers)))) 186 (defmethod pop-worker (pool) 187 (vector-pop (task-pool-workers pool))) 189 (defun start-task-worker (pool index) 190 ;; (with-recursive-lock 191 (start-worker (aref (workers pool) index))) 193 (defun start-task-workers (pool) 194 "Start all workers in the given task POOL." 195 (loop for w across (workers pool) 196 do (start-worker w))) 200 ((state :initform nil :initarg :state :accessor task-state)) 201 (:documentation "This object represents a single unit of work to be done by some 202 worker. Tasks are typically generated by an oracle, but workers may also be 203 granted the ability to create and distribute their own tasks. Once a task is 204 assigned, the 'owner', i.e. the worker that is assigned this task, may modify 205 the object and state. When the work associated with a task is complete, the 206 owner is responsible for indicating in the state slot the result of the 209 (defmethod print-object ((self task) stream) 210 (print-unreadable-object (self stream :type t) 211 (format stream ":state ~A" (task-state self)))) 213 (defmethod run-task ((self thread) (task task))) 216 (defstruct (job (:constructor %make-job (tasks))) 217 "A collection of tasks to be performed by worker threads." 218 (tasks (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t) 219 :type (array task *)) 220 (lock (make-mutex :name "job") :type mutex)) 222 (defmethod tasks ((self job)) (job-tasks self)) 224 (defmethod make-job ((self task) &key (size 1)) 225 (%make-job (make-array size :element-type 'task 226 :initial-element self))) 228 (defmethod make-job ((self vector) &key) 231 (defmethod make-job ((self null) &key (size 1)) 232 (%make-job (make-array size :element-type 'task :fill-pointer 0 :adjustable t))) 234 (defmethod print-object ((self job) stream) 235 (print-unreadable-object (self stream :type t) 236 (format stream "~A" (job-tasks self)))) 240 ((jobs :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t) 244 (lock :initform (make-mutex :name "stage") :initarg :lock :accessor stage-lock :type mutex))) 246 (defmethod print-object ((self stage) stream) 247 (print-unreadable-object (self stream :type t) 248 (format stream "~A" (jobs self)))) 250 (defmethod run-stage ((self thread) (stage stage))) 253 (defmacro with-task-pool ((sym &key oracle (tasks 0) lock (workers 4) start kernel results) &body body) 254 (unless lock (setf lock (make-semaphore :name "online" :count workers))) 255 (unless results (setf results (make-mailbox :name "results"))) 256 `(let ((,sym (make-task-pool :lock ,lock :results ,results 262 :initial-element (make-instance 'task)))))) 263 ,@(if kernel `((setf (task-pool-kernel ,sym) ,kernel)) 264 `((setf (task-pool-kernel ,sym) 265 (gen-task-kernel (gensym "TASK-KERNEL") () 266 (task-pool-lock ,sym) 270 (loop for i below ,workers 271 do (push-worker (make-worker :kernel (task-pool-kernel ,sym)) ,sym)) 272 ,@(when oracle `((designate-oracle ,sym ,oracle))) 273 ,@(when start `((start-task-workers ,sym)))