changelog shortlog graph tags branches changeset files revisions annotate raw help

Mercurial > core / lisp/std/task.lisp

changeset 692: f51b73f49946
parent: 295ea43ceb2d
child: a36280d2ef4e
author: Richard Westhaver <ellis@rwest.io>
date: Thu, 03 Oct 2024 17:56:11 -0400
permissions: -rw-r--r--
description: std/task and tests
1 ;;; task.lisp --- Standard Task API
2 
3 ;;
4 
5 ;;; Code:
6 (in-package :std/task)
7 
8 ;;; Vars
9 (defvar *task-pool*)
10 (defvar *tasks* (make-queue :name "tasks"))
11 (defvar *jobs*)
12 (defvar *stages*)
13 (defvar *oracles* nil)
14 (defvar *task-oracles* nil)
15 (eval-when (:compile-toplevel)
16  (defvar *task*)
17  (defvar *task-result* nil))
18 
19 (define-condition task-error (thread-error) ()
20  (:report (lambda (condition stream)
21  (format stream "Unhandled task error in thread ~A"
22  (thread-error-thread condition)))))
23 
24 (defun task-error (thread)
25  (error 'task-error :thread thread))
26 
27 ;;; Kernel
28 (defmacro gen-task-kernel (name args lock queue mailbox timeout &body body)
29  `(compile ,name
30  (lambda ,args
31  (wait-on-semaphore ,lock ,@(when timeout `((:timeout ,timeout))))
32  (let ((*task* (dequeue ,queue)))
33  (unwind-protect
34  (handler-case (setf *task-result* (progn ,@body))
35  (error () (task-error *current-thread*)))
36  (send-message ,mailbox *task-result*)
37  (release-foreground))))))
38 
39 (defmacro define-task-kernel (name (&key lock timeout mailbox queue) args &body body)
40  "Define a task kernel.
41 
42 (define-task-kernel NAME (&key ARGS ACCESSORS)
43 
44 The kernel should process all options and return a function - the
45 'kernel function'.
46 
47 The kernel function is installed in worker threads by passing it to
48 SB-THREAD:MAKE-THREAD. It may accept a varying number of arguments
49 specified by ARGS.
50 
51 Within the BODY the variable *task* is bound to the result of (DEQUEUE QUEUE)
52 and *task-result* is bound to the return value of BODY.
53 
54 This interface is experimental and subject to change."
55  `(gen-task-kernel ,name ,args ,lock ,queue ,mailbox ,timeout
56  ,@body))
57 
58 ;;; Supervisor
59 (defclass supervisor ()
60  (scope)
61  (:documentation "A class which provides a view of the work done within a specified
62 SCOPE.
63 
64 This object should be used by operators to inspect 'runstreams'
65 performed in other threads, such as WORKERS in TASK-POOL.
66 
67 Before using this object you should ensure the SCOPE is fully
68 initialized. Supervisors should be created at any point during the
69 lifetime of SCOPE, but never before and never after."))
70 
71 ;;; Worker
72 ;; unix-getrusage
73 ;; 0,-1,-2
74 ;; (multiple-value-list (sb-unix:unix-getrusage 0))
75 ;; (setf sb-unix::*on-dangerous-wait* :error)
76 (defvar *default-worker-name* "worker")
77 
78 (defclass worker ()
79  ((thread :initform (sb-thread::%make-thread #.#1=(symbol-name (gensym "w")) t (make-semaphore :name #.#1#))
80  :accessor worker-thread
81  :initarg :thread)
82  (kernel :type function :accessor worker-kernel :initarg :kernel)
83  (input :initform nil :accessor worker-input :initarg :input)))
84 
85 (defvar *workers* (make-array 0 :element-type 'worker :adjustable t))
86 
87 (declaim (inline kill-worker join-worker))
88 (defun start-worker (worker)
89  (sb-thread::start-thread (worker-thread worker) (worker-kernel worker) (worker-input worker)))
90 (defun kill-worker (worker) (kill-thread (worker-thread worker)))
91 (defun join-worker (worker) (join-thread (worker-thread worker)))
92 
93 ;;; Oracle
94 (defstruct (oracle (:constructor %make-oracle (id thread)))
95  (id 0 :type (unsigned-byte 32) :read-only t)
96  (thread *current-thread* :read-only t))
97 
98 (defun oracle-of-id (id)
99  (find id *oracles* :test '= :key 'oracle-id))
100 
101 (defun make-oracle (thread)
102  (let ((id (thread-os-tid thread)))
103  (if-let ((found (oracle-of-id id)))
104  (values id found)
105  (let ((orc (%make-oracle id thread)))
106  (push orc *oracles*)
107  (values id orc)))))
108 
109 ;;; Proto
110 ;; oracle
111 (defgeneric designate-oracle (host guest))
112 ;; worker
113 (defun make-worker (&key thread kernel input)
114  (apply #'make-instance 'worker
115  `(,@(when thread `(:thread ,thread))
116  ,@(when kernel `(:kernel ,kernel))
117  ,@(when input `(:input ,input)))))
118 
119 (defgeneric make-workers (count &rest initargs &key &allow-other-keys)
120  (:method ((count number) &key thread kernel input)
121  (let ((ret))
122  (dotimes (i count ret)
123  (push (make-worker :thread thread :kernel kernel :input input) ret)))))
124 
125 (defgeneric delete-worker (worker pool &key &allow-other-keys))
126 (defgeneric spawn-worker (pool worker))
127 
128 ;; job
129 (defgeneric make-job (self &key &allow-other-keys))
130 (defgeneric find-job (job pool &key &allow-other-keys))
131 (defgeneric run-job (self job))
132 (defgeneric run-jobs (self))
133 ;; task
134 (defgeneric tasks (self))
135 (defgeneric run-task (self task))
136 (defgeneric run-tasks (self))
137 (defgeneric results (self))
138 ;; stage
139 (defgeneric run-stage (self stage))
140 (defgeneric workers (self))
141 
142 ;;; Task Pool
143 (defstruct task-pool
144  (kernel 'identity :type symbol)
145  (tasks *tasks*)
146  (lock (make-semaphore :name "online") :type semaphore)
147  ;; TODO: test weak-vector here
148  (workers (make-array 0 :element-type 'worker :fill-pointer 0) :type (vector worker))
149  (results (make-mailbox :name "results")))
150 
151 (defmethod tasks ((self task-pool)) (task-pool-tasks self))
152 (defmethod results ((self task-pool)) (task-pool-results self))
153 (defmethod workers ((self task-pool)) (task-pool-workers self))
154 
155 (defmethod print-object ((self task-pool) (stream t))
156  (print-unreadable-object (self stream :type t)
157  (format stream "~A :workers ~A :tasks ~A/~A :results ~A"
158  (task-pool-kernel self)
159  (length (workers self))
160  (queue-count (tasks self))
161  (semaphore-count (task-pool-lock self))
162  (mailbox-count (task-pool-results self)))))
163 
164 (defun kill-workers (pool)
165  "Call FINISH-THREADS on task-pool's workers."
166  (dotimes (i (length (workers pool)))
167  (kill-worker (vector-pop (workers pool)))))
168 
169 (defun worker-count (task-pool &key online)
170  (if online
171  (semaphore-count (task-pool-lock task-pool))
172  (length (task-pool-workers task-pool))))
173 
174 (defmethod designate-oracle ((self task-pool) (guest thread))
175  (designate-oracle self (make-oracle guest)))
176 
177 (declaim (inline push-worker push-workers pop-worker))
178 (defun push-worker (worker pool)
179  (vector-push-extend worker (task-pool-workers pool)))
180 
181 (defun push-workers (threads pool)
182  (with-slots (workers) pool
183  (dolist (w threads)
184  (vector-push-extend w workers))))
185 
186 (defmethod pop-worker (pool)
187  (vector-pop (task-pool-workers pool)))
188 
189 (defun start-task-worker (pool index)
190  ;; (with-recursive-lock
191  (start-worker (aref (workers pool) index)))
192 
193 (defun start-task-workers (pool)
194  "Start all workers in the given task POOL."
195  (loop for w across (workers pool)
196  do (start-worker w)))
197 
198 ;;; Task
199 (defclass task ()
200  ((state :initform nil :initarg :state :accessor task-state))
201  (:documentation "This object represents a single unit of work to be done by some
202 worker. Tasks are typically generated by an oracle, but workers may also be
203 granted the ability to create and distribute their own tasks. Once a task is
204 assigned, the 'owner', i.e. the worker that is assigned this task, may modify
205 the object and state. When the work associated with a task is complete, the
206 owner is responsible for indicating in the state slot the result of the
207 computation."))
208 
209 (defmethod print-object ((self task) stream)
210  (print-unreadable-object (self stream :type t)
211  (format stream ":state ~A" (task-state self))))
212 
213 (defmethod run-task ((self thread) (task task)))
214 
215 ;;; Job
216 (defstruct (job (:constructor %make-job (tasks)))
217  "A collection of tasks to be performed by worker threads."
218  (tasks (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
219  :type (array task *))
220  (lock (make-mutex :name "job") :type mutex))
221 
222 (defmethod tasks ((self job)) (job-tasks self))
223 
224 (defmethod make-job ((self task) &key (size 1))
225  (%make-job (make-array size :element-type 'task
226  :initial-element self)))
227 
228 (defmethod make-job ((self vector) &key)
229  (%make-job self))
230 
231 (defmethod make-job ((self null) &key (size 1))
232  (%make-job (make-array size :element-type 'task :fill-pointer 0 :adjustable t)))
233 
234 (defmethod print-object ((self job) stream)
235  (print-unreadable-object (self stream :type t)
236  (format stream "~A" (job-tasks self))))
237 
238 ;;; Stage
239 (defclass stage ()
240  ((jobs :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
241  :initarg :jobs
242  :accessor jobs
243  :type (vector job))
244  (lock :initform (make-mutex :name "stage") :initarg :lock :accessor stage-lock :type mutex)))
245 
246 (defmethod print-object ((self stage) stream)
247  (print-unreadable-object (self stream :type t)
248  (format stream "~A" (jobs self))))
249 
250 (defmethod run-stage ((self thread) (stage stage)))
251 
252 ;;; Macros
253 (defmacro with-task-pool ((sym &key oracle (tasks 0) lock (workers 4) start kernel results) &body body)
254  (unless lock (setf lock (make-semaphore :name "online" :count workers)))
255  (unless results (setf results (make-mailbox :name "results")))
256  `(let ((,sym (make-task-pool :lock ,lock :results ,results
257  :tasks (make-queue
258  :name "tasks"
259  :initial-contents
260  (make-array ,tasks
261  :element-type 'task
262  :initial-element (make-instance 'task))))))
263  ,@(if kernel `((setf (task-pool-kernel ,sym) ,kernel))
264  `((setf (task-pool-kernel ,sym)
265  (gen-task-kernel (gensym "TASK-KERNEL") ()
266  (task-pool-lock ,sym)
267  (tasks ,sym)
268  (results ,sym)
269  nil))))
270  (loop for i below ,workers
271  do (push-worker (make-worker :kernel (task-pool-kernel ,sym)) ,sym))
272  ,@(when oracle `((designate-oracle ,sym ,oracle)))
273  ,@(when start `((start-task-workers ,sym)))
274  ,@body))