changelog shortlog graph tags branches changeset file revisions annotate raw help

Mercurial > core / lisp/std/task.lisp

revision 694: a36280d2ef4e
parent 692: f51b73f49946
child 696: 38e9c3be2392
     1.1--- a/lisp/std/task.lisp	Thu Oct 03 19:04:57 2024 -0400
     1.2+++ b/lisp/std/task.lisp	Thu Oct 03 21:54:07 2024 -0400
     1.3@@ -10,8 +10,11 @@
     1.4 (defvar *tasks* (make-queue :name "tasks"))
     1.5 (defvar *jobs*)
     1.6 (defvar *stages*)
     1.7-(defvar *oracles* nil)
     1.8-(defvar *task-oracles* nil)
     1.9+(sb-ext:defglobal *worker-threads* nil)
    1.10+(sb-ext:defglobal *oracles* nil)
    1.11+(sb-ext:defglobal *oracle-threads* nil)
    1.12+(sb-ext:defglobal *supervisor-threads* nil)
    1.13+
    1.14 (eval-when (:compile-toplevel)
    1.15   (defvar *task*)
    1.16   (defvar *task-result* nil))
    1.17@@ -55,43 +58,94 @@
    1.18   `(gen-task-kernel ,name ,args ,lock ,queue ,mailbox ,timeout
    1.19      ,@body))
    1.20 
    1.21+(defun make-ephemeral-thread (name)
    1.22+    (sb-thread::%make-thread name t (make-semaphore :name name)))
    1.23+
    1.24+;;; Proto
    1.25+(defgeneric designate-oracle (host guest))
    1.26+(defgeneric assign-supervisor (worker supervisor))
    1.27+
    1.28+(defgeneric make-workers (count &rest initargs &key &allow-other-keys)
    1.29+  (:method ((count number) &key thread kernel input)
    1.30+    (let ((ret))
    1.31+      (dotimes (i count ret)
    1.32+        (push (make-worker :thread thread :kernel kernel :input input) ret)))))
    1.33+
    1.34+(defgeneric tasks (self))
    1.35+(defgeneric results (self))
    1.36+(defgeneric run (self object &rest initargs &key &allow-other-keys))
    1.37+(defgeneric run-object (self))
    1.38+(defgeneric work (self &key &allow-other-keys))
    1.39+(defgeneric workers (self))
    1.40+
    1.41 ;;; Supervisor
    1.42 (defclass supervisor ()
    1.43-  (scope)
    1.44-  (:documentation "A class which provides a view of the work done within a specified
    1.45-SCOPE.
    1.46+  ((thread :initform (make-ephemeral-thread (symbol-name (gensym "supervisor"))) :accessor supervisor-thread)
    1.47+   (domain)
    1.48+   (scope))
    1.49+  (:documentation "Supervisors are threads which are responsible for a set of worker threads
    1.50+within their DOMAIN and SCOPE."))
    1.51 
    1.52-This object should be used by operators to inspect 'runstreams'
    1.53-performed in other threads, such as WORKERS in TASK-POOL.
    1.54-
    1.55-Before using this object you should ensure the SCOPE is fully
    1.56-initialized. Supervisors should be created at any point during the
    1.57-lifetime of SCOPE, but never before and never after."))
    1.58+(defmethod initialize-instance :after ((self supervisor) &key &allow-other-keys)
    1.59+  (push (supervisor-thread self) *supervisor-threads*))
    1.60 
    1.61 ;;; Worker
    1.62 ;; unix-getrusage  
    1.63 ;; 0,-1,-2
    1.64 ;; (multiple-value-list (sb-unix:unix-getrusage 0))
    1.65 ;; (setf sb-unix::*on-dangerous-wait* :error)
    1.66-(defvar *default-worker-name* "worker")
    1.67 
    1.68+;; TODO 2024-10-03: with-cas-lock?
    1.69 (defclass worker ()
    1.70-  ((thread :initform (sb-thread::%make-thread #.#1=(symbol-name (gensym "w")) t (make-semaphore :name #.#1#))
    1.71+  ((thread :initform (make-ephemeral-thread (symbol-name (gensym "worker")))
    1.72            :accessor worker-thread
    1.73            :initarg :thread)
    1.74    (kernel :type function :accessor worker-kernel :initarg :kernel)
    1.75-   (input :initform nil :accessor worker-input :initarg :input)))
    1.76+   (tasks :initform nil :accessor tasks :initarg :input)))
    1.77+
    1.78+(defmethod initialize-instance :after ((self worker) &key &allow-other-keys)
    1.79+  (push (worker-thread self) *worker-threads*))
    1.80 
    1.81-(defvar *workers* (make-array 0 :element-type 'worker :adjustable t))
    1.82+(defun make-worker (&key thread kernel input)
    1.83+  (apply #'make-instance 'worker
    1.84+         `(,@(when thread `(:thread ,thread))
    1.85+           ,@(when kernel `(:kernel ,kernel))
    1.86+           ,@(when input `(:input ,input)))))
    1.87+
    1.88+;; TODO 2024-10-03: pause/resume
    1.89+
    1.90+(declaim (inline kill-worker join-worker start-worker run-worker))
    1.91+(defun start-worker (worker) 
    1.92+  (sb-thread::start-thread (worker-thread worker) (worker-kernel worker) (tasks worker)))
    1.93 
    1.94-(declaim (inline kill-worker join-worker))
    1.95-(defun start-worker (worker) 
    1.96-  (sb-thread::start-thread (worker-thread worker) (worker-kernel worker) (worker-input worker)))
    1.97-(defun kill-worker (worker) (kill-thread (worker-thread worker)))
    1.98-(defun join-worker (worker) (join-thread (worker-thread worker)))
    1.99+(defun run-worker (worker &key input wait)
   1.100+  (when input
   1.101+    (setf (tasks worker) input))
   1.102+  (start-worker worker)
   1.103+  (if wait (join-worker worker)
   1.104+      worker))
   1.105+
   1.106+(defmethod run-object ((self worker))
   1.107+  (run-worker self))
   1.108+
   1.109+(defmethod run ((self worker) (object t) &key wait &allow-other-keys)
   1.110+  (run-worker self :input object :wait wait))
   1.111+
   1.112+(defun kill-worker (worker) 
   1.113+  (declare (worker worker))
   1.114+  (let ((th (worker-thread worker)))
   1.115+    (unwind-protect (kill-thread th)
   1.116+      (deletef *worker-threads* th))))
   1.117+
   1.118+(defun join-worker (worker)
   1.119+  (declare (worker worker))
   1.120+  (let ((th (worker-thread worker)))
   1.121+    (unwind-protect (join-thread th)
   1.122+      (deletef *worker-threads* th))))
   1.123 
   1.124 ;;; Oracle           
   1.125 (defstruct (oracle (:constructor %make-oracle (id thread)))
   1.126+  "Oracles provide a tagged view into some threaded scope of work."
   1.127   (id 0 :type (unsigned-byte 32) :read-only t)
   1.128   (thread *current-thread* :read-only t))
   1.129 
   1.130@@ -104,41 +158,9 @@
   1.131       (values id found)
   1.132       (let ((orc (%make-oracle id thread)))
   1.133         (push orc *oracles*)
   1.134+        (push (oracle-thread orc) *oracle-threads*)
   1.135         (values id orc)))))
   1.136 
   1.137-;;; Proto
   1.138-;; oracle
   1.139-(defgeneric designate-oracle (host guest))
   1.140-;; worker
   1.141-(defun make-worker (&key thread kernel input)
   1.142-  (apply #'make-instance 'worker
   1.143-         `(,@(when thread `(:thread ,thread))
   1.144-           ,@(when kernel `(:kernel ,kernel))
   1.145-           ,@(when input `(:input ,input)))))
   1.146-
   1.147-(defgeneric make-workers (count &rest initargs &key &allow-other-keys)
   1.148-  (:method ((count number) &key thread kernel input)
   1.149-    (let ((ret))
   1.150-      (dotimes (i count ret)
   1.151-        (push (make-worker :thread thread :kernel kernel :input input) ret)))))
   1.152-
   1.153-(defgeneric delete-worker (worker pool &key &allow-other-keys))
   1.154-(defgeneric spawn-worker (pool worker))
   1.155-
   1.156-;; job
   1.157-(defgeneric make-job (self &key &allow-other-keys))
   1.158-(defgeneric find-job (job pool &key &allow-other-keys))
   1.159-(defgeneric run-job (self job))
   1.160-(defgeneric run-jobs (self))
   1.161-;; task
   1.162-(defgeneric tasks (self))
   1.163-(defgeneric run-task (self task))
   1.164-(defgeneric run-tasks (self))
   1.165-(defgeneric results (self))
   1.166-;; stage
   1.167-(defgeneric run-stage (self stage))
   1.168-(defgeneric workers (self))
   1.169-
   1.170 ;;; Task Pool
   1.171 (defstruct task-pool
   1.172   (kernel 'identity :type symbol)
   1.173@@ -199,55 +221,54 @@
   1.174 (defclass task ()
   1.175   ((state :initform nil :initarg :state :accessor task-state))
   1.176   (:documentation "This object represents a single unit of work to be done by some
   1.177-worker. Tasks are typically generated by an oracle, but workers may also be
   1.178-granted the ability to create and distribute their own tasks. Once a task is
   1.179-assigned, the 'owner', i.e. the worker that is assigned this task, may modify
   1.180-the object and state. When the work associated with a task is complete, the
   1.181-owner is responsible for indicating in the state slot the result of the
   1.182-computation."))
   1.183+worker. Tasks are typically distributed from the task-pool, but workers may
   1.184+also be granted the ability to create and distribute their own tasks. Once a
   1.185+task is assigned, the 'owner', i.e. the worker that is assigned this task, may
   1.186+modify the object. When the work associated with a task is complete, the owner
   1.187+is responsible for indicating in the state slot the result of the computation."))
   1.188 
   1.189 (defmethod print-object ((self task) stream)
   1.190   (print-unreadable-object (self stream :type t)
   1.191     (format stream ":state ~A" (task-state self))))
   1.192 
   1.193-(defmethod run-task ((self thread) (task task)))
   1.194+(defun run-task (worker task)
   1.195+  (run-worker worker :input task))
   1.196 
   1.197 ;;; Job
   1.198-(defstruct (job (:constructor %make-job (tasks)))
   1.199-  "A collection of tasks to be performed by worker threads."
   1.200-  (tasks (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
   1.201-   :type (array task *))
   1.202-  (lock (make-mutex :name "job") :type mutex))
   1.203-
   1.204-(defmethod tasks ((self job)) (job-tasks self))
   1.205+(defclass job (task)
   1.206+  ((tasks :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
   1.207+          :type (array task *)
   1.208+          :initarg :tasks
   1.209+          :accessor tasks)
   1.210+   (lock :initform (make-mutex :name "job") :type mutex
   1.211+         :initarg :lock))
   1.212+  (:documentation "A collection of tasks forming a single unit of work."))
   1.213 
   1.214-(defmethod make-job ((self task) &key (size 1))
   1.215-  (%make-job (make-array size :element-type 'task
   1.216-                              :initial-element self)))
   1.217-
   1.218-(defmethod make-job ((self vector) &key)
   1.219-  (%make-job self))
   1.220-
   1.221-(defmethod make-job ((self null) &key (size 1))
   1.222-  (%make-job (make-array size :element-type 'task :fill-pointer 0 :adjustable t)))
   1.223+(declaim (inline make-job))
   1.224+(defun make-job (&rest tasks)
   1.225+  (make-instance 'job
   1.226+    :tasks (make-array (length tasks) 
   1.227+                       :element-type 'task
   1.228+                       :initial-contents tasks)))
   1.229 
   1.230 (defmethod print-object ((self job) stream)
   1.231   (print-unreadable-object (self stream :type t)
   1.232-    (format stream "~A" (job-tasks self))))
   1.233+    (format stream "~A tasks" (length (tasks self)))))
   1.234+
   1.235+(defun run-job (worker job)
   1.236+  (run-worker worker :input job))
   1.237 
   1.238-;;; Stage
   1.239-(defclass stage ()
   1.240-  ((jobs  :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
   1.241-          :initarg :jobs
   1.242-          :accessor jobs
   1.243-          :type (vector job))
   1.244-   (lock :initform (make-mutex :name "stage") :initarg :lock :accessor stage-lock :type mutex)))
   1.245+;;; Work Scope
   1.246+(defclass work-scope ()
   1.247+  ((tasks  :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
   1.248+           :initarg :tasks
   1.249+           :accessor tasks
   1.250+           :type (vector task))
   1.251+   (lock :initform (make-mutex :name "work-scope") :initarg :lock :accessor work-scope-lock :type mutex)))
   1.252 
   1.253-(defmethod print-object ((self stage) stream)
   1.254+(defmethod print-object ((self work-scope) stream)
   1.255   (print-unreadable-object (self stream :type t)
   1.256-    (format stream "~A" (jobs self))))
   1.257-
   1.258-(defmethod run-stage ((self thread) (stage stage)))
   1.259+    (format stream "~A" (tasks self))))
   1.260 
   1.261 ;;; Macros
   1.262 (defmacro with-task-pool ((sym &key oracle (tasks 0) lock (workers 4) start kernel results) &body body)