1.1--- a/lisp/std/task.lisp Thu Oct 03 19:04:57 2024 -0400
1.2+++ b/lisp/std/task.lisp Thu Oct 03 21:54:07 2024 -0400
1.3@@ -10,8 +10,11 @@
1.4 (defvar *tasks* (make-queue :name "tasks"))
1.5 (defvar *jobs*)
1.6 (defvar *stages*)
1.7-(defvar *oracles* nil)
1.8-(defvar *task-oracles* nil)
1.9+(sb-ext:defglobal *worker-threads* nil)
1.10+(sb-ext:defglobal *oracles* nil)
1.11+(sb-ext:defglobal *oracle-threads* nil)
1.12+(sb-ext:defglobal *supervisor-threads* nil)
1.13+
1.14 (eval-when (:compile-toplevel)
1.15 (defvar *task*)
1.16 (defvar *task-result* nil))
1.17@@ -55,43 +58,94 @@
1.18 `(gen-task-kernel ,name ,args ,lock ,queue ,mailbox ,timeout
1.19 ,@body))
1.20
1.21+(defun make-ephemeral-thread (name)
1.22+ (sb-thread::%make-thread name t (make-semaphore :name name)))
1.23+
1.24+;;; Proto
1.25+(defgeneric designate-oracle (host guest))
1.26+(defgeneric assign-supervisor (worker supervisor))
1.27+
1.28+(defgeneric make-workers (count &rest initargs &key &allow-other-keys)
1.29+ (:method ((count number) &key thread kernel input)
1.30+ (let ((ret))
1.31+ (dotimes (i count ret)
1.32+ (push (make-worker :thread thread :kernel kernel :input input) ret)))))
1.33+
1.34+(defgeneric tasks (self))
1.35+(defgeneric results (self))
1.36+(defgeneric run (self object &rest initargs &key &allow-other-keys))
1.37+(defgeneric run-object (self))
1.38+(defgeneric work (self &key &allow-other-keys))
1.39+(defgeneric workers (self))
1.40+
1.41 ;;; Supervisor
1.42 (defclass supervisor ()
1.43- (scope)
1.44- (:documentation "A class which provides a view of the work done within a specified
1.45-SCOPE.
1.46+ ((thread :initform (make-ephemeral-thread (symbol-name (gensym "supervisor"))) :accessor supervisor-thread)
1.47+ (domain)
1.48+ (scope))
1.49+ (:documentation "Supervisors are threads which are responsible for a set of worker threads
1.50+within their DOMAIN and SCOPE."))
1.51
1.52-This object should be used by operators to inspect 'runstreams'
1.53-performed in other threads, such as WORKERS in TASK-POOL.
1.54-
1.55-Before using this object you should ensure the SCOPE is fully
1.56-initialized. Supervisors should be created at any point during the
1.57-lifetime of SCOPE, but never before and never after."))
1.58+(defmethod initialize-instance :after ((self supervisor) &key &allow-other-keys)
1.59+ (push (supervisor-thread self) *supervisor-threads*))
1.60
1.61 ;;; Worker
1.62 ;; unix-getrusage
1.63 ;; 0,-1,-2
1.64 ;; (multiple-value-list (sb-unix:unix-getrusage 0))
1.65 ;; (setf sb-unix::*on-dangerous-wait* :error)
1.66-(defvar *default-worker-name* "worker")
1.67
1.68+;; TODO 2024-10-03: with-cas-lock?
1.69 (defclass worker ()
1.70- ((thread :initform (sb-thread::%make-thread #.#1=(symbol-name (gensym "w")) t (make-semaphore :name #.#1#))
1.71+ ((thread :initform (make-ephemeral-thread (symbol-name (gensym "worker")))
1.72 :accessor worker-thread
1.73 :initarg :thread)
1.74 (kernel :type function :accessor worker-kernel :initarg :kernel)
1.75- (input :initform nil :accessor worker-input :initarg :input)))
1.76+ (tasks :initform nil :accessor tasks :initarg :input)))
1.77+
1.78+(defmethod initialize-instance :after ((self worker) &key &allow-other-keys)
1.79+ (push (worker-thread self) *worker-threads*))
1.80
1.81-(defvar *workers* (make-array 0 :element-type 'worker :adjustable t))
1.82+(defun make-worker (&key thread kernel input)
1.83+ (apply #'make-instance 'worker
1.84+ `(,@(when thread `(:thread ,thread))
1.85+ ,@(when kernel `(:kernel ,kernel))
1.86+ ,@(when input `(:input ,input)))))
1.87+
1.88+;; TODO 2024-10-03: pause/resume
1.89+
1.90+(declaim (inline kill-worker join-worker start-worker run-worker))
1.91+(defun start-worker (worker)
1.92+ (sb-thread::start-thread (worker-thread worker) (worker-kernel worker) (tasks worker)))
1.93
1.94-(declaim (inline kill-worker join-worker))
1.95-(defun start-worker (worker)
1.96- (sb-thread::start-thread (worker-thread worker) (worker-kernel worker) (worker-input worker)))
1.97-(defun kill-worker (worker) (kill-thread (worker-thread worker)))
1.98-(defun join-worker (worker) (join-thread (worker-thread worker)))
1.99+(defun run-worker (worker &key input wait)
1.100+ (when input
1.101+ (setf (tasks worker) input))
1.102+ (start-worker worker)
1.103+ (if wait (join-worker worker)
1.104+ worker))
1.105+
1.106+(defmethod run-object ((self worker))
1.107+ (run-worker self))
1.108+
1.109+(defmethod run ((self worker) (object t) &key wait &allow-other-keys)
1.110+ (run-worker self :input object :wait wait))
1.111+
1.112+(defun kill-worker (worker)
1.113+ (declare (worker worker))
1.114+ (let ((th (worker-thread worker)))
1.115+ (unwind-protect (kill-thread th)
1.116+ (deletef *worker-threads* th))))
1.117+
1.118+(defun join-worker (worker)
1.119+ (declare (worker worker))
1.120+ (let ((th (worker-thread worker)))
1.121+ (unwind-protect (join-thread th)
1.122+ (deletef *worker-threads* th))))
1.123
1.124 ;;; Oracle
1.125 (defstruct (oracle (:constructor %make-oracle (id thread)))
1.126+ "Oracles provide a tagged view into some threaded scope of work."
1.127 (id 0 :type (unsigned-byte 32) :read-only t)
1.128 (thread *current-thread* :read-only t))
1.129
1.130@@ -104,41 +158,9 @@
1.131 (values id found)
1.132 (let ((orc (%make-oracle id thread)))
1.133 (push orc *oracles*)
1.134+ (push (oracle-thread orc) *oracle-threads*)
1.135 (values id orc)))))
1.136
1.137-;;; Proto
1.138-;; oracle
1.139-(defgeneric designate-oracle (host guest))
1.140-;; worker
1.141-(defun make-worker (&key thread kernel input)
1.142- (apply #'make-instance 'worker
1.143- `(,@(when thread `(:thread ,thread))
1.144- ,@(when kernel `(:kernel ,kernel))
1.145- ,@(when input `(:input ,input)))))
1.146-
1.147-(defgeneric make-workers (count &rest initargs &key &allow-other-keys)
1.148- (:method ((count number) &key thread kernel input)
1.149- (let ((ret))
1.150- (dotimes (i count ret)
1.151- (push (make-worker :thread thread :kernel kernel :input input) ret)))))
1.152-
1.153-(defgeneric delete-worker (worker pool &key &allow-other-keys))
1.154-(defgeneric spawn-worker (pool worker))
1.155-
1.156-;; job
1.157-(defgeneric make-job (self &key &allow-other-keys))
1.158-(defgeneric find-job (job pool &key &allow-other-keys))
1.159-(defgeneric run-job (self job))
1.160-(defgeneric run-jobs (self))
1.161-;; task
1.162-(defgeneric tasks (self))
1.163-(defgeneric run-task (self task))
1.164-(defgeneric run-tasks (self))
1.165-(defgeneric results (self))
1.166-;; stage
1.167-(defgeneric run-stage (self stage))
1.168-(defgeneric workers (self))
1.169-
1.170 ;;; Task Pool
1.171 (defstruct task-pool
1.172 (kernel 'identity :type symbol)
1.173@@ -199,55 +221,54 @@
1.174 (defclass task ()
1.175 ((state :initform nil :initarg :state :accessor task-state))
1.176 (:documentation "This object represents a single unit of work to be done by some
1.177-worker. Tasks are typically generated by an oracle, but workers may also be
1.178-granted the ability to create and distribute their own tasks. Once a task is
1.179-assigned, the 'owner', i.e. the worker that is assigned this task, may modify
1.180-the object and state. When the work associated with a task is complete, the
1.181-owner is responsible for indicating in the state slot the result of the
1.182-computation."))
1.183+worker. Tasks are typically distributed from the task-pool, but workers may
1.184+also be granted the ability to create and distribute their own tasks. Once a
1.185+task is assigned, the 'owner', i.e. the worker that is assigned this task, may
1.186+modify the object. When the work associated with a task is complete, the owner
1.187+is responsible for indicating in the state slot the result of the computation."))
1.188
1.189 (defmethod print-object ((self task) stream)
1.190 (print-unreadable-object (self stream :type t)
1.191 (format stream ":state ~A" (task-state self))))
1.192
1.193-(defmethod run-task ((self thread) (task task)))
1.194+(defun run-task (worker task)
1.195+ (run-worker worker :input task))
1.196
1.197 ;;; Job
1.198-(defstruct (job (:constructor %make-job (tasks)))
1.199- "A collection of tasks to be performed by worker threads."
1.200- (tasks (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
1.201- :type (array task *))
1.202- (lock (make-mutex :name "job") :type mutex))
1.203-
1.204-(defmethod tasks ((self job)) (job-tasks self))
1.205+(defclass job (task)
1.206+ ((tasks :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
1.207+ :type (array task *)
1.208+ :initarg :tasks
1.209+ :accessor tasks)
1.210+ (lock :initform (make-mutex :name "job") :type mutex
1.211+ :initarg :lock))
1.212+ (:documentation "A collection of tasks forming a single unit of work."))
1.213
1.214-(defmethod make-job ((self task) &key (size 1))
1.215- (%make-job (make-array size :element-type 'task
1.216- :initial-element self)))
1.217-
1.218-(defmethod make-job ((self vector) &key)
1.219- (%make-job self))
1.220-
1.221-(defmethod make-job ((self null) &key (size 1))
1.222- (%make-job (make-array size :element-type 'task :fill-pointer 0 :adjustable t)))
1.223+(declaim (inline make-job))
1.224+(defun make-job (&rest tasks)
1.225+ (make-instance 'job
1.226+ :tasks (make-array (length tasks)
1.227+ :element-type 'task
1.228+ :initial-contents tasks)))
1.229
1.230 (defmethod print-object ((self job) stream)
1.231 (print-unreadable-object (self stream :type t)
1.232- (format stream "~A" (job-tasks self))))
1.233+ (format stream "~A tasks" (length (tasks self)))))
1.234+
1.235+(defun run-job (worker job)
1.236+ (run-worker worker :input job))
1.237
1.238-;;; Stage
1.239-(defclass stage ()
1.240- ((jobs :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
1.241- :initarg :jobs
1.242- :accessor jobs
1.243- :type (vector job))
1.244- (lock :initform (make-mutex :name "stage") :initarg :lock :accessor stage-lock :type mutex)))
1.245+;;; Work Scope
1.246+(defclass work-scope ()
1.247+ ((tasks :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
1.248+ :initarg :tasks
1.249+ :accessor tasks
1.250+ :type (vector task))
1.251+ (lock :initform (make-mutex :name "work-scope") :initarg :lock :accessor work-scope-lock :type mutex)))
1.252
1.253-(defmethod print-object ((self stage) stream)
1.254+(defmethod print-object ((self work-scope) stream)
1.255 (print-unreadable-object (self stream :type t)
1.256- (format stream "~A" (jobs self))))
1.257-
1.258-(defmethod run-stage ((self thread) (stage stage)))
1.259+ (format stream "~A" (tasks self))))
1.260
1.261 ;;; Macros
1.262 (defmacro with-task-pool ((sym &key oracle (tasks 0) lock (workers 4) start kernel results) &body body)