# HG changeset patch # User Richard Westhaver # Date 1727926747 14400 # Node ID 295ea43ceb2d9d2afb606d8ab999e405d44b50a5 # Parent 90417ae14b210c63008abdd135e74b258af2b29b tasks diff -r 90417ae14b21 -r 295ea43ceb2d lisp/lib/obj/obj.asd --- a/lisp/lib/obj/obj.asd Tue Oct 01 23:34:01 2024 -0400 +++ b/lisp/lib/obj/obj.asd Wed Oct 02 23:39:07 2024 -0400 @@ -45,8 +45,6 @@ (:file "util") (:file "x11") (:file "palette"))) - (:module "music" - :components ((:file "music"))) (:module "time" :components ((:file "local") (:file "util"))) diff -r 90417ae14b21 -r 295ea43ceb2d lisp/std/pkg.lisp --- a/lisp/std/pkg.lisp Tue Oct 01 23:34:01 2024 -0400 +++ b/lisp/std/pkg.lisp Wed Oct 02 23:39:07 2024 -0400 @@ -303,6 +303,8 @@ :default-task-kernel :make-worker :make-workers + :run-tasks + :run-jobs :worker-count :init-task-pool :make-task-pool @@ -454,6 +456,7 @@ (:shadowing-import-from :sb-vm :list-allocated-objects) (:import-from :sb-impl :*logical-hosts*) (:export + :*default-arena-size* :current-lisp-implementation :current-machine :list-package-symbols diff -r 90417ae14b21 -r 295ea43ceb2d lisp/std/sys.lisp --- a/lisp/std/sys.lisp Tue Oct 01 23:34:01 2024 -0400 +++ b/lisp/std/sys.lisp Wed Oct 02 23:39:07 2024 -0400 @@ -18,6 +18,8 @@ ;; sb-sys:*linkage-info* *machine-version* *runtime-dlhandle* *periodic-polling-function* ;; *periodic-polling-period* io-timeout nlx-protect serve-event os-deinit os-exit with-deadline dlopen-or-lose deallocate-system-memory +(defvar *default-arena-size* (* 10 1024 1024 1024)) + (defun current-lisp-implementation () "Return the current lisp implemenation as a list: (TYPE VERSION FEATURES)" (list diff -r 90417ae14b21 -r 295ea43ceb2d lisp/std/task.lisp --- a/lisp/std/task.lisp Tue Oct 01 23:34:01 2024 -0400 +++ b/lisp/std/task.lisp Wed Oct 02 23:39:07 2024 -0400 @@ -19,7 +19,7 @@ (destructuring-bind (name args &body body) op `(std/macs:named-lambda ,name ,args ,@body))) -(defmacro define-task-kernel (name ops accessors &body body) +(defmacro define-task-kernel (name (&key lock timeout mailbox queue) args &body body) "Define a task kernel. (define-task-kernel NAME (&key ARGS ACCESSORS) @@ -35,18 +35,22 @@ kernel via an ORACLE. This interface is experimental and subject to change." - (declare (ignorable accessors ops)) - `(defun ,name () - ,@body - (values))) + `(gen-task-kernel ,name ,args ,lock ,queue ,mailbox ,timeout + ,@body)) + +(defvar *task-queue*) +(defvar *task-item*) +(defvar *task-result*) +(defvar *task-error*) -(defvar *task-kernel* - (symbol-function - (define-task-kernel default-task-kernel () () - "The default task kernel used to initialize the KERNEL slot of -task-pools. -" - nil))) +(defmacro gen-task-kernel (name args lock queue mailbox timeout &body body) + `(compile ,name + (lambda ,args + (wait-on-semaphore ,lock ,@(when timeout `((:timeout ,timeout)))) + (let* ((*task-queue* ,queue) + (*task-item* (dequeue ,queue))) + ,@body + (send-message ,mailbox t))))) ;;; Supervisor (defclass supervisor () @@ -92,7 +96,6 @@ (values id orc))))) ;;; Proto - ;; oracle (defgeneric designate-oracle (host guest)) ;; worker @@ -108,9 +111,6 @@ (let ((ret)) (dotimes (i count ret) (push (make-worker t :thread thread :function function :arguments arguments) ret))))) -(defgeneric push-worker (thread pool)) -(defgeneric push-workers (threads pool)) -(defgeneric worker-count (self &key &allow-other-keys)) (defgeneric delete-worker (worker pool &key &allow-other-keys)) (defgeneric pop-worker (pool)) (defgeneric make-worker-for (pool function &rest args) @@ -132,73 +132,49 @@ ;; job (defgeneric make-job (self &key &allow-other-keys)) -(defgeneric push-job (job pool)) -(defgeneric pop-job (pool)) (defgeneric find-job (job pool &key &allow-other-keys)) -(defgeneric delete-job (job pool &key &allow-other-keys)) (defgeneric run-job (self job)) - +(defgeneric run-jobs (self)) ;; task -(defgeneric push-task (task pool)) -(defgeneric pop-task (pool)) -(defgeneric delete-task (task pool &key &allow-other-keys)) -(defgeneric make-task (&rest args)) +(defgeneric tasks (self)) +(defgeneric make-task (&rest args &key &allow-other-keys)) (defgeneric run-task (self task)) - -;; task-result -(defgeneric push-task-result (task pool)) -(defgeneric pop-task-result (pool)) - +(defgeneric run-tasks (self)) +(defgeneric results (self)) ;; stage -(defgeneric push-stage (stage pool)) -(defgeneric pop-stage (pool)) -(defgeneric delete-stage (stage pool &key &allow-other-keys)) (defgeneric run-stage (self stage)) +(defgeneric workers (self)) -;; task-pool -(defgeneric start-task-pool (pool)) -(defgeneric pause-task-pool (pool)) -(defgeneric init-task-pool (pool) - (:method ((pool t)) - (setq *task-pool* (or pool (make-task-pool))))) -(defgeneric stop-task-pool (pool)) -(defgeneric restart-task-pool (pool)) - -;;; Pool +;;; Task Pool (defstruct task-pool - (oracle-id nil :type (or null (unsigned-byte 32))) - (kernel #'default-task-kernel :type function) - (jobs (make-queue :name "jobs")) - (stages (make-array 0 :element-type 'stage :fill-pointer 0) :type (array stage *)) - ;; When open, indicates that the pool is fully initialized and workers - ;; may make progress. - (online (make-semaphore :name "online") - :type semaphore) + (kernel 'identity :type symbol) + (tasks (make-queue :name "tasks")) + (lock (make-semaphore :name "online") :type semaphore) ;; TODO: test weak-vector here - (workers (make-array 0 :element-type 'worker - :fill-pointer 0) - :type (vector worker)) + (workers (make-array 0 :element-type 'worker :fill-pointer 0) :type (vector thread)) (results (make-mailbox :name "results"))) -(defmethod print-object ((self task-pool) stream) +(defmethod tasks ((self task-pool)) (task-pool-tasks self)) +(defmethod results ((self task-pool)) (task-pool-results self)) +(defmethod workers ((self task-pool)) (task-pool-workers self)) + +(defmethod print-object ((self task-pool) (stream t)) (print-unreadable-object (self stream :type t) - (format stream "~A ~A :online ~A ~A:~A:~A ~A" - (task-pool-oracle-id self) + (format stream "~A ~A:~A:~A ~A" (task-pool-kernel self) - (semaphore-count (task-pool-online self)) - (queue-count (task-pool-jobs self)) - (length (task-pool-stages self)) - (length (task-pool-workers self)) + (length (workers self)) + (semaphore-count (task-pool-lock self)) + (queue-count (tasks self)) (mailbox-count (task-pool-results self))))) (defmethod designate-oracle ((self task-pool) (guest integer)) (setf (task-pool-oracle-id self) (make-oracle (find-thread-by-id guest))) self) -(defmethod worker-count ((self task-pool) &key online) +(defun worker-count (task-pool &key online) (if online - (semaphore-count (task-pool-online self)) - (length (task-pool-workers self)))) + (semaphore-count (task-pool-online task-pool)) + (length (task-pool-workers task-pool)))) (defmethod designate-oracle ((self task-pool) (guest thread)) (designate-oracle self (make-oracle guest))) @@ -206,15 +182,16 @@ (defmethod task-pool-oracle ((self task-pool)) (oracle-thread (find-oracle (slot-value self 'oracle)))) -(defmethod push-worker ((worker thread) (pool task-pool)) +(declaim (inline push-worker push-workers pop-worker)) +(defun push-worker (worker pool) (vector-push-extend worker (task-pool-workers pool))) -(defmethod push-workers ((threads list) (pool task-pool)) +(defun push-workers (threads pool) (with-slots (workers) pool (dolist (w threads) (vector-push-extend w workers)))) -(defmethod pop-worker ((pool task-pool)) +(defmethod pop-worker (pool) (vector-pop (task-pool-workers pool))) (defmethod make-worker-for ((pool task-pool) function &rest args) @@ -232,8 +209,7 @@ ;;; Task (defclass task () - ((state :initarg :state :accessor task-state) - (object :initarg :object :accessor task-object)) + ((state :initarg :state :accessor task-state)) (:documentation "This object represents a single unit of work to be done by some worker. Tasks are typically generated by an oracle, but workers may also be granted the ability to create and distribute their own tasks. Once a task is @@ -242,9 +218,6 @@ owner is responsible for indicating in the state slot the result of the computation.")) -(defmethod make-task (&rest args) - (make-instance 'task :object args)) - (defmethod print-object ((self task) stream) (print-unreadable-object (self stream :type t) (format stream "~A" (task-object self)))) @@ -261,6 +234,8 @@ :type (array task *)) (lock (make-mutex :name "job") :type mutex)) +(defmethod tasks ((self job)) (job-tasks self)) + (defmethod make-job ((self task) &key (size 1)) (%make-job (make-array size :element-type 'task :initial-element self))) @@ -275,19 +250,6 @@ (print-unreadable-object (self stream :type t) (format stream "~A" (job-tasks self)))) -(defmethod push-task ((task task) (job job)) - (vector-push task (job-tasks job))) - -(defmethod push-task ((task task) (pool task-pool)) - (push-job (make-job task) pool)) - -(defmethod push-job ((job job) (pool task-pool)) - (enqueue job (task-pool-jobs pool))) - -(defmethod run-job ((self task-pool) (job job)) - #+log (log:trace! "running remote job...") - (push-job job self)) - ;;; Stage (defclass stage () ((jobs :initform (make-array 0 :element-type 'task :fill-pointer 0 :adjustable t) @@ -300,13 +262,20 @@ (print-unreadable-object (self stream :type t) (format stream "~A" (jobs self)))) -(defmethod push-stage ((stage stage) (pool task-pool)) - (vector-push stage (task-pool-stages pool))) - (defmethod run-stage ((self thread) (stage stage))) ;;; Macros -(defmacro with-task-pool ((sym pool &key count) &body body) - `(let ((,sym ,(or pool std/task::*task-pool*))) - ,@(when count `((spawn-workers ,sym ,count))) +(defmacro with-task-pool ((sym &key oracle lock (count 4) spawn kernel results) &body body) + (unless lock (setf lock (make-semaphore :name "online" :count count))) + (unless results (setf results (make-mailbox :name "results"))) + `(let ((,sym (make-task-pool :lock ,lock :results ,results))) + ,@(if kernel `((setf (task-pool-kernel ,sym) ,kernel)) + `((setf (task-pool-kernel ,sym) + (gen-task-kernel '%kernel () + (task-pool-lock ,sym) + (tasks ,sym) + (results ,sym) + nil)))) + (designate-oracle ,sym ,@(if oracle (list oracle) `((make-oracle *current-thread*)))) + ,@(when spawn `((spawn-workers ,sym ,spawn))) ,@body)) diff -r 90417ae14b21 -r 295ea43ceb2d lisp/std/tests/task.lisp --- a/lisp/std/tests/task.lisp Tue Oct 01 23:34:01 2024 -0400 +++ b/lisp/std/tests/task.lisp Wed Oct 02 23:39:07 2024 -0400 @@ -8,12 +8,14 @@ (deftest tasks () "Test task-pools, oracles, and workers." - (let ((pool (designate-oracle (make-task-pool) (make-oracle *current-thread*)))) - ;; pool is bound to a task pool, *ORACLE-THREADS* contains the *CURRENT-THREAD*. - (spawn-workers pool 16) - ;; (with-threads (16 :args (&optional (a 0) (b 1) (c 2))) - ;; (sb-thread:allocator-histogram) - ;; (sb-concurrency:wait-on-gate (std/thread::task-pool-online pool)) - ;; (print (+ a b c))) - (is (= 16 (length (task-pool-workers pool)))) - (is (sb-thread:semaphore-count (std/task::task-pool-online pool))))) + (with-threads (4 :args (&optional (a 0) (b 1) (c 2))) + (is (= 3 (+ a b c)))) + ;; *ORACLE-THREADS* contains the *CURRENT-THREAD*. + (std/task:with-task-pool (tp :count 10 :spawn 4) + (is (= 4 (length (task-pool-workers tp)))) + (std/task::task-pool-lock tp) + (is (= 4 (std/task::mailbox-count (task-pool-results tp)))) + (describe tp) + (dotimes (i 4) + (is (eql t (std/task::receive-message (task-pool-results tp))))) + (is (null (std/task::receive-message-no-hang (task-pool-results tp))))))