changeset 694: |
a36280d2ef4e |
parent 693: |
5f81d888c31f |
child 695: |
2bad47888dbf |
author: |
Richard Westhaver <ellis@rwest.io> |
date: |
Thu, 03 Oct 2024 21:54:07 -0400 |
files: |
lisp/ffi/sndfile/pkg.lisp lisp/std/mop.lisp lisp/std/pkg.lisp lisp/std/task.lisp lisp/std/tests/task.lisp |
description: |
tasks |
1.1--- a/lisp/ffi/sndfile/pkg.lisp Thu Oct 03 19:04:57 2024 -0400
1.2+++ b/lisp/ffi/sndfile/pkg.lisp Thu Oct 03 21:54:07 2024 -0400
1.3@@ -120,8 +120,10 @@
1.4 :wavex #x130000 ; MS WAVE with WAVEFORMATEX
1.5 :sd2 #x160000 ; Sound Designer 2
1.6 :flac #x170000 ; FLAC lossless file format
1.7- :caf #x180000 ; Core Audio File format
1.8+ :caf #x180000) ; Core Audio File format
1.9
1.10+(define-alien-enum (sf-format-subtype int)
1.11+ ;; subtypes
1.12 :pcm-s8 #x0001 ; Signed 8 bit data
1.13 :pcm-16 #x0002 ; Signed 16 bit data
1.14 :pcm-24 #x0003 ; Signed 24 bit data
2.1--- a/lisp/std/mop.lisp Thu Oct 03 19:04:57 2024 -0400
2.2+++ b/lisp/std/mop.lisp Thu Oct 03 21:54:07 2024 -0400
2.3@@ -61,3 +61,33 @@
2.4 `(,ns ,v))))
2.5 (when unboundp (list ns))))))
2.6 slots)))
2.7+
2.8+;; closer-mop
2.9+(defun ensure-finalized (class &optional (errorp t))
2.10+ (if (typep class 'class)
2.11+ (unless (class-finalized-p class)
2.12+ (finalize-inheritance class))
2.13+ (when errorp (error "~S is not a class." class)))
2.14+ class)
2.15+
2.16+(defun subclassp (class superclass)
2.17+ (flet ((get-class (class) (etypecase class
2.18+ (class class)
2.19+ (symbol (find-class class)))))
2.20+
2.21+ (loop with class = (get-class class)
2.22+ with superclass = (get-class superclass)
2.23+
2.24+ for superclasses = (list class)
2.25+ then (set-difference
2.26+ (union (class-direct-superclasses current-class) superclasses)
2.27+ seen)
2.28+
2.29+ for current-class = (first superclasses)
2.30+
2.31+ while current-class
2.32+
2.33+ if (eq current-class superclass) return t
2.34+ else collect current-class into seen
2.35+
2.36+ finally (return nil))))
3.1--- a/lisp/std/pkg.lisp Thu Oct 03 19:04:57 2024 -0400
3.2+++ b/lisp/std/pkg.lisp Thu Oct 03 21:54:07 2024 -0400
3.3@@ -178,7 +178,8 @@
3.4 (:use :cl :sb-mop :sb-pcl)
3.5 (:import-from :std/sym :symb :make-keyword)
3.6 (:export :list-slot-values-using-class
3.7- :list-class-methods :list-class-slots :list-indirect-slot-methods))
3.8+ :list-class-methods :list-class-slots :list-indirect-slot-methods
3.9+ :ensure-finalized :subclassp))
3.10
3.11 (defpkg :std/fu
3.12 (:use :cl)
3.13@@ -282,14 +283,16 @@
3.14 (:use :cl :std/thread :sb-concurrency)
3.15 (:import-from :std/thread :%make-thread)
3.16 (:import-from :std/macs :if-let)
3.17+ (:import-from :std/list :deletef)
3.18 (:export
3.19 :spawn-workers
3.20 :make-oracle :make-supervisor
3.21- :oracle :run-task
3.22+ :oracle
3.23 :oracle-id :find-thread
3.24 :push-job :push-task
3.25 :push-worker :push-task-result
3.26- :run-job :run-stage
3.27+ :run :run-object
3.28+ :work
3.29 :pop-job :pop-task
3.30 :workers
3.31 :tasks
3.32@@ -304,7 +307,9 @@
3.33 :*task-pool*
3.34 :*tasks*
3.35 :*oracles*
3.36- :*workers*
3.37+ :*oracle-threads*
3.38+ :*worker-threads*
3.39+ :*supervisor-threads*
3.40 :*jobs*
3.41 :*stages*
3.42 :*task*
4.1--- a/lisp/std/task.lisp Thu Oct 03 19:04:57 2024 -0400
4.2+++ b/lisp/std/task.lisp Thu Oct 03 21:54:07 2024 -0400
4.3@@ -10,8 +10,11 @@
4.4 (defvar *tasks* (make-queue :name "tasks"))
4.5 (defvar *jobs*)
4.6 (defvar *stages*)
4.7-(defvar *oracles* nil)
4.8-(defvar *task-oracles* nil)
4.9+(sb-ext:defglobal *worker-threads* nil)
4.10+(sb-ext:defglobal *oracles* nil)
4.11+(sb-ext:defglobal *oracle-threads* nil)
4.12+(sb-ext:defglobal *supervisor-threads* nil)
4.13+
4.14 (eval-when (:compile-toplevel)
4.15 (defvar *task*)
4.16 (defvar *task-result* nil))
4.17@@ -55,43 +58,94 @@
4.18 `(gen-task-kernel ,name ,args ,lock ,queue ,mailbox ,timeout
4.19 ,@body))
4.20
4.21+(defun make-ephemeral-thread (name)
4.22+ (sb-thread::%make-thread name t (make-semaphore :name name)))
4.23+
4.24+;;; Proto
4.25+(defgeneric designate-oracle (host guest))
4.26+(defgeneric assign-supervisor (worker supervisor))
4.27+
4.28+(defgeneric make-workers (count &rest initargs &key &allow-other-keys)
4.29+ (:method ((count number) &key thread kernel input)
4.30+ (let ((ret))
4.31+ (dotimes (i count ret)
4.32+ (push (make-worker :thread thread :kernel kernel :input input) ret)))))
4.33+
4.34+(defgeneric tasks (self))
4.35+(defgeneric results (self))
4.36+(defgeneric run (self object &rest initargs &key &allow-other-keys))
4.37+(defgeneric run-object (self))
4.38+(defgeneric work (self &key &allow-other-keys))
4.39+(defgeneric workers (self))
4.40+
4.41 ;;; Supervisor
4.42 (defclass supervisor ()
4.43- (scope)
4.44- (:documentation "A class which provides a view of the work done within a specified
4.45-SCOPE.
4.46+ ((thread :initform (make-ephemeral-thread (symbol-name (gensym "supervisor"))) :accessor supervisor-thread)
4.47+ (domain)
4.48+ (scope))
4.49+ (:documentation "Supervisors are threads which are responsible for a set of worker threads
4.50+within their DOMAIN and SCOPE."))
4.51
4.52-This object should be used by operators to inspect 'runstreams'
4.53-performed in other threads, such as WORKERS in TASK-POOL.
4.54-
4.55-Before using this object you should ensure the SCOPE is fully
4.56-initialized. Supervisors should be created at any point during the
4.57-lifetime of SCOPE, but never before and never after."))
4.58+(defmethod initialize-instance :after ((self supervisor) &key &allow-other-keys)
4.59+ (push (supervisor-thread self) *supervisor-threads*))
4.60
4.61 ;;; Worker
4.62 ;; unix-getrusage
4.63 ;; 0,-1,-2
4.64 ;; (multiple-value-list (sb-unix:unix-getrusage 0))
4.65 ;; (setf sb-unix::*on-dangerous-wait* :error)
4.66-(defvar *default-worker-name* "worker")
4.67
4.68+;; TODO 2024-10-03: with-cas-lock?
4.69 (defclass worker ()
4.70- ((thread :initform (sb-thread::%make-thread #.#1=(symbol-name (gensym "w")) t (make-semaphore :name #.#1#))
4.71+ ((thread :initform (make-ephemeral-thread (symbol-name (gensym "worker")))
4.72 :accessor worker-thread
4.73 :initarg :thread)
4.74 (kernel :type function :accessor worker-kernel :initarg :kernel)
4.75- (input :initform nil :accessor worker-input :initarg :input)))
4.76+ (tasks :initform nil :accessor tasks :initarg :input)))
4.77+
4.78+(defmethod initialize-instance :after ((self worker) &key &allow-other-keys)
4.79+ (push (worker-thread self) *worker-threads*))
4.80
4.81-(defvar *workers* (make-array 0 :element-type 'worker :adjustable t))
4.82+(defun make-worker (&key thread kernel input)
4.83+ (apply #'make-instance 'worker
4.84+ `(,@(when thread `(:thread ,thread))
4.85+ ,@(when kernel `(:kernel ,kernel))
4.86+ ,@(when input `(:input ,input)))))
4.87+
4.88+;; TODO 2024-10-03: pause/resume
4.89+
4.90+(declaim (inline kill-worker join-worker start-worker run-worker))
4.91+(defun start-worker (worker)
4.92+ (sb-thread::start-thread (worker-thread worker) (worker-kernel worker) (tasks worker)))
4.93
4.94-(declaim (inline kill-worker join-worker))
4.95-(defun start-worker (worker)
4.96- (sb-thread::start-thread (worker-thread worker) (worker-kernel worker) (worker-input worker)))
4.97-(defun kill-worker (worker) (kill-thread (worker-thread worker)))
4.98-(defun join-worker (worker) (join-thread (worker-thread worker)))
4.99+(defun run-worker (worker &key input wait)
4.100+ (when input
4.101+ (setf (tasks worker) input))
4.102+ (start-worker worker)
4.103+ (if wait (join-worker worker)
4.104+ worker))
4.105+
4.106+(defmethod run-object ((self worker))
4.107+ (run-worker self))
4.108+
4.109+(defmethod run ((self worker) (object t) &key wait &allow-other-keys)
4.110+ (run-worker self :input object :wait wait))
4.111+
4.112+(defun kill-worker (worker)
4.113+ (declare (worker worker))
4.114+ (let ((th (worker-thread worker)))
4.115+ (unwind-protect (kill-thread th)
4.116+ (deletef *worker-threads* th))))
4.117+
4.118+(defun join-worker (worker)
4.119+ (declare (worker worker))
4.120+ (let ((th (worker-thread worker)))
4.121+ (unwind-protect (join-thread th)
4.122+ (deletef *worker-threads* th))))
4.123
4.124 ;;; Oracle
4.125 (defstruct (oracle (:constructor %make-oracle (id thread)))
4.126+ "Oracles provide a tagged view into some threaded scope of work."
4.127 (id 0 :type (unsigned-byte 32) :read-only t)
4.128 (thread *current-thread* :read-only t))
4.129
4.130@@ -104,41 +158,9 @@
4.131 (values id found)
4.132 (let ((orc (%make-oracle id thread)))
4.133 (push orc *oracles*)
4.134+ (push (oracle-thread orc) *oracle-threads*)
4.135 (values id orc)))))
4.136
4.137-;;; Proto
4.138-;; oracle
4.139-(defgeneric designate-oracle (host guest))
4.140-;; worker
4.141-(defun make-worker (&key thread kernel input)
4.142- (apply #'make-instance 'worker
4.143- `(,@(when thread `(:thread ,thread))
4.144- ,@(when kernel `(:kernel ,kernel))
4.145- ,@(when input `(:input ,input)))))
4.146-
4.147-(defgeneric make-workers (count &rest initargs &key &allow-other-keys)
4.148- (:method ((count number) &key thread kernel input)
4.149- (let ((ret))
4.150- (dotimes (i count ret)
4.151- (push (make-worker :thread thread :kernel kernel :input input) ret)))))
4.152-
4.153-(defgeneric delete-worker (worker pool &key &allow-other-keys))
4.154-(defgeneric spawn-worker (pool worker))
4.155-
4.156-;; job
4.157-(defgeneric make-job (self &key &allow-other-keys))
4.158-(defgeneric find-job (job pool &key &allow-other-keys))
4.159-(defgeneric run-job (self job))
4.160-(defgeneric run-jobs (self))
4.161-;; task
4.162-(defgeneric tasks (self))
4.163-(defgeneric run-task (self task))
4.164-(defgeneric run-tasks (self))
4.165-(defgeneric results (self))
4.166-;; stage
4.167-(defgeneric run-stage (self stage))
4.168-(defgeneric workers (self))
4.169-
4.170 ;;; Task Pool
4.171 (defstruct task-pool
4.172 (kernel 'identity :type symbol)
4.173@@ -199,55 +221,54 @@
4.174 (defclass task ()
4.175 ((state :initform nil :initarg :state :accessor task-state))
4.176 (:documentation "This object represents a single unit of work to be done by some
4.177-worker. Tasks are typically generated by an oracle, but workers may also be
4.178-granted the ability to create and distribute their own tasks. Once a task is
4.179-assigned, the 'owner', i.e. the worker that is assigned this task, may modify
4.180-the object and state. When the work associated with a task is complete, the
4.181-owner is responsible for indicating in the state slot the result of the
4.182-computation."))
4.183+worker. Tasks are typically distributed from the task-pool, but workers may
4.184+also be granted the ability to create and distribute their own tasks. Once a
4.185+task is assigned, the 'owner', i.e. the worker that is assigned this task, may
4.186+modify the object. When the work associated with a task is complete, the owner
4.187+is responsible for indicating in the state slot the result of the computation."))
4.188
4.189 (defmethod print-object ((self task) stream)
4.190 (print-unreadable-object (self stream :type t)
4.191 (format stream ":state ~A" (task-state self))))
4.192
4.193-(defmethod run-task ((self thread) (task task)))
4.194+(defun run-task (worker task)
4.195+ (run-worker worker :input task))
4.196
4.197 ;;; Job
4.198-(defstruct (job (:constructor %make-job (tasks)))
4.199- "A collection of tasks to be performed by worker threads."
4.200- (tasks (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
4.201- :type (array task *))
4.202- (lock (make-mutex :name "job") :type mutex))
4.203-
4.204-(defmethod tasks ((self job)) (job-tasks self))
4.205+(defclass job (task)
4.206+ ((tasks :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
4.207+ :type (array task *)
4.208+ :initarg :tasks
4.209+ :accessor tasks)
4.210+ (lock :initform (make-mutex :name "job") :type mutex
4.211+ :initarg :lock))
4.212+ (:documentation "A collection of tasks forming a single unit of work."))
4.213
4.214-(defmethod make-job ((self task) &key (size 1))
4.215- (%make-job (make-array size :element-type 'task
4.216- :initial-element self)))
4.217-
4.218-(defmethod make-job ((self vector) &key)
4.219- (%make-job self))
4.220-
4.221-(defmethod make-job ((self null) &key (size 1))
4.222- (%make-job (make-array size :element-type 'task :fill-pointer 0 :adjustable t)))
4.223+(declaim (inline make-job))
4.224+(defun make-job (&rest tasks)
4.225+ (make-instance 'job
4.226+ :tasks (make-array (length tasks)
4.227+ :element-type 'task
4.228+ :initial-contents tasks)))
4.229
4.230 (defmethod print-object ((self job) stream)
4.231 (print-unreadable-object (self stream :type t)
4.232- (format stream "~A" (job-tasks self))))
4.233+ (format stream "~A tasks" (length (tasks self)))))
4.234+
4.235+(defun run-job (worker job)
4.236+ (run-worker worker :input job))
4.237
4.238-;;; Stage
4.239-(defclass stage ()
4.240- ((jobs :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
4.241- :initarg :jobs
4.242- :accessor jobs
4.243- :type (vector job))
4.244- (lock :initform (make-mutex :name "stage") :initarg :lock :accessor stage-lock :type mutex)))
4.245+;;; Work Scope
4.246+(defclass work-scope ()
4.247+ ((tasks :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
4.248+ :initarg :tasks
4.249+ :accessor tasks
4.250+ :type (vector task))
4.251+ (lock :initform (make-mutex :name "work-scope") :initarg :lock :accessor work-scope-lock :type mutex)))
4.252
4.253-(defmethod print-object ((self stage) stream)
4.254+(defmethod print-object ((self work-scope) stream)
4.255 (print-unreadable-object (self stream :type t)
4.256- (format stream "~A" (jobs self))))
4.257-
4.258-(defmethod run-stage ((self thread) (stage stage)))
4.259+ (format stream "~A" (tasks self))))
4.260
4.261 ;;; Macros
4.262 (defmacro with-task-pool ((sym &key oracle (tasks 0) lock (workers 4) start kernel results) &body body)
5.1--- a/lisp/std/tests/task.lisp Thu Oct 03 19:04:57 2024 -0400
5.2+++ b/lisp/std/tests/task.lisp Thu Oct 03 21:54:07 2024 -0400
5.3@@ -20,6 +20,7 @@
5.4 (is (zerop (sb-concurrency:mailbox-count (results tp))))
5.5 (start-task-workers tp)
5.6 (loop for w across (workers tp)
5.7- do (join-worker w))
5.8+ do (join-worker w))
5.9 (is (= 4 (sb-concurrency:mailbox-count (results tp))))))
5.10
5.11+