changelog shortlog graph tags branches changeset files revisions annotate raw help

Mercurial > core / lisp/lib/obj/query.lisp

changeset 698: 96958d3eb5b0
parent: 97dd03beda03
author: Richard Westhaver <ellis@rwest.io>
date: Fri, 04 Oct 2024 22:04:59 -0400
permissions: -rw-r--r--
description: fixes
1 ;;; obj/query/pkg.lisp --- Query Objects
2 
3 ;; Lisp primitive Query objects for DIY query engines.
4 
5 ;;; Commentary:
6 
7 ;; This package provides the base set of classes and methods for implementing
8 ;; query engines.
9 
10 ;; The intention is to use these objects in several high-level packages where
11 ;; we need the ability to ask complex questions about some arbitrary data
12 ;; source.
13 
14 ;; The type of high-level packages can loosely be categorized as:
15 
16 ;; - Frontends :: The interface exposed to the user - SQL, Prolog, etc.
17 
18 ;; - Middleware :: interfaces which are used internally and exposed publicly -
19 ;; query planners/optimizers/ast
20 
21 ;; - Backends :: The interface exposed to the underlying data sources -
22 ;; RocksDB, SQLite, etc.
23 
24 ;;;; Refs
25 
26 ;; https://gist.github.com/twitu/221c8349887cec0a83b395e4cbb492a7
27 
28 ;; https://www1.columbia.edu/sec/acis/db2/db2d0/db2d0103.htm
29 
30 ;; https://howqueryengineswork.com/
31 
32 ;;; Code:
33 (in-package :obj/query)
34 
35 ;;; Types
36 (eval-always
37  (defvar *literal-value-types* '(boolean fixnum signed-byte unsigned-byte float double-float string)))
38 
39 (deftype literal-value-type () `(or ,@*literal-value-types*))
40 
41 ;;; Field
42 (defstruct field
43  (name (symbol-name (gensym "#")) :type simple-string)
44  (type t :type (or symbol list)))
45 
46 (defmethod make-load-form ((self field) &optional env)
47  (declare (ignore env))
48  `(make-field :name ,(field-name self) :type ,(field-type self)))
49 
50 ;;; Field Vectors
51 (deftype field-vector () '(vector field))
52 
53 ;; convenience interface for FIELD-VECTOR
54 (defclass column-vector () ((data :type simple-vector :accessor column-data)))
55 
56 (defclass literal-value-vector (column-vector)
57  ((type :type literal-value-type :initarg :type :accessor column-type)
58  (data :initarg :data :accessor column-data)
59  (size :type fixnum :initarg :size :accessor column-size)))
60 
61 (defgeneric column-literal-value (self)
62  (:method ((self literal-value-vector))
63  (column-data self)))
64 
65 (defgeneric column-type (self)
66  (:method ((self column-vector))
67  (array-element-type (column-data self))))
68 
69 (defgeneric column-value (self i)
70  (:method ((self column-vector) (i fixnum))
71  (aref (column-data self) i))
72  (:method ((self literal-value-vector) (i fixnum))
73  (if (or (< i 0) (>= i (column-size self)))
74  (error 'simple-error :format-control "index out of bounds: ~A" :format-arguments i)
75  (column-literal-value self))))
76 
77 ;;; Schema
78 (defclass schema ()
79  ((fields :type field-vector :initarg :fields :accessor fields)))
80 
81 (defun make-schema (&rest fields)
82  (make-instance 'schema :fields (coerce fields 'field-vector)))
83 
84 (defmethod print-object ((self schema) stream)
85  (print-unreadable-object (self stream :type t)
86  (format stream ":fields ~A" (map 'list 'field-name (fields self)))))
87 
88 (defmethod make-load-form ((self schema) &optional env)
89  (declare (ignore env))
90  `(make-instance ,(class-of self) :fields ,(fields self)))
91 
92 (defclass schema-metadata ()
93  ((metadata :initarg :metadata :accessor schema-metadata)))
94 
95 (defmethod make-load-form ((self schema-metadata) &optional env)
96  (declare (ignore env))
97  `(make-instance ,(class-of self) :metadata ,(schema-metadata self)))
98 
99 (defgeneric column-size (self)
100  (:method ((self column-vector))
101  (length (column-data self))))
102 
103 ;;; Record Batch
104 (defstruct record-batch
105  (schema (make-schema) :type schema)
106  (fields #() :type column-vector))
107 
108 (defmethod schema ((self record-batch))
109  (record-batch-schema self))
110 
111 (defmethod make-load-form ((self record-batch) &optional env)
112  (declare (ignore env))
113  `(make-record-batch :schema ,(record-batch-schema self) :fields ,(record-batch-fields self)))
114 
115 ;;; Proto
116 (defgeneric field (self n)
117  (:method ((self record-batch) (n fixnum))
118  (aref (record-batch-fields self) n)))
119 
120 (defgeneric fields (self)
121  (:method ((self record-batch))
122  (record-batch-fields self)))
123 
124 (defgeneric schema (self)
125  (:method ((self record-batch))
126  (record-batch-schema self)))
127 
128 (defgeneric derive-schema (self))
129 (defgeneric load-schema (self schema))
130 (defgeneric load-field (self field))
131 (defgeneric select (self names)
132  (:method ((self schema) (names list))
133  (let* ((fields (fields self))
134  (ret (make-array (length fields) :element-type 'field :fill-pointer 0
135  :initial-element (make-field))))
136  (make-instance 'schema
137  :fields (dolist (n names ret)
138  (if-let ((found (find n fields :test 'equal :key 'field-name)))
139  (vector-push found ret)
140  (error 'invalid-argument :item n :reason "Invalid column name"))))))
141  (:method ((self schema) (names vector))
142  (let* ((fields (fields self))
143  (ret (make-array (length fields) :element-type 'field :fill-pointer 0
144  :initial-element (make-field))))
145  (make-instance 'schema
146  :fields (loop for n across names
147  do (if-let ((found (find n fields :test 'equal :key 'field-name)))
148  (vector-push found ret)
149  (error 'invalid-argument :item n :reason "Invalid column name"))
150  finally (return ret))))))
151 
152 (defgeneric project (self indices)
153  (:method ((self schema) (indices list))
154  (make-instance 'schema
155  :fields (coerce (mapcar (lambda (i) (aref (fields self) i)) indices) 'field-vector)))
156  (:method ((self schema) (indices vector))
157  (make-instance 'schema
158  :fields (coerce
159  (loop for i across indices
160  collect (aref (fields self) i))
161  'field-vector))))
162 
163 (defgeneric row-count (self)
164  (:method ((self record-batch))
165  (sequence:length (aref (record-batch-fields self) 0))))
166 
167 (defgeneric column-count (self)
168  (:method ((self record-batch))
169  (length (record-batch-fields self))))
170 
171 ;;; Data Source
172 (defclass data-source ()
173  ((schema :type schema :accessor schema)))
174 
175 (defclass file-data-source (data-source)
176  ((path :initarg :path :accessor file-data-path)))
177 
178 (defgeneric scan-data (self projection)
179  (:documentation "Scan the data source, selecting the specified columns."))
180 
181 ;;; Expressions
182 (defclass query-expression () ())
183 
184 (defclass query-plan ()
185  ((schema :type schema :accessor schema :initarg :schema)
186  (children :type (vector query-plan))))
187 
188 (defclass logical-plan (query-plan)
189  ((children :type (vector logical-plan) :accessor children :initarg :children)))
190 
191 (defclass physical-plan (query-plan)
192  ((children :type (vector physical-plan))))
193 
194 ;;; Logical Expressions
195 (defclass logical-expression (query-expression) ())
196 
197 (defgeneric to-field (self input)
198  (:method ((self string) (input logical-plan))
199  (declare (ignore input))
200  (make-field :name self :type 'string))
201  (:method ((self number) (input logical-plan))
202  (declare (ignore input))
203  (make-field :name (princ-to-string self) :type 'number)))
204 
205 (defclass column-expression (logical-expression)
206  ((name :type string :initarg :name :accessor column-name)))
207 
208 (defmethod to-field ((self column-expression) (input logical-plan))
209  (or (find (column-name self) (fields (schema input)) :test 'equal :key 'field-name)
210  (error 'invalid-argument :item (column-name self) :reason "Invalid column name")))
211 
212 (defmethod df-col ((self string))
213  (make-instance 'column-expression :name self))
214 
215 (defclass literal-expression (logical-expression) ())
216 
217 ;;;;; Alias
218 (defclass alias-expression (logical-expression)
219  ((expr :type logical-expression :initarg :expr :accessor expr)
220  (alias :type string :initarg :alias)))
221 
222 (defclass cast-expression (logical-expression)
223  ((expr :type logical-expression :initarg :expr :accessor expr)
224  (data-type :type form :initarg :data-type)))
225 
226 (defmethod to-field ((self cast-expression) (input logical-plan))
227  (make-field :name (field-name (to-field (expr self) input)) :type (slot-value self 'data-type)))
228 
229 ;;;;; Unary
230 (defclass unary-expression (logical-expression)
231  ((expr :type logical-expression :accessor expr)))
232 
233 ;;;;; Binary
234 (defclass binary-expression (logical-expression)
235  ((lhs :type logical-expression :initarg :lhs :accessor lhs)
236  (rhs :type logical-expression :initarg :rhs :accessor rhs)))
237 
238 (defgeneric binary-expression-name (self))
239 (defgeneric binary-expression-op (self))
240 
241 (defclass boolean-binary-expression (binary-expression)
242  ((name :initarg :name :type string :accessor binary-expression-name)
243  (op :initarg :op :type symbol :accessor binary-expression-op)))
244 
245 (defmethod to-field ((self boolean-binary-expression) (input logical-plan))
246  (declare (ignore input))
247  (make-field :name (binary-expression-name self) :type 'boolean))
248 
249 ;; Equiv Expr
250 (defclass eq-expression (boolean-binary-expression) ()
251  (:default-initargs
252  :name "eq"
253  :op 'eq))
254 
255 (defclass neq-expression (boolean-binary-expression) ()
256  (:default-initargs
257  :name "neq"
258  :op 'neq))
259 
260 (defclass gt-expression (boolean-binary-expression) ()
261  (:default-initargs
262  :name "gt"
263  :op '>))
264 
265 (defclass lt-expression (boolean-binary-expression) ()
266  (:default-initargs
267  :name "lt"
268  :op '<))
269 
270 (defclass gteq-expression (boolean-binary-expression) ()
271  (:default-initargs
272  :name "gteq"
273  :op '>=))
274 
275 (defclass lteq-expression (boolean-binary-expression) ()
276  (:default-initargs
277  :name "lteq"
278  :op '<=))
279 
280 ;; Bool Expr
281 (defclass and-expression (boolean-binary-expression) ()
282  (:default-initargs
283  :name "and"
284  :op 'and))
285 
286 (defclass or-expression (boolean-binary-expression) ()
287  (:default-initargs
288  :name "or"
289  :op 'or))
290 
291 ;; Math Expr
292 (defclass math-expression (binary-expression)
293  ((name :initarg :name :type string :accessor binary-expression-name)
294  (op :initarg :op :type symbol :accessor binary-expression-op)))
295 
296 ;; TODO 2024-08-03: ???
297 (defmethod to-field ((self math-expression) (input logical-plan))
298  (declare (ignorable input))
299  (make-field :name "*" :type (field-type (to-field (lhs self) input))))
300 
301 (defclass add-expression (math-expression) ()
302  (:default-initargs
303  :name "add"
304  :op '+))
305 
306 (defclass sub-expression (math-expression) ()
307  (:default-initargs
308  :name "sub"
309  :op '-))
310 
311 (defclass mult-expression (math-expression) ()
312  (:default-initargs
313  :name "mult"
314  :op '*))
315 
316 (defclass div-expression (math-expression) ()
317  (:default-initargs
318  :name "div"
319  :op '/))
320 
321 (defclass mod-expression (math-expression) ()
322  (:default-initargs
323  :name "mod"
324  :op 'mod))
325 
326 ;;;;; Agg Expr
327 (deftype aggregate-function () `(function ((input logical-expression)) query-expression))
328 
329 (deftype aggregate-function-designator () `(or aggregate-function symbol))
330 
331 (defclass aggregate-expression (logical-expression)
332  ((name :type string)
333  (expr :type logical-expression :accessor expr)))
334 
335 (defgeneric aggregate-expression-p (self)
336  (:method ((self aggregate-expression)) t)
337  (:method ((self alias-expression)) (aggregate-expression-p (expr self)))
338  (:method ((self t)) nil))
339 
340 (defmethod to-field ((self aggregate-expression) (input logical-plan))
341  (declare (ignorable input))
342  (make-field :name (slot-value self 'name) :type (field-type (to-field (slot-value self 'expr) input))))
343 
344 (defclass sum-expression (aggregate-expression) ()
345  (:default-initargs
346  :name "SUM"))
347 
348 (defclass min-expression (aggregate-expression) ()
349  (:default-initargs
350  :name "MIN"))
351 
352 (defclass max-expression (aggregate-expression) ()
353  (:default-initargs
354  :name "MAX"))
355 
356 (defclass avg-expression (aggregate-expression) ()
357  (:default-initargs
358  :name "AVG"))
359 
360 (defclass count-expression (aggregate-expression) ()
361  (:default-initargs
362  :name "COUNT"))
363 
364 (defmethod to-field ((self count-expression) (input logical-plan))
365  (declare (ignore input))
366  (make-field :name "COUNT" :type 'number))
367 
368 ;;; Logical Plan
369 
370 ;;;;; Scan
371 (defclass scan-data (logical-plan)
372  ((path :type string :initarg :path)
373  (data-source :type data-source :initarg :data-source)
374  (projection :type (vector string) :initarg :projection)))
375 
376 (defmethod derive-schema ((self scan-data))
377  (let ((proj (slot-value self 'projection)))
378  (if (= 0 (length proj))
379  (slot-value self 'schema)
380  (select (slot-value self 'schema) proj))))
381 
382 (defmethod schema ((self scan-data))
383  (derive-schema self))
384 
385 ;;;;; Projection
386 (defclass projection (logical-plan)
387  ((input :type logical-plan :initarg :input)
388  (expr :type (vector logical-expression) :initarg :expr)))
389 
390 (defmethod schema ((self projection))
391  (schema (slot-value self 'input)))
392 
393 ;;;;; Selection
394 (defclass selection (logical-plan)
395  ((input :type logical-plan :initarg :input)
396  (expr :type logical-expression :initarg :expr)))
397 
398 (defmethod schema ((self selection))
399  (schema (slot-value self 'input)))
400 
401 ;;;;; Aggregate
402 (defclass aggregate (logical-plan)
403  ((input :type logical-plan :initarg :input)
404  (group-expr :type (vector logical-expression) :initarg :group-expr)
405  (agg-expr :type (vector aggregate-expression) :initarg :agg-expr)))
406 
407 (defmethod schema ((self aggregate))
408  (let ((input (slot-value self 'input))
409  (ret))
410  (loop for g across (slot-value self 'group-expr)
411  do (push (to-field g input) ret))
412  (loop for a across (slot-value self 'agg-expr)
413  do (push (to-field a input) ret))
414  (apply 'make-schema ret)))
415 
416 ;;;;; Limit
417 (defclass limit (logical-plan)
418  ((input :type logical-plan :initarg :input)
419  (limit :type integer)))
420 
421 (defmethod schema ((self limit))
422  (setf (slot-value self 'schema)
423  (schema (slot-value self 'input))))
424 
425 (defmethod children ((self limit))
426  (setf (slot-value self 'children)
427  (children (slot-value self 'input))))
428 
429 ;;;;; Joins
430 (defclass join (logical-plan)
431  ((left :accessor lhs)
432  (right :accessor rhs)
433  (on :accessor join-on)))
434 
435 (defclass inner-join (join) ())
436 ;; (defclass outer-join (join))
437 (defclass left-join (join) ())
438 (defclass right-join (join) ())
439 ;; left-outer-join
440 ;; right-outer-join
441 ;; semi-join
442 ;; anti-join
443 ;; cross-join
444 
445 (defmethod schema ((self join))
446  ;; TODO 2024-08-04: test better dupe impl
447  (let ((dupes (mapcon #'(lambda (l) (when (eq (car l) (second l)) (list (car l))))
448  (coerce (join-on self) 'list)))
449  (schema (make-instance 'schema)))
450  (setf (fields schema)
451  (typecase self
452  (right-join
453  (let ((l (remove-if (lambda (x) (member x dupes :test 'string-equal)) (fields (schema (lhs self)))))
454  (r (fields (schema (rhs self)))))
455  (merge 'vector l r (lambda (x y) (declare (ignore y)) x))))
456  (inner-join
457  (let ((l (fields (schema (lhs self))))
458  (r (remove-if (lambda (x) (member x dupes :test 'string-equal)) (fields (schema (rhs self))))))
459  (merge 'vector l r (lambda (x y) (declare (ignore y)) x))))))
460  schema))
461 
462 (defmethod children ((self join))
463  (vector (lhs self) (rhs self)))
464 
465 ;;; Subqueries
466 
467 ;; TODO 2024-08-02:
468 
469 ;; subquery
470 
471 ;; correlated-subquery
472 
473 ;; SELECT id, name, (SELECT count(*) FROM orders WHERE customer_id = customer.id) AS num_orders FROM customers
474 
475 ;; uncorrelated-subquery
476 
477 ;; scalar-subquery
478 
479 ;; SELECT * FROM orders WHERE total > (SELECT avg(total) FROM sales WHERE customer_state = 'CA')
480 
481 ;; NOTE 2024-08-02: EXISTS, IN, NOT EXISTS, and NOT IN are also subqueries
482 
483 ;;; Dataframes
484 ;; minimal data-frame abstraction. methods are prefixed with 'DF-'.
485 (defstruct (data-frame (:constructor make-data-frame (&optional plan)))
486  (plan (make-instance 'logical-plan) :type logical-plan))
487 
488 (defgeneric df-col (self))
489 
490 (defgeneric df-project (df exprs)
491  (:method ((df data-frame) (expr list))
492  (df-project df (coerce expr 'vector)))
493  (:method ((df data-frame) (expr vector))
494  (setf (data-frame-plan df)
495  (make-instance 'projection
496  :input (data-frame-plan df)
497  :expr expr))
498  df))
499 
500 (defgeneric df-filter (df expr)
501  (:method ((df data-frame) (expr logical-expression))
502  (setf (data-frame-plan df)
503  (make-instance 'selection :input (data-frame-plan df) :expr expr))
504  df))
505 
506 (defgeneric df-aggregate (df group-by agg-expr)
507  (:method ((df data-frame) (group-by vector) (agg-expr vector))
508  (setf (data-frame-plan df)
509  (make-instance 'aggregate :input (data-frame-plan df)
510  :group-expr group-by
511  :agg-expr agg-expr))
512  df)
513  (:method ((df data-frame) (group-by list) (agg-expr list))
514  (df-aggregate df (coerce group-by 'vector) (coerce agg-expr 'vector))))
515 
516 (defgeneric make-df (self &key &allow-other-keys)
517  (:method ((self null) &key)
518  (make-data-frame)))
519 
520 (defmethod schema ((df data-frame))
521  (schema (data-frame-plan df)))
522 
523 (defmethod (setf schema) ((schema schema) (df data-frame))
524  (setf (slot-value (data-frame-plan df) 'schema) schema))
525 
526 (defgeneric df-plan (df)
527  (:documentation "Return the logical plan associated with this data-frame.")
528  (:method ((df data-frame)) (data-frame-plan df)))
529 
530 (defmethod (setf df-plan) ((plan logical-plan) (df data-frame))
531  (setf (df-plan df) plan))
532 
533 ;;; Physical Expression
534 (defclass physical-expression (query-expression) ())
535 
536 (defclass literal-physical-expression (physical-expression) ())
537 
538 (defgeneric evaluate (self input)
539  (:documentation "Evaluate the expression SELF with INPUT and return a COLUMN-VECTOR result.")
540  (:method ((self string) (input record-batch))
541  (make-instance 'literal-value-vector
542  :size (row-count input)
543  :type 'string
544  :data (sb-ext:string-to-octets self)))
545  (:method ((self number) (input record-batch))
546  (make-instance 'literal-value-vector :size (row-count input) :type 'number :data self)))
547 
548 (defclass column-physical-expression (physical-expression)
549  ((val :type array-index :initarg :val)))
550 
551 (defmethod evaluate ((self column-physical-expression) (input record-batch))
552  (field input (slot-value self 'val)))
553 
554 (defclass binary-physical-expression (physical-expression)
555  ((lhs :type physical-expression :accessor lhs :initarg :lhs)
556  (rhs :type physical-expression :accessor rhs :initarg :rhs)))
557 
558 (defgeneric evaluate2 (self lhs rhs))
559 
560 (defmethod evaluate ((self binary-physical-expression) (input record-batch))
561  (let ((ll (evaluate (lhs self) input))
562  (rr (evaluate (rhs self) input)))
563  (assert (= (length ll) (length rr)))
564  (if (eql (column-type ll) (column-type rr))
565  (evaluate2 self ll rr)
566  (error "invalid state! lhs != rhs"))))
567 
568 (defclass eq-physical-expression (binary-physical-expression) ())
569 
570 (defmethod evaluate2 ((self eq-physical-expression) lhs rhs)
571  (declare (ignore self))
572  (equal lhs rhs))
573 
574 (defclass neq-physical-expression (binary-physical-expression) ())
575 
576 (defmethod evaluate2 ((self neq-physical-expression) lhs rhs)
577  (declare (ignore self))
578  (equal lhs rhs))
579 
580 (defclass lt-physical-expression (binary-physical-expression) ())
581 
582 (defclass gt-physical-expression (binary-physical-expression) ())
583 
584 (defclass lteq-physical-expression (binary-physical-expression) ())
585 
586 (defclass gteq-physical-expression (binary-physical-expression) ())
587 
588 (defclass and-physical-expression (binary-physical-expression) ())
589 
590 (defclass or-physical-expression (binary-physical-expression) ())
591 
592 (defclass math-physical-expression (binary-physical-expression) ())
593 
594 (defmethod evaluate2 ((self math-physical-expression) (lhs column-vector) (rhs column-vector))
595  (coerce (loop for i below (column-size lhs)
596  collect (evaluate2 self (column-value lhs i) (column-value rhs i)))
597  'field-vector))
598 
599 (defclass add-physical-expresion (math-expression) ())
600 
601 (defmethod evaluate2 ((self add-physical-expresion) lhs rhs)
602  (declare (ignore self))
603  (+ lhs rhs))
604 
605 (defclass sub-physical-expression (math-expression) ())
606 
607 (defmethod evaluate2 ((self sub-physical-expression) lhs rhs)
608  (declare (ignore self))
609  (- lhs rhs))
610 
611 (defclass mult-physical-expression (math-expression) ())
612 
613 (defmethod evaluate2 ((self mult-physical-expression) lhs rhs)
614  (declare (ignore self))
615  (* lhs rhs))
616 
617 (defclass div-physical-expression (math-expression) ())
618 
619 (defmethod evaluate2 ((self div-physical-expression) lhs rhs)
620  (declare (ignore self))
621  (/ lhs rhs))
622 
623 (defclass accumulator ()
624  ((value :initarg :value :accessor accumulator-value)))
625 
626 (defgeneric accumulate (self val)
627  (:method ((self accumulator) val)
628  (when val
629  (setf (accumulator-value self) (+ val (accumulator-value self)))))
630  (:method ((self list) val)
631  (push val self)))
632 
633 (defgeneric make-accumulator (self))
634 
635 ;; max-accumulator
636 (defclass max-accumulator (accumulator) ())
637 
638 (defmethod accumulate ((self max-accumulator) (val number))
639  (when (> val (accumulator-value self))
640  (setf (accumulator-value self) val)))
641 
642 (defclass aggregate-physical-expression (physical-expression)
643  ((input :type physical-expression)))
644 
645 (defclass max-physical-expression (aggregate-physical-expression) ())
646 
647 (defmethod make-accumulator ((self max-physical-expression))
648  (make-instance 'max-accumulator))
649 
650 ;;; Physical Plan
651 (defgeneric execute (self)
652  (:documentation "Execute the LOGICAL-PLAN represented by object SELF.")
653  (:method ((self data-frame))
654  (execute (df-plan self))))
655 
656 (defclass scan-exec (physical-plan)
657  ((data-source :type data-source :initarg :data-source)
658  (projection :type (vector string) :initarg :projection)))
659 
660 (defmethod schema ((self scan-exec))
661  (select (schema (slot-value self 'data-source)) (slot-value self 'projection)))
662 
663 (defmethod execute ((self scan-exec))
664  (scan-data (slot-value self 'data-source) (slot-value self 'projection)))
665 
666 (defclass projection-exec (physical-plan)
667  ((input :type physical-plan :initarg :input)
668  (expr :type (vector physical-expression) :initarg :expr)))
669 
670 (defmethod execute ((self projection-exec))
671  (coerce
672  (loop for batch across (fields (execute (slot-value self 'input)))
673  collect (make-record-batch :schema (slot-value self 'schema)
674  :fields (coerce
675  (loop for e across (slot-value self 'expr)
676  collect (evaluate e batch))
677  'field-vector)))
678  '(vector record-batch)))
679 
680 
681 (defclass selection-exec (physical-plan)
682  ((input :type physical-plan :initarg :input)
683  (expr :type physical-expression :initarg :expr)))
684 
685 (defmethod schema ((self selection-exec))
686  (schema (slot-value self 'input)))
687 
688 (defmethod execute ((self selection-exec))
689  (coerce
690  (loop for batch across (execute (slot-value self 'input))
691  with res = (coerce (evaluate (slot-value self 'expr) batch) 'bit-vector)
692  with schema = (schema batch)
693  with count = (column-count (fields (schema batch)))
694  with filtered = (loop for i from 0 below count
695  collect (filter self (field batch i) res))
696  collect (make-record-batch :schema schema :fields (coerce filtered 'field-vector)))
697  '(vector record-batch)))
698 
699 (defgeneric filter (self columns selection)
700  (:method ((self selection-exec) (columns column-vector) (selection simple-bit-vector))
701  (coerce
702  (loop for i from 0 below (length selection)
703  unless (zerop (bit selection i))
704  collect (column-value columns i))
705  'field-vector)))
706 
707 (defclass hash-aggregate-exec (physical-plan)
708  ((input :type physical-plan :initarg :input)
709  (group-expr :type (vector physical-plan) :initarg :group-expr)
710  (agg-expr :type (vector aggregate-physical-expression) :initarg :agg-expr)))
711 
712 (defmethod execute ((self hash-aggregate-exec))
713  (coerce
714  (loop for batch across (execute (slot-value self 'input))
715  with map = (make-hash-table :test 'equal)
716  with groupkeys = (map 'vector (lambda (x) (evaluate x batch)) (slot-value self 'group-expr))
717  with aggr-inputs = (map 'vector (lambda (x) (evaluate (slot-value x 'input) batch))
718  (slot-value self 'agg-expr))
719  do (loop for row-idx from 0 below (row-count batch)
720  with row-key = (map 'vector
721  (lambda (x)
722  (when-let ((val (column-value x row-idx)))
723  (typecase val
724  (octet-vector (sb-ext:octets-to-string val))
725  (t val))))
726  groupkeys)
727  with accs = (if-let ((val (gethash row-key map)))
728  val
729  (setf
730  (gethash row-key map)
731  (map 'vector
732  #'make-accumulator
733  (slot-value self 'agg-expr))))
734  ;; start accumulating
735  do (loop for i from 0 below (length accs)
736  for accum across accs
737  with val = (column-value (aref aggr-inputs i) row-idx)
738  return (accumulate accum val))
739  ;; collect results in array
740  with ret = (make-record-batch :schema (slot-value self 'schema)
741  :fields (make-array (hash-table-size map)
742  :element-type 'field
743  :initial-element (make-field)))
744  do (loop for row-idx from 0 below (hash-table-size map)
745  for gkey being the hash-keys of map
746  using (hash-value accums)
747  with glen = (length (slot-value self 'group-expr))
748  do (loop for i from 0 below glen
749  do (setf (aref (aref (fields ret) i) row-idx)
750  (aref gkey i)))
751  do (loop for i from 0 below (length (slot-value self 'agg-expr))
752  do (setf (aref (aref (fields ret) (+ i glen)) row-idx)
753  (accumulator-value (aref accums i)))))
754  collect ret))
755  '(vector record-batch)))
756 
757 ;;; Planner
758 
759 ;; The Query Planner is effectively a compiler which translates logical
760 ;; expressions and plans into their physical counterparts.
761 
762 (defclass query-planner () ())
763 
764 (defgeneric make-physical-expression (expr input)
765  (:documentation "Translate logical expression EXPR and logical plan INPUT
766  into a physical expression.")
767  (:method ((expr string) (input logical-plan))
768  (declare (ignore input))
769  expr)
770  (:method ((expr number) (input logical-plan))
771  (declare (ignore input))
772  expr)
773  (:method ((expr column-expression) (input logical-plan))
774  (let ((i (position (column-name expr) (fields (schema input)) :key 'field-name :test 'equal)))
775  (make-instance 'column-physical-expression :val i)))
776  (:method ((expr binary-expression) (input logical-plan))
777  (let ((l (make-physical-expression (lhs expr) input))
778  (r (make-physical-expression (rhs expr) input)))
779  (etypecase expr
780  (eq-expression (make-instance 'eq-physical-expression :lhs l :rhs r))
781  (neq-expression (make-instance 'neq-physical-expression :lhs l :rhs r))
782  (gt-expression (make-instance 'gt-physical-expression :lhs l :rhs r))
783  (gteq-expression (make-instance 'gteq-physical-expression :lhs l :rhs r))
784  (lt-expression (make-instance 'lt-physical-expression :lhs l :rhs r))
785  (lteq-expression (make-instance 'lteq-physical-expression :lhs l :rhs r))
786  (and-expression (make-instance 'and-physical-expression :lhs l :rhs r))
787  (or-expression (make-instance 'or-physical-expression :lhs l :rhs r))
788  (add-expression (make-instance 'add-physical-expresion :lhs l :rhs r))
789  (sub-expression (make-instance 'sub-physical-expression :lhs l :rhs r))
790  (mult-expression (make-instance 'mult-physical-expression :lhs l :rhs r))
791  (div-expression (make-instance 'div-physical-expression :lhs l :rhs r))))))
792 
793 (defgeneric make-physical-plan (plan)
794  (:documentation "Create a physical plan from logical PLAN.")
795  (:method ((plan logical-plan))
796  (etypecase plan
797  (scan-data (make-instance 'scan-exec
798  :data-source (slot-value plan 'data-source)
799  :projection (slot-value plan 'projection)))
800  (projection (make-instance 'projection-exec
801  :schema (make-instance 'schema
802  :fields
803  (map 'field-vector
804  (lambda (x) (to-field x (slot-value plan 'input)))
805  (slot-value plan 'expr)))
806  :input (make-physical-plan (slot-value plan 'input))
807  :expr (map 'vector (lambda (x) (make-physical-expression x (slot-value plan 'input)))
808  (slot-value plan 'expr))))
809  (selection (make-instance 'selection-exec
810  :input (make-physical-plan (slot-value plan 'input))
811  :expr (make-physical-expression (slot-value plan 'expr) (slot-value plan 'input))))
812  (aggregate (make-instance 'hash-aggregate-exec
813  :input (make-physical-plan (slot-value plan 'input))
814  :group-expr (make-physical-expression (slot-value plan 'group-expr) (slot-value plan 'input))
815  :agg-expr (make-physical-expression (slot-value plan 'agg-expr) (slot-value plan 'input)))))))
816 
817 ;;; Optimizer
818 
819 ;; The Query Optimizer is responsible for walking a QUERY-PLAN and returning a
820 ;; modified version of the same object. Usually we want to run optimization on
821 ;; LOGICAL-PLANs but we also support specializing on PHYSICAL-PLAN.
822 
823 ;; Rule-based Optimizers: projection/predicate push-down, sub-expr elim
824 
825 ;; Lowerings: hdsl -> ldsl
826 
827 ;; Extensibility principle - A low level DSL should have greater than or equal
828 ;; to expressiveness of a high level DSL
829 
830 ;; Transformation cohesion principle - There should be a unique path lowering
831 ;; a high-level DSL to a low-level DSL. This also prevents loops between high
832 ;; and low level DSLs.
833 
834 ;; TBD: Cost-based optimizers
835 ;; TODO 2024-07-10:
836 (defclass query-optimizer () ())
837 
838 (defstruct (query-vop (:constructor make-query-vop (info)))
839  "A virtual query operation available to query compilers."
840  (info nil))
841 
842 (defgeneric optimize-query (self plan)
843  (:documentation "Optimize the query expressed by PLAN using the optimizer SELF."))
844 
845 ;; Projection Pushdown
846 (defun extract-columns (expr input &optional accum)
847  "Recursively check an expression for field indicators and add the to an
848 accumulator."
849  (etypecase expr
850  (array-index (accumulate accum (field (fields (schema input)) expr)))
851  (column-expression (accumulate accum (column-name expr)))
852  (binary-expression
853  (extract-columns (lhs expr) input accum)
854  (extract-columns (rhs expr) input accum))
855  (alias-expression (extract-columns (expr expr) input accum))
856  (cast-expression (extract-columns (expr expr) input accum))
857  (literal-expression nil)))
858 
859 (defun extract-columns* (exprs input &optional accum)
860  (mapcar (lambda (x) (extract-columns x input accum)) exprs))
861 
862 (defclass projection-pushdown-optimizer (query-optimizer) ())
863 
864 (defun %pushdown (plan &optional column-names)
865  (declare (logical-plan plan))
866  (etypecase plan
867  (projection
868  (extract-columns (slot-value plan 'expr) column-names)
869  (let ((input (%pushdown (slot-value plan 'input) column-names)))
870  (make-instance 'projection :input input :expr (slot-value plan 'expr))))
871  (selection
872  (extract-columns (slot-value plan 'expr) column-names)
873  (let ((input (%pushdown (slot-value plan 'input) column-names)))
874  (make-instance 'selection :input input :expr (slot-value plan 'expr))))
875  (aggregate
876  (extract-columns (slot-value plan 'group-expr) column-names)
877  (extract-columns*
878  (loop for x across (slot-value plan 'agg-expr) collect (slot-value x 'input))
879  column-names)
880  (let ((input (%pushdown (slot-value plan 'input) column-names)))
881  (make-instance 'aggregate
882  :input input
883  :group-expr (slot-value plan 'group-expr)
884  :agg-expr (slot-value plan 'agg-expr))))
885  (scan-data (make-instance 'scan-data
886  :path (slot-value plan 'name)
887  :data-source (slot-value plan 'data-source)
888  :projection column-names)))) ;; maybe sort here?
889 
890 (defmethod optimize-query ((self projection-pushdown-optimizer) (plan logical-plan))
891  (%pushdown plan))
892 
893 ;;; Query
894 (defclass query () ())
895 
896 (defgeneric make-query (self &rest initargs &key &allow-other-keys)
897  (:documentation "Make a new QUERY object.")
898  (:method ((self t) &rest initargs)
899  (declare (ignore initargs))
900  (make-instance 'query)))
901 
902 ;;; Execution Context
903 (defclass execution-context () ())
904 
905 (defgeneric register-df (self name df)
906  (:documentation "Register a DATA-FRAME with an EXECUTION-CONTEXT."))
907 
908 (defgeneric register-data-source (self name source)
909  (:documentation "Register a DATA-SOURCE with an EXECUTION-CONTEXT."))
910 
911 (defgeneric register-file (self name path &key type &allow-other-keys)
912  (:documentation "Register a DATA-SOURCE contained in a file of type TYPE at PATH."))
913 
914 (defgeneric execute* (self df)
915  (:documentation "Execute the DATA-FRAME DF in CONTEXT. This is the stateful version of EXECUTE.")
916  (:method ((self execution-context) (df data-frame))
917  (declare (ignore self))
918  (execute df)))
919 
920 (defmethod execute ((self logical-plan))
921  (execute
922  (make-physical-plan
923  (optimize-query (make-instance 'projection-pushdown-optimizer) self))))