changelog shortlog graph tags branches changeset files revisions annotate raw help

Mercurial > core / lisp/std/task.lisp

changeset 691: 295ea43ceb2d
parent: ec1d4d544c36
child: f51b73f49946
author: Richard Westhaver <ellis@rwest.io>
date: Wed, 02 Oct 2024 23:39:07 -0400
permissions: -rw-r--r--
description: tasks
1 ;;; task.lisp --- Standard Task API
2 
3 ;;
4 
5 ;;; Code:
6 (in-package :std/task)
7 
8 ;;; Vars
9 (defvar *task-pool*)
10 (defvar *tasks*)
11 (defvar *workers*)
12 (defvar *jobs*)
13 (defvar *stages*)
14 (defvar *oracles* nil)
15 
16 ;;; Kernel
17 (defmacro parse-kernel-ops (op)
18  "Parse an op of the form (NAME ARGS &BODY BODY)"
19  (destructuring-bind (name args &body body) op
20  `(std/macs:named-lambda ,name ,args ,@body)))
21 
22 (defmacro define-task-kernel (name (&key lock timeout mailbox queue) args &body body)
23  "Define a task kernel.
24 
25 (define-task-kernel NAME (&key ARGS ACCESSORS)
26 
27 The kernel should process all options and return a function - the
28 'kernel function'.
29 
30 The kernel function is installed in worker threads by passing it to
31 SB-THREAD:MAKE-THREAD. It may accept a varying number of arguments
32 specified by ARGS.
33 
34 ACCESSORS is a list of pandoric accessors which can be called on the
35 kernel via an ORACLE.
36 
37 This interface is experimental and subject to change."
38  `(gen-task-kernel ,name ,args ,lock ,queue ,mailbox ,timeout
39  ,@body))
40 
41 (defvar *task-queue*)
42 (defvar *task-item*)
43 (defvar *task-result*)
44 (defvar *task-error*)
45 
46 (defmacro gen-task-kernel (name args lock queue mailbox timeout &body body)
47  `(compile ,name
48  (lambda ,args
49  (wait-on-semaphore ,lock ,@(when timeout `((:timeout ,timeout))))
50  (let* ((*task-queue* ,queue)
51  (*task-item* (dequeue ,queue)))
52  ,@body
53  (send-message ,mailbox t)))))
54 
55 ;;; Supervisor
56 (defclass supervisor ()
57  (scope)
58  (:documentation "A class which provides a view of the work done within a specified
59 SCOPE.
60 
61 This object should be used by operators to inspect 'runstreams'
62 performed in other threads, such as WORKERS in TASK-POOL.
63 
64 Before using this object you should ensure the SCOPE is fully
65 initialized. Supervisors should be created at any point during the
66 lifetime of SCOPE, but never before and never after."))
67 
68 ;;; Worker
69 ;; unix-getrusage
70 ;; 0,-1,-2
71 ;; (multiple-value-list (sb-unix:unix-getrusage 0))
72 ;; (setf sb-unix::*on-dangerous-wait* :error)
73 (defvar *default-worker-name* "worker")
74 
75 (defclass worker ()
76  ((thread :initform (sb-thread::%make-thread #.#1=(symbol-name (gensym "w")) t (make-semaphore :name #.#1#))
77  :accessor worker-thread
78  :initarg :thread)
79  (function :type function :accessor worker-function :initarg :function)
80  (arguments :type list :accessor worker-arguments :initarg :arguments)))
81 
82 ;;; Oracle
83 (defstruct (oracle (:constructor %make-oracle (id thread)))
84  (id 0 :type (unsigned-byte 32) :read-only t)
85  (thread *current-thread* :read-only t))
86 
87 (defun find-oracle (id)
88  (find id *oracles* :test '= :key 'oracle-id))
89 
90 (defun make-oracle (thread)
91  (let* ((id (thread-os-tid thread)))
92  (if-let ((found (find-oracle id)))
93  (values id found)
94  (let ((orc (%make-oracle id thread)))
95  (push orc *oracles*)
96  (values id orc)))))
97 
98 ;;; Proto
99 ;; oracle
100 (defgeneric designate-oracle (host guest))
101 ;; worker
102 (defgeneric make-worker (self &rest initargs &key &allow-other-keys)
103  (:method ((self t) &key thread function arguments)
104  (declare (ignore self))
105  (apply #'make-instance 'worker
106  `(,@(when thread `(:thread ,thread))
107  ,@(when function `(:function ,function))
108  ,@(when arguments `(:arguments ,arguments))))))
109 (defgeneric make-workers (self count &rest initargs &key &allow-other-keys)
110  (:method ((self t) (count t) &key thread function arguments)
111  (let ((ret))
112  (dotimes (i count ret)
113  (push (make-worker t :thread thread :function function :arguments arguments) ret)))))
114 (defgeneric delete-worker (worker pool &key &allow-other-keys))
115 (defgeneric pop-worker (pool))
116 (defgeneric make-worker-for (pool function &rest args)
117  (:method ((pool null) (function function) &rest args)
118  (declare (ignore pool))
119  (make-worker t :function function :arguments args)))
120 (defgeneric make-workers-for (pool count function)
121  (:method ((pool null) (count fixnum) (function function))
122  (declare (ignore pool))
123  (make-workers t count :function function)))
124 (defgeneric spawn-worker (pool)
125  (:method ((pool null))
126  (declare (ignore pool))
127  (make-worker t :function (default-task-kernel))))
128 (defgeneric spawn-workers (pool count)
129  (:method ((pool null) (count fixnum))
130  (declare (ignore pool))
131  (make-workers t count :function (default-task-kernel))))
132 
133 ;; job
134 (defgeneric make-job (self &key &allow-other-keys))
135 (defgeneric find-job (job pool &key &allow-other-keys))
136 (defgeneric run-job (self job))
137 (defgeneric run-jobs (self))
138 ;; task
139 (defgeneric tasks (self))
140 (defgeneric make-task (&rest args &key &allow-other-keys))
141 (defgeneric run-task (self task))
142 (defgeneric run-tasks (self))
143 (defgeneric results (self))
144 ;; stage
145 (defgeneric run-stage (self stage))
146 (defgeneric workers (self))
147 
148 ;;; Task Pool
149 (defstruct task-pool
150  (kernel 'identity :type symbol)
151  (tasks (make-queue :name "tasks"))
152  (lock (make-semaphore :name "online") :type semaphore)
153  ;; TODO: test weak-vector here
154  (workers (make-array 0 :element-type 'worker :fill-pointer 0) :type (vector thread))
155  (results (make-mailbox :name "results")))
156 
157 (defmethod tasks ((self task-pool)) (task-pool-tasks self))
158 (defmethod results ((self task-pool)) (task-pool-results self))
159 (defmethod workers ((self task-pool)) (task-pool-workers self))
160 
161 (defmethod print-object ((self task-pool) (stream t))
162  (print-unreadable-object (self stream :type t)
163  (format stream "~A ~A:~A:~A ~A"
164  (task-pool-kernel self)
165  (length (workers self))
166  (semaphore-count (task-pool-lock self))
167  (queue-count (tasks self))
168  (mailbox-count (task-pool-results self)))))
169 
170 (defmethod designate-oracle ((self task-pool) (guest integer))
171  (setf (task-pool-oracle-id self) (make-oracle (find-thread-by-id guest)))
172  self)
173 
174 (defun worker-count (task-pool &key online)
175  (if online
176  (semaphore-count (task-pool-online task-pool))
177  (length (task-pool-workers task-pool))))
178 
179 (defmethod designate-oracle ((self task-pool) (guest thread))
180  (designate-oracle self (make-oracle guest)))
181 
182 (defmethod task-pool-oracle ((self task-pool))
183  (oracle-thread (find-oracle (slot-value self 'oracle))))
184 
185 (declaim (inline push-worker push-workers pop-worker))
186 (defun push-worker (worker pool)
187  (vector-push-extend worker (task-pool-workers pool)))
188 
189 (defun push-workers (threads pool)
190  (with-slots (workers) pool
191  (dolist (w threads)
192  (vector-push-extend w workers))))
193 
194 (defmethod pop-worker (pool)
195  (vector-pop (task-pool-workers pool)))
196 
197 (defmethod make-worker-for ((pool task-pool) function &rest args)
198  (make-thread function :name *default-worker-name* :arguments args))
199 
200 (defmethod make-workers-for ((pool task-pool) (count fixnum) function)
201  (make-threads count function :name *default-worker-name*))
202 
203 (defmethod spawn-worker ((pool task-pool))
204  ;; (with-recursive-lock
205  (push-worker (make-worker-for pool (task-pool-kernel pool)) pool))
206 
207 (defmethod spawn-workers ((pool task-pool) (count fixnum))
208  (push-workers (make-workers-for pool count (task-pool-kernel pool)) pool))
209 
210 ;;; Task
211 (defclass task ()
212  ((state :initarg :state :accessor task-state))
213  (:documentation "This object represents a single unit of work to be done by some
214 worker. Tasks are typically generated by an oracle, but workers may also be
215 granted the ability to create and distribute their own tasks. Once a task is
216 assigned, the 'owner', i.e. the worker that is assigned this task, may modify
217 the object and state. When the work associated with a task is complete, the
218 owner is responsible for indicating in the state slot the result of the
219 computation."))
220 
221 (defmethod print-object ((self task) stream)
222  (print-unreadable-object (self stream :type t)
223  (format stream "~A" (task-object self))))
224 
225 (defmethod push-task-result ((task task) (pool task-pool))
226  (send-message (task-pool-results pool) task))
227 
228 (defmethod run-task ((self thread) (task task)))
229 
230 ;;; Job
231 (defstruct (job (:constructor %make-job (tasks)))
232  "A collection of tasks to be performed by worker threads."
233  (tasks (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
234  :type (array task *))
235  (lock (make-mutex :name "job") :type mutex))
236 
237 (defmethod tasks ((self job)) (job-tasks self))
238 
239 (defmethod make-job ((self task) &key (size 1))
240  (%make-job (make-array size :element-type 'task
241  :initial-element self)))
242 
243 (defmethod make-job ((self vector) &key)
244  (%make-job self))
245 
246 (defmethod make-job ((self null) &key (size 1))
247  (%make-job (make-array size :element-type 'task :fill-pointer 0 :adjustable t)))
248 
249 (defmethod print-object ((self job) stream)
250  (print-unreadable-object (self stream :type t)
251  (format stream "~A" (job-tasks self))))
252 
253 ;;; Stage
254 (defclass stage ()
255  ((jobs :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
256  :initarg :jobs
257  :accessor jobs
258  :type (vector job))
259  (lock :initform (make-mutex :name "stage") :initarg :lock :accessor stage-lock :type mutex)))
260 
261 (defmethod print-object ((self stage) stream)
262  (print-unreadable-object (self stream :type t)
263  (format stream "~A" (jobs self))))
264 
265 (defmethod run-stage ((self thread) (stage stage)))
266 
267 ;;; Macros
268 (defmacro with-task-pool ((sym &key oracle lock (count 4) spawn kernel results) &body body)
269  (unless lock (setf lock (make-semaphore :name "online" :count count)))
270  (unless results (setf results (make-mailbox :name "results")))
271  `(let ((,sym (make-task-pool :lock ,lock :results ,results)))
272  ,@(if kernel `((setf (task-pool-kernel ,sym) ,kernel))
273  `((setf (task-pool-kernel ,sym)
274  (gen-task-kernel '%kernel ()
275  (task-pool-lock ,sym)
276  (tasks ,sym)
277  (results ,sym)
278  nil))))
279  (designate-oracle ,sym ,@(if oracle (list oracle) `((make-oracle *current-thread*))))
280  ,@(when spawn `((spawn-workers ,sym ,spawn)))
281  ,@body))