Mercurial > core / lisp/lib/obj/query.lisp
changeset 698: |
96958d3eb5b0 |
parent: |
97dd03beda03
|
author: |
Richard Westhaver <ellis@rwest.io> |
date: |
Fri, 04 Oct 2024 22:04:59 -0400 |
permissions: |
-rw-r--r-- |
description: |
fixes |
1 ;;; obj/query/pkg.lisp --- Query Objects 3 ;; Lisp primitive Query objects for DIY query engines. 7 ;; This package provides the base set of classes and methods for implementing 10 ;; The intention is to use these objects in several high-level packages where 11 ;; we need the ability to ask complex questions about some arbitrary data 14 ;; The type of high-level packages can loosely be categorized as: 16 ;; - Frontends :: The interface exposed to the user - SQL, Prolog, etc. 18 ;; - Middleware :: interfaces which are used internally and exposed publicly - 19 ;; query planners/optimizers/ast 21 ;; - Backends :: The interface exposed to the underlying data sources - 22 ;; RocksDB, SQLite, etc. 26 ;; https://gist.github.com/twitu/221c8349887cec0a83b395e4cbb492a7 28 ;; https://www1.columbia.edu/sec/acis/db2/db2d0/db2d0103.htm 30 ;; https://howqueryengineswork.com/ 33 (in-package :obj/query) 37 (defvar *literal-value-types* '(boolean fixnum signed-byte unsigned-byte float double-float string))) 39 (deftype literal-value-type () `(or ,@*literal-value-types*)) 43 (name (symbol-name (gensym "#")) :type simple-string) 44 (type t :type (or symbol list))) 46 (defmethod make-load-form ((self field) &optional env) 47 (declare (ignore env)) 48 `(make-field :name ,(field-name self) :type ,(field-type self))) 51 (deftype field-vector () '(vector field)) 53 ;; convenience interface for FIELD-VECTOR 54 (defclass column-vector () ((data :type simple-vector :accessor column-data))) 56 (defclass literal-value-vector (column-vector) 57 ((type :type literal-value-type :initarg :type :accessor column-type) 58 (data :initarg :data :accessor column-data) 59 (size :type fixnum :initarg :size :accessor column-size))) 61 (defgeneric column-literal-value (self) 62 (:method ((self literal-value-vector)) 65 (defgeneric column-type (self) 66 (:method ((self column-vector)) 67 (array-element-type (column-data self)))) 69 (defgeneric column-value (self i) 70 (:method ((self column-vector) (i fixnum)) 71 (aref (column-data self) i)) 72 (:method ((self literal-value-vector) (i fixnum)) 73 (if (or (< i 0) (>= i (column-size self))) 74 (error 'simple-error :format-control "index out of bounds: ~A" :format-arguments i) 75 (column-literal-value self)))) 79 ((fields :type field-vector :initarg :fields :accessor fields))) 81 (defun make-schema (&rest fields) 82 (make-instance 'schema :fields (coerce fields 'field-vector))) 84 (defmethod print-object ((self schema) stream) 85 (print-unreadable-object (self stream :type t) 86 (format stream ":fields ~A" (map 'list 'field-name (fields self))))) 88 (defmethod make-load-form ((self schema) &optional env) 89 (declare (ignore env)) 90 `(make-instance ,(class-of self) :fields ,(fields self))) 92 (defclass schema-metadata () 93 ((metadata :initarg :metadata :accessor schema-metadata))) 95 (defmethod make-load-form ((self schema-metadata) &optional env) 96 (declare (ignore env)) 97 `(make-instance ,(class-of self) :metadata ,(schema-metadata self))) 99 (defgeneric column-size (self) 100 (:method ((self column-vector)) 101 (length (column-data self)))) 104 (defstruct record-batch 105 (schema (make-schema) :type schema) 106 (fields #() :type column-vector)) 108 (defmethod schema ((self record-batch)) 109 (record-batch-schema self)) 111 (defmethod make-load-form ((self record-batch) &optional env) 112 (declare (ignore env)) 113 `(make-record-batch :schema ,(record-batch-schema self) :fields ,(record-batch-fields self))) 116 (defgeneric field (self n) 117 (:method ((self record-batch) (n fixnum)) 118 (aref (record-batch-fields self) n))) 120 (defgeneric fields (self) 121 (:method ((self record-batch)) 122 (record-batch-fields self))) 124 (defgeneric schema (self) 125 (:method ((self record-batch)) 126 (record-batch-schema self))) 128 (defgeneric derive-schema (self)) 129 (defgeneric load-schema (self schema)) 130 (defgeneric load-field (self field)) 131 (defgeneric select (self names) 132 (:method ((self schema) (names list)) 133 (let* ((fields (fields self)) 134 (ret (make-array (length fields) :element-type 'field :fill-pointer 0 135 :initial-element (make-field)))) 136 (make-instance 'schema 137 :fields (dolist (n names ret) 138 (if-let ((found (find n fields :test 'equal :key 'field-name))) 139 (vector-push found ret) 140 (error 'invalid-argument :item n :reason "Invalid column name")))))) 141 (:method ((self schema) (names vector)) 142 (let* ((fields (fields self)) 143 (ret (make-array (length fields) :element-type 'field :fill-pointer 0 144 :initial-element (make-field)))) 145 (make-instance 'schema 146 :fields (loop for n across names 147 do (if-let ((found (find n fields :test 'equal :key 'field-name))) 148 (vector-push found ret) 149 (error 'invalid-argument :item n :reason "Invalid column name")) 150 finally (return ret)))))) 152 (defgeneric project (self indices) 153 (:method ((self schema) (indices list)) 154 (make-instance 'schema 155 :fields (coerce (mapcar (lambda (i) (aref (fields self) i)) indices) 'field-vector))) 156 (:method ((self schema) (indices vector)) 157 (make-instance 'schema 159 (loop for i across indices 160 collect (aref (fields self) i)) 163 (defgeneric row-count (self) 164 (:method ((self record-batch)) 165 (sequence:length (aref (record-batch-fields self) 0)))) 167 (defgeneric column-count (self) 168 (:method ((self record-batch)) 169 (length (record-batch-fields self)))) 172 (defclass data-source () 173 ((schema :type schema :accessor schema))) 175 (defclass file-data-source (data-source) 176 ((path :initarg :path :accessor file-data-path))) 178 (defgeneric scan-data (self projection) 179 (:documentation "Scan the data source, selecting the specified columns.")) 182 (defclass query-expression () ()) 184 (defclass query-plan () 185 ((schema :type schema :accessor schema :initarg :schema) 186 (children :type (vector query-plan)))) 188 (defclass logical-plan (query-plan) 189 ((children :type (vector logical-plan) :accessor children :initarg :children))) 191 (defclass physical-plan (query-plan) 192 ((children :type (vector physical-plan)))) 194 ;;; Logical Expressions 195 (defclass logical-expression (query-expression) ()) 197 (defgeneric to-field (self input) 198 (:method ((self string) (input logical-plan)) 199 (declare (ignore input)) 200 (make-field :name self :type 'string)) 201 (:method ((self number) (input logical-plan)) 202 (declare (ignore input)) 203 (make-field :name (princ-to-string self) :type 'number))) 205 (defclass column-expression (logical-expression) 206 ((name :type string :initarg :name :accessor column-name))) 208 (defmethod to-field ((self column-expression) (input logical-plan)) 209 (or (find (column-name self) (fields (schema input)) :test 'equal :key 'field-name) 210 (error 'invalid-argument :item (column-name self) :reason "Invalid column name"))) 212 (defmethod df-col ((self string)) 213 (make-instance 'column-expression :name self)) 215 (defclass literal-expression (logical-expression) ()) 218 (defclass alias-expression (logical-expression) 219 ((expr :type logical-expression :initarg :expr :accessor expr) 220 (alias :type string :initarg :alias))) 222 (defclass cast-expression (logical-expression) 223 ((expr :type logical-expression :initarg :expr :accessor expr) 224 (data-type :type form :initarg :data-type))) 226 (defmethod to-field ((self cast-expression) (input logical-plan)) 227 (make-field :name (field-name (to-field (expr self) input)) :type (slot-value self 'data-type))) 230 (defclass unary-expression (logical-expression) 231 ((expr :type logical-expression :accessor expr))) 234 (defclass binary-expression (logical-expression) 235 ((lhs :type logical-expression :initarg :lhs :accessor lhs) 236 (rhs :type logical-expression :initarg :rhs :accessor rhs))) 238 (defgeneric binary-expression-name (self)) 239 (defgeneric binary-expression-op (self)) 241 (defclass boolean-binary-expression (binary-expression) 242 ((name :initarg :name :type string :accessor binary-expression-name) 243 (op :initarg :op :type symbol :accessor binary-expression-op))) 245 (defmethod to-field ((self boolean-binary-expression) (input logical-plan)) 246 (declare (ignore input)) 247 (make-field :name (binary-expression-name self) :type 'boolean)) 250 (defclass eq-expression (boolean-binary-expression) () 255 (defclass neq-expression (boolean-binary-expression) () 260 (defclass gt-expression (boolean-binary-expression) () 265 (defclass lt-expression (boolean-binary-expression) () 270 (defclass gteq-expression (boolean-binary-expression) () 275 (defclass lteq-expression (boolean-binary-expression) () 281 (defclass and-expression (boolean-binary-expression) () 286 (defclass or-expression (boolean-binary-expression) () 292 (defclass math-expression (binary-expression) 293 ((name :initarg :name :type string :accessor binary-expression-name) 294 (op :initarg :op :type symbol :accessor binary-expression-op))) 296 ;; TODO 2024-08-03: ??? 297 (defmethod to-field ((self math-expression) (input logical-plan)) 298 (declare (ignorable input)) 299 (make-field :name "*" :type (field-type (to-field (lhs self) input)))) 301 (defclass add-expression (math-expression) () 306 (defclass sub-expression (math-expression) () 311 (defclass mult-expression (math-expression) () 316 (defclass div-expression (math-expression) () 321 (defclass mod-expression (math-expression) () 327 (deftype aggregate-function () `(function ((input logical-expression)) query-expression)) 329 (deftype aggregate-function-designator () `(or aggregate-function symbol)) 331 (defclass aggregate-expression (logical-expression) 333 (expr :type logical-expression :accessor expr))) 335 (defgeneric aggregate-expression-p (self) 336 (:method ((self aggregate-expression)) t) 337 (:method ((self alias-expression)) (aggregate-expression-p (expr self))) 338 (:method ((self t)) nil)) 340 (defmethod to-field ((self aggregate-expression) (input logical-plan)) 341 (declare (ignorable input)) 342 (make-field :name (slot-value self 'name) :type (field-type (to-field (slot-value self 'expr) input)))) 344 (defclass sum-expression (aggregate-expression) () 348 (defclass min-expression (aggregate-expression) () 352 (defclass max-expression (aggregate-expression) () 356 (defclass avg-expression (aggregate-expression) () 360 (defclass count-expression (aggregate-expression) () 364 (defmethod to-field ((self count-expression) (input logical-plan)) 365 (declare (ignore input)) 366 (make-field :name "COUNT" :type 'number)) 371 (defclass scan-data (logical-plan) 372 ((path :type string :initarg :path) 373 (data-source :type data-source :initarg :data-source) 374 (projection :type (vector string) :initarg :projection))) 376 (defmethod derive-schema ((self scan-data)) 377 (let ((proj (slot-value self 'projection))) 378 (if (= 0 (length proj)) 379 (slot-value self 'schema) 380 (select (slot-value self 'schema) proj)))) 382 (defmethod schema ((self scan-data)) 383 (derive-schema self)) 386 (defclass projection (logical-plan) 387 ((input :type logical-plan :initarg :input) 388 (expr :type (vector logical-expression) :initarg :expr))) 390 (defmethod schema ((self projection)) 391 (schema (slot-value self 'input))) 394 (defclass selection (logical-plan) 395 ((input :type logical-plan :initarg :input) 396 (expr :type logical-expression :initarg :expr))) 398 (defmethod schema ((self selection)) 399 (schema (slot-value self 'input))) 402 (defclass aggregate (logical-plan) 403 ((input :type logical-plan :initarg :input) 404 (group-expr :type (vector logical-expression) :initarg :group-expr) 405 (agg-expr :type (vector aggregate-expression) :initarg :agg-expr))) 407 (defmethod schema ((self aggregate)) 408 (let ((input (slot-value self 'input)) 410 (loop for g across (slot-value self 'group-expr) 411 do (push (to-field g input) ret)) 412 (loop for a across (slot-value self 'agg-expr) 413 do (push (to-field a input) ret)) 414 (apply 'make-schema ret))) 417 (defclass limit (logical-plan) 418 ((input :type logical-plan :initarg :input) 419 (limit :type integer))) 421 (defmethod schema ((self limit)) 422 (setf (slot-value self 'schema) 423 (schema (slot-value self 'input)))) 425 (defmethod children ((self limit)) 426 (setf (slot-value self 'children) 427 (children (slot-value self 'input)))) 430 (defclass join (logical-plan) 431 ((left :accessor lhs) 432 (right :accessor rhs) 433 (on :accessor join-on))) 435 (defclass inner-join (join) ()) 436 ;; (defclass outer-join (join)) 437 (defclass left-join (join) ()) 438 (defclass right-join (join) ()) 445 (defmethod schema ((self join)) 446 ;; TODO 2024-08-04: test better dupe impl 447 (let ((dupes (mapcon #'(lambda (l) (when (eq (car l) (second l)) (list (car l)))) 448 (coerce (join-on self) 'list))) 449 (schema (make-instance 'schema))) 450 (setf (fields schema) 453 (let ((l (remove-if (lambda (x) (member x dupes :test 'string-equal)) (fields (schema (lhs self))))) 454 (r (fields (schema (rhs self))))) 455 (merge 'vector l r (lambda (x y) (declare (ignore y)) x)))) 457 (let ((l (fields (schema (lhs self)))) 458 (r (remove-if (lambda (x) (member x dupes :test 'string-equal)) (fields (schema (rhs self)))))) 459 (merge 'vector l r (lambda (x y) (declare (ignore y)) x)))))) 462 (defmethod children ((self join)) 463 (vector (lhs self) (rhs self))) 471 ;; correlated-subquery 473 ;; SELECT id, name, (SELECT count(*) FROM orders WHERE customer_id = customer.id) AS num_orders FROM customers 475 ;; uncorrelated-subquery 479 ;; SELECT * FROM orders WHERE total > (SELECT avg(total) FROM sales WHERE customer_state = 'CA') 481 ;; NOTE 2024-08-02: EXISTS, IN, NOT EXISTS, and NOT IN are also subqueries 484 ;; minimal data-frame abstraction. methods are prefixed with 'DF-'. 485 (defstruct (data-frame (:constructor make-data-frame (&optional plan))) 486 (plan (make-instance 'logical-plan) :type logical-plan)) 488 (defgeneric df-col (self)) 490 (defgeneric df-project (df exprs) 491 (:method ((df data-frame) (expr list)) 492 (df-project df (coerce expr 'vector))) 493 (:method ((df data-frame) (expr vector)) 494 (setf (data-frame-plan df) 495 (make-instance 'projection 496 :input (data-frame-plan df) 500 (defgeneric df-filter (df expr) 501 (:method ((df data-frame) (expr logical-expression)) 502 (setf (data-frame-plan df) 503 (make-instance 'selection :input (data-frame-plan df) :expr expr)) 506 (defgeneric df-aggregate (df group-by agg-expr) 507 (:method ((df data-frame) (group-by vector) (agg-expr vector)) 508 (setf (data-frame-plan df) 509 (make-instance 'aggregate :input (data-frame-plan df) 513 (:method ((df data-frame) (group-by list) (agg-expr list)) 514 (df-aggregate df (coerce group-by 'vector) (coerce agg-expr 'vector)))) 516 (defgeneric make-df (self &key &allow-other-keys) 517 (:method ((self null) &key) 520 (defmethod schema ((df data-frame)) 521 (schema (data-frame-plan df))) 523 (defmethod (setf schema) ((schema schema) (df data-frame)) 524 (setf (slot-value (data-frame-plan df) 'schema) schema)) 526 (defgeneric df-plan (df) 527 (:documentation "Return the logical plan associated with this data-frame.") 528 (:method ((df data-frame)) (data-frame-plan df))) 530 (defmethod (setf df-plan) ((plan logical-plan) (df data-frame)) 531 (setf (df-plan df) plan)) 533 ;;; Physical Expression 534 (defclass physical-expression (query-expression) ()) 536 (defclass literal-physical-expression (physical-expression) ()) 538 (defgeneric evaluate (self input) 539 (:documentation "Evaluate the expression SELF with INPUT and return a COLUMN-VECTOR result.") 540 (:method ((self string) (input record-batch)) 541 (make-instance 'literal-value-vector 542 :size (row-count input) 544 :data (sb-ext:string-to-octets self))) 545 (:method ((self number) (input record-batch)) 546 (make-instance 'literal-value-vector :size (row-count input) :type 'number :data self))) 548 (defclass column-physical-expression (physical-expression) 549 ((val :type array-index :initarg :val))) 551 (defmethod evaluate ((self column-physical-expression) (input record-batch)) 552 (field input (slot-value self 'val))) 554 (defclass binary-physical-expression (physical-expression) 555 ((lhs :type physical-expression :accessor lhs :initarg :lhs) 556 (rhs :type physical-expression :accessor rhs :initarg :rhs))) 558 (defgeneric evaluate2 (self lhs rhs)) 560 (defmethod evaluate ((self binary-physical-expression) (input record-batch)) 561 (let ((ll (evaluate (lhs self) input)) 562 (rr (evaluate (rhs self) input))) 563 (assert (= (length ll) (length rr))) 564 (if (eql (column-type ll) (column-type rr)) 565 (evaluate2 self ll rr) 566 (error "invalid state! lhs != rhs")))) 568 (defclass eq-physical-expression (binary-physical-expression) ()) 570 (defmethod evaluate2 ((self eq-physical-expression) lhs rhs) 571 (declare (ignore self)) 574 (defclass neq-physical-expression (binary-physical-expression) ()) 576 (defmethod evaluate2 ((self neq-physical-expression) lhs rhs) 577 (declare (ignore self)) 580 (defclass lt-physical-expression (binary-physical-expression) ()) 582 (defclass gt-physical-expression (binary-physical-expression) ()) 584 (defclass lteq-physical-expression (binary-physical-expression) ()) 586 (defclass gteq-physical-expression (binary-physical-expression) ()) 588 (defclass and-physical-expression (binary-physical-expression) ()) 590 (defclass or-physical-expression (binary-physical-expression) ()) 592 (defclass math-physical-expression (binary-physical-expression) ()) 594 (defmethod evaluate2 ((self math-physical-expression) (lhs column-vector) (rhs column-vector)) 595 (coerce (loop for i below (column-size lhs) 596 collect (evaluate2 self (column-value lhs i) (column-value rhs i))) 599 (defclass add-physical-expresion (math-expression) ()) 601 (defmethod evaluate2 ((self add-physical-expresion) lhs rhs) 602 (declare (ignore self)) 605 (defclass sub-physical-expression (math-expression) ()) 607 (defmethod evaluate2 ((self sub-physical-expression) lhs rhs) 608 (declare (ignore self)) 611 (defclass mult-physical-expression (math-expression) ()) 613 (defmethod evaluate2 ((self mult-physical-expression) lhs rhs) 614 (declare (ignore self)) 617 (defclass div-physical-expression (math-expression) ()) 619 (defmethod evaluate2 ((self div-physical-expression) lhs rhs) 620 (declare (ignore self)) 623 (defclass accumulator () 624 ((value :initarg :value :accessor accumulator-value))) 626 (defgeneric accumulate (self val) 627 (:method ((self accumulator) val) 629 (setf (accumulator-value self) (+ val (accumulator-value self))))) 630 (:method ((self list) val) 633 (defgeneric make-accumulator (self)) 636 (defclass max-accumulator (accumulator) ()) 638 (defmethod accumulate ((self max-accumulator) (val number)) 639 (when (> val (accumulator-value self)) 640 (setf (accumulator-value self) val))) 642 (defclass aggregate-physical-expression (physical-expression) 643 ((input :type physical-expression))) 645 (defclass max-physical-expression (aggregate-physical-expression) ()) 647 (defmethod make-accumulator ((self max-physical-expression)) 648 (make-instance 'max-accumulator)) 651 (defgeneric execute (self) 652 (:documentation "Execute the LOGICAL-PLAN represented by object SELF.") 653 (:method ((self data-frame)) 654 (execute (df-plan self)))) 656 (defclass scan-exec (physical-plan) 657 ((data-source :type data-source :initarg :data-source) 658 (projection :type (vector string) :initarg :projection))) 660 (defmethod schema ((self scan-exec)) 661 (select (schema (slot-value self 'data-source)) (slot-value self 'projection))) 663 (defmethod execute ((self scan-exec)) 664 (scan-data (slot-value self 'data-source) (slot-value self 'projection))) 666 (defclass projection-exec (physical-plan) 667 ((input :type physical-plan :initarg :input) 668 (expr :type (vector physical-expression) :initarg :expr))) 670 (defmethod execute ((self projection-exec)) 672 (loop for batch across (fields (execute (slot-value self 'input))) 673 collect (make-record-batch :schema (slot-value self 'schema) 675 (loop for e across (slot-value self 'expr) 676 collect (evaluate e batch)) 678 '(vector record-batch))) 681 (defclass selection-exec (physical-plan) 682 ((input :type physical-plan :initarg :input) 683 (expr :type physical-expression :initarg :expr))) 685 (defmethod schema ((self selection-exec)) 686 (schema (slot-value self 'input))) 688 (defmethod execute ((self selection-exec)) 690 (loop for batch across (execute (slot-value self 'input)) 691 with res = (coerce (evaluate (slot-value self 'expr) batch) 'bit-vector) 692 with schema = (schema batch) 693 with count = (column-count (fields (schema batch))) 694 with filtered = (loop for i from 0 below count 695 collect (filter self (field batch i) res)) 696 collect (make-record-batch :schema schema :fields (coerce filtered 'field-vector))) 697 '(vector record-batch))) 699 (defgeneric filter (self columns selection) 700 (:method ((self selection-exec) (columns column-vector) (selection simple-bit-vector)) 702 (loop for i from 0 below (length selection) 703 unless (zerop (bit selection i)) 704 collect (column-value columns i)) 707 (defclass hash-aggregate-exec (physical-plan) 708 ((input :type physical-plan :initarg :input) 709 (group-expr :type (vector physical-plan) :initarg :group-expr) 710 (agg-expr :type (vector aggregate-physical-expression) :initarg :agg-expr))) 712 (defmethod execute ((self hash-aggregate-exec)) 714 (loop for batch across (execute (slot-value self 'input)) 715 with map = (make-hash-table :test 'equal) 716 with groupkeys = (map 'vector (lambda (x) (evaluate x batch)) (slot-value self 'group-expr)) 717 with aggr-inputs = (map 'vector (lambda (x) (evaluate (slot-value x 'input) batch)) 718 (slot-value self 'agg-expr)) 719 do (loop for row-idx from 0 below (row-count batch) 720 with row-key = (map 'vector 722 (when-let ((val (column-value x row-idx))) 724 (octet-vector (sb-ext:octets-to-string val)) 727 with accs = (if-let ((val (gethash row-key map))) 730 (gethash row-key map) 733 (slot-value self 'agg-expr)))) 734 ;; start accumulating 735 do (loop for i from 0 below (length accs) 736 for accum across accs 737 with val = (column-value (aref aggr-inputs i) row-idx) 738 return (accumulate accum val)) 739 ;; collect results in array 740 with ret = (make-record-batch :schema (slot-value self 'schema) 741 :fields (make-array (hash-table-size map) 743 :initial-element (make-field))) 744 do (loop for row-idx from 0 below (hash-table-size map) 745 for gkey being the hash-keys of map 746 using (hash-value accums) 747 with glen = (length (slot-value self 'group-expr)) 748 do (loop for i from 0 below glen 749 do (setf (aref (aref (fields ret) i) row-idx) 751 do (loop for i from 0 below (length (slot-value self 'agg-expr)) 752 do (setf (aref (aref (fields ret) (+ i glen)) row-idx) 753 (accumulator-value (aref accums i))))) 755 '(vector record-batch))) 759 ;; The Query Planner is effectively a compiler which translates logical 760 ;; expressions and plans into their physical counterparts. 762 (defclass query-planner () ()) 764 (defgeneric make-physical-expression (expr input) 765 (:documentation "Translate logical expression EXPR and logical plan INPUT 766 into a physical expression.") 767 (:method ((expr string) (input logical-plan)) 768 (declare (ignore input)) 770 (:method ((expr number) (input logical-plan)) 771 (declare (ignore input)) 773 (:method ((expr column-expression) (input logical-plan)) 774 (let ((i (position (column-name expr) (fields (schema input)) :key 'field-name :test 'equal))) 775 (make-instance 'column-physical-expression :val i))) 776 (:method ((expr binary-expression) (input logical-plan)) 777 (let ((l (make-physical-expression (lhs expr) input)) 778 (r (make-physical-expression (rhs expr) input))) 780 (eq-expression (make-instance 'eq-physical-expression :lhs l :rhs r)) 781 (neq-expression (make-instance 'neq-physical-expression :lhs l :rhs r)) 782 (gt-expression (make-instance 'gt-physical-expression :lhs l :rhs r)) 783 (gteq-expression (make-instance 'gteq-physical-expression :lhs l :rhs r)) 784 (lt-expression (make-instance 'lt-physical-expression :lhs l :rhs r)) 785 (lteq-expression (make-instance 'lteq-physical-expression :lhs l :rhs r)) 786 (and-expression (make-instance 'and-physical-expression :lhs l :rhs r)) 787 (or-expression (make-instance 'or-physical-expression :lhs l :rhs r)) 788 (add-expression (make-instance 'add-physical-expresion :lhs l :rhs r)) 789 (sub-expression (make-instance 'sub-physical-expression :lhs l :rhs r)) 790 (mult-expression (make-instance 'mult-physical-expression :lhs l :rhs r)) 791 (div-expression (make-instance 'div-physical-expression :lhs l :rhs r)))))) 793 (defgeneric make-physical-plan (plan) 794 (:documentation "Create a physical plan from logical PLAN.") 795 (:method ((plan logical-plan)) 797 (scan-data (make-instance 'scan-exec 798 :data-source (slot-value plan 'data-source) 799 :projection (slot-value plan 'projection))) 800 (projection (make-instance 'projection-exec 801 :schema (make-instance 'schema 804 (lambda (x) (to-field x (slot-value plan 'input))) 805 (slot-value plan 'expr))) 806 :input (make-physical-plan (slot-value plan 'input)) 807 :expr (map 'vector (lambda (x) (make-physical-expression x (slot-value plan 'input))) 808 (slot-value plan 'expr)))) 809 (selection (make-instance 'selection-exec 810 :input (make-physical-plan (slot-value plan 'input)) 811 :expr (make-physical-expression (slot-value plan 'expr) (slot-value plan 'input)))) 812 (aggregate (make-instance 'hash-aggregate-exec 813 :input (make-physical-plan (slot-value plan 'input)) 814 :group-expr (make-physical-expression (slot-value plan 'group-expr) (slot-value plan 'input)) 815 :agg-expr (make-physical-expression (slot-value plan 'agg-expr) (slot-value plan 'input))))))) 819 ;; The Query Optimizer is responsible for walking a QUERY-PLAN and returning a 820 ;; modified version of the same object. Usually we want to run optimization on 821 ;; LOGICAL-PLANs but we also support specializing on PHYSICAL-PLAN. 823 ;; Rule-based Optimizers: projection/predicate push-down, sub-expr elim 825 ;; Lowerings: hdsl -> ldsl 827 ;; Extensibility principle - A low level DSL should have greater than or equal 828 ;; to expressiveness of a high level DSL 830 ;; Transformation cohesion principle - There should be a unique path lowering 831 ;; a high-level DSL to a low-level DSL. This also prevents loops between high 832 ;; and low level DSLs. 834 ;; TBD: Cost-based optimizers 836 (defclass query-optimizer () ()) 838 (defstruct (query-vop (:constructor make-query-vop (info))) 839 "A virtual query operation available to query compilers." 842 (defgeneric optimize-query (self plan) 843 (:documentation "Optimize the query expressed by PLAN using the optimizer SELF.")) 845 ;; Projection Pushdown 846 (defun extract-columns (expr input &optional accum) 847 "Recursively check an expression for field indicators and add the to an 850 (array-index (accumulate accum (field (fields (schema input)) expr))) 851 (column-expression (accumulate accum (column-name expr))) 853 (extract-columns (lhs expr) input accum) 854 (extract-columns (rhs expr) input accum)) 855 (alias-expression (extract-columns (expr expr) input accum)) 856 (cast-expression (extract-columns (expr expr) input accum)) 857 (literal-expression nil))) 859 (defun extract-columns* (exprs input &optional accum) 860 (mapcar (lambda (x) (extract-columns x input accum)) exprs)) 862 (defclass projection-pushdown-optimizer (query-optimizer) ()) 864 (defun %pushdown (plan &optional column-names) 865 (declare (logical-plan plan)) 868 (extract-columns (slot-value plan 'expr) column-names) 869 (let ((input (%pushdown (slot-value plan 'input) column-names))) 870 (make-instance 'projection :input input :expr (slot-value plan 'expr)))) 872 (extract-columns (slot-value plan 'expr) column-names) 873 (let ((input (%pushdown (slot-value plan 'input) column-names))) 874 (make-instance 'selection :input input :expr (slot-value plan 'expr)))) 876 (extract-columns (slot-value plan 'group-expr) column-names) 878 (loop for x across (slot-value plan 'agg-expr) collect (slot-value x 'input)) 880 (let ((input (%pushdown (slot-value plan 'input) column-names))) 881 (make-instance 'aggregate 883 :group-expr (slot-value plan 'group-expr) 884 :agg-expr (slot-value plan 'agg-expr)))) 885 (scan-data (make-instance 'scan-data 886 :path (slot-value plan 'name) 887 :data-source (slot-value plan 'data-source) 888 :projection column-names)))) ;; maybe sort here? 890 (defmethod optimize-query ((self projection-pushdown-optimizer) (plan logical-plan)) 894 (defclass query () ()) 896 (defgeneric make-query (self &rest initargs &key &allow-other-keys) 897 (:documentation "Make a new QUERY object.") 898 (:method ((self t) &rest initargs) 899 (declare (ignore initargs)) 900 (make-instance 'query))) 902 ;;; Execution Context 903 (defclass execution-context () ()) 905 (defgeneric register-df (self name df) 906 (:documentation "Register a DATA-FRAME with an EXECUTION-CONTEXT.")) 908 (defgeneric register-data-source (self name source) 909 (:documentation "Register a DATA-SOURCE with an EXECUTION-CONTEXT.")) 911 (defgeneric register-file (self name path &key type &allow-other-keys) 912 (:documentation "Register a DATA-SOURCE contained in a file of type TYPE at PATH.")) 914 (defgeneric execute* (self df) 915 (:documentation "Execute the DATA-FRAME DF in CONTEXT. This is the stateful version of EXECUTE.") 916 (:method ((self execution-context) (df data-frame)) 917 (declare (ignore self)) 920 (defmethod execute ((self logical-plan)) 923 (optimize-query (make-instance 'projection-pushdown-optimizer) self))))