changelog shortlog graph tags branches files raw help

Mercurial > core / changeset: tasks

changeset 694: a36280d2ef4e
parent 693: 5f81d888c31f
child 695: 2bad47888dbf
author: Richard Westhaver <ellis@rwest.io>
date: Thu, 03 Oct 2024 21:54:07 -0400
files: lisp/ffi/sndfile/pkg.lisp lisp/std/mop.lisp lisp/std/pkg.lisp lisp/std/task.lisp lisp/std/tests/task.lisp
description: tasks
     1.1--- a/lisp/ffi/sndfile/pkg.lisp	Thu Oct 03 19:04:57 2024 -0400
     1.2+++ b/lisp/ffi/sndfile/pkg.lisp	Thu Oct 03 21:54:07 2024 -0400
     1.3@@ -120,8 +120,10 @@
     1.4                    :wavex #x130000               ; MS WAVE with WAVEFORMATEX 
     1.5                    :sd2 #x160000               ; Sound Designer 2 
     1.6                    :flac #x170000               ; FLAC lossless file format 
     1.7-                   :caf #x180000               ; Core Audio File format 
     1.8+                   :caf #x180000)               ; Core Audio File format 
     1.9 
    1.10+(define-alien-enum (sf-format-subtype int)
    1.11+                   ;; subtypes
    1.12                    :pcm-s8 #x0001                 ; Signed 8 bit data 
    1.13                    :pcm-16 #x0002                 ; Signed 16 bit data 
    1.14                    :pcm-24 #x0003                 ; Signed 24 bit data 
     2.1--- a/lisp/std/mop.lisp	Thu Oct 03 19:04:57 2024 -0400
     2.2+++ b/lisp/std/mop.lisp	Thu Oct 03 21:54:07 2024 -0400
     2.3@@ -61,3 +61,33 @@
     2.4                       `(,ns ,v))))
     2.5               (when unboundp (list ns))))))
     2.6     slots)))
     2.7+
     2.8+;; closer-mop
     2.9+(defun ensure-finalized (class &optional (errorp t))
    2.10+  (if (typep class 'class)
    2.11+    (unless (class-finalized-p class)
    2.12+      (finalize-inheritance class))
    2.13+    (when errorp (error "~S is not a class." class)))
    2.14+  class)
    2.15+
    2.16+(defun subclassp (class superclass)
    2.17+  (flet ((get-class (class) (etypecase class
    2.18+                              (class class)
    2.19+                              (symbol (find-class class)))))
    2.20+
    2.21+      (loop with class = (get-class class)
    2.22+            with superclass = (get-class superclass)
    2.23+
    2.24+            for superclasses = (list class)
    2.25+            then (set-difference
    2.26+                  (union (class-direct-superclasses current-class) superclasses)
    2.27+                  seen)
    2.28+
    2.29+            for current-class = (first superclasses)
    2.30+
    2.31+            while current-class
    2.32+
    2.33+            if (eq current-class superclass) return t
    2.34+            else collect current-class into seen
    2.35+
    2.36+            finally (return nil))))
     3.1--- a/lisp/std/pkg.lisp	Thu Oct 03 19:04:57 2024 -0400
     3.2+++ b/lisp/std/pkg.lisp	Thu Oct 03 21:54:07 2024 -0400
     3.3@@ -178,7 +178,8 @@
     3.4   (:use :cl :sb-mop :sb-pcl)
     3.5   (:import-from :std/sym :symb :make-keyword)
     3.6   (:export :list-slot-values-using-class
     3.7-   :list-class-methods :list-class-slots :list-indirect-slot-methods))
     3.8+   :list-class-methods :list-class-slots :list-indirect-slot-methods
     3.9+   :ensure-finalized :subclassp))
    3.10    
    3.11 (defpkg :std/fu
    3.12   (:use :cl)
    3.13@@ -282,14 +283,16 @@
    3.14   (:use :cl :std/thread :sb-concurrency)
    3.15   (:import-from :std/thread :%make-thread)
    3.16   (:import-from :std/macs :if-let)
    3.17+  (:import-from :std/list :deletef)
    3.18   (:export
    3.19    :spawn-workers
    3.20    :make-oracle :make-supervisor
    3.21-   :oracle :run-task
    3.22+   :oracle 
    3.23    :oracle-id :find-thread
    3.24    :push-job :push-task
    3.25    :push-worker :push-task-result
    3.26-   :run-job :run-stage
    3.27+   :run :run-object 
    3.28+   :work
    3.29    :pop-job :pop-task
    3.30    :workers
    3.31    :tasks
    3.32@@ -304,7 +307,9 @@
    3.33    :*task-pool*
    3.34    :*tasks*
    3.35    :*oracles*
    3.36-   :*workers*
    3.37+   :*oracle-threads*
    3.38+   :*worker-threads*
    3.39+   :*supervisor-threads*
    3.40    :*jobs*
    3.41    :*stages*
    3.42    :*task*
     4.1--- a/lisp/std/task.lisp	Thu Oct 03 19:04:57 2024 -0400
     4.2+++ b/lisp/std/task.lisp	Thu Oct 03 21:54:07 2024 -0400
     4.3@@ -10,8 +10,11 @@
     4.4 (defvar *tasks* (make-queue :name "tasks"))
     4.5 (defvar *jobs*)
     4.6 (defvar *stages*)
     4.7-(defvar *oracles* nil)
     4.8-(defvar *task-oracles* nil)
     4.9+(sb-ext:defglobal *worker-threads* nil)
    4.10+(sb-ext:defglobal *oracles* nil)
    4.11+(sb-ext:defglobal *oracle-threads* nil)
    4.12+(sb-ext:defglobal *supervisor-threads* nil)
    4.13+
    4.14 (eval-when (:compile-toplevel)
    4.15   (defvar *task*)
    4.16   (defvar *task-result* nil))
    4.17@@ -55,43 +58,94 @@
    4.18   `(gen-task-kernel ,name ,args ,lock ,queue ,mailbox ,timeout
    4.19      ,@body))
    4.20 
    4.21+(defun make-ephemeral-thread (name)
    4.22+    (sb-thread::%make-thread name t (make-semaphore :name name)))
    4.23+
    4.24+;;; Proto
    4.25+(defgeneric designate-oracle (host guest))
    4.26+(defgeneric assign-supervisor (worker supervisor))
    4.27+
    4.28+(defgeneric make-workers (count &rest initargs &key &allow-other-keys)
    4.29+  (:method ((count number) &key thread kernel input)
    4.30+    (let ((ret))
    4.31+      (dotimes (i count ret)
    4.32+        (push (make-worker :thread thread :kernel kernel :input input) ret)))))
    4.33+
    4.34+(defgeneric tasks (self))
    4.35+(defgeneric results (self))
    4.36+(defgeneric run (self object &rest initargs &key &allow-other-keys))
    4.37+(defgeneric run-object (self))
    4.38+(defgeneric work (self &key &allow-other-keys))
    4.39+(defgeneric workers (self))
    4.40+
    4.41 ;;; Supervisor
    4.42 (defclass supervisor ()
    4.43-  (scope)
    4.44-  (:documentation "A class which provides a view of the work done within a specified
    4.45-SCOPE.
    4.46+  ((thread :initform (make-ephemeral-thread (symbol-name (gensym "supervisor"))) :accessor supervisor-thread)
    4.47+   (domain)
    4.48+   (scope))
    4.49+  (:documentation "Supervisors are threads which are responsible for a set of worker threads
    4.50+within their DOMAIN and SCOPE."))
    4.51 
    4.52-This object should be used by operators to inspect 'runstreams'
    4.53-performed in other threads, such as WORKERS in TASK-POOL.
    4.54-
    4.55-Before using this object you should ensure the SCOPE is fully
    4.56-initialized. Supervisors should be created at any point during the
    4.57-lifetime of SCOPE, but never before and never after."))
    4.58+(defmethod initialize-instance :after ((self supervisor) &key &allow-other-keys)
    4.59+  (push (supervisor-thread self) *supervisor-threads*))
    4.60 
    4.61 ;;; Worker
    4.62 ;; unix-getrusage  
    4.63 ;; 0,-1,-2
    4.64 ;; (multiple-value-list (sb-unix:unix-getrusage 0))
    4.65 ;; (setf sb-unix::*on-dangerous-wait* :error)
    4.66-(defvar *default-worker-name* "worker")
    4.67 
    4.68+;; TODO 2024-10-03: with-cas-lock?
    4.69 (defclass worker ()
    4.70-  ((thread :initform (sb-thread::%make-thread #.#1=(symbol-name (gensym "w")) t (make-semaphore :name #.#1#))
    4.71+  ((thread :initform (make-ephemeral-thread (symbol-name (gensym "worker")))
    4.72            :accessor worker-thread
    4.73            :initarg :thread)
    4.74    (kernel :type function :accessor worker-kernel :initarg :kernel)
    4.75-   (input :initform nil :accessor worker-input :initarg :input)))
    4.76+   (tasks :initform nil :accessor tasks :initarg :input)))
    4.77+
    4.78+(defmethod initialize-instance :after ((self worker) &key &allow-other-keys)
    4.79+  (push (worker-thread self) *worker-threads*))
    4.80 
    4.81-(defvar *workers* (make-array 0 :element-type 'worker :adjustable t))
    4.82+(defun make-worker (&key thread kernel input)
    4.83+  (apply #'make-instance 'worker
    4.84+         `(,@(when thread `(:thread ,thread))
    4.85+           ,@(when kernel `(:kernel ,kernel))
    4.86+           ,@(when input `(:input ,input)))))
    4.87+
    4.88+;; TODO 2024-10-03: pause/resume
    4.89+
    4.90+(declaim (inline kill-worker join-worker start-worker run-worker))
    4.91+(defun start-worker (worker) 
    4.92+  (sb-thread::start-thread (worker-thread worker) (worker-kernel worker) (tasks worker)))
    4.93 
    4.94-(declaim (inline kill-worker join-worker))
    4.95-(defun start-worker (worker) 
    4.96-  (sb-thread::start-thread (worker-thread worker) (worker-kernel worker) (worker-input worker)))
    4.97-(defun kill-worker (worker) (kill-thread (worker-thread worker)))
    4.98-(defun join-worker (worker) (join-thread (worker-thread worker)))
    4.99+(defun run-worker (worker &key input wait)
   4.100+  (when input
   4.101+    (setf (tasks worker) input))
   4.102+  (start-worker worker)
   4.103+  (if wait (join-worker worker)
   4.104+      worker))
   4.105+
   4.106+(defmethod run-object ((self worker))
   4.107+  (run-worker self))
   4.108+
   4.109+(defmethod run ((self worker) (object t) &key wait &allow-other-keys)
   4.110+  (run-worker self :input object :wait wait))
   4.111+
   4.112+(defun kill-worker (worker) 
   4.113+  (declare (worker worker))
   4.114+  (let ((th (worker-thread worker)))
   4.115+    (unwind-protect (kill-thread th)
   4.116+      (deletef *worker-threads* th))))
   4.117+
   4.118+(defun join-worker (worker)
   4.119+  (declare (worker worker))
   4.120+  (let ((th (worker-thread worker)))
   4.121+    (unwind-protect (join-thread th)
   4.122+      (deletef *worker-threads* th))))
   4.123 
   4.124 ;;; Oracle           
   4.125 (defstruct (oracle (:constructor %make-oracle (id thread)))
   4.126+  "Oracles provide a tagged view into some threaded scope of work."
   4.127   (id 0 :type (unsigned-byte 32) :read-only t)
   4.128   (thread *current-thread* :read-only t))
   4.129 
   4.130@@ -104,41 +158,9 @@
   4.131       (values id found)
   4.132       (let ((orc (%make-oracle id thread)))
   4.133         (push orc *oracles*)
   4.134+        (push (oracle-thread orc) *oracle-threads*)
   4.135         (values id orc)))))
   4.136 
   4.137-;;; Proto
   4.138-;; oracle
   4.139-(defgeneric designate-oracle (host guest))
   4.140-;; worker
   4.141-(defun make-worker (&key thread kernel input)
   4.142-  (apply #'make-instance 'worker
   4.143-         `(,@(when thread `(:thread ,thread))
   4.144-           ,@(when kernel `(:kernel ,kernel))
   4.145-           ,@(when input `(:input ,input)))))
   4.146-
   4.147-(defgeneric make-workers (count &rest initargs &key &allow-other-keys)
   4.148-  (:method ((count number) &key thread kernel input)
   4.149-    (let ((ret))
   4.150-      (dotimes (i count ret)
   4.151-        (push (make-worker :thread thread :kernel kernel :input input) ret)))))
   4.152-
   4.153-(defgeneric delete-worker (worker pool &key &allow-other-keys))
   4.154-(defgeneric spawn-worker (pool worker))
   4.155-
   4.156-;; job
   4.157-(defgeneric make-job (self &key &allow-other-keys))
   4.158-(defgeneric find-job (job pool &key &allow-other-keys))
   4.159-(defgeneric run-job (self job))
   4.160-(defgeneric run-jobs (self))
   4.161-;; task
   4.162-(defgeneric tasks (self))
   4.163-(defgeneric run-task (self task))
   4.164-(defgeneric run-tasks (self))
   4.165-(defgeneric results (self))
   4.166-;; stage
   4.167-(defgeneric run-stage (self stage))
   4.168-(defgeneric workers (self))
   4.169-
   4.170 ;;; Task Pool
   4.171 (defstruct task-pool
   4.172   (kernel 'identity :type symbol)
   4.173@@ -199,55 +221,54 @@
   4.174 (defclass task ()
   4.175   ((state :initform nil :initarg :state :accessor task-state))
   4.176   (:documentation "This object represents a single unit of work to be done by some
   4.177-worker. Tasks are typically generated by an oracle, but workers may also be
   4.178-granted the ability to create and distribute their own tasks. Once a task is
   4.179-assigned, the 'owner', i.e. the worker that is assigned this task, may modify
   4.180-the object and state. When the work associated with a task is complete, the
   4.181-owner is responsible for indicating in the state slot the result of the
   4.182-computation."))
   4.183+worker. Tasks are typically distributed from the task-pool, but workers may
   4.184+also be granted the ability to create and distribute their own tasks. Once a
   4.185+task is assigned, the 'owner', i.e. the worker that is assigned this task, may
   4.186+modify the object. When the work associated with a task is complete, the owner
   4.187+is responsible for indicating in the state slot the result of the computation."))
   4.188 
   4.189 (defmethod print-object ((self task) stream)
   4.190   (print-unreadable-object (self stream :type t)
   4.191     (format stream ":state ~A" (task-state self))))
   4.192 
   4.193-(defmethod run-task ((self thread) (task task)))
   4.194+(defun run-task (worker task)
   4.195+  (run-worker worker :input task))
   4.196 
   4.197 ;;; Job
   4.198-(defstruct (job (:constructor %make-job (tasks)))
   4.199-  "A collection of tasks to be performed by worker threads."
   4.200-  (tasks (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
   4.201-   :type (array task *))
   4.202-  (lock (make-mutex :name "job") :type mutex))
   4.203-
   4.204-(defmethod tasks ((self job)) (job-tasks self))
   4.205+(defclass job (task)
   4.206+  ((tasks :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
   4.207+          :type (array task *)
   4.208+          :initarg :tasks
   4.209+          :accessor tasks)
   4.210+   (lock :initform (make-mutex :name "job") :type mutex
   4.211+         :initarg :lock))
   4.212+  (:documentation "A collection of tasks forming a single unit of work."))
   4.213 
   4.214-(defmethod make-job ((self task) &key (size 1))
   4.215-  (%make-job (make-array size :element-type 'task
   4.216-                              :initial-element self)))
   4.217-
   4.218-(defmethod make-job ((self vector) &key)
   4.219-  (%make-job self))
   4.220-
   4.221-(defmethod make-job ((self null) &key (size 1))
   4.222-  (%make-job (make-array size :element-type 'task :fill-pointer 0 :adjustable t)))
   4.223+(declaim (inline make-job))
   4.224+(defun make-job (&rest tasks)
   4.225+  (make-instance 'job
   4.226+    :tasks (make-array (length tasks) 
   4.227+                       :element-type 'task
   4.228+                       :initial-contents tasks)))
   4.229 
   4.230 (defmethod print-object ((self job) stream)
   4.231   (print-unreadable-object (self stream :type t)
   4.232-    (format stream "~A" (job-tasks self))))
   4.233+    (format stream "~A tasks" (length (tasks self)))))
   4.234+
   4.235+(defun run-job (worker job)
   4.236+  (run-worker worker :input job))
   4.237 
   4.238-;;; Stage
   4.239-(defclass stage ()
   4.240-  ((jobs  :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
   4.241-          :initarg :jobs
   4.242-          :accessor jobs
   4.243-          :type (vector job))
   4.244-   (lock :initform (make-mutex :name "stage") :initarg :lock :accessor stage-lock :type mutex)))
   4.245+;;; Work Scope
   4.246+(defclass work-scope ()
   4.247+  ((tasks  :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t)
   4.248+           :initarg :tasks
   4.249+           :accessor tasks
   4.250+           :type (vector task))
   4.251+   (lock :initform (make-mutex :name "work-scope") :initarg :lock :accessor work-scope-lock :type mutex)))
   4.252 
   4.253-(defmethod print-object ((self stage) stream)
   4.254+(defmethod print-object ((self work-scope) stream)
   4.255   (print-unreadable-object (self stream :type t)
   4.256-    (format stream "~A" (jobs self))))
   4.257-
   4.258-(defmethod run-stage ((self thread) (stage stage)))
   4.259+    (format stream "~A" (tasks self))))
   4.260 
   4.261 ;;; Macros
   4.262 (defmacro with-task-pool ((sym &key oracle (tasks 0) lock (workers 4) start kernel results) &body body)
     5.1--- a/lisp/std/tests/task.lisp	Thu Oct 03 19:04:57 2024 -0400
     5.2+++ b/lisp/std/tests/task.lisp	Thu Oct 03 21:54:07 2024 -0400
     5.3@@ -20,6 +20,7 @@
     5.4     (is (zerop (sb-concurrency:mailbox-count (results tp))))
     5.5     (start-task-workers tp)
     5.6     (loop for w across (workers tp)
     5.7-             do (join-worker w))
     5.8+          do (join-worker w))
     5.9     (is (= 4 (sb-concurrency:mailbox-count (results tp))))))
    5.10 
    5.11+