changelog shortlog graph tags branches changeset files revisions annotate raw help

Mercurial > core / lisp/std/task.lisp

changeset 698: 96958d3eb5b0
parent: 38e9c3be2392
author: Richard Westhaver <ellis@rwest.io>
date: Fri, 04 Oct 2024 22:04:59 -0400
permissions: -rw-r--r--
description: fixes
1 ;;; task.lisp --- Standard Task API
2 
3 ;;
4 
5 ;;; Code:
6 (in-package :std/task)
7 
8 ;;; Vars
9 (defvar *task-pool*)
10 (defvar *tasks* (make-queue :name "tasks"))
11 (defvar *jobs*)
12 (defvar *stages*)
13 (sb-ext:defglobal *worker-threads* nil)
14 (sb-ext:defglobal *oracles* nil)
15 (sb-ext:defglobal *oracle-threads* nil)
16 (sb-ext:defglobal *supervisor-threads* nil)
17 
18 (eval-always
19  (defvar *task*)
20  (defvar *task-result* nil))
21 
22 (define-condition task-error (thread-error) ()
23  (:report (lambda (condition stream)
24  (format stream "Unhandled task error in thread ~A"
25  (thread-error-thread condition)))))
26 
27 (defun task-error (thread)
28  (error 'task-error :thread thread))
29 
30 ;;; Kernel
31 (defmacro gen-task-kernel (name args lock queue mailbox timeout &body body)
32  `(compile ,name
33  (lambda ,args
34  (wait-on-semaphore ,lock ,@(when timeout `((:timeout ,timeout))))
35  (let ((*task* (dequeue ,queue)))
36  (unwind-protect
37  (handler-case (setf *task-result* (progn ,@body))
38  (error () (task-error *current-thread*)))
39  (send-message ,mailbox *task-result*)
40  (release-foreground))))))
41 
42 (defmacro define-task-kernel (name (&key lock timeout mailbox queue) args &body body)
43  "Define a task kernel.
44 
45 (define-task-kernel NAME (&key ARGS ACCESSORS)
46 
47 The kernel should process all options and return a function - the
48 'kernel function'.
49 
50 The kernel function is installed in worker threads by passing it to
51 SB-THREAD:MAKE-THREAD. It may accept a varying number of arguments
52 specified by ARGS.
53 
54 Within the BODY the variable *task* is bound to the result of (DEQUEUE QUEUE)
55 and *task-result* is bound to the return value of BODY.
56 
57 This interface is experimental and subject to change."
58  `(gen-task-kernel ,name ,args ,lock ,queue ,mailbox ,timeout
59  ,@body))
60 
61 (defun make-ephemeral-thread (name)
62  (sb-thread::%make-thread name t (make-semaphore :name name)))
63 
64 ;;; Proto
65 (defgeneric designate-oracle (host guest))
66 (defgeneric assign-supervisor (worker supervisor))
67 
68 (defgeneric make-workers (count &rest initargs &key &allow-other-keys)
69  (:method ((count number) &key thread kernel input)
70  (let ((ret))
71  (dotimes (i count ret)
72  (push (make-worker :thread thread :kernel kernel :input input) ret)))))
73 
74 (defgeneric tasks (self))
75 (defgeneric results (self))
76 (defgeneric run-object (self))
77 (defgeneric work (self &key &allow-other-keys))
78 (defgeneric workers (self))
79 
80 ;;; Supervisor
81 (defclass supervisor ()
82  ((thread :initform (make-ephemeral-thread (symbol-name (gensym "supervisor"))) :accessor supervisor-thread)
83  (domain)
84  (scope))
85  (:documentation "Supervisors are threads which are responsible for a set of worker threads
86 within their DOMAIN and SCOPE."))
87 
88 (defmethod initialize-instance :after ((self supervisor) &key &allow-other-keys)
89  (push (supervisor-thread self) *supervisor-threads*))
90 
91 ;;; Worker
92 ;; unix-getrusage
93 ;; 0,-1,-2
94 ;; (multiple-value-list (sb-unix:unix-getrusage 0))
95 ;; (setf sb-unix::*on-dangerous-wait* :error)
96 
97 ;; TODO 2024-10-03: with-cas-lock?
98 (defclass worker ()
99  ((thread :initform (make-ephemeral-thread (symbol-name (gensym "worker")))
100  :accessor worker-thread
101  :initarg :thread)
102  (kernel :type function :accessor worker-kernel :initarg :kernel)
103  (tasks :initform nil :accessor tasks :initarg :input)))
104 
105 (defmethod initialize-instance :after ((self worker) &key &allow-other-keys)
106  (push (worker-thread self) *worker-threads*))
107 
108 (defun make-worker (&key thread kernel input)
109  (apply #'make-instance 'worker
110  `(,@(when thread `(:thread ,thread))
111  ,@(when kernel `(:kernel ,kernel))
112  ,@(when input `(:input ,input)))))
113 
114 ;; TODO 2024-10-03: pause/resume
115 
116 (declaim (inline kill-worker join-worker start-worker run-worker))
117 (defun start-worker (worker)
118  (sb-thread::start-thread (worker-thread worker) (worker-kernel worker) (tasks worker)))
119 
120 (defun run-worker (worker &key input wait)
121  (when input
122  (setf (tasks worker) input))
123  (start-worker worker)
124  (if wait (join-worker worker)
125  worker))
126 
127 (defmethod run-object ((self worker))
128  (run-worker self))
129 
130 (defun run-with-worker (worker object &key wait)
131  (run-worker worker :input object :wait wait))
132 
133 (defun kill-worker (worker)
134  (declare (worker worker))
135  (let ((th (worker-thread worker)))
136  (unwind-protect (kill-thread th)
137  (deletef *worker-threads* th))))
138 
139 (defun join-worker (worker)
140  (declare (worker worker))
141  (let ((th (worker-thread worker)))
142  (unwind-protect (join-thread th)
143  (deletef *worker-threads* th))))
144 
145 ;;; Oracle
146 (defstruct (oracle (:constructor %make-oracle (id thread)))
147  "Oracles provide a tagged view into some threaded scope of work."
148  (id 0 :type (unsigned-byte 32) :read-only t)
149  (thread *current-thread* :read-only t))
150 
151 (defun oracle-of-id (id)
152  (find id *oracles* :test '= :key 'oracle-id))
153 
154 (defun make-oracle (thread)
155  (let ((id (thread-os-tid thread)))
156  (if-let ((found (oracle-of-id id)))
157  (values id found)
158  (let ((orc (%make-oracle id thread)))
159  (push orc *oracles*)
160  (push (oracle-thread orc) *oracle-threads*)
161  (values id orc)))))
162 
163 ;;; Task Pool
164 (defstruct task-pool
165  (kernel 'identity :type symbol)
166  (tasks *tasks*)
167  (lock (make-semaphore :name "online") :type semaphore)
168  ;; TODO: test weak-vector here
169  (workers (make-array 0 :element-type 'worker :fill-pointer 0) :type (vector worker))
170  (results (make-mailbox :name "results")))
171 
172 (defmethod tasks ((self task-pool)) (task-pool-tasks self))
173 (defmethod results ((self task-pool)) (task-pool-results self))
174 (defmethod workers ((self task-pool)) (task-pool-workers self))
175 
176 (defmethod print-object ((self task-pool) (stream t))
177  (print-unreadable-object (self stream :type t)
178  (format stream "~A :workers ~A :tasks ~A/~A :results ~A"
179  (task-pool-kernel self)
180  (length (workers self))
181  (queue-count (tasks self))
182  (semaphore-count (task-pool-lock self))
183  (mailbox-count (task-pool-results self)))))
184 
185 (defun kill-workers (pool)
186  "Call FINISH-THREADS on task-pool's workers."
187  (dotimes (i (length (workers pool)))
188  (kill-worker (vector-pop (workers pool)))))
189 
190 (defun worker-count (task-pool &key online)
191  (if online
192  (semaphore-count (task-pool-lock task-pool))
193  (length (task-pool-workers task-pool))))
194 
195 (defmethod designate-oracle ((self task-pool) (guest thread))
196  (designate-oracle self (make-oracle guest)))
197 
198 (declaim (inline push-worker push-workers pop-worker))
199 (defun push-worker (worker pool)
200  (vector-push-extend worker (task-pool-workers pool)))
201 
202 (defun push-workers (threads pool)
203  (with-slots (workers) pool
204  (dolist (w threads)
205  (vector-push-extend w workers))))
206 
207 (defmethod pop-worker (pool)
208  (vector-pop (task-pool-workers pool)))
209 
210 (defun start-task-worker (pool index)
211  ;; (with-recursive-lock
212  (start-worker (aref (workers pool) index)))
213 
214 (defun start-task-workers (pool)
215  "Start all workers in the given task POOL."
216  (loop for w across (workers pool)
217  do (start-worker w)))
218 
219 ;;; Task
220 (defclass task ()
221  ((state :initform nil :initarg :state :accessor task-state))
222  (:documentation "This object represents a single unit of work to be done by some
223 worker. Tasks are typically distributed from the task-pool, but workers may
224 also be granted the ability to create and distribute their own tasks. Once a
225 task is assigned, the 'owner', i.e. the worker that is assigned this task, may
226 modify the object. When the work associated with a task is complete, the owner
227 is responsible for indicating in the state slot the result of the computation."))
228 
229 (defmethod print-object ((self task) stream)
230  (print-unreadable-object (self stream :type t)
231  (format stream ":state ~A" (task-state self))))
232 
233 (defun run-task (worker task)
234  (run-worker worker :input task))
235 
236 ;;; Job
237 (defclass job (task)
238  ((tasks :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
239  :type (array task *)
240  :initarg :tasks
241  :accessor tasks)
242  (lock :initform (make-mutex :name "job") :type mutex
243  :initarg :lock))
244  (:documentation "A collection of tasks forming a single unit of work."))
245 
246 (declaim (inline make-job))
247 (defun make-job (&rest tasks)
248  (make-instance 'job
249  :tasks (make-array (length tasks)
250  :element-type 'task
251  :initial-contents tasks)))
252 
253 (defmethod print-object ((self job) stream)
254  (print-unreadable-object (self stream :type t)
255  (format stream "~A tasks" (length (tasks self)))))
256 
257 (defun run-job (worker job)
258  (run-worker worker :input job))
259 
260 ;;; Work Scope
261 (defclass work-scope ()
262  ((tasks :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
263  :initarg :tasks
264  :accessor tasks
265  :type (vector task))
266  (lock :initform (make-mutex :name "work-scope") :initarg :lock :accessor work-scope-lock :type mutex)))
267 
268 (defmethod print-object ((self work-scope) stream)
269  (print-unreadable-object (self stream :type t)
270  (format stream "~A" (tasks self))))
271 
272 ;;; Macros
273 (defmacro with-task-pool ((sym &key oracle (tasks 0) lock (workers 4) start kernel results) &body body)
274  (unless lock (setf lock (make-semaphore :name "online" :count workers)))
275  (unless results (setf results (make-mailbox :name "results")))
276  `(let ((,sym (make-task-pool :lock ,lock :results ,results
277  :tasks (make-queue
278  :name "tasks"
279  :initial-contents
280  (make-array ,tasks
281  :element-type 'task
282  :initial-element (make-instance 'task))))))
283  ,@(if kernel `((setf (task-pool-kernel ,sym) ,kernel))
284  `((setf (task-pool-kernel ,sym)
285  (gen-task-kernel (gensym "TASK-KERNEL") ()
286  (task-pool-lock ,sym)
287  (tasks ,sym)
288  (results ,sym)
289  nil))))
290  (loop for i below ,workers
291  do (push-worker (make-worker :kernel (task-pool-kernel ,sym)) ,sym))
292  ,@(when oracle `((designate-oracle ,sym ,oracle)))
293  ,@(when start `((start-task-workers ,sym)))
294  ,@body))