changeset 336: |
0af47621fa8b |
parent: |
a0dfde3cb3c4
|
child: |
83f8623a6ec3 |
author: |
Richard Westhaver <ellis@rwest.io> |
date: |
Sun, 12 May 2024 22:13:22 -0400 |
permissions: |
-rw-r--r-- |
description: |
printing, bugfixes |
1 ;;; threads.lisp --- Multi-thread utilities 7 ;; mostly yoinked from sb-thread and friends 10 (in-package :std/thread) 12 ;; (sb-thread:thread-os-tid sb-thread:*current-thread*) 13 ;; sb-thread:interrupt-thread 15 (defun thread-support-p () (member :thread-support *features*)) 17 (eval-when (:compile-toplevel) 18 (defun print-thread-message-top-level (msg) 19 (sb-thread:make-thread 21 (format #.*standard-output* msg))) 24 ;; this is all very unsafe. don't touch the finalizer thread plz. 25 (defun find-thread-by-id (id) 26 "Search for thread by ID which must be an u64. On success returns the thread itself or nil." 27 (sb-thread::avlnode-data (sb-thread::avl-find id sb-thread::*all-threads*))) 29 (defun thread-id-list () 30 (sb-thread::avltree-filter #'sb-thread::avlnode-key sb-thread::*all-threads*)) 32 (defun thread-count () 33 (sb-thread::avl-count sb-thread::*all-threads*)) 35 (defun make-threads (n fn &key (name "thread")) 36 (declare (type fixnum n)) 38 collect (make-thread fn :name (format nil "~A-~D" name i)))) 40 (defmacro with-threads ((idx n) &body body) 41 `(make-threads ,n (lambda (,idx) (declare (ignorable ,idx)) ,@body))) 43 (defun finish-threads (&rest threads) 44 (let ((threads (flatten threads))) 46 (mapc #'join-thread threads) 47 (dolist (thread threads) 48 (when (thread-alive-p thread) 49 (terminate-thread thread)))))) 51 (defun timed-join-thread (thread timeout) 52 (declare (type thread thread) (type float timeout)) 53 (handler-case (sb-sys:with-deadline (:seconds timeout) 54 (join-thread thread :default :aborted)) 59 (join-thread *current-thread*)) 61 (defun kill-thread (thread) 62 (when (thread-alive-p thread) 64 (terminate-thread thread)))) 66 ;; (sb-vm::primitive-object-slots (sb-vm::primitive-object 'sb-vm::thread)) 67 (defun init-session (&optional (thread *current-thread*)) (sb-thread::new-session thread)) 69 ;; (sb-thread::with-progressive-timeout (timet :seconds 4) (dotimes (i 4000) (print (timet)))) 71 ;; (describe sb-thread::*session*) 73 ;; make-listener-thread 75 ;; with-progressive-timeout 79 (let* ((slots (sb-vm::primitive-object-slots #1=(sb-vm::primitive-object 'sb-vm::thread))) 80 (sap (current-thread-sap)) 81 (thread-obj-len (sb-vm::primitive-object-length #1#)) 82 (names (make-array thread-obj-len :initial-element ""))) 83 (loop for slot across slots 85 (setf (aref names (sb-vm::slot-offset slot)) (sb-vm::slot-name slot))) 86 (flet ((safely-read (sap offset &aux (bits (sb-vm::sap-ref-word sap offset))) 87 (cond ((eql bits sb-vm:no-tls-value-marker) :no-tls-value) 88 ((eql (logand bits sb-vm:widetag-mask) sb-vm:unbound-marker-widetag) :unbound) 89 (t (sb-vm::sap-ref-lispobj sap offset)))) 91 (declare (type fixnum sym)) 92 (let ((*print-right-margin* 128) 94 (format t " ~3d ~30a : ~s~%" 95 #+sb-thread (ash sym (- sb-vm:word-shift)) 97 #+sb-thread (sb-vm:symbol-from-tls-index sym) 100 (format t "~&TLS: (base=~x)~%" (sb-vm::sap-int sap)) 101 (loop for tlsindex from sb-vm:n-word-bytes below 102 #+sb-thread (ash sb-vm::*free-tls-index* sb-vm:n-fixnum-tag-bits) 103 #-sb-thread (ash thread-obj-len sb-vm:word-shift) 104 by sb-vm:n-word-bytes 106 (unless (<= sb-vm::thread-allocator-histogram-slot 107 (ash tlsindex (- sb-vm:word-shift)) 108 (1- sb-vm::thread-lisp-thread-slot)) 109 (let ((thread-slot-name 110 (if (< tlsindex (ash thread-obj-len sb-vm:word-shift)) 111 (aref names (ash tlsindex (- sb-vm:word-shift)))))) 112 (if (and thread-slot-name (sb-vm::neq thread-slot-name 'sb-vm::lisp-thread)) 113 (format t " ~3d ~30a : #x~x~%" (ash tlsindex (- sb-vm:word-shift)) 114 thread-slot-name (sb-vm::sap-ref-word sap tlsindex)) 115 (let ((val (safely-read sap tlsindex))) 116 (unless (eq val :no-tls-value) 117 (show tlsindex val))))))) 118 (let ((from (sb-vm::descriptor-sap sb-vm:*binding-stack-start*)) 119 (to (sb-vm::binding-stack-pointer-sap))) 120 (format t "~%Binding stack: (depth ~d)~%" 121 (/ (sb-vm::sap- to from) (* sb-vm:binding-size sb-vm:n-word-bytes))) 123 (when (sb-vm::sap>= from to) (return)) 124 (let ((val (safely-read from 0)) 125 (sym #+sb-thread (sb-vm::sap-ref-word from sb-vm:n-word-bytes) ; a TLS index 126 #-sb-thread (sb-vm::sap-ref-lispobj from sb-vm:n-word-bytes))) 128 (setq from (sb-vm::sap+ from (* sb-vm:binding-size sb-vm:n-word-bytes)))))))) 130 (defun wait-for-threads (threads) 131 (map 'list (lambda (thread) (sb-thread:join-thread thread :default nil)) threads) 132 (not (some #'sb-thread:thread-alive-p threads))) 134 (defun process-all-interrupts (&optional (thread sb-thread:*current-thread*)) 135 (sb-ext:wait-for (null (sb-thread::thread-interruptions thread)))) 138 (defclass supervisor () 140 (:documentation "A class which provides a view of the work done within a specified 143 This object should be used by operators to inspect 'runstreams' 144 performed in other threads, such as WORKERS in TASK-POOL. 146 Before using this object you should ensure the SCOPE is fully 147 initialized. Supervisors should be created at any point during the 148 lifetime of SCOPE, but never before and never after.")) 152 ;; (multiple-value-list (sb-unix:unix-getrusage 0)) 153 ;; (setf sb-unix::*on-dangerous-wait* :error) 154 (defvar *oracle-threads* nil) 156 (defun find-oracle (id) 157 (declare ((unsigned-byte 32) id)) 158 (find id *oracle-threads* :test '= :key 'oracle-id)) 160 (defstruct (oracle (:constructor %make-oracle (id thread))) 161 (id 0 :type (unsigned-byte 32) :read-only t) 162 (thread *current-thread* :read-only t)) 164 (defun make-oracle (thread) 165 (let ((orc (%make-oracle (sb-thread:thread-os-tid thread) thread))) 167 (pushnew orc *oracle-threads* :test '= :key #'oracle-id)))) 169 (defgeneric designate-oracle (host guest)) 171 (defgeneric push-job (job pool)) 172 (defgeneric push-task (task pool)) 173 (defgeneric push-result (task pool)) 174 (defgeneric push-worker (thread pool)) 175 (defgeneric push-workers (threads pool)) 176 (defgeneric push-stage (stage pool)) 177 (defgeneric find-job (job pool &key &allow-other-keys)) 179 (defgeneric delete-job (job pool &key &allow-other-keys)) 180 (defgeneric pop-job (pool)) 181 (defgeneric pop-task (pool)) 182 (defgeneric pop-result (pool)) 183 (defgeneric pop-worker (pool)) 184 (defgeneric pop-stage (pool)) 186 (defgeneric start-task-pool (pool)) 187 (defgeneric pause-task-pool (pool)) 188 (defgeneric stop-task-pool (pool)) 189 (defgeneric make-task (&rest args)) 190 (defgeneric run-job (self job)) 191 (defgeneric run-stage (self stage)) 192 (defgeneric run-task (self task)) 194 (defgeneric make-worker-for (pool function &rest args) 195 (:method ((pool null) (function function) &rest args) 196 (declare (ignore pool)) 197 (make-thread function :arguments args))) 199 (defvar *default-worker-name* "worker") 201 (defgeneric make-workers-for (pool count function) 202 (:method ((pool null) (count fixnum) (function function)) 203 (declare (ignore pool)) 204 (make-threads count function :name *default-worker-name*))) 206 (defmacro define-task-kernel (name (&key args accessors) &body body) 207 "Define a task kernel. 209 (define-task-kernel NAME (&key ARGS MAX MIN ACCESSORS) 211 The kernel should process all options and return a function - the 214 The kernel function is installed in worker threads by passing it to 215 SB-THREAD:MAKE-THREAD. It may accept a varying number of arguments 218 ACCESSORS is a list of pandoric accessors which can be called on the 219 kernel via an ORACLE. 221 This interface is experimental and subject to change." 222 (declare (ignorable accessors)) 223 `(defun ,name (,@args) 226 (define-task-kernel default-task-kernel (:args () ) 227 "The default task kernel used to initialize the KERNEL slot of 233 (defgeneric spawn-worker (pool) 234 (:method ((pool null)) 235 (declare (ignore pool)) 236 (make-thread (default-task-kernel)))) 238 (defgeneric spawn-workers (pool count) 239 (:method ((pool null) (count fixnum)) 240 (declare (ignore pool)) 241 (make-threads count (default-task-kernel) :name *default-worker-name*))) 244 (oracle-id nil :type (or null (unsigned-byte 32))) 245 (kernel #'default-task-kernel :type function) 246 (jobs (make-queue :name "jobs")) 247 (stages (make-array 0 :element-type 'stage :fill-pointer 0) :type (array stage *)) 248 ;; When open, indicates that the pool is fully initialized and workers 249 ;; may make progress. 250 (online (make-gate :name "online" :open nil) 252 ;; TODO: test weak-vector here 253 (workers (make-array 0 :element-type '(unsigned-byte 32) :fill-pointer 0) :type (vector (unsigned-byte 32) *)) 254 (results (make-mailbox :name "results"))) 256 (defmethod designate-oracle ((self task-pool) (guest integer)) 257 (setf (task-pool-oracle-id self) guest) 260 (defmethod designate-oracle ((self task-pool) (guest thread)) 261 (designate-oracle self (make-oracle guest))) 263 (defmethod task-pool-oracle ((self task-pool)) 264 (oracle-thread (find-oracle (slot-value self 'oracle)))) 266 (defmethod push-worker ((worker thread) (pool task-pool)) 267 (vector-push (thread-os-tid worker) (task-pool-workers pool))) 269 (defmethod push-workers ((threads list) (pool task-pool)) 270 (with-slots (workers) pool 272 (vector-push (thread-os-tid w) workers)))) 274 (defmethod make-worker-for ((pool task-pool) function &rest args) 275 (make-thread function :name *default-worker-name* :arguments args)) 277 (defmethod make-workers-for ((pool task-pool) (count fixnum) function) 278 (make-threads count function :name *default-worker-name*)) 280 (defmethod spawn-worker ((pool task-pool)) 281 ;; (with-recursive-lock 282 (push-worker (make-worker-for pool (task-pool-kernel pool)) pool)) 284 (defmethod spawn-workers ((pool task-pool) (count fixnum)) 285 (push-workers (make-workers-for pool count (task-pool-kernel pool)) pool)) 288 ((state :initarg :state :accessor task-state) 289 (object :initarg :object :accessor task-object)) 290 (:documentation "This object represents a single unit of work to be done by some 291 worker. Tasks are typically generated by an oracle, but workers may 292 also be granted the ability to create and distribute their own 293 tasks. Once a task is assigned, the 'owner', i.e. the worker that is 294 assigned this task, may modify the object and state. When the work 295 associated with a task is complete, the owner is responsible for 296 indicating in the state slot the result of the computation.")) 298 (defmethod make-task (&rest args) 299 (make-instance 'task :object args)) 301 (defmethod print-object ((self task) stream) 302 (print-unreadable-object (self stream :type t) 303 (format stream "~A" (task-object self)))) 305 (defmethod push-result ((task task) (pool task-pool)) 306 (send-message (task-pool-results pool) task)) 308 (defmethod run-task ((self thread) (task task)) 311 (defstruct (job (:constructor %make-job (tasks))) 312 "A collection of tasks to be performed by worker threads." 313 (tasks (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t) 314 :type (array task *)) 315 (lock (make-mutex :name "job") :type mutex)) 317 (defgeneric make-job (self &key &allow-other-keys)) 319 (defmethod make-job ((self task) &key (size 1)) 320 (%make-job (make-array size :element-type 'task 321 :initial-element self))) 323 (defmethod make-job ((self vector) &key) 326 (defmethod make-job ((self null) &key (size 1)) 327 (%make-job (make-array size :element-type 'task :fill-pointer 0 :adjustable t))) 329 (defmethod print-object ((self job) stream) 330 (print-unreadable-object (self stream :type t) 331 (format stream "~A" (job-tasks self)))) 333 (defmethod push-task ((task task) (job job)) 334 (vector-push task (job-tasks job))) 336 (defmethod push-task ((task task) (pool task-pool)) 337 (push-job (make-job task) pool)) 339 (defmethod push-job ((job job) (pool task-pool)) 340 (enqueue job (task-pool-jobs pool))) 343 (defmethod run-job ((self task-pool) (job job)) 344 #+log (log:trace! "running remote job...") 348 ((jobs :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t) 352 (lock :initform (make-mutex :name "stage") :initarg :lock :accessor stage-lock :type mutex))) 354 (defmethod print-object ((self stage) stream) 355 (print-unreadable-object (self stream :type t) 356 (format stream "~A" (jobs self)))) 358 (defmethod push-stage ((stage stage) (pool task-pool)) 359 (vector-push stage (task-pool-stages pool))) 361 (defmethod run-stage ((self thread) (stage stage)))