changelog shortlog graph tags branches changeset file revisions annotate raw help

Mercurial > core / lisp/std/task.lisp

revision 692: f51b73f49946
parent 691: 295ea43ceb2d
child 694: a36280d2ef4e
     1.1--- a/lisp/std/task.lisp	Wed Oct 02 23:39:07 2024 -0400
     1.2+++ b/lisp/std/task.lisp	Thu Oct 03 17:56:11 2024 -0400
     1.3@@ -7,17 +7,34 @@
     1.4 
     1.5 ;;; Vars
     1.6 (defvar *task-pool*)
     1.7-(defvar *tasks*)
     1.8-(defvar *workers*)
     1.9+(defvar *tasks* (make-queue :name "tasks"))
    1.10 (defvar *jobs*)
    1.11 (defvar *stages*)
    1.12 (defvar *oracles* nil)
    1.13+(defvar *task-oracles* nil)
    1.14+(eval-when (:compile-toplevel)
    1.15+  (defvar *task*)
    1.16+  (defvar *task-result* nil))
    1.17+
    1.18+(define-condition task-error (thread-error) ()
    1.19+  (:report (lambda (condition stream)
    1.20+             (format stream "Unhandled task error in thread ~A" 
    1.21+                     (thread-error-thread condition)))))
    1.22+
    1.23+(defun task-error (thread)
    1.24+  (error 'task-error :thread thread))
    1.25 
    1.26 ;;; Kernel
    1.27-(defmacro parse-kernel-ops (op)
    1.28-  "Parse an op of the form (NAME ARGS &BODY BODY)"
    1.29-  (destructuring-bind (name args &body body) op
    1.30-    `(std/macs:named-lambda ,name ,args ,@body)))
    1.31+(defmacro gen-task-kernel (name args lock queue mailbox timeout &body body)
    1.32+  `(compile ,name 
    1.33+            (lambda ,args 
    1.34+              (wait-on-semaphore ,lock ,@(when timeout `((:timeout ,timeout))))
    1.35+              (let ((*task* (dequeue ,queue)))
    1.36+                (unwind-protect 
    1.37+                     (handler-case (setf *task-result* (progn ,@body))
    1.38+                       (error () (task-error *current-thread*)))
    1.39+                  (send-message ,mailbox *task-result*)
    1.40+                  (release-foreground))))))
    1.41 
    1.42 (defmacro define-task-kernel (name (&key lock timeout mailbox queue) args &body body)
    1.43   "Define a task kernel.
    1.44@@ -31,27 +48,13 @@
    1.45 SB-THREAD:MAKE-THREAD. It may accept a varying number of arguments
    1.46 specified by ARGS.
    1.47 
    1.48-ACCESSORS is a list of pandoric accessors which can be called on the
    1.49-kernel via an ORACLE. 
    1.50+Within the BODY the variable *task* is bound to the result of (DEQUEUE QUEUE)
    1.51+and *task-result* is bound to the return value of BODY.
    1.52 
    1.53 This interface is experimental and subject to change."
    1.54   `(gen-task-kernel ,name ,args ,lock ,queue ,mailbox ,timeout
    1.55      ,@body))
    1.56 
    1.57-(defvar *task-queue*)
    1.58-(defvar *task-item*)
    1.59-(defvar *task-result*)
    1.60-(defvar *task-error*)
    1.61-
    1.62-(defmacro gen-task-kernel (name args lock queue mailbox timeout &body body)
    1.63-  `(compile ,name 
    1.64-            (lambda ,args 
    1.65-              (wait-on-semaphore ,lock ,@(when timeout `((:timeout ,timeout))))
    1.66-              (let* ((*task-queue* ,queue)
    1.67-                     (*task-item* (dequeue ,queue)))
    1.68-                ,@body
    1.69-                (send-message ,mailbox t)))))
    1.70-
    1.71 ;;; Supervisor
    1.72 (defclass supervisor ()
    1.73   (scope)
    1.74@@ -76,20 +79,28 @@
    1.75   ((thread :initform (sb-thread::%make-thread #.#1=(symbol-name (gensym "w")) t (make-semaphore :name #.#1#))
    1.76            :accessor worker-thread
    1.77            :initarg :thread)
    1.78-   (function :type function :accessor worker-function :initarg :function)
    1.79-   (arguments :type list :accessor worker-arguments :initarg :arguments)))
    1.80+   (kernel :type function :accessor worker-kernel :initarg :kernel)
    1.81+   (input :initform nil :accessor worker-input :initarg :input)))
    1.82+
    1.83+(defvar *workers* (make-array 0 :element-type 'worker :adjustable t))
    1.84+
    1.85+(declaim (inline kill-worker join-worker))
    1.86+(defun start-worker (worker) 
    1.87+  (sb-thread::start-thread (worker-thread worker) (worker-kernel worker) (worker-input worker)))
    1.88+(defun kill-worker (worker) (kill-thread (worker-thread worker)))
    1.89+(defun join-worker (worker) (join-thread (worker-thread worker)))
    1.90 
    1.91 ;;; Oracle           
    1.92 (defstruct (oracle (:constructor %make-oracle (id thread)))
    1.93   (id 0 :type (unsigned-byte 32) :read-only t)
    1.94   (thread *current-thread* :read-only t))
    1.95 
    1.96-(defun find-oracle (id)
    1.97+(defun oracle-of-id (id)
    1.98   (find id *oracles* :test '= :key 'oracle-id))
    1.99 
   1.100 (defun make-oracle (thread)
   1.101-  (let* ((id (thread-os-tid thread)))
   1.102-    (if-let ((found (find-oracle id)))
   1.103+  (let ((id (thread-os-tid thread)))
   1.104+    (if-let ((found (oracle-of-id id)))
   1.105       (values id found)
   1.106       (let ((orc (%make-oracle id thread)))
   1.107         (push orc *oracles*)
   1.108@@ -99,36 +110,20 @@
   1.109 ;; oracle
   1.110 (defgeneric designate-oracle (host guest))
   1.111 ;; worker
   1.112-(defgeneric make-worker (self &rest initargs &key &allow-other-keys)
   1.113-  (:method ((self t) &key thread function arguments)
   1.114-    (declare (ignore self))
   1.115-    (apply #'make-instance 'worker
   1.116-           `(,@(when thread `(:thread ,thread))
   1.117-             ,@(when function `(:function ,function))
   1.118-             ,@(when arguments `(:arguments ,arguments))))))
   1.119-(defgeneric make-workers (self count &rest initargs &key &allow-other-keys)
   1.120-  (:method ((self t) (count t) &key thread function arguments)
   1.121+(defun make-worker (&key thread kernel input)
   1.122+  (apply #'make-instance 'worker
   1.123+         `(,@(when thread `(:thread ,thread))
   1.124+           ,@(when kernel `(:kernel ,kernel))
   1.125+           ,@(when input `(:input ,input)))))
   1.126+
   1.127+(defgeneric make-workers (count &rest initargs &key &allow-other-keys)
   1.128+  (:method ((count number) &key thread kernel input)
   1.129     (let ((ret))
   1.130       (dotimes (i count ret)
   1.131-        (push (make-worker t :thread thread :function function :arguments arguments) ret)))))
   1.132+        (push (make-worker :thread thread :kernel kernel :input input) ret)))))
   1.133+
   1.134 (defgeneric delete-worker (worker pool &key &allow-other-keys))
   1.135-(defgeneric pop-worker (pool))
   1.136-(defgeneric make-worker-for (pool function &rest args)
   1.137-  (:method ((pool null) (function function) &rest args)
   1.138-    (declare (ignore pool))
   1.139-    (make-worker t :function function :arguments args)))
   1.140-(defgeneric make-workers-for (pool count function)
   1.141-  (:method ((pool null) (count fixnum) (function function))
   1.142-    (declare (ignore pool))
   1.143-    (make-workers t count :function function)))
   1.144-(defgeneric spawn-worker (pool)
   1.145-  (:method ((pool null))
   1.146-    (declare (ignore pool))
   1.147-    (make-worker t :function (default-task-kernel))))
   1.148-(defgeneric spawn-workers (pool count)
   1.149-  (:method ((pool null) (count fixnum))
   1.150-    (declare (ignore pool))
   1.151-    (make-workers t count :function (default-task-kernel))))
   1.152+(defgeneric spawn-worker (pool worker))
   1.153 
   1.154 ;; job
   1.155 (defgeneric make-job (self &key &allow-other-keys))
   1.156@@ -137,7 +132,6 @@
   1.157 (defgeneric run-jobs (self))
   1.158 ;; task
   1.159 (defgeneric tasks (self))
   1.160-(defgeneric make-task (&rest args &key &allow-other-keys))
   1.161 (defgeneric run-task (self task))
   1.162 (defgeneric run-tasks (self))
   1.163 (defgeneric results (self))
   1.164@@ -148,10 +142,10 @@
   1.165 ;;; Task Pool
   1.166 (defstruct task-pool
   1.167   (kernel 'identity :type symbol)
   1.168-  (tasks (make-queue :name "tasks"))
   1.169+  (tasks *tasks*)
   1.170   (lock (make-semaphore :name "online") :type semaphore)
   1.171   ;; TODO: test weak-vector here
   1.172-  (workers (make-array 0 :element-type 'worker :fill-pointer 0) :type (vector thread))
   1.173+  (workers (make-array 0 :element-type 'worker :fill-pointer 0) :type (vector worker))
   1.174   (results (make-mailbox :name "results")))
   1.175 
   1.176 (defmethod tasks ((self task-pool)) (task-pool-tasks self))
   1.177@@ -160,28 +154,26 @@
   1.178 
   1.179 (defmethod print-object ((self task-pool) (stream t))
   1.180   (print-unreadable-object (self stream :type t)
   1.181-    (format stream "~A ~A:~A:~A ~A"
   1.182+    (format stream "~A :workers ~A :tasks ~A/~A :results ~A"
   1.183             (task-pool-kernel self)
   1.184             (length (workers self))
   1.185+            (queue-count (tasks self))
   1.186             (semaphore-count (task-pool-lock self))
   1.187-            (queue-count (tasks self))
   1.188             (mailbox-count (task-pool-results self)))))
   1.189 
   1.190-(defmethod designate-oracle ((self task-pool) (guest integer))
   1.191-  (setf (task-pool-oracle-id self) (make-oracle (find-thread-by-id guest)))
   1.192-  self)
   1.193+(defun kill-workers (pool)
   1.194+  "Call FINISH-THREADS on task-pool's workers."
   1.195+  (dotimes (i (length (workers pool)))
   1.196+    (kill-worker (vector-pop (workers pool)))))
   1.197 
   1.198 (defun worker-count (task-pool &key online)
   1.199   (if online
   1.200-      (semaphore-count (task-pool-online task-pool))
   1.201+      (semaphore-count (task-pool-lock task-pool))
   1.202       (length (task-pool-workers task-pool))))
   1.203 
   1.204 (defmethod designate-oracle ((self task-pool) (guest thread))
   1.205   (designate-oracle self (make-oracle guest)))
   1.206 
   1.207-(defmethod task-pool-oracle ((self task-pool))
   1.208-  (oracle-thread (find-oracle (slot-value self 'oracle))))
   1.209-
   1.210 (declaim (inline push-worker push-workers pop-worker))
   1.211 (defun push-worker (worker pool)
   1.212   (vector-push-extend worker (task-pool-workers pool)))
   1.213@@ -194,22 +186,18 @@
   1.214 (defmethod pop-worker (pool)
   1.215   (vector-pop (task-pool-workers pool)))
   1.216 
   1.217-(defmethod make-worker-for ((pool task-pool) function &rest args)
   1.218-  (make-thread function :name *default-worker-name* :arguments args))
   1.219-
   1.220-(defmethod make-workers-for ((pool task-pool) (count fixnum) function)
   1.221-  (make-threads count function :name *default-worker-name*))
   1.222+(defun start-task-worker (pool index)
   1.223+  ;; (with-recursive-lock
   1.224+  (start-worker (aref (workers pool) index)))
   1.225 
   1.226-(defmethod spawn-worker ((pool task-pool))
   1.227-  ;; (with-recursive-lock
   1.228-  (push-worker (make-worker-for pool (task-pool-kernel pool)) pool))
   1.229-
   1.230-(defmethod spawn-workers ((pool task-pool) (count fixnum))
   1.231-  (push-workers (make-workers-for pool count (task-pool-kernel pool)) pool))
   1.232+(defun start-task-workers (pool)
   1.233+  "Start all workers in the given task POOL."
   1.234+  (loop for w across (workers pool)
   1.235+        do (start-worker w)))
   1.236 
   1.237 ;;; Task
   1.238 (defclass task ()
   1.239-  ((state :initarg :state :accessor task-state))
   1.240+  ((state :initform nil :initarg :state :accessor task-state))
   1.241   (:documentation "This object represents a single unit of work to be done by some
   1.242 worker. Tasks are typically generated by an oracle, but workers may also be
   1.243 granted the ability to create and distribute their own tasks. Once a task is
   1.244@@ -220,10 +208,7 @@
   1.245 
   1.246 (defmethod print-object ((self task) stream)
   1.247   (print-unreadable-object (self stream :type t)
   1.248-    (format stream "~A" (task-object self))))
   1.249-
   1.250-(defmethod push-task-result ((task task) (pool task-pool))
   1.251-  (send-message (task-pool-results pool) task))
   1.252+    (format stream ":state ~A" (task-state self))))
   1.253 
   1.254 (defmethod run-task ((self thread) (task task)))
   1.255 
   1.256@@ -265,17 +250,25 @@
   1.257 (defmethod run-stage ((self thread) (stage stage)))
   1.258 
   1.259 ;;; Macros
   1.260-(defmacro with-task-pool ((sym &key oracle lock (count 4) spawn kernel results) &body body)
   1.261-  (unless lock (setf lock (make-semaphore :name "online" :count count)))
   1.262+(defmacro with-task-pool ((sym &key oracle (tasks 0) lock (workers 4) start kernel results) &body body)
   1.263+  (unless lock (setf lock (make-semaphore :name "online" :count workers)))
   1.264   (unless results (setf results (make-mailbox :name "results")))
   1.265-  `(let ((,sym (make-task-pool :lock ,lock :results ,results)))
   1.266+  `(let ((,sym (make-task-pool :lock ,lock :results ,results 
   1.267+                               :tasks (make-queue 
   1.268+                                       :name "tasks"
   1.269+                                       :initial-contents
   1.270+                                       (make-array ,tasks 
   1.271+                                                   :element-type 'task 
   1.272+                                                   :initial-element (make-instance 'task))))))
   1.273      ,@(if kernel `((setf (task-pool-kernel ,sym) ,kernel))
   1.274            `((setf (task-pool-kernel ,sym)
   1.275-                   (gen-task-kernel '%kernel ()
   1.276+                   (gen-task-kernel (gensym "TASK-KERNEL") ()
   1.277                        (task-pool-lock ,sym) 
   1.278                        (tasks ,sym) 
   1.279                        (results ,sym)
   1.280                        nil))))
   1.281-     (designate-oracle ,sym ,@(if oracle (list oracle) `((make-oracle *current-thread*))))
   1.282-     ,@(when spawn `((spawn-workers ,sym ,spawn)))
   1.283+     (loop for i below ,workers
   1.284+           do (push-worker (make-worker :kernel (task-pool-kernel ,sym)) ,sym))
   1.285+     ,@(when oracle `((designate-oracle ,sym ,oracle)))
   1.286+     ,@(when start `((start-task-workers ,sym)))
   1.287      ,@body))