changelog shortlog graph tags branches changeset file revisions annotate raw help

Mercurial > core / lisp/std/task.lisp

revision 691: 295ea43ceb2d
parent 544: ec1d4d544c36
child 692: f51b73f49946
     1.1--- a/lisp/std/task.lisp	Tue Oct 01 23:34:01 2024 -0400
     1.2+++ b/lisp/std/task.lisp	Wed Oct 02 23:39:07 2024 -0400
     1.3@@ -19,7 +19,7 @@
     1.4   (destructuring-bind (name args &body body) op
     1.5     `(std/macs:named-lambda ,name ,args ,@body)))
     1.6 
     1.7-(defmacro define-task-kernel (name ops accessors &body body)
     1.8+(defmacro define-task-kernel (name (&key lock timeout mailbox queue) args &body body)
     1.9   "Define a task kernel.
    1.10 
    1.11 (define-task-kernel NAME (&key ARGS ACCESSORS)
    1.12@@ -35,18 +35,22 @@
    1.13 kernel via an ORACLE. 
    1.14 
    1.15 This interface is experimental and subject to change."
    1.16-  (declare (ignorable accessors ops))
    1.17-  `(defun ,name ()
    1.18-     ,@body
    1.19-     (values)))
    1.20+  `(gen-task-kernel ,name ,args ,lock ,queue ,mailbox ,timeout
    1.21+     ,@body))
    1.22+
    1.23+(defvar *task-queue*)
    1.24+(defvar *task-item*)
    1.25+(defvar *task-result*)
    1.26+(defvar *task-error*)
    1.27 
    1.28-(defvar *task-kernel*
    1.29-  (symbol-function
    1.30-   (define-task-kernel default-task-kernel () ()
    1.31-     "The default task kernel used to initialize the KERNEL slot of
    1.32-task-pools.
    1.33-"
    1.34-     nil)))
    1.35+(defmacro gen-task-kernel (name args lock queue mailbox timeout &body body)
    1.36+  `(compile ,name 
    1.37+            (lambda ,args 
    1.38+              (wait-on-semaphore ,lock ,@(when timeout `((:timeout ,timeout))))
    1.39+              (let* ((*task-queue* ,queue)
    1.40+                     (*task-item* (dequeue ,queue)))
    1.41+                ,@body
    1.42+                (send-message ,mailbox t)))))
    1.43 
    1.44 ;;; Supervisor
    1.45 (defclass supervisor ()
    1.46@@ -92,7 +96,6 @@
    1.47         (values id orc)))))
    1.48 
    1.49 ;;; Proto
    1.50-
    1.51 ;; oracle
    1.52 (defgeneric designate-oracle (host guest))
    1.53 ;; worker
    1.54@@ -108,9 +111,6 @@
    1.55     (let ((ret))
    1.56       (dotimes (i count ret)
    1.57         (push (make-worker t :thread thread :function function :arguments arguments) ret)))))
    1.58-(defgeneric push-worker (thread pool))
    1.59-(defgeneric push-workers (threads pool))
    1.60-(defgeneric worker-count (self &key &allow-other-keys))
    1.61 (defgeneric delete-worker (worker pool &key &allow-other-keys))
    1.62 (defgeneric pop-worker (pool))
    1.63 (defgeneric make-worker-for (pool function &rest args)
    1.64@@ -132,73 +132,49 @@
    1.65 
    1.66 ;; job
    1.67 (defgeneric make-job (self &key &allow-other-keys))
    1.68-(defgeneric push-job (job pool))
    1.69-(defgeneric pop-job (pool))
    1.70 (defgeneric find-job (job pool &key &allow-other-keys))
    1.71-(defgeneric delete-job (job pool &key &allow-other-keys))
    1.72 (defgeneric run-job (self job))
    1.73-
    1.74+(defgeneric run-jobs (self))
    1.75 ;; task
    1.76-(defgeneric push-task (task pool))
    1.77-(defgeneric pop-task (pool))
    1.78-(defgeneric delete-task (task pool &key &allow-other-keys))
    1.79-(defgeneric make-task (&rest args))
    1.80+(defgeneric tasks (self))
    1.81+(defgeneric make-task (&rest args &key &allow-other-keys))
    1.82 (defgeneric run-task (self task))
    1.83-
    1.84-;; task-result
    1.85-(defgeneric push-task-result (task pool))
    1.86-(defgeneric pop-task-result (pool))
    1.87-
    1.88+(defgeneric run-tasks (self))
    1.89+(defgeneric results (self))
    1.90 ;; stage
    1.91-(defgeneric push-stage (stage pool))
    1.92-(defgeneric pop-stage (pool))
    1.93-(defgeneric delete-stage (stage pool &key &allow-other-keys))
    1.94 (defgeneric run-stage (self stage))
    1.95+(defgeneric workers (self))
    1.96 
    1.97-;; task-pool
    1.98-(defgeneric start-task-pool (pool))
    1.99-(defgeneric pause-task-pool (pool))
   1.100-(defgeneric init-task-pool (pool)
   1.101-  (:method ((pool t))
   1.102-    (setq *task-pool* (or pool (make-task-pool)))))
   1.103-(defgeneric stop-task-pool (pool))
   1.104-(defgeneric restart-task-pool (pool))
   1.105-
   1.106-;;; Pool
   1.107+;;; Task Pool
   1.108 (defstruct task-pool
   1.109-  (oracle-id nil :type (or null (unsigned-byte 32)))
   1.110-  (kernel #'default-task-kernel :type function)
   1.111-  (jobs (make-queue :name "jobs"))
   1.112-  (stages (make-array 0 :element-type 'stage :fill-pointer 0) :type (array stage *))
   1.113-   ;; When open, indicates that the pool is fully initialized and workers
   1.114-   ;; may make progress.
   1.115-  (online (make-semaphore :name "online")
   1.116-   :type semaphore)
   1.117+  (kernel 'identity :type symbol)
   1.118+  (tasks (make-queue :name "tasks"))
   1.119+  (lock (make-semaphore :name "online") :type semaphore)
   1.120   ;; TODO: test weak-vector here
   1.121-  (workers (make-array 0 :element-type 'worker
   1.122-                         :fill-pointer 0)
   1.123-   :type (vector worker))
   1.124+  (workers (make-array 0 :element-type 'worker :fill-pointer 0) :type (vector thread))
   1.125   (results (make-mailbox :name "results")))
   1.126 
   1.127-(defmethod print-object ((self task-pool) stream)
   1.128+(defmethod tasks ((self task-pool)) (task-pool-tasks self))
   1.129+(defmethod results ((self task-pool)) (task-pool-results self))
   1.130+(defmethod workers ((self task-pool)) (task-pool-workers self))
   1.131+
   1.132+(defmethod print-object ((self task-pool) (stream t))
   1.133   (print-unreadable-object (self stream :type t)
   1.134-    (format stream "~A ~A :online ~A ~A:~A:~A ~A"
   1.135-            (task-pool-oracle-id self)
   1.136+    (format stream "~A ~A:~A:~A ~A"
   1.137             (task-pool-kernel self)
   1.138-            (semaphore-count (task-pool-online self))
   1.139-            (queue-count (task-pool-jobs self))
   1.140-            (length (task-pool-stages self))
   1.141-            (length (task-pool-workers self))
   1.142+            (length (workers self))
   1.143+            (semaphore-count (task-pool-lock self))
   1.144+            (queue-count (tasks self))
   1.145             (mailbox-count (task-pool-results self)))))
   1.146 
   1.147 (defmethod designate-oracle ((self task-pool) (guest integer))
   1.148   (setf (task-pool-oracle-id self) (make-oracle (find-thread-by-id guest)))
   1.149   self)
   1.150 
   1.151-(defmethod worker-count ((self task-pool) &key online)
   1.152+(defun worker-count (task-pool &key online)
   1.153   (if online
   1.154-      (semaphore-count (task-pool-online self))
   1.155-      (length (task-pool-workers self))))
   1.156+      (semaphore-count (task-pool-online task-pool))
   1.157+      (length (task-pool-workers task-pool))))
   1.158 
   1.159 (defmethod designate-oracle ((self task-pool) (guest thread))
   1.160   (designate-oracle self (make-oracle guest)))
   1.161@@ -206,15 +182,16 @@
   1.162 (defmethod task-pool-oracle ((self task-pool))
   1.163   (oracle-thread (find-oracle (slot-value self 'oracle))))
   1.164 
   1.165-(defmethod push-worker ((worker thread) (pool task-pool))
   1.166+(declaim (inline push-worker push-workers pop-worker))
   1.167+(defun push-worker (worker pool)
   1.168   (vector-push-extend worker (task-pool-workers pool)))
   1.169 
   1.170-(defmethod push-workers ((threads list) (pool task-pool))
   1.171+(defun push-workers (threads pool)
   1.172   (with-slots (workers) pool
   1.173     (dolist (w threads)
   1.174       (vector-push-extend w workers))))
   1.175 
   1.176-(defmethod pop-worker ((pool task-pool))
   1.177+(defmethod pop-worker (pool)
   1.178   (vector-pop (task-pool-workers pool)))
   1.179 
   1.180 (defmethod make-worker-for ((pool task-pool) function &rest args)
   1.181@@ -232,8 +209,7 @@
   1.182 
   1.183 ;;; Task
   1.184 (defclass task ()
   1.185-  ((state :initarg :state :accessor task-state)
   1.186-   (object :initarg :object :accessor task-object))
   1.187+  ((state :initarg :state :accessor task-state))
   1.188   (:documentation "This object represents a single unit of work to be done by some
   1.189 worker. Tasks are typically generated by an oracle, but workers may also be
   1.190 granted the ability to create and distribute their own tasks. Once a task is
   1.191@@ -242,9 +218,6 @@
   1.192 owner is responsible for indicating in the state slot the result of the
   1.193 computation."))
   1.194 
   1.195-(defmethod make-task (&rest args)
   1.196-  (make-instance 'task :object args))
   1.197-
   1.198 (defmethod print-object ((self task) stream)
   1.199   (print-unreadable-object (self stream :type t)
   1.200     (format stream "~A" (task-object self))))
   1.201@@ -261,6 +234,8 @@
   1.202    :type (array task *))
   1.203   (lock (make-mutex :name "job") :type mutex))
   1.204 
   1.205+(defmethod tasks ((self job)) (job-tasks self))
   1.206+
   1.207 (defmethod make-job ((self task) &key (size 1))
   1.208   (%make-job (make-array size :element-type 'task
   1.209                               :initial-element self)))
   1.210@@ -275,19 +250,6 @@
   1.211   (print-unreadable-object (self stream :type t)
   1.212     (format stream "~A" (job-tasks self))))
   1.213 
   1.214-(defmethod push-task ((task task) (job job))
   1.215-  (vector-push task (job-tasks job)))
   1.216-
   1.217-(defmethod push-task ((task task) (pool task-pool))
   1.218-  (push-job (make-job task) pool))
   1.219-
   1.220-(defmethod push-job ((job job) (pool task-pool))
   1.221-  (enqueue job (task-pool-jobs pool)))
   1.222-
   1.223-(defmethod run-job ((self task-pool) (job job))
   1.224-  #+log (log:trace! "running remote job...")
   1.225-  (push-job job self))
   1.226-
   1.227 ;;; Stage
   1.228 (defclass stage ()
   1.229   ((jobs  :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
   1.230@@ -300,13 +262,20 @@
   1.231   (print-unreadable-object (self stream :type t)
   1.232     (format stream "~A" (jobs self))))
   1.233 
   1.234-(defmethod push-stage ((stage stage) (pool task-pool))
   1.235-  (vector-push stage (task-pool-stages pool)))
   1.236-
   1.237 (defmethod run-stage ((self thread) (stage stage)))
   1.238 
   1.239 ;;; Macros
   1.240-(defmacro with-task-pool ((sym pool &key count) &body body)
   1.241-  `(let ((,sym ,(or pool std/task::*task-pool*)))
   1.242-     ,@(when count `((spawn-workers ,sym ,count)))
   1.243+(defmacro with-task-pool ((sym &key oracle lock (count 4) spawn kernel results) &body body)
   1.244+  (unless lock (setf lock (make-semaphore :name "online" :count count)))
   1.245+  (unless results (setf results (make-mailbox :name "results")))
   1.246+  `(let ((,sym (make-task-pool :lock ,lock :results ,results)))
   1.247+     ,@(if kernel `((setf (task-pool-kernel ,sym) ,kernel))
   1.248+           `((setf (task-pool-kernel ,sym)
   1.249+                   (gen-task-kernel '%kernel ()
   1.250+                       (task-pool-lock ,sym) 
   1.251+                       (tasks ,sym) 
   1.252+                       (results ,sym)
   1.253+                       nil))))
   1.254+     (designate-oracle ,sym ,@(if oracle (list oracle) `((make-oracle *current-thread*))))
   1.255+     ,@(when spawn `((spawn-workers ,sym ,spawn)))
   1.256      ,@body))