changelog shortlog graph tags branches changeset files revisions annotate raw help

Mercurial > core / lisp/lib/obj/query.lisp

changeset 576: 60c7b1c83c47
parent: efb4a19ff530
child: 806c2b214df8
author: Richard Westhaver <ellis@rwest.io>
date: Sun, 04 Aug 2024 16:58:23 -0400
permissions: -rw-r--r--
description: more sql query updates
1 ;;; obj/query/pkg.lisp --- Query Objects
2 
3 ;; Lisp primitive Query objects for DIY query engines.
4 
5 ;;; Commentary:
6 
7 ;; This package provides the base set of classes and methods for implementing
8 ;; query engines.
9 
10 ;; The intention is to use these objects in several high-level packages where
11 ;; we need the ability to ask complex questions about some arbitrary data
12 ;; source.
13 
14 ;; The type of high-level packages can loosely be categorized as:
15 
16 ;; - Frontends :: The interface exposed to the user - SQL, Prolog, etc.
17 
18 ;; - Middleware :: interfaces which are used internally and exposed publicly -
19 ;; query planners/optimizers/ast
20 
21 ;; - Backends :: The interface exposed to the underlying data sources -
22 ;; RocksDB, SQLite, etc.
23 
24 ;;;; Refs
25 
26 ;; https://gist.github.com/twitu/221c8349887cec0a83b395e4cbb492a7
27 
28 ;; https://www1.columbia.edu/sec/acis/db2/db2d0/db2d0103.htm
29 
30 ;; https://howqueryengineswork.com/
31 
32 ;;; Code:
33 (in-package :obj/query)
34 
35 ;;; Types
36 (eval-always
37  (defvar *literal-value-types* '(boolean fixnum signed-byte unsigned-byte float double-float string)))
38 
39 (deftype literal-value-type () `(or ,@*literal-value-types*))
40 
41 ;;; Field
42 (defstruct field
43  (name (symbol-name (gensym "#")) :type simple-string)
44  (type t :type (or symbol list)))
45 
46 (defmethod make-load-form ((self field) &optional env)
47  (declare (ignore env))
48  `(make-field :name ,(field-name self) :type ,(field-type self)))
49 
50 ;;; Field Vectors
51 (deftype field-vector () '(vector field))
52 
53 ;; convenience interface for FIELD-VECTOR
54 (defclass column-vector () ((data :type simple-vector :accessor column-data)))
55 
56 (defclass literal-value-vector (column-vector)
57  ((type :type literal-value-type :initarg :type :accessor column-type)
58  (data :initarg :data :accessor column-data)
59  (size :type fixnum :initarg :size :accessor column-size)))
60 
61 (defgeneric column-literal-value (self)
62  (:method ((self literal-value-vector))
63  (column-data self)))
64 
65 (defgeneric column-type (self)
66  (:method ((self column-vector))
67  (array-element-type (column-data self))))
68 
69 (defgeneric column-value (self i)
70  (:method ((self column-vector) (i fixnum))
71  (aref (column-data self) i))
72  (:method ((self literal-value-vector) (i fixnum))
73  (if (or (< i 0) (>= i (column-size self)))
74  (error 'simple-error :format-control "index out of bounds: ~A" :format-arguments i)
75  (column-literal-value self))))
76 
77 ;;; Schema
78 (defclass schema ()
79  ((fields :type field-vector :initarg :fields :accessor fields)))
80 
81 (defun make-schema (&rest fields)
82  (make-instance 'schema :fields (coerce fields 'field-vector)))
83 
84 (defgeneric load-schema (self &optional schema))
85 
86 (defmethod make-load-form ((self schema) &optional env)
87  (declare (ignore env))
88  `(make-instance ,(class-of self) :fields ,(fields self)))
89 
90 (defclass schema-metadata ()
91  ((metadata :initarg :metadata :accessor schema-metadata)))
92 
93 (defmethod make-load-form ((self schema-metadata) &optional env)
94  (declare (ignore env))
95  `(make-instance ,(class-of self) :metadata ,(schema-metadata self)))
96 
97 (defgeneric column-size (self)
98  (:method ((self column-vector))
99  (length (column-data self))))
100 
101 ;;; Record Batch
102 (defstruct record-batch
103  (schema (make-schema) :type schema)
104  (fields #() :type field-vector))
105 
106 (defmethod make-load-form ((self record-batch) &optional env)
107  (declare (ignore env))
108  `(make-record-batch :schema ,(record-batch-schema self) :fields ,(record-batch-fields self)))
109 
110 ;;; Proto
111 (defgeneric field (self n)
112  (:method ((self record-batch) (n fixnum))
113  (aref (record-batch-fields self) n)))
114 
115 (defgeneric fields (self)
116  (:method ((self record-batch))
117  (record-batch-fields self)))
118 
119 (defgeneric schema (self)
120  (:method ((self record-batch))
121  (record-batch-schema self)))
122 
123 (defgeneric derive-schema (self))
124 
125 (defgeneric select (self names)
126  (:method ((self schema) (names list))
127  (let* ((fields (fields self))
128  (ret (make-array (length fields) :element-type 'field :fill-pointer 0
129  :initial-element (make-field))))
130  (make-instance 'schema
131  :fields (dolist (n names ret)
132  (if-let ((found (find n fields :test 'equal :key 'field-name)))
133  (vector-push found ret)
134  (error 'invalid-argument :item n :reason "Invalid column name"))))))
135  (:method ((self schema) (names vector))
136  (let* ((fields (fields self))
137  (ret (make-array (length fields) :element-type 'field :fill-pointer 0
138  :initial-element (make-field))))
139  (make-instance 'schema
140  :fields (loop for n across names
141  do (if-let ((found (find n fields :test 'equal :key 'field-name)))
142  (vector-push found ret)
143  (error 'invalid-argument :item n :reason "Invalid column name"))
144  finally (return ret))))))
145 
146 (defgeneric project (self indices)
147  (:method ((self schema) (indices list))
148  (make-instance 'schema
149  :fields (coerce (mapcar (lambda (i) (aref (fields self) i)) indices) 'field-vector)))
150  (:method ((self schema) (indices vector))
151  (make-instance 'schema
152  :fields (coerce
153  (loop for i across indices
154  collect (aref (fields self) i))
155  'field-vector))))
156 
157 (defgeneric row-count (self)
158  (:method ((self record-batch))
159  (sequence:length (aref (record-batch-fields self) 0))))
160 
161 (defgeneric column-count (self)
162  (:method ((self record-batch))
163  (length (record-batch-fields self))))
164 
165 ;;; Execution Context
166 (defclass execution-context () ())
167 
168 (defclass data-source ()
169  ((schema :type schema :accessor schema)))
170 
171 (defgeneric scan-data-source (self projection)
172  (:documentation "Scan the data source, selecting the specified columns."))
173 
174 ;;; Expressions
175 (defclass query-expression () ())
176 
177 (defclass query-plan ()
178  ((schema :type schema :accessor schema :initarg :schema)
179  (children :type (vector query-plan))))
180 
181 (defclass logical-plan (query-plan)
182  ((children :type (vector logical-plan) :accessor children :initarg :children)))
183 
184 (defclass physical-plan (query-plan)
185  ((children :type (vector physical-plan))))
186 
187 ;;; Logical Expressions
188 (defclass logical-expression (query-expression) ())
189 
190 (defgeneric to-field (self input)
191  (:method ((self string) (input logical-plan))
192  (declare (ignore input))
193  (make-field :name self :type 'string))
194  (:method ((self number) (input logical-plan))
195  (declare (ignore input))
196  (make-field :name (princ-to-string self) :type 'number)))
197 
198 (defclass column-expression (logical-expression)
199  ((name :type string :initarg :name :accessor column-name)))
200 
201 (defmethod to-field ((self column-expression) (input logical-plan))
202  (or (find (column-name self) (fields (schema input)) :test 'equal :key 'field-name)
203  (error 'invalid-argument :item (column-name self) :reason "Invalid column name")))
204 
205 (defmethod df-col ((self string))
206  (make-instance 'column-expression :name self))
207 
208 (defclass literal-expression (logical-expression) ())
209 
210 ;;;;; Alias
211 (defclass alias-expression (logical-expression)
212  ((expr :type logical-expression :initarg :expr :accessor expr)
213  (alias :type string :initarg :alias)))
214 
215 ;;;;; Unary
216 (defclass unary-expression (logical-expression)
217  ((expr :type logical-expression :accessor expr)))
218 
219 ;;;;; Binary
220 (defclass binary-expression (logical-expression)
221  ((lhs :type logical-expression :initarg :lhs :accessor lhs)
222  (rhs :type logical-expression :initarg :rhs :accessor rhs)))
223 
224 (defgeneric binary-expression-name (self))
225 (defgeneric binary-expression-op (self))
226 
227 (defclass boolean-binary-expression (binary-expression)
228  ((name :initarg :name :type string :accessor binary-expression-name)
229  (op :initarg :op :type symbol :accessor binary-expression-op)))
230 
231 (defmethod to-field ((self boolean-binary-expression) (input logical-plan))
232  (declare (ignore input))
233  (make-field :name (binary-expression-name self) :type 'boolean))
234 
235 ;; Equiv Expr
236 (defclass eq-expression (boolean-binary-expression) ()
237  (:default-initargs
238  :name "eq"
239  :op 'eq))
240 
241 (defclass neq-expression (boolean-binary-expression) ()
242  (:default-initargs
243  :name "neq"
244  :op 'neq))
245 
246 (defclass gt-expression (boolean-binary-expression) ()
247  (:default-initargs
248  :name "gt"
249  :op '>))
250 
251 (defclass lt-expression (boolean-binary-expression) ()
252  (:default-initargs
253  :name "lt"
254  :op '<))
255 
256 (defclass gteq-expression (boolean-binary-expression) ()
257  (:default-initargs
258  :name "gteq"
259  :op '>=))
260 
261 (defclass lteq-expression (boolean-binary-expression) ()
262  (:default-initargs
263  :name "lteq"
264  :op '<=))
265 
266 ;; Bool Expr
267 (defclass and-expression (boolean-binary-expression) ()
268  (:default-initargs
269  :name "and"
270  :op 'and))
271 
272 (defclass or-expression (boolean-binary-expression) ()
273  (:default-initargs
274  :name "or"
275  :op 'or))
276 
277 ;; Math Expr
278 (defclass math-expression (binary-expression)
279  ((name :initarg :name :type string :accessor binary-expression-name)
280  (op :initarg :op :type symbol :accessor binary-expression-op)))
281 
282 ;; TODO 2024-08-03: ???
283 (defmethod to-field ((self math-expression) (input logical-plan))
284  (declare (ignorable input))
285  (make-field :name "*" :type (field-type (to-field (lhs self) input))))
286 
287 (defclass add-expression (math-expression) ()
288  (:default-initargs
289  :name "add"
290  :op '+))
291 
292 (defclass sub-expression (math-expression) ()
293  (:default-initargs
294  :name "sub"
295  :op '-))
296 
297 (defclass mult-expression (math-expression) ()
298  (:default-initargs
299  :name "mult"
300  :op '*))
301 
302 (defclass div-expression (math-expression) ()
303  (:default-initargs
304  :name "div"
305  :op '/))
306 
307 (defclass mod-expression (math-expression) ()
308  (:default-initargs
309  :name "mod"
310  :op 'mod))
311 
312 ;;;;; Agg Expr
313 (deftype aggregate-function () `(function ((input logical-expression)) query-expression))
314 
315 (deftype aggregate-function-designator () `(or aggregate-function symbol))
316 
317 (defclass aggregate-expression (logical-expression)
318  ((name :type string)
319  (expr :type logical-expression :accessor expr)))
320 
321 (defgeneric aggregate-expression-p (self)
322  (:method ((self aggregate-expression)) t)
323  (:method ((self alias-expression)) (aggregate-expression-p (expr self))))
324 
325 (defmethod to-field ((self aggregate-expression) (input logical-plan))
326  (declare (ignorable input))
327  (make-field :name (slot-value self 'name) :type (field-type (to-field (slot-value self 'expr) input))))
328 
329 (defclass sum-expression (aggregate-expression) ()
330  (:default-initargs
331  :name "SUM"))
332 
333 (defclass min-expression (aggregate-expression) ()
334  (:default-initargs
335  :name "MIN"))
336 
337 (defclass max-expression (aggregate-expression) ()
338  (:default-initargs
339  :name "MAX"))
340 
341 (defclass avg-expression (aggregate-expression) ()
342  (:default-initargs
343  :name "AVG"))
344 
345 (defclass count-expression (aggregate-expression) ()
346  (:default-initargs
347  :name "COUNT"))
348 
349 (defmethod to-field ((self count-expression) (input logical-plan))
350  (declare (ignore input))
351  (make-field :name "COUNT" :type 'number))
352 
353 ;;; Logical Plan
354 
355 ;;;;; Scan
356 (defclass scan-data (logical-plan)
357  ((path :type string :initarg :path)
358  (data-source :type data-source :initarg :data-source)
359  (projection :type (vector string) :initarg :projection)))
360 
361 (defmethod derive-schema ((self scan-data))
362  (let ((proj (slot-value self 'projection)))
363  (if (= 0 (length proj))
364  (slot-value self 'schema)
365  (select (slot-value self 'schema) proj))))
366 
367 (defmethod schema ((self scan-data))
368  (derive-schema self))
369 
370 ;;;;; Projection
371 (defclass projection (logical-plan)
372  ((input :type logical-plan :initarg :input)
373  (expr :type (vector logical-expression) :initarg :expr)))
374 
375 (defmethod schema ((self projection))
376  (schema (slot-value self 'input)))
377 
378 ;;;;; Selection
379 (defclass selection (logical-plan)
380  ((input :type logical-plan :initarg :input)
381  (expr :type logical-expression :initarg :expr)))
382 
383 (defmethod schema ((self selection))
384  (schema (slot-value self 'input)))
385 
386 ;;;;; Aggregate
387 (defclass aggregate (logical-plan)
388  ((input :type logical-plan :initarg :input)
389  (group-expr :type (vector logical-expression) :initarg :group-expr)
390  (agg-expr :type (vector aggregate-expression) :initarg :agg-expr)))
391 
392 (defmethod schema ((self aggregate))
393  (let ((input (slot-value self 'input))
394  (ret))
395  (loop for g across (slot-value self 'group-expr)
396  do (push (to-field g input) ret))
397  (loop for a across (slot-value self 'agg-expr)
398  do (push (to-field a input) ret))
399  (make-schema :fields (coerce ret 'field-vector))))
400 
401 ;;;;; Limit
402 (defclass limit (logical-plan)
403  ((input :type logical-plan :initarg :input)
404  (limit :type integer)))
405 
406 (defmethod schema ((self limit))
407  (setf (slot-value self 'schema)
408  (schema (slot-value self 'input))))
409 
410 (defmethod children ((self limit))
411  (setf (slot-value self 'children)
412  (children (slot-value self 'input))))
413 
414 ;;;;; Joins
415 (defclass join (logical-plan)
416  ((left :accessor lhs)
417  (right :accessor rhs)
418  (on :accessor join-on)))
419 
420 (defclass inner-join (join) ())
421 ;; (defclass outer-join (join))
422 (defclass left-join (join) ())
423 (defclass right-join (join) ())
424 ;; left-outer-join
425 ;; right-outer-join
426 ;; semi-join
427 ;; anti-join
428 ;; cross-join
429 
430 (defmethod schema ((self join))
431  ;; TODO 2024-08-04: test better dupe impl
432  (let ((dupes (mapcon #'(lambda (l) (when (eq (car l) (second l)) (list (car l))))
433  (coerce (join-on self) 'list)))
434  (schema (make-instance 'schema)))
435  (setf (fields schema)
436  (typecase self
437  (right-join
438  (let ((l (remove-if (lambda (x) (member x dupes :test 'string-equal)) (fields (schema (lhs self)))))
439  (r (fields (schema (rhs self)))))
440  (merge 'vector l r (lambda (x y) (declare (ignore y)) x))))
441  (inner-join
442  (let ((l (fields (schema (lhs self))))
443  (r (remove-if (lambda (x) (member x dupes :test 'string-equal)) (fields (schema (rhs self))))))
444  (merge 'vector l r (lambda (x y) (declare (ignore y)) x))))))
445  schema))
446 
447 (defmethod children ((self join))
448  (vector (lhs self) (rhs self)))
449 
450 ;;; Subqueries
451 
452 ;; TODO 2024-08-02:
453 
454 ;; subquery
455 
456 ;; correlated-subquery
457 
458 ;; SELECT id, name, (SELECT count(*) FROM orders WHERE customer_id = customer.id) AS num_orders FROM customers
459 
460 ;; uncorrelated-subquery
461 
462 ;; scalar-subquery
463 
464 ;; SELECT * FROM orders WHERE total > (SELECT avg(total) FROM sales WHERE customer_state = 'CA')
465 
466 ;; NOTE 2024-08-02: EXISTS, IN, NOT EXISTS, and NOT IN are also subqueries
467 
468 ;;; Dataframes
469 ;; minimal data-frame abstraction. methods are prefixed with 'DF-'.
470 (defstruct (data-frame (:constructor make-data-frame (&optional plan)))
471  (plan (make-instance 'logical-plan) :type logical-plan))
472 
473 (defgeneric df-col (self))
474 (defgeneric df-project (df exprs)
475  (:method ((df data-frame) (expr list))
476  (df-project df (coerce expr 'vector)))
477  (:method ((df data-frame) (expr vector))
478  (setf (data-frame-plan df)
479  (make-instance 'projection
480  :input (data-frame-plan df)
481  :expr expr))
482  df))
483 
484 (defgeneric df-filter (df expr)
485  (:method ((df data-frame) (expr logical-expression))
486  (setf (data-frame-plan df)
487  (make-instance 'selection :input (data-frame-plan df) :expr expr))
488  df))
489 
490 (defgeneric df-aggregate (df group-by agg-expr)
491  (:method ((df data-frame) (group-by vector) (agg-expr vector))
492  (setf (data-frame-plan df)
493  (make-instance 'aggregate :input (data-frame-plan df)
494  :group-expr group-by
495  :agg-expr agg-expr))
496  df)
497  (:method ((df data-frame) (group-by list) (agg-expr list))
498  (df-aggregate df (coerce group-by 'vector) (coerce agg-expr 'vector))))
499 
500 (defgeneric make-df (&rest initargs &key &allow-other-keys))
501 
502 (defmethod schema ((df data-frame))
503  (schema (data-frame-plan df)))
504 
505 (defgeneric df-plan (df)
506  (:documentation "Return the logical plan associated with this data-frame.")
507  (:method ((df data-frame)) (data-frame-plan df)))
508 
509 ;;; Physical Expression
510 (defclass physical-expression (query-expression) ())
511 
512 (defclass literal-physical-expression (physical-expression) ())
513 
514 (defgeneric evaluate (self input)
515  (:documentation "Evaluate the expression SELF with INPUT and return a COLUMN-VECTOR result.")
516  (:method ((self string) (input record-batch))
517  (make-instance 'literal-value-vector
518  :size (row-count input)
519  :type 'string
520  :data (sb-ext:string-to-octets self)))
521  (:method ((self number) (input record-batch))
522  (make-instance 'literal-value-vector :size (row-count input) :type 'number :data self)))
523 
524 (defclass column-physical-expression (physical-expression)
525  ((val :type array-index :initarg :val)))
526 
527 (defmethod evaluate ((self column-physical-expression) (input record-batch))
528  (field input (slot-value self 'val)))
529 
530 (defclass binary-physical-expression (physical-expression)
531  ((lhs :type physical-expression :accessor lhs :initarg :lhs)
532  (rhs :type physical-expression :accessor rhs :initarg :rhs)))
533 
534 (defgeneric evaluate2 (self lhs rhs))
535 
536 (defmethod evaluate ((self binary-physical-expression) (input record-batch))
537  (let ((ll (evaluate (lhs self) input))
538  (rr (evaluate (rhs self) input)))
539  (assert (= (length ll) (length rr)))
540  (if (eql (column-type ll) (column-type rr))
541  (evaluate2 self ll rr)
542  (error "invalid state! lhs != rhs"))))
543 
544 (defclass eq-physical-expression (binary-physical-expression) ())
545 
546 (defmethod evaluate2 ((self eq-physical-expression) lhs rhs)
547  (declare (ignore self))
548  (equal lhs rhs))
549 
550 (defclass neq-physical-expression (binary-physical-expression) ())
551 
552 (defmethod evaluate2 ((self neq-physical-expression) lhs rhs)
553  (declare (ignore self))
554  (equal lhs rhs))
555 
556 (defclass lt-physical-expression (binary-physical-expression) ())
557 
558 (defclass gt-physical-expression (binary-physical-expression) ())
559 
560 (defclass lteq-physical-expression (binary-physical-expression) ())
561 
562 (defclass gteq-physical-expression (binary-physical-expression) ())
563 
564 (defclass and-physical-expression (binary-physical-expression) ())
565 
566 (defclass or-physical-expression (binary-physical-expression) ())
567 
568 (defclass math-physical-expression (binary-physical-expression) ())
569 
570 (defmethod evaluate2 ((self math-physical-expression) (lhs column-vector) (rhs column-vector))
571  (coerce (loop for i below (column-size lhs)
572  collect (evaluate2 self (column-value lhs i) (column-value rhs i)))
573  'field-vector))
574 
575 (defclass add-physical-expresion (math-expression) ())
576 
577 (defmethod evaluate2 ((self add-physical-expresion) lhs rhs)
578  (declare (ignore self))
579  (+ lhs rhs))
580 
581 (defclass sub-physical-expression (math-expression) ())
582 
583 (defmethod evaluate2 ((self sub-physical-expression) lhs rhs)
584  (declare (ignore self))
585  (- lhs rhs))
586 
587 (defclass mult-physical-expression (math-expression) ())
588 
589 (defmethod evaluate2 ((self mult-physical-expression) lhs rhs)
590  (declare (ignore self))
591  (* lhs rhs))
592 
593 (defclass div-physical-expression (math-expression) ())
594 
595 (defmethod evaluate2 ((self div-physical-expression) lhs rhs)
596  (declare (ignore self))
597  (/ lhs rhs))
598 
599 (defclass accumulator ()
600  ((value :initarg :value :accessor accumulator-value)))
601 
602 (defgeneric accumulate (self val)
603  (:method ((self accumulator) val)
604  (when val
605  (setf (accumulator-value self) (+ val (accumulator-value self))))))
606 
607 (defgeneric make-accumulator (self))
608 
609 ;; max-accumulator
610 (defclass max-accumulator (accumulator) ())
611 
612 (defmethod accumulate ((self max-accumulator) (val number))
613  (when (> val (accumulator-value self))
614  (setf (accumulator-value self) val)))
615 
616 (defclass aggregate-physical-expression (physical-expression)
617  ((input :type physical-expression)))
618 
619 (defclass max-physical-expression (aggregate-physical-expression) ())
620 
621 (defmethod make-accumulator ((self max-physical-expression))
622  (make-instance 'max-accumulator))
623 
624 ;;; Physical Plan
625 (defgeneric execute (self))
626 
627 (defclass scan-exec (physical-plan)
628  ((data-source :type data-source :initarg :data-source)
629  (projection :type (vector string) :initarg :projection)))
630 
631 (defmethod schema ((self scan-exec))
632  (select (schema (slot-value self 'data-source)) (slot-value self 'projection)))
633 
634 (defmethod execute ((self scan-exec))
635  (scan-data-source (slot-value self 'data-source) (slot-value self 'projection)))
636 
637 (defclass projection-exec (physical-plan)
638  ((input :type physical-plan :initarg :input)
639  (expr :type (vector physical-expression) :initarg :expr)))
640 
641 (defmethod execute ((self projection-exec))
642  (coerce
643  (loop for batch across (fields (execute (slot-value self 'input)))
644  collect (make-record-batch :schema (slot-value self 'schema)
645  :fields (coerce
646  (loop for e across (slot-value self 'expr)
647  collect (evaluate e batch))
648  'field-vector)))
649  '(vector record-batch)))
650 
651 
652 (defclass selection-exec (physical-plan)
653  ((input :type physical-plan :initarg :input)
654  (expr :type physical-expression :initarg :expr)))
655 
656 (defmethod schema ((self selection-exec))
657  (schema (slot-value self 'input)))
658 
659 (defmethod execute ((self selection-exec))
660  (coerce
661  (loop for batch across (execute (slot-value self 'input))
662  with res = (coerce (evaluate (slot-value self 'expr) batch) 'bit-vector)
663  with schema = (schema batch)
664  with count = (column-count (fields (schema batch)))
665  with filtered = (loop for i from 0 below count
666  collect (filter self (field batch i) res))
667  collect (make-record-batch :schema schema :fields (coerce filtered 'field-vector)))
668  '(vector record-batch)))
669 
670 (defgeneric filter (self columns selection)
671  (:method ((self selection-exec) (columns column-vector) (selection simple-bit-vector))
672  (coerce
673  (loop for i from 0 below (length selection)
674  unless (zerop (bit selection i))
675  collect (column-value columns i))
676  'field-vector)))
677 
678 (defclass hash-aggregate-exec (physical-plan)
679  ((input :type physical-plan :initarg :input)
680  (group-expr :type (vector physical-plan) :initarg :group-expr)
681  (agg-expr :type (vector aggregate-physical-expression) :initarg :agg-expr)))
682 
683 (defmethod execute ((self hash-aggregate-exec))
684  (coerce
685  (loop for batch across (execute (slot-value self 'input))
686  with map = (make-hash-table :test 'equal)
687  with groupkeys = (map 'vector (lambda (x) (evaluate x batch)) (slot-value self 'group-expr))
688  with aggr-inputs = (map 'vector (lambda (x) (evaluate (slot-value x 'input) batch))
689  (slot-value self 'agg-expr))
690  do (loop for row-idx from 0 below (row-count batch)
691  with row-key = (map 'vector
692  (lambda (x)
693  (when-let ((val (column-value x row-idx)))
694  (typecase val
695  (octet-vector (sb-ext:octets-to-string val))
696  (t val))))
697  groupkeys)
698  with accs = (if-let ((val (gethash row-key map)))
699  val
700  (setf
701  (gethash row-key map)
702  (map 'vector
703  #'make-accumulator
704  (slot-value self 'agg-expr))))
705  ;; start accumulating
706  do (loop for i from 0 below (length accs)
707  for accum across accs
708  with val = (column-value (aref aggr-inputs i) row-idx)
709  return (accumulate accum val))
710  ;; collect results in array
711  with ret = (make-record-batch :schema (slot-value self 'schema)
712  :fields (make-array (hash-table-size map)
713  :element-type 'field
714  :initial-element (make-field)))
715  do (loop for row-idx from 0 below (hash-table-size map)
716  for gkey being the hash-keys of map
717  using (hash-value accums)
718  with glen = (length (slot-value self 'group-expr))
719  do (loop for i from 0 below glen
720  do (setf (aref (aref (fields ret) i) row-idx)
721  (aref gkey i)))
722  do (loop for i from 0 below (length (slot-value self 'agg-expr))
723  do (setf (aref (aref (fields ret) (+ i glen)) row-idx)
724  (accumulator-value (aref accums i)))))
725  collect ret))
726  '(vector record-batch)))
727 
728 ;;; Planner
729 
730 ;; The Query Planner is effectively a compiler which translates logical
731 ;; expressions and plans into their physical counterparts.
732 
733 (defclass query-planner () ())
734 
735 (defgeneric make-physical-expression (expr input)
736  (:documentation "Translate logical expression EXPR and logical plan INPUT
737  into a physical expression.")
738  (:method ((expr string) (input logical-plan))
739  (declare (ignore input))
740  expr)
741  (:method ((expr number) (input logical-plan))
742  (declare (ignore input))
743  expr)
744  (:method ((expr column-expression) (input logical-plan))
745  (let ((i (position (column-name expr) (fields (schema input)) :key 'field-name :test 'equal)))
746  (make-instance 'column-physical-expression :val i)))
747  (:method ((expr binary-expression) (input logical-plan))
748  (let ((l (make-physical-expression (lhs expr) input))
749  (r (make-physical-expression (rhs expr) input)))
750  (etypecase expr
751  (eq-expression (make-instance 'eq-physical-expression :lhs l :rhs r))
752  (neq-expression (make-instance 'neq-physical-expression :lhs l :rhs r))
753  (gt-expression (make-instance 'gt-physical-expression :lhs l :rhs r))
754  (gteq-expression (make-instance 'gteq-physical-expression :lhs l :rhs r))
755  (lt-expression (make-instance 'lt-physical-expression :lhs l :rhs r))
756  (lteq-expression (make-instance 'lteq-physical-expression :lhs l :rhs r))
757  (and-expression (make-instance 'and-physical-expression :lhs l :rhs r))
758  (or-expression (make-instance 'or-physical-expression :lhs l :rhs r))
759  (add-expression (make-instance 'add-physical-expresion :lhs l :rhs r))
760  (sub-expression (make-instance 'sub-physical-expression :lhs l :rhs r))
761  (mult-expression (make-instance 'mult-physical-expression :lhs l :rhs r))
762  (div-expression (make-instance 'div-physical-expression :lhs l :rhs r))))))
763 
764 (defgeneric make-physical-plan (plan)
765  (:documentation "Create a physical plan from logical PLAN.")
766  (:method ((plan logical-plan))
767  (etypecase plan
768  (scan-data (make-instance 'scan-exec
769  :data-source (slot-value plan 'data-source)
770  :projection (slot-value plan 'projection)))
771  (projection (make-instance 'projection-exec
772  :schema (make-instance 'schema
773  :fields
774  (map 'field-vector
775  (lambda (x) (to-field x (slot-value plan 'input)))
776  (slot-value plan 'expr)))
777  :input (make-physical-plan (slot-value plan 'input))
778  :expr (map 'vector (lambda (x) (make-physical-expression x (slot-value plan 'input)))
779  (slot-value plan 'expr))))
780  (selection (make-instance 'selection-exec
781  :input (make-physical-plan (slot-value plan 'input))
782  :expr (make-physical-expression (slot-value plan 'expr) (slot-value plan 'input))))
783  (aggregate (make-instance 'hash-aggregate-exec
784  :input (make-physical-plan (slot-value plan 'input))
785  :group-expr (make-physical-expression (slot-value plan 'group-expr) (slot-value plan 'input))
786  :agg-expr (make-physical-expression (slot-value plan 'agg-expr) (slot-value plan 'input)))))))
787 
788 ;;; Optimizer
789 
790 ;; The Query Optimizer is responsible for walking a QUERY-PLAN and returning a
791 ;; modified version of the same object. Usually we want to run optimization on
792 ;; LOGICAL-PLANs but we also support specializing on PHYSICAL-PLAN.
793 
794 ;; Rule-based Optimizers: projection/predicate push-down, sub-expr elim
795 
796 ;; TBD: Cost-based optimizers
797 ;; TODO 2024-07-10:
798 (defclass query-optimizer () ())
799 
800 (defstruct (query-vop (:constructor make-query-vop (info)))
801  (info nil))
802 
803 (defgeneric optimize-query (self plan))
804 
805 ;; Projection Pushdown
806 (defun extract-columns (expr input &optional accum)
807  (etypecase expr
808  (array-index (accumulate accum (field (fields (schema input)) expr)))
809  (column-expression (accumulate accum (column-name expr)))
810  (binary-expression
811  (extract-columns (lhs expr) input accum)
812  (extract-columns (rhs expr) input accum))
813  (alias-expression (extract-columns (slot-value expr 'expr) input accum))
814  ;; cast-expression
815  (literal-expression nil)))
816 
817 (defun extract-columns* (exprs input &optional accum)
818  (mapcar (lambda (x) (extract-columns x input accum)) exprs))
819 
820 (defclass projection-pushdown-optimizer (query-optimizer) ())
821 
822 (defun %pushdown (plan &optional column-names)
823  (declare (logical-plan plan))
824  (etypecase plan
825  (projection
826  (extract-columns (slot-value plan 'expr) column-names)
827  (let ((input (%pushdown (slot-value plan 'input) column-names)))
828  (make-instance 'projection :input input :expr (slot-value plan 'expr))))
829  (selection
830  (extract-columns (slot-value plan 'expr) column-names)
831  (let ((input (%pushdown (slot-value plan 'input) column-names)))
832  (make-instance 'selection :input input :expr (slot-value plan 'expr))))
833  (aggregate
834  (extract-columns (slot-value plan 'group-expr) column-names)
835  (extract-columns*
836  (loop for x across (slot-value plan 'agg-expr) collect (slot-value x 'input))
837  column-names)
838  (let ((input (%pushdown (slot-value plan 'input) column-names)))
839  (make-instance 'aggregate
840  :input input
841  :group-expr (slot-value plan 'group-expr)
842  :agg-expr (slot-value plan 'agg-expr))))
843  (scan-data (make-instance 'scan-data
844  :path (slot-value plan 'name)
845  :data-source (slot-value plan 'data-source)
846  :projection column-names)))) ;; maybe sort here?
847 
848 (defmethod optimize-query ((self projection-pushdown-optimizer) (plan logical-plan))
849  (%pushdown plan))
850 
851 ;;; Query
852 (defclass query () ())
853 
854 (defgeneric make-query (self &rest initargs &key &allow-other-keys)
855  (:method ((self t) &rest initargs)
856  (declare (ignore initargs))
857  (make-instance 'query)))