# HG changeset patch # User Richard Westhaver # Date 1728006847 14400 # Node ID a36280d2ef4e6ccf8920a37f12e87c96e6698942 # Parent 5f81d888c31f2f2b7dec6d48f07376432f6f5128 tasks diff -r 5f81d888c31f -r a36280d2ef4e lisp/ffi/sndfile/pkg.lisp --- a/lisp/ffi/sndfile/pkg.lisp Thu Oct 03 19:04:57 2024 -0400 +++ b/lisp/ffi/sndfile/pkg.lisp Thu Oct 03 21:54:07 2024 -0400 @@ -120,8 +120,10 @@ :wavex #x130000 ; MS WAVE with WAVEFORMATEX :sd2 #x160000 ; Sound Designer 2 :flac #x170000 ; FLAC lossless file format - :caf #x180000 ; Core Audio File format + :caf #x180000) ; Core Audio File format +(define-alien-enum (sf-format-subtype int) + ;; subtypes :pcm-s8 #x0001 ; Signed 8 bit data :pcm-16 #x0002 ; Signed 16 bit data :pcm-24 #x0003 ; Signed 24 bit data diff -r 5f81d888c31f -r a36280d2ef4e lisp/std/mop.lisp --- a/lisp/std/mop.lisp Thu Oct 03 19:04:57 2024 -0400 +++ b/lisp/std/mop.lisp Thu Oct 03 21:54:07 2024 -0400 @@ -61,3 +61,33 @@ `(,ns ,v)))) (when unboundp (list ns)))))) slots))) + +;; closer-mop +(defun ensure-finalized (class &optional (errorp t)) + (if (typep class 'class) + (unless (class-finalized-p class) + (finalize-inheritance class)) + (when errorp (error "~S is not a class." class))) + class) + +(defun subclassp (class superclass) + (flet ((get-class (class) (etypecase class + (class class) + (symbol (find-class class))))) + + (loop with class = (get-class class) + with superclass = (get-class superclass) + + for superclasses = (list class) + then (set-difference + (union (class-direct-superclasses current-class) superclasses) + seen) + + for current-class = (first superclasses) + + while current-class + + if (eq current-class superclass) return t + else collect current-class into seen + + finally (return nil)))) diff -r 5f81d888c31f -r a36280d2ef4e lisp/std/pkg.lisp --- a/lisp/std/pkg.lisp Thu Oct 03 19:04:57 2024 -0400 +++ b/lisp/std/pkg.lisp Thu Oct 03 21:54:07 2024 -0400 @@ -178,7 +178,8 @@ (:use :cl :sb-mop :sb-pcl) (:import-from :std/sym :symb :make-keyword) (:export :list-slot-values-using-class - :list-class-methods :list-class-slots :list-indirect-slot-methods)) + :list-class-methods :list-class-slots :list-indirect-slot-methods + :ensure-finalized :subclassp)) (defpkg :std/fu (:use :cl) @@ -282,14 +283,16 @@ (:use :cl :std/thread :sb-concurrency) (:import-from :std/thread :%make-thread) (:import-from :std/macs :if-let) + (:import-from :std/list :deletef) (:export :spawn-workers :make-oracle :make-supervisor - :oracle :run-task + :oracle :oracle-id :find-thread :push-job :push-task :push-worker :push-task-result - :run-job :run-stage + :run :run-object + :work :pop-job :pop-task :workers :tasks @@ -304,7 +307,9 @@ :*task-pool* :*tasks* :*oracles* - :*workers* + :*oracle-threads* + :*worker-threads* + :*supervisor-threads* :*jobs* :*stages* :*task* diff -r 5f81d888c31f -r a36280d2ef4e lisp/std/task.lisp --- a/lisp/std/task.lisp Thu Oct 03 19:04:57 2024 -0400 +++ b/lisp/std/task.lisp Thu Oct 03 21:54:07 2024 -0400 @@ -10,8 +10,11 @@ (defvar *tasks* (make-queue :name "tasks")) (defvar *jobs*) (defvar *stages*) -(defvar *oracles* nil) -(defvar *task-oracles* nil) +(sb-ext:defglobal *worker-threads* nil) +(sb-ext:defglobal *oracles* nil) +(sb-ext:defglobal *oracle-threads* nil) +(sb-ext:defglobal *supervisor-threads* nil) + (eval-when (:compile-toplevel) (defvar *task*) (defvar *task-result* nil)) @@ -55,43 +58,94 @@ `(gen-task-kernel ,name ,args ,lock ,queue ,mailbox ,timeout ,@body)) +(defun make-ephemeral-thread (name) + (sb-thread::%make-thread name t (make-semaphore :name name))) + +;;; Proto +(defgeneric designate-oracle (host guest)) +(defgeneric assign-supervisor (worker supervisor)) + +(defgeneric make-workers (count &rest initargs &key &allow-other-keys) + (:method ((count number) &key thread kernel input) + (let ((ret)) + (dotimes (i count ret) + (push (make-worker :thread thread :kernel kernel :input input) ret))))) + +(defgeneric tasks (self)) +(defgeneric results (self)) +(defgeneric run (self object &rest initargs &key &allow-other-keys)) +(defgeneric run-object (self)) +(defgeneric work (self &key &allow-other-keys)) +(defgeneric workers (self)) + ;;; Supervisor (defclass supervisor () - (scope) - (:documentation "A class which provides a view of the work done within a specified -SCOPE. + ((thread :initform (make-ephemeral-thread (symbol-name (gensym "supervisor"))) :accessor supervisor-thread) + (domain) + (scope)) + (:documentation "Supervisors are threads which are responsible for a set of worker threads +within their DOMAIN and SCOPE.")) -This object should be used by operators to inspect 'runstreams' -performed in other threads, such as WORKERS in TASK-POOL. - -Before using this object you should ensure the SCOPE is fully -initialized. Supervisors should be created at any point during the -lifetime of SCOPE, but never before and never after.")) +(defmethod initialize-instance :after ((self supervisor) &key &allow-other-keys) + (push (supervisor-thread self) *supervisor-threads*)) ;;; Worker ;; unix-getrusage ;; 0,-1,-2 ;; (multiple-value-list (sb-unix:unix-getrusage 0)) ;; (setf sb-unix::*on-dangerous-wait* :error) -(defvar *default-worker-name* "worker") +;; TODO 2024-10-03: with-cas-lock? (defclass worker () - ((thread :initform (sb-thread::%make-thread #.#1=(symbol-name (gensym "w")) t (make-semaphore :name #.#1#)) + ((thread :initform (make-ephemeral-thread (symbol-name (gensym "worker"))) :accessor worker-thread :initarg :thread) (kernel :type function :accessor worker-kernel :initarg :kernel) - (input :initform nil :accessor worker-input :initarg :input))) + (tasks :initform nil :accessor tasks :initarg :input))) + +(defmethod initialize-instance :after ((self worker) &key &allow-other-keys) + (push (worker-thread self) *worker-threads*)) -(defvar *workers* (make-array 0 :element-type 'worker :adjustable t)) +(defun make-worker (&key thread kernel input) + (apply #'make-instance 'worker + `(,@(when thread `(:thread ,thread)) + ,@(when kernel `(:kernel ,kernel)) + ,@(when input `(:input ,input))))) + +;; TODO 2024-10-03: pause/resume + +(declaim (inline kill-worker join-worker start-worker run-worker)) +(defun start-worker (worker) + (sb-thread::start-thread (worker-thread worker) (worker-kernel worker) (tasks worker))) -(declaim (inline kill-worker join-worker)) -(defun start-worker (worker) - (sb-thread::start-thread (worker-thread worker) (worker-kernel worker) (worker-input worker))) -(defun kill-worker (worker) (kill-thread (worker-thread worker))) -(defun join-worker (worker) (join-thread (worker-thread worker))) +(defun run-worker (worker &key input wait) + (when input + (setf (tasks worker) input)) + (start-worker worker) + (if wait (join-worker worker) + worker)) + +(defmethod run-object ((self worker)) + (run-worker self)) + +(defmethod run ((self worker) (object t) &key wait &allow-other-keys) + (run-worker self :input object :wait wait)) + +(defun kill-worker (worker) + (declare (worker worker)) + (let ((th (worker-thread worker))) + (unwind-protect (kill-thread th) + (deletef *worker-threads* th)))) + +(defun join-worker (worker) + (declare (worker worker)) + (let ((th (worker-thread worker))) + (unwind-protect (join-thread th) + (deletef *worker-threads* th)))) ;;; Oracle (defstruct (oracle (:constructor %make-oracle (id thread))) + "Oracles provide a tagged view into some threaded scope of work." (id 0 :type (unsigned-byte 32) :read-only t) (thread *current-thread* :read-only t)) @@ -104,41 +158,9 @@ (values id found) (let ((orc (%make-oracle id thread))) (push orc *oracles*) + (push (oracle-thread orc) *oracle-threads*) (values id orc))))) -;;; Proto -;; oracle -(defgeneric designate-oracle (host guest)) -;; worker -(defun make-worker (&key thread kernel input) - (apply #'make-instance 'worker - `(,@(when thread `(:thread ,thread)) - ,@(when kernel `(:kernel ,kernel)) - ,@(when input `(:input ,input))))) - -(defgeneric make-workers (count &rest initargs &key &allow-other-keys) - (:method ((count number) &key thread kernel input) - (let ((ret)) - (dotimes (i count ret) - (push (make-worker :thread thread :kernel kernel :input input) ret))))) - -(defgeneric delete-worker (worker pool &key &allow-other-keys)) -(defgeneric spawn-worker (pool worker)) - -;; job -(defgeneric make-job (self &key &allow-other-keys)) -(defgeneric find-job (job pool &key &allow-other-keys)) -(defgeneric run-job (self job)) -(defgeneric run-jobs (self)) -;; task -(defgeneric tasks (self)) -(defgeneric run-task (self task)) -(defgeneric run-tasks (self)) -(defgeneric results (self)) -;; stage -(defgeneric run-stage (self stage)) -(defgeneric workers (self)) - ;;; Task Pool (defstruct task-pool (kernel 'identity :type symbol) @@ -199,55 +221,54 @@ (defclass task () ((state :initform nil :initarg :state :accessor task-state)) (:documentation "This object represents a single unit of work to be done by some -worker. Tasks are typically generated by an oracle, but workers may also be -granted the ability to create and distribute their own tasks. Once a task is -assigned, the 'owner', i.e. the worker that is assigned this task, may modify -the object and state. When the work associated with a task is complete, the -owner is responsible for indicating in the state slot the result of the -computation.")) +worker. Tasks are typically distributed from the task-pool, but workers may +also be granted the ability to create and distribute their own tasks. Once a +task is assigned, the 'owner', i.e. the worker that is assigned this task, may +modify the object. When the work associated with a task is complete, the owner +is responsible for indicating in the state slot the result of the computation.")) (defmethod print-object ((self task) stream) (print-unreadable-object (self stream :type t) (format stream ":state ~A" (task-state self)))) -(defmethod run-task ((self thread) (task task))) +(defun run-task (worker task) + (run-worker worker :input task)) ;;; Job -(defstruct (job (:constructor %make-job (tasks))) - "A collection of tasks to be performed by worker threads." - (tasks (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t) - :type (array task *)) - (lock (make-mutex :name "job") :type mutex)) - -(defmethod tasks ((self job)) (job-tasks self)) +(defclass job (task) + ((tasks :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t) + :type (array task *) + :initarg :tasks + :accessor tasks) + (lock :initform (make-mutex :name "job") :type mutex + :initarg :lock)) + (:documentation "A collection of tasks forming a single unit of work.")) -(defmethod make-job ((self task) &key (size 1)) - (%make-job (make-array size :element-type 'task - :initial-element self))) - -(defmethod make-job ((self vector) &key) - (%make-job self)) - -(defmethod make-job ((self null) &key (size 1)) - (%make-job (make-array size :element-type 'task :fill-pointer 0 :adjustable t))) +(declaim (inline make-job)) +(defun make-job (&rest tasks) + (make-instance 'job + :tasks (make-array (length tasks) + :element-type 'task + :initial-contents tasks))) (defmethod print-object ((self job) stream) (print-unreadable-object (self stream :type t) - (format stream "~A" (job-tasks self)))) + (format stream "~A tasks" (length (tasks self))))) + +(defun run-job (worker job) + (run-worker worker :input job)) -;;; Stage -(defclass stage () - ((jobs :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t) - :initarg :jobs - :accessor jobs - :type (vector job)) - (lock :initform (make-mutex :name "stage") :initarg :lock :accessor stage-lock :type mutex))) +;;; Work Scope +(defclass work-scope () + ((tasks :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t) + :initarg :tasks + :accessor tasks + :type (vector task)) + (lock :initform (make-mutex :name "work-scope") :initarg :lock :accessor work-scope-lock :type mutex))) -(defmethod print-object ((self stage) stream) +(defmethod print-object ((self work-scope) stream) (print-unreadable-object (self stream :type t) - (format stream "~A" (jobs self)))) - -(defmethod run-stage ((self thread) (stage stage))) + (format stream "~A" (tasks self)))) ;;; Macros (defmacro with-task-pool ((sym &key oracle (tasks 0) lock (workers 4) start kernel results) &body body) diff -r 5f81d888c31f -r a36280d2ef4e lisp/std/tests/task.lisp --- a/lisp/std/tests/task.lisp Thu Oct 03 19:04:57 2024 -0400 +++ b/lisp/std/tests/task.lisp Thu Oct 03 21:54:07 2024 -0400 @@ -20,6 +20,7 @@ (is (zerop (sb-concurrency:mailbox-count (results tp)))) (start-task-workers tp) (loop for w across (workers tp) - do (join-worker w)) + do (join-worker w)) (is (= 4 (sb-concurrency:mailbox-count (results tp)))))) +