changelog shortlog graph tags branches changeset files revisions annotate raw help

Mercurial > core / lisp/std/thread.lisp

changeset 437: 83f8623a6ec3
parent: 0af47621fa8b
child: a37b1d3371fc
author: Richard Westhaver <ellis@rwest.io>
date: Tue, 11 Jun 2024 22:33:37 -0400
permissions: -rw-r--r--
description: std work, renamed :disabled in deftest to :skip
1 ;;; threads.lisp --- Multi-thread utilities
2 
3 ;; Threading Macros
4 
5 ;;; Commentary:
6 
7 ;; mostly yoinked from sb-thread and friends
8 
9 ;;; Code:
10 (in-package :std/thread)
11 
12 ;; (sb-thread:thread-os-tid sb-thread:*current-thread*)
13 ;; sb-thread:interrupt-thread
14 
15 (defun thread-support-p () (member :thread-support *features*))
16 
17 (eval-when (:compile-toplevel)
18  (defun print-thread-message-top-level (msg)
19  (sb-thread:make-thread
20  (lambda ()
21  (format #.*standard-output* msg)))
22  nil))
23 
24 ;; this is all very unsafe. don't touch the finalizer thread plz.
25 (defun find-thread-by-id (id)
26  "Search for thread by ID which must be an u64. On success returns the thread itself or nil."
27  (find id (sb-thread::list-all-threads) :test '= :key 'thread-os-tid))
28 
29 (defun thread-key-list ()
30  (sb-thread::avltree-filter #'sb-thread::avlnode-key sb-thread::*all-threads*))
31 
32 (defun thread-id-list ()
33  (sb-thread::avltree-filter (lambda (th) (thread-os-tid (sb-thread::avlnode-data th))) sb-thread::*all-threads*))
34 
35 (defun thread-count ()
36  (sb-thread::avl-count sb-thread::*all-threads*))
37 
38 (defun make-threads (n fn &key (name "thread"))
39  (declare (type fixnum n))
40  (loop for i below n
41  collect (make-thread fn :name (format nil "~A-~D" name i))))
42 
43 (defun parse-lambda-list-names (ll)
44  (multiple-value-bind (idx _ args) (sb-int:parse-lambda-list ll)
45  (declare (ignore idx _))
46  (loop for a in args
47  collect
48  (etypecase a
49  (atom a)
50  (cons (car a))))))
51 
52 (defmacro with-threads ((n &key args) &body body)
53  `(make-threads ,n (lambda (,@args) (declare (ignorable ,@(parse-lambda-list-names args))) ,@body)))
54 
55 (defun finish-threads (&rest threads)
56  (let ((threads (flatten threads)))
57  (unwind-protect
58  (mapc #'join-thread threads)
59  (dolist (thread threads)
60  (when (thread-alive-p thread)
61  (terminate-thread thread))))))
62 
63 (defun timed-join-thread (thread timeout)
64  (declare (type thread thread) (type float timeout))
65  (handler-case (sb-sys:with-deadline (:seconds timeout)
66  (join-thread thread :default :aborted))
67  (sb-ext:timeout ()
68  :timeout)))
69 
70 (defun hang ()
71  (join-thread *current-thread*))
72 
73 (defun kill-thread (thread)
74  (when (thread-alive-p thread)
75  (ignore-errors
76  (terminate-thread thread))))
77 
78 ;; (sb-vm::primitive-object-slots (sb-vm::primitive-object 'sb-vm::thread))
79 (defun init-session (&optional (thread *current-thread*)) (sb-thread::new-session thread))
80 
81 ;; (sb-thread::with-progressive-timeout (timet :seconds 4) (dotimes (i 4000) (print (timet))))
82 
83 ;; (describe sb-thread::*session*)
84 
85 ;; make-listener-thread
86 
87 ;; with-progressive-timeout
88 
89 ;; from sb-thread
90 (defun dump-thread ()
91  (let* ((slots (sb-vm::primitive-object-slots #1=(sb-vm::primitive-object 'sb-vm::thread)))
92  (sap (current-thread-sap))
93  (thread-obj-len (sb-vm::primitive-object-length #1#))
94  (names (make-array thread-obj-len :initial-element "")))
95  (loop for slot across slots
96  do
97  (setf (aref names (sb-vm::slot-offset slot)) (sb-vm::slot-name slot)))
98  (flet ((safely-read (sap offset &aux (bits (sb-vm::sap-ref-word sap offset)))
99  (cond ((eql bits sb-vm:no-tls-value-marker) :no-tls-value)
100  ((eql (logand bits sb-vm:widetag-mask) sb-vm:unbound-marker-widetag) :unbound)
101  (t (sb-vm::sap-ref-lispobj sap offset))))
102  (show (sym val)
103  (declare (type fixnum sym))
104  (let ((*print-right-margin* 128)
105  (*print-lines* 4))
106  (format t " ~3d ~30a : ~s~%"
107  #+sb-thread (ash sym (- sb-vm:word-shift))
108  #-sb-thread 0
109  #+sb-thread (sb-vm:symbol-from-tls-index sym)
110  #-sb-thread sym
111  val))))
112  (format t "~&TLS: (base=~x)~%" (sb-vm::sap-int sap))
113  (loop for tlsindex from sb-vm:n-word-bytes below
114  #+sb-thread (ash sb-vm::*free-tls-index* sb-vm:n-fixnum-tag-bits)
115  #-sb-thread (ash thread-obj-len sb-vm:word-shift)
116  by sb-vm:n-word-bytes
117  do
118  (unless (<= sb-vm::thread-allocator-histogram-slot
119  (ash tlsindex (- sb-vm:word-shift))
120  (1- sb-vm::thread-lisp-thread-slot))
121  (let ((thread-slot-name
122  (if (< tlsindex (ash thread-obj-len sb-vm:word-shift))
123  (aref names (ash tlsindex (- sb-vm:word-shift))))))
124  (if (and thread-slot-name (sb-vm::neq thread-slot-name 'sb-vm::lisp-thread))
125  (format t " ~3d ~30a : #x~x~%" (ash tlsindex (- sb-vm:word-shift))
126  thread-slot-name (sb-vm::sap-ref-word sap tlsindex))
127  (let ((val (safely-read sap tlsindex)))
128  (unless (eq val :no-tls-value)
129  (show tlsindex val)))))))
130  (let ((from (sb-vm::descriptor-sap sb-vm:*binding-stack-start*))
131  (to (sb-vm::binding-stack-pointer-sap)))
132  (format t "~%Binding stack: (depth ~d)~%"
133  (/ (sb-vm::sap- to from) (* sb-vm:binding-size sb-vm:n-word-bytes)))
134  (loop
135  (when (sb-vm::sap>= from to) (return))
136  (let ((val (safely-read from 0))
137  (sym #+sb-thread (sb-vm::sap-ref-word from sb-vm:n-word-bytes) ; a TLS index
138  #-sb-thread (sb-vm::sap-ref-lispobj from sb-vm:n-word-bytes)))
139  (show sym val))
140  (setq from (sb-vm::sap+ from (* sb-vm:binding-size sb-vm:n-word-bytes))))))))
141 
142 (defun wait-for-threads (threads)
143  (map 'list (lambda (thread) (sb-thread:join-thread thread :default nil)) threads)
144  (not (some #'sb-thread:thread-alive-p threads)))
145 
146 (defun process-all-interrupts (&optional (thread sb-thread:*current-thread*))
147  (sb-ext:wait-for (null (sb-thread::thread-interruptions thread))))
148 
149 ;;; Tasks
150 (defclass supervisor ()
151  (scope)
152  (:documentation "A class which provides a view of the work done within a specified
153 SCOPE.
154 
155 This object should be used by operators to inspect 'runstreams'
156 performed in other threads, such as WORKERS in TASK-POOL.
157 
158 Before using this object you should ensure the SCOPE is fully
159 initialized. Supervisors should be created at any point during the
160 lifetime of SCOPE, but never before and never after."))
161 
162 ;; unix-getrusage
163 ;; 0,-1,-2
164 ;; (multiple-value-list (sb-unix:unix-getrusage 0))
165 ;; (setf sb-unix::*on-dangerous-wait* :error)
166 (defstruct (oracle (:constructor %make-oracle (id thread)))
167  (id 0 :type (unsigned-byte 32) :read-only t)
168  (thread *current-thread* :read-only t))
169 
170 (defvar *oracle-threads* nil)
171 
172 (defun find-oracle (id)
173  (find id *oracle-threads* :test '= :key 'oracle-id))
174 
175 (defun make-oracle (thread)
176  (let* ((id (thread-os-tid thread)))
177  (if-let ((found (find-oracle id)))
178  (values id found)
179  (let ((orc (%make-oracle id thread)))
180  (push orc *oracle-threads*)
181  (values id orc)))))
182 
183 (defgeneric designate-oracle (host guest))
184 (defgeneric push-job (job pool))
185 (defgeneric push-task (task pool))
186 (defgeneric push-result (task pool))
187 (defgeneric push-worker (thread pool))
188 (defgeneric push-workers (threads pool))
189 (defgeneric push-stage (stage pool))
190 (defgeneric find-job (job pool &key &allow-other-keys))
191 
192 (defgeneric delete-job (job pool &key &allow-other-keys))
193 (defgeneric pop-job (pool))
194 (defgeneric pop-task (pool))
195 (defgeneric pop-result (pool))
196 (defgeneric pop-worker (pool))
197 (defgeneric pop-stage (pool))
198 (defgeneric start-task-pool (pool))
199 (defgeneric pause-task-pool (pool))
200 (defgeneric stop-task-pool (pool))
201 (defgeneric restart-task-pool (pool))
202 (defgeneric make-task (&rest args))
203 (defgeneric run-job (self job))
204 (defgeneric run-stage (self stage))
205 (defgeneric run-task (self task))
206 
207 (defgeneric make-worker-for (pool function &rest args)
208  (:method ((pool null) (function function) &rest args)
209  (declare (ignore pool))
210  (make-thread function :arguments args)))
211 
212 (defvar *default-worker-name* "worker")
213 
214 (defgeneric make-workers-for (pool count function)
215  (:method ((pool null) (count fixnum) (function function))
216  (declare (ignore pool))
217  (make-threads count function :name *default-worker-name*)))
218 
219 (defmacro parse-kernel-ops (op)
220  "Parse an op of the form (NAME ARGS &BODY BODY)"
221  (destructuring-bind (name args &body body) op
222  `(std/macs:plambda ,args ,@body)))
223 
224 (defmacro define-task-kernel (name ops accessors &body body)
225  "Define a task kernel.
226 
227 (define-task-kernel NAME (&key ARGS ACCESSORS)
228 
229 The kernel should process all options and return a function - the
230 'kernel function'.
231 
232 The kernel function is installed in worker threads by passing it to
233 SB-THREAD:MAKE-THREAD. It may accept a varying number of arguments
234 specified by ARGS.
235 
236 ACCESSORS is a list of pandoric accessors which can be called on the
237 kernel via an ORACLE.
238 
239 This interface is experimental and subject to change."
240  (declare (ignorable accessors ops))
241  `(defun ,name ()
242  ,@body
243  (values)))
244 
245 (define-task-kernel default-task-kernel () ()
246  "The default task kernel used to initialize the KERNEL slot of
247 task-pools.
248 
249 "
250  nil)
251 
252 (defgeneric spawn-worker (pool)
253  (:method ((pool null))
254  (declare (ignore pool))
255  (make-thread (default-task-kernel))))
256 
257 (defgeneric spawn-workers (pool count)
258  (:method ((pool null) (count fixnum))
259  (declare (ignore pool))
260  (make-threads count (default-task-kernel) :name *default-worker-name*)))
261 
262 (defstruct task-pool
263  (oracle-id nil :type (or null (unsigned-byte 32)))
264  (kernel #'default-task-kernel :type function)
265  (jobs (make-queue :name "jobs"))
266  (stages (make-array 0 :element-type 'stage :fill-pointer 0) :type (array stage *))
267  ;; When open, indicates that the pool is fully initialized and workers
268  ;; may make progress.
269  (online (make-gate :name "online" :open nil)
270  :type gate)
271  ;; TODO: test weak-vector here
272  (workers nil :type list)
273  (results (make-mailbox :name "results")))
274 
275 (defmethod print-object ((self task-pool) stream)
276  (print-unreadable-object (self stream :type t)
277  (format stream "~A ~A :online ~A ~A:~A:~A ~A"
278  (task-pool-oracle-id self)
279  (task-pool-kernel self)
280  (gate-open-p (task-pool-online self))
281  (queue-count (task-pool-jobs self))
282  (length (task-pool-stages self))
283  (length (task-pool-workers self))
284  (mailbox-count (task-pool-results self)))))
285 
286 (defmethod designate-oracle ((self task-pool) (guest integer))
287  (setf (task-pool-oracle-id self) (make-oracle (find-thread-by-id guest)))
288  self)
289 
290 (defmethod designate-oracle ((self task-pool) (guest thread))
291  (designate-oracle self (make-oracle guest)))
292 
293 (defmethod task-pool-oracle ((self task-pool))
294  (oracle-thread (find-oracle (slot-value self 'oracle))))
295 
296 (defmethod push-worker ((worker thread) (pool task-pool))
297  (pushnew worker (task-pool-workers pool)))
298 
299 (defmethod push-workers ((threads list) (pool task-pool))
300  (with-slots (workers) pool
301  (dolist (w threads)
302  (pushnew w workers))))
303 
304 (defmethod make-worker-for ((pool task-pool) function &rest args)
305  (make-thread function :name *default-worker-name* :arguments args))
306 
307 (defmethod make-workers-for ((pool task-pool) (count fixnum) function)
308  (make-threads count function :name *default-worker-name*))
309 
310 (defmethod spawn-worker ((pool task-pool))
311  ;; (with-recursive-lock
312  (push-worker (make-worker-for pool (task-pool-kernel pool)) pool))
313 
314 (defmethod spawn-workers ((pool task-pool) (count fixnum))
315  (push-workers (make-workers-for pool count (task-pool-kernel pool)) pool))
316 
317 (defclass task ()
318  ((state :initarg :state :accessor task-state)
319  (object :initarg :object :accessor task-object))
320  (:documentation "This object represents a single unit of work to be done by some
321 worker. Tasks are typically generated by an oracle, but workers may
322 also be granted the ability to create and distribute their own
323 tasks. Once a task is assigned, the 'owner', i.e. the worker that is
324 assigned this task, may modify the object and state. When the work
325 associated with a task is complete, the owner is responsible for
326 indicating in the state slot the result of the computation."))
327 
328 (defmethod make-task (&rest args)
329  (make-instance 'task :object args))
330 
331 (defmethod print-object ((self task) stream)
332  (print-unreadable-object (self stream :type t)
333  (format stream "~A" (task-object self))))
334 
335 (defmethod push-result ((task task) (pool task-pool))
336  (send-message (task-pool-results pool) task))
337 
338 (defmethod run-task ((self thread) (task task))
339  )
340 
341 (defstruct (job (:constructor %make-job (tasks)))
342  "A collection of tasks to be performed by worker threads."
343  (tasks (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
344  :type (array task *))
345  (lock (make-mutex :name "job") :type mutex))
346 
347 (defgeneric make-job (self &key &allow-other-keys))
348 
349 (defmethod make-job ((self task) &key (size 1))
350  (%make-job (make-array size :element-type 'task
351  :initial-element self)))
352 
353 (defmethod make-job ((self vector) &key)
354  (%make-job self))
355 
356 (defmethod make-job ((self null) &key (size 1))
357  (%make-job (make-array size :element-type 'task :fill-pointer 0 :adjustable t)))
358 
359 (defmethod print-object ((self job) stream)
360  (print-unreadable-object (self stream :type t)
361  (format stream "~A" (job-tasks self))))
362 
363 (defmethod push-task ((task task) (job job))
364  (vector-push task (job-tasks job)))
365 
366 (defmethod push-task ((task task) (pool task-pool))
367  (push-job (make-job task) pool))
368 
369 (defmethod push-job ((job job) (pool task-pool))
370  (enqueue job (task-pool-jobs pool)))
371 
372 ;; TODO..
373 (defmethod run-job ((self task-pool) (job job))
374  #+log (log:trace! "running remote job...")
375  (push-job job self))
376 
377 (defclass stage ()
378  ((jobs :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
379  :initarg :jobs
380  :accessor jobs
381  :type (vector job))
382  (lock :initform (make-mutex :name "stage") :initarg :lock :accessor stage-lock :type mutex)))
383 
384 (defmethod print-object ((self stage) stream)
385  (print-unreadable-object (self stream :type t)
386  (format stream "~A" (jobs self))))
387 
388 (defmethod push-stage ((stage stage) (pool task-pool))
389  (vector-push stage (task-pool-stages pool)))
390 
391 (defmethod run-stage ((self thread) (stage stage)))