changelog shortlog graph tags branches changeset files revisions annotate raw help

Mercurial > core / lisp/std/thread.lisp

changeset 336: 0af47621fa8b
parent: a0dfde3cb3c4
child: 83f8623a6ec3
author: Richard Westhaver <ellis@rwest.io>
date: Sun, 12 May 2024 22:13:22 -0400
permissions: -rw-r--r--
description: printing, bugfixes
1 ;;; threads.lisp --- Multi-thread utilities
2 
3 ;; Threading Macros
4 
5 ;;; Commentary:
6 
7 ;; mostly yoinked from sb-thread and friends
8 
9 ;;; Code:
10 (in-package :std/thread)
11 
12 ;; (sb-thread:thread-os-tid sb-thread:*current-thread*)
13 ;; sb-thread:interrupt-thread
14 
15 (defun thread-support-p () (member :thread-support *features*))
16 
17 (eval-when (:compile-toplevel)
18  (defun print-thread-message-top-level (msg)
19  (sb-thread:make-thread
20  (lambda ()
21  (format #.*standard-output* msg)))
22  nil))
23 
24 ;; this is all very unsafe. don't touch the finalizer thread plz.
25 (defun find-thread-by-id (id)
26  "Search for thread by ID which must be an u64. On success returns the thread itself or nil."
27  (sb-thread::avlnode-data (sb-thread::avl-find id sb-thread::*all-threads*)))
28 
29 (defun thread-id-list ()
30  (sb-thread::avltree-filter #'sb-thread::avlnode-key sb-thread::*all-threads*))
31 
32 (defun thread-count ()
33  (sb-thread::avl-count sb-thread::*all-threads*))
34 
35 (defun make-threads (n fn &key (name "thread"))
36  (declare (type fixnum n))
37  (loop for i below n
38  collect (make-thread fn :name (format nil "~A-~D" name i))))
39 
40 (defmacro with-threads ((idx n) &body body)
41  `(make-threads ,n (lambda (,idx) (declare (ignorable ,idx)) ,@body)))
42 
43 (defun finish-threads (&rest threads)
44  (let ((threads (flatten threads)))
45  (unwind-protect
46  (mapc #'join-thread threads)
47  (dolist (thread threads)
48  (when (thread-alive-p thread)
49  (terminate-thread thread))))))
50 
51 (defun timed-join-thread (thread timeout)
52  (declare (type thread thread) (type float timeout))
53  (handler-case (sb-sys:with-deadline (:seconds timeout)
54  (join-thread thread :default :aborted))
55  (sb-ext:timeout ()
56  :timeout)))
57 
58 (defun hang ()
59  (join-thread *current-thread*))
60 
61 (defun kill-thread (thread)
62  (when (thread-alive-p thread)
63  (ignore-errors
64  (terminate-thread thread))))
65 
66 ;; (sb-vm::primitive-object-slots (sb-vm::primitive-object 'sb-vm::thread))
67 (defun init-session (&optional (thread *current-thread*)) (sb-thread::new-session thread))
68 
69 ;; (sb-thread::with-progressive-timeout (timet :seconds 4) (dotimes (i 4000) (print (timet))))
70 
71 ;; (describe sb-thread::*session*)
72 
73 ;; make-listener-thread
74 
75 ;; with-progressive-timeout
76 
77 ;; from sb-thread
78 (defun dump-thread ()
79  (let* ((slots (sb-vm::primitive-object-slots #1=(sb-vm::primitive-object 'sb-vm::thread)))
80  (sap (current-thread-sap))
81  (thread-obj-len (sb-vm::primitive-object-length #1#))
82  (names (make-array thread-obj-len :initial-element "")))
83  (loop for slot across slots
84  do
85  (setf (aref names (sb-vm::slot-offset slot)) (sb-vm::slot-name slot)))
86  (flet ((safely-read (sap offset &aux (bits (sb-vm::sap-ref-word sap offset)))
87  (cond ((eql bits sb-vm:no-tls-value-marker) :no-tls-value)
88  ((eql (logand bits sb-vm:widetag-mask) sb-vm:unbound-marker-widetag) :unbound)
89  (t (sb-vm::sap-ref-lispobj sap offset))))
90  (show (sym val)
91  (declare (type fixnum sym))
92  (let ((*print-right-margin* 128)
93  (*print-lines* 4))
94  (format t " ~3d ~30a : ~s~%"
95  #+sb-thread (ash sym (- sb-vm:word-shift))
96  #-sb-thread 0
97  #+sb-thread (sb-vm:symbol-from-tls-index sym)
98  #-sb-thread sym
99  val))))
100  (format t "~&TLS: (base=~x)~%" (sb-vm::sap-int sap))
101  (loop for tlsindex from sb-vm:n-word-bytes below
102  #+sb-thread (ash sb-vm::*free-tls-index* sb-vm:n-fixnum-tag-bits)
103  #-sb-thread (ash thread-obj-len sb-vm:word-shift)
104  by sb-vm:n-word-bytes
105  do
106  (unless (<= sb-vm::thread-allocator-histogram-slot
107  (ash tlsindex (- sb-vm:word-shift))
108  (1- sb-vm::thread-lisp-thread-slot))
109  (let ((thread-slot-name
110  (if (< tlsindex (ash thread-obj-len sb-vm:word-shift))
111  (aref names (ash tlsindex (- sb-vm:word-shift))))))
112  (if (and thread-slot-name (sb-vm::neq thread-slot-name 'sb-vm::lisp-thread))
113  (format t " ~3d ~30a : #x~x~%" (ash tlsindex (- sb-vm:word-shift))
114  thread-slot-name (sb-vm::sap-ref-word sap tlsindex))
115  (let ((val (safely-read sap tlsindex)))
116  (unless (eq val :no-tls-value)
117  (show tlsindex val)))))))
118  (let ((from (sb-vm::descriptor-sap sb-vm:*binding-stack-start*))
119  (to (sb-vm::binding-stack-pointer-sap)))
120  (format t "~%Binding stack: (depth ~d)~%"
121  (/ (sb-vm::sap- to from) (* sb-vm:binding-size sb-vm:n-word-bytes)))
122  (loop
123  (when (sb-vm::sap>= from to) (return))
124  (let ((val (safely-read from 0))
125  (sym #+sb-thread (sb-vm::sap-ref-word from sb-vm:n-word-bytes) ; a TLS index
126  #-sb-thread (sb-vm::sap-ref-lispobj from sb-vm:n-word-bytes)))
127  (show sym val))
128  (setq from (sb-vm::sap+ from (* sb-vm:binding-size sb-vm:n-word-bytes))))))))
129 
130 (defun wait-for-threads (threads)
131  (map 'list (lambda (thread) (sb-thread:join-thread thread :default nil)) threads)
132  (not (some #'sb-thread:thread-alive-p threads)))
133 
134 (defun process-all-interrupts (&optional (thread sb-thread:*current-thread*))
135  (sb-ext:wait-for (null (sb-thread::thread-interruptions thread))))
136 
137 ;;; Tasks
138 (defclass supervisor ()
139  (scope)
140  (:documentation "A class which provides a view of the work done within a specified
141 SCOPE.
142 
143 This object should be used by operators to inspect 'runstreams'
144 performed in other threads, such as WORKERS in TASK-POOL.
145 
146 Before using this object you should ensure the SCOPE is fully
147 initialized. Supervisors should be created at any point during the
148 lifetime of SCOPE, but never before and never after."))
149 (thread-id-list)
150 ;; unix-getrusage
151 ;; 0,-1,-2
152 ;; (multiple-value-list (sb-unix:unix-getrusage 0))
153 ;; (setf sb-unix::*on-dangerous-wait* :error)
154 (defvar *oracle-threads* nil)
155 
156 (defun find-oracle (id)
157  (declare ((unsigned-byte 32) id))
158  (find id *oracle-threads* :test '= :key 'oracle-id))
159 
160 (defstruct (oracle (:constructor %make-oracle (id thread)))
161  (id 0 :type (unsigned-byte 32) :read-only t)
162  (thread *current-thread* :read-only t))
163 
164 (defun make-oracle (thread)
165  (let ((orc (%make-oracle (sb-thread:thread-os-tid thread) thread)))
166  (prog1 orc
167  (pushnew orc *oracle-threads* :test '= :key #'oracle-id))))
168 
169 (defgeneric designate-oracle (host guest))
170 
171 (defgeneric push-job (job pool))
172 (defgeneric push-task (task pool))
173 (defgeneric push-result (task pool))
174 (defgeneric push-worker (thread pool))
175 (defgeneric push-workers (threads pool))
176 (defgeneric push-stage (stage pool))
177 (defgeneric find-job (job pool &key &allow-other-keys))
178 
179 (defgeneric delete-job (job pool &key &allow-other-keys))
180 (defgeneric pop-job (pool))
181 (defgeneric pop-task (pool))
182 (defgeneric pop-result (pool))
183 (defgeneric pop-worker (pool))
184 (defgeneric pop-stage (pool))
185 
186 (defgeneric start-task-pool (pool))
187 (defgeneric pause-task-pool (pool))
188 (defgeneric stop-task-pool (pool))
189 (defgeneric make-task (&rest args))
190 (defgeneric run-job (self job))
191 (defgeneric run-stage (self stage))
192 (defgeneric run-task (self task))
193 
194 (defgeneric make-worker-for (pool function &rest args)
195  (:method ((pool null) (function function) &rest args)
196  (declare (ignore pool))
197  (make-thread function :arguments args)))
198 
199 (defvar *default-worker-name* "worker")
200 
201 (defgeneric make-workers-for (pool count function)
202  (:method ((pool null) (count fixnum) (function function))
203  (declare (ignore pool))
204  (make-threads count function :name *default-worker-name*)))
205 
206 (defmacro define-task-kernel (name (&key args accessors) &body body)
207  "Define a task kernel.
208 
209 (define-task-kernel NAME (&key ARGS MAX MIN ACCESSORS)
210 
211 The kernel should process all options and return a function - the
212 'kernel function'.
213 
214 The kernel function is installed in worker threads by passing it to
215 SB-THREAD:MAKE-THREAD. It may accept a varying number of arguments
216 specified by ARGS.
217 
218 ACCESSORS is a list of pandoric accessors which can be called on the
219 kernel via an ORACLE.
220 
221 This interface is experimental and subject to change."
222  (declare (ignorable accessors))
223  `(defun ,name (,@args)
224  ,@body))
225 
226 (define-task-kernel default-task-kernel (:args () )
227  "The default task kernel used to initialize the KERNEL slot of
228 task-pools.
229 
230 "
231  nil)
232 
233 (defgeneric spawn-worker (pool)
234  (:method ((pool null))
235  (declare (ignore pool))
236  (make-thread (default-task-kernel))))
237 
238 (defgeneric spawn-workers (pool count)
239  (:method ((pool null) (count fixnum))
240  (declare (ignore pool))
241  (make-threads count (default-task-kernel) :name *default-worker-name*)))
242 
243 (defstruct task-pool
244  (oracle-id nil :type (or null (unsigned-byte 32)))
245  (kernel #'default-task-kernel :type function)
246  (jobs (make-queue :name "jobs"))
247  (stages (make-array 0 :element-type 'stage :fill-pointer 0) :type (array stage *))
248  ;; When open, indicates that the pool is fully initialized and workers
249  ;; may make progress.
250  (online (make-gate :name "online" :open nil)
251  :type gate)
252  ;; TODO: test weak-vector here
253  (workers (make-array 0 :element-type '(unsigned-byte 32) :fill-pointer 0) :type (vector (unsigned-byte 32) *))
254  (results (make-mailbox :name "results")))
255 
256 (defmethod designate-oracle ((self task-pool) (guest integer))
257  (setf (task-pool-oracle-id self) guest)
258  self)
259 
260 (defmethod designate-oracle ((self task-pool) (guest thread))
261  (designate-oracle self (make-oracle guest)))
262 
263 (defmethod task-pool-oracle ((self task-pool))
264  (oracle-thread (find-oracle (slot-value self 'oracle))))
265 
266 (defmethod push-worker ((worker thread) (pool task-pool))
267  (vector-push (thread-os-tid worker) (task-pool-workers pool)))
268 
269 (defmethod push-workers ((threads list) (pool task-pool))
270  (with-slots (workers) pool
271  (dolist (w threads)
272  (vector-push (thread-os-tid w) workers))))
273 
274 (defmethod make-worker-for ((pool task-pool) function &rest args)
275  (make-thread function :name *default-worker-name* :arguments args))
276 
277 (defmethod make-workers-for ((pool task-pool) (count fixnum) function)
278  (make-threads count function :name *default-worker-name*))
279 
280 (defmethod spawn-worker ((pool task-pool))
281  ;; (with-recursive-lock
282  (push-worker (make-worker-for pool (task-pool-kernel pool)) pool))
283 
284 (defmethod spawn-workers ((pool task-pool) (count fixnum))
285  (push-workers (make-workers-for pool count (task-pool-kernel pool)) pool))
286 
287 (defclass task ()
288  ((state :initarg :state :accessor task-state)
289  (object :initarg :object :accessor task-object))
290  (:documentation "This object represents a single unit of work to be done by some
291 worker. Tasks are typically generated by an oracle, but workers may
292 also be granted the ability to create and distribute their own
293 tasks. Once a task is assigned, the 'owner', i.e. the worker that is
294 assigned this task, may modify the object and state. When the work
295 associated with a task is complete, the owner is responsible for
296 indicating in the state slot the result of the computation."))
297 
298 (defmethod make-task (&rest args)
299  (make-instance 'task :object args))
300 
301 (defmethod print-object ((self task) stream)
302  (print-unreadable-object (self stream :type t)
303  (format stream "~A" (task-object self))))
304 
305 (defmethod push-result ((task task) (pool task-pool))
306  (send-message (task-pool-results pool) task))
307 
308 (defmethod run-task ((self thread) (task task))
309  )
310 
311 (defstruct (job (:constructor %make-job (tasks)))
312  "A collection of tasks to be performed by worker threads."
313  (tasks (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
314  :type (array task *))
315  (lock (make-mutex :name "job") :type mutex))
316 
317 (defgeneric make-job (self &key &allow-other-keys))
318 
319 (defmethod make-job ((self task) &key (size 1))
320  (%make-job (make-array size :element-type 'task
321  :initial-element self)))
322 
323 (defmethod make-job ((self vector) &key)
324  (%make-job self))
325 
326 (defmethod make-job ((self null) &key (size 1))
327  (%make-job (make-array size :element-type 'task :fill-pointer 0 :adjustable t)))
328 
329 (defmethod print-object ((self job) stream)
330  (print-unreadable-object (self stream :type t)
331  (format stream "~A" (job-tasks self))))
332 
333 (defmethod push-task ((task task) (job job))
334  (vector-push task (job-tasks job)))
335 
336 (defmethod push-task ((task task) (pool task-pool))
337  (push-job (make-job task) pool))
338 
339 (defmethod push-job ((job job) (pool task-pool))
340  (enqueue job (task-pool-jobs pool)))
341 
342 ;; TODO..
343 (defmethod run-job ((self task-pool) (job job))
344  #+log (log:trace! "running remote job...")
345  (push-job job self))
346 
347 (defclass stage ()
348  ((jobs :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
349  :initarg :jobs
350  :accessor jobs
351  :type (vector job))
352  (lock :initform (make-mutex :name "stage") :initarg :lock :accessor stage-lock :type mutex)))
353 
354 (defmethod print-object ((self stage) stream)
355  (print-unreadable-object (self stream :type t)
356  (format stream "~A" (jobs self))))
357 
358 (defmethod push-stage ((stage stage) (pool task-pool))
359  (vector-push stage (task-pool-stages pool)))
360 
361 (defmethod run-stage ((self thread) (stage stage)))