changelog shortlog graph tags branches changeset files revisions annotate raw help

Mercurial > core / lisp/std/task.lisp

changeset 694: a36280d2ef4e
parent: f51b73f49946
child: 38e9c3be2392
author: Richard Westhaver <ellis@rwest.io>
date: Thu, 03 Oct 2024 21:54:07 -0400
permissions: -rw-r--r--
description: tasks
1 ;;; task.lisp --- Standard Task API
2 
3 ;;
4 
5 ;;; Code:
6 (in-package :std/task)
7 
8 ;;; Vars
9 (defvar *task-pool*)
10 (defvar *tasks* (make-queue :name "tasks"))
11 (defvar *jobs*)
12 (defvar *stages*)
13 (sb-ext:defglobal *worker-threads* nil)
14 (sb-ext:defglobal *oracles* nil)
15 (sb-ext:defglobal *oracle-threads* nil)
16 (sb-ext:defglobal *supervisor-threads* nil)
17 
18 (eval-when (:compile-toplevel)
19  (defvar *task*)
20  (defvar *task-result* nil))
21 
22 (define-condition task-error (thread-error) ()
23  (:report (lambda (condition stream)
24  (format stream "Unhandled task error in thread ~A"
25  (thread-error-thread condition)))))
26 
27 (defun task-error (thread)
28  (error 'task-error :thread thread))
29 
30 ;;; Kernel
31 (defmacro gen-task-kernel (name args lock queue mailbox timeout &body body)
32  `(compile ,name
33  (lambda ,args
34  (wait-on-semaphore ,lock ,@(when timeout `((:timeout ,timeout))))
35  (let ((*task* (dequeue ,queue)))
36  (unwind-protect
37  (handler-case (setf *task-result* (progn ,@body))
38  (error () (task-error *current-thread*)))
39  (send-message ,mailbox *task-result*)
40  (release-foreground))))))
41 
42 (defmacro define-task-kernel (name (&key lock timeout mailbox queue) args &body body)
43  "Define a task kernel.
44 
45 (define-task-kernel NAME (&key ARGS ACCESSORS)
46 
47 The kernel should process all options and return a function - the
48 'kernel function'.
49 
50 The kernel function is installed in worker threads by passing it to
51 SB-THREAD:MAKE-THREAD. It may accept a varying number of arguments
52 specified by ARGS.
53 
54 Within the BODY the variable *task* is bound to the result of (DEQUEUE QUEUE)
55 and *task-result* is bound to the return value of BODY.
56 
57 This interface is experimental and subject to change."
58  `(gen-task-kernel ,name ,args ,lock ,queue ,mailbox ,timeout
59  ,@body))
60 
61 (defun make-ephemeral-thread (name)
62  (sb-thread::%make-thread name t (make-semaphore :name name)))
63 
64 ;;; Proto
65 (defgeneric designate-oracle (host guest))
66 (defgeneric assign-supervisor (worker supervisor))
67 
68 (defgeneric make-workers (count &rest initargs &key &allow-other-keys)
69  (:method ((count number) &key thread kernel input)
70  (let ((ret))
71  (dotimes (i count ret)
72  (push (make-worker :thread thread :kernel kernel :input input) ret)))))
73 
74 (defgeneric tasks (self))
75 (defgeneric results (self))
76 (defgeneric run (self object &rest initargs &key &allow-other-keys))
77 (defgeneric run-object (self))
78 (defgeneric work (self &key &allow-other-keys))
79 (defgeneric workers (self))
80 
81 ;;; Supervisor
82 (defclass supervisor ()
83  ((thread :initform (make-ephemeral-thread (symbol-name (gensym "supervisor"))) :accessor supervisor-thread)
84  (domain)
85  (scope))
86  (:documentation "Supervisors are threads which are responsible for a set of worker threads
87 within their DOMAIN and SCOPE."))
88 
89 (defmethod initialize-instance :after ((self supervisor) &key &allow-other-keys)
90  (push (supervisor-thread self) *supervisor-threads*))
91 
92 ;;; Worker
93 ;; unix-getrusage
94 ;; 0,-1,-2
95 ;; (multiple-value-list (sb-unix:unix-getrusage 0))
96 ;; (setf sb-unix::*on-dangerous-wait* :error)
97 
98 ;; TODO 2024-10-03: with-cas-lock?
99 (defclass worker ()
100  ((thread :initform (make-ephemeral-thread (symbol-name (gensym "worker")))
101  :accessor worker-thread
102  :initarg :thread)
103  (kernel :type function :accessor worker-kernel :initarg :kernel)
104  (tasks :initform nil :accessor tasks :initarg :input)))
105 
106 (defmethod initialize-instance :after ((self worker) &key &allow-other-keys)
107  (push (worker-thread self) *worker-threads*))
108 
109 (defun make-worker (&key thread kernel input)
110  (apply #'make-instance 'worker
111  `(,@(when thread `(:thread ,thread))
112  ,@(when kernel `(:kernel ,kernel))
113  ,@(when input `(:input ,input)))))
114 
115 ;; TODO 2024-10-03: pause/resume
116 
117 (declaim (inline kill-worker join-worker start-worker run-worker))
118 (defun start-worker (worker)
119  (sb-thread::start-thread (worker-thread worker) (worker-kernel worker) (tasks worker)))
120 
121 (defun run-worker (worker &key input wait)
122  (when input
123  (setf (tasks worker) input))
124  (start-worker worker)
125  (if wait (join-worker worker)
126  worker))
127 
128 (defmethod run-object ((self worker))
129  (run-worker self))
130 
131 (defmethod run ((self worker) (object t) &key wait &allow-other-keys)
132  (run-worker self :input object :wait wait))
133 
134 (defun kill-worker (worker)
135  (declare (worker worker))
136  (let ((th (worker-thread worker)))
137  (unwind-protect (kill-thread th)
138  (deletef *worker-threads* th))))
139 
140 (defun join-worker (worker)
141  (declare (worker worker))
142  (let ((th (worker-thread worker)))
143  (unwind-protect (join-thread th)
144  (deletef *worker-threads* th))))
145 
146 ;;; Oracle
147 (defstruct (oracle (:constructor %make-oracle (id thread)))
148  "Oracles provide a tagged view into some threaded scope of work."
149  (id 0 :type (unsigned-byte 32) :read-only t)
150  (thread *current-thread* :read-only t))
151 
152 (defun oracle-of-id (id)
153  (find id *oracles* :test '= :key 'oracle-id))
154 
155 (defun make-oracle (thread)
156  (let ((id (thread-os-tid thread)))
157  (if-let ((found (oracle-of-id id)))
158  (values id found)
159  (let ((orc (%make-oracle id thread)))
160  (push orc *oracles*)
161  (push (oracle-thread orc) *oracle-threads*)
162  (values id orc)))))
163 
164 ;;; Task Pool
165 (defstruct task-pool
166  (kernel 'identity :type symbol)
167  (tasks *tasks*)
168  (lock (make-semaphore :name "online") :type semaphore)
169  ;; TODO: test weak-vector here
170  (workers (make-array 0 :element-type 'worker :fill-pointer 0) :type (vector worker))
171  (results (make-mailbox :name "results")))
172 
173 (defmethod tasks ((self task-pool)) (task-pool-tasks self))
174 (defmethod results ((self task-pool)) (task-pool-results self))
175 (defmethod workers ((self task-pool)) (task-pool-workers self))
176 
177 (defmethod print-object ((self task-pool) (stream t))
178  (print-unreadable-object (self stream :type t)
179  (format stream "~A :workers ~A :tasks ~A/~A :results ~A"
180  (task-pool-kernel self)
181  (length (workers self))
182  (queue-count (tasks self))
183  (semaphore-count (task-pool-lock self))
184  (mailbox-count (task-pool-results self)))))
185 
186 (defun kill-workers (pool)
187  "Call FINISH-THREADS on task-pool's workers."
188  (dotimes (i (length (workers pool)))
189  (kill-worker (vector-pop (workers pool)))))
190 
191 (defun worker-count (task-pool &key online)
192  (if online
193  (semaphore-count (task-pool-lock task-pool))
194  (length (task-pool-workers task-pool))))
195 
196 (defmethod designate-oracle ((self task-pool) (guest thread))
197  (designate-oracle self (make-oracle guest)))
198 
199 (declaim (inline push-worker push-workers pop-worker))
200 (defun push-worker (worker pool)
201  (vector-push-extend worker (task-pool-workers pool)))
202 
203 (defun push-workers (threads pool)
204  (with-slots (workers) pool
205  (dolist (w threads)
206  (vector-push-extend w workers))))
207 
208 (defmethod pop-worker (pool)
209  (vector-pop (task-pool-workers pool)))
210 
211 (defun start-task-worker (pool index)
212  ;; (with-recursive-lock
213  (start-worker (aref (workers pool) index)))
214 
215 (defun start-task-workers (pool)
216  "Start all workers in the given task POOL."
217  (loop for w across (workers pool)
218  do (start-worker w)))
219 
220 ;;; Task
221 (defclass task ()
222  ((state :initform nil :initarg :state :accessor task-state))
223  (:documentation "This object represents a single unit of work to be done by some
224 worker. Tasks are typically distributed from the task-pool, but workers may
225 also be granted the ability to create and distribute their own tasks. Once a
226 task is assigned, the 'owner', i.e. the worker that is assigned this task, may
227 modify the object. When the work associated with a task is complete, the owner
228 is responsible for indicating in the state slot the result of the computation."))
229 
230 (defmethod print-object ((self task) stream)
231  (print-unreadable-object (self stream :type t)
232  (format stream ":state ~A" (task-state self))))
233 
234 (defun run-task (worker task)
235  (run-worker worker :input task))
236 
237 ;;; Job
238 (defclass job (task)
239  ((tasks :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
240  :type (array task *)
241  :initarg :tasks
242  :accessor tasks)
243  (lock :initform (make-mutex :name "job") :type mutex
244  :initarg :lock))
245  (:documentation "A collection of tasks forming a single unit of work."))
246 
247 (declaim (inline make-job))
248 (defun make-job (&rest tasks)
249  (make-instance 'job
250  :tasks (make-array (length tasks)
251  :element-type 'task
252  :initial-contents tasks)))
253 
254 (defmethod print-object ((self job) stream)
255  (print-unreadable-object (self stream :type t)
256  (format stream "~A tasks" (length (tasks self)))))
257 
258 (defun run-job (worker job)
259  (run-worker worker :input job))
260 
261 ;;; Work Scope
262 (defclass work-scope ()
263  ((tasks :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
264  :initarg :tasks
265  :accessor tasks
266  :type (vector task))
267  (lock :initform (make-mutex :name "work-scope") :initarg :lock :accessor work-scope-lock :type mutex)))
268 
269 (defmethod print-object ((self work-scope) stream)
270  (print-unreadable-object (self stream :type t)
271  (format stream "~A" (tasks self))))
272 
273 ;;; Macros
274 (defmacro with-task-pool ((sym &key oracle (tasks 0) lock (workers 4) start kernel results) &body body)
275  (unless lock (setf lock (make-semaphore :name "online" :count workers)))
276  (unless results (setf results (make-mailbox :name "results")))
277  `(let ((,sym (make-task-pool :lock ,lock :results ,results
278  :tasks (make-queue
279  :name "tasks"
280  :initial-contents
281  (make-array ,tasks
282  :element-type 'task
283  :initial-element (make-instance 'task))))))
284  ,@(if kernel `((setf (task-pool-kernel ,sym) ,kernel))
285  `((setf (task-pool-kernel ,sym)
286  (gen-task-kernel (gensym "TASK-KERNEL") ()
287  (task-pool-lock ,sym)
288  (tasks ,sym)
289  (results ,sym)
290  nil))))
291  (loop for i below ,workers
292  do (push-worker (make-worker :kernel (task-pool-kernel ,sym)) ,sym))
293  ,@(when oracle `((designate-oracle ,sym ,oracle)))
294  ,@(when start `((start-task-workers ,sym)))
295  ,@body))