changelog shortlog graph tags branches changeset files revisions annotate raw help

Mercurial > core / lisp/lib/obj/query.lisp

changeset 580: 571685ae64f1
parent: 806c2b214df8
child: 568c39371122
author: Richard Westhaver <ellis@rwest.io>
date: Mon, 05 Aug 2024 21:57:13 -0400
permissions: -rw-r--r--
description: queries, cli fixes, dat/csv, emacs org-columns
1 ;;; obj/query/pkg.lisp --- Query Objects
2 
3 ;; Lisp primitive Query objects for DIY query engines.
4 
5 ;;; Commentary:
6 
7 ;; This package provides the base set of classes and methods for implementing
8 ;; query engines.
9 
10 ;; The intention is to use these objects in several high-level packages where
11 ;; we need the ability to ask complex questions about some arbitrary data
12 ;; source.
13 
14 ;; The type of high-level packages can loosely be categorized as:
15 
16 ;; - Frontends :: The interface exposed to the user - SQL, Prolog, etc.
17 
18 ;; - Middleware :: interfaces which are used internally and exposed publicly -
19 ;; query planners/optimizers/ast
20 
21 ;; - Backends :: The interface exposed to the underlying data sources -
22 ;; RocksDB, SQLite, etc.
23 
24 ;;;; Refs
25 
26 ;; https://gist.github.com/twitu/221c8349887cec0a83b395e4cbb492a7
27 
28 ;; https://www1.columbia.edu/sec/acis/db2/db2d0/db2d0103.htm
29 
30 ;; https://howqueryengineswork.com/
31 
32 ;;; Code:
33 (in-package :obj/query)
34 
35 ;;; Types
36 (eval-always
37  (defvar *literal-value-types* '(boolean fixnum signed-byte unsigned-byte float double-float string)))
38 
39 (deftype literal-value-type () `(or ,@*literal-value-types*))
40 
41 ;;; Field
42 (defstruct field
43  (name (symbol-name (gensym "#")) :type simple-string)
44  (type t :type (or symbol list)))
45 
46 (defmethod make-load-form ((self field) &optional env)
47  (declare (ignore env))
48  `(make-field :name ,(field-name self) :type ,(field-type self)))
49 
50 ;;; Field Vectors
51 (deftype field-vector () '(vector field))
52 
53 ;; convenience interface for FIELD-VECTOR
54 (defclass column-vector () ((data :type simple-vector :accessor column-data)))
55 
56 (defclass literal-value-vector (column-vector)
57  ((type :type literal-value-type :initarg :type :accessor column-type)
58  (data :initarg :data :accessor column-data)
59  (size :type fixnum :initarg :size :accessor column-size)))
60 
61 (defgeneric column-literal-value (self)
62  (:method ((self literal-value-vector))
63  (column-data self)))
64 
65 (defgeneric column-type (self)
66  (:method ((self column-vector))
67  (array-element-type (column-data self))))
68 
69 (defgeneric column-value (self i)
70  (:method ((self column-vector) (i fixnum))
71  (aref (column-data self) i))
72  (:method ((self literal-value-vector) (i fixnum))
73  (if (or (< i 0) (>= i (column-size self)))
74  (error 'simple-error :format-control "index out of bounds: ~A" :format-arguments i)
75  (column-literal-value self))))
76 
77 ;;; Schema
78 (defclass schema ()
79  ((fields :type field-vector :initarg :fields :accessor fields)))
80 
81 (defun make-schema (&rest fields)
82  (make-instance 'schema :fields (coerce fields 'field-vector)))
83 
84 (defgeneric load-schema (self &optional schema))
85 
86 (defmethod make-load-form ((self schema) &optional env)
87  (declare (ignore env))
88  `(make-instance ,(class-of self) :fields ,(fields self)))
89 
90 (defclass schema-metadata ()
91  ((metadata :initarg :metadata :accessor schema-metadata)))
92 
93 (defmethod make-load-form ((self schema-metadata) &optional env)
94  (declare (ignore env))
95  `(make-instance ,(class-of self) :metadata ,(schema-metadata self)))
96 
97 (defgeneric column-size (self)
98  (:method ((self column-vector))
99  (length (column-data self))))
100 
101 ;;; Record Batch
102 (defstruct record-batch
103  (schema (make-schema) :type schema)
104  (fields #() :type field-vector))
105 
106 (defmethod make-load-form ((self record-batch) &optional env)
107  (declare (ignore env))
108  `(make-record-batch :schema ,(record-batch-schema self) :fields ,(record-batch-fields self)))
109 
110 ;;; Proto
111 (defgeneric field (self n)
112  (:method ((self record-batch) (n fixnum))
113  (aref (record-batch-fields self) n)))
114 
115 (defgeneric fields (self)
116  (:method ((self record-batch))
117  (record-batch-fields self)))
118 
119 (defgeneric schema (self)
120  (:method ((self record-batch))
121  (record-batch-schema self)))
122 
123 (defgeneric derive-schema (self))
124 
125 (defgeneric select (self names)
126  (:method ((self schema) (names list))
127  (let* ((fields (fields self))
128  (ret (make-array (length fields) :element-type 'field :fill-pointer 0
129  :initial-element (make-field))))
130  (make-instance 'schema
131  :fields (dolist (n names ret)
132  (if-let ((found (find n fields :test 'equal :key 'field-name)))
133  (vector-push found ret)
134  (error 'invalid-argument :item n :reason "Invalid column name"))))))
135  (:method ((self schema) (names vector))
136  (let* ((fields (fields self))
137  (ret (make-array (length fields) :element-type 'field :fill-pointer 0
138  :initial-element (make-field))))
139  (make-instance 'schema
140  :fields (loop for n across names
141  do (if-let ((found (find n fields :test 'equal :key 'field-name)))
142  (vector-push found ret)
143  (error 'invalid-argument :item n :reason "Invalid column name"))
144  finally (return ret))))))
145 
146 (defgeneric project (self indices)
147  (:method ((self schema) (indices list))
148  (make-instance 'schema
149  :fields (coerce (mapcar (lambda (i) (aref (fields self) i)) indices) 'field-vector)))
150  (:method ((self schema) (indices vector))
151  (make-instance 'schema
152  :fields (coerce
153  (loop for i across indices
154  collect (aref (fields self) i))
155  'field-vector))))
156 
157 (defgeneric row-count (self)
158  (:method ((self record-batch))
159  (sequence:length (aref (record-batch-fields self) 0))))
160 
161 (defgeneric column-count (self)
162  (:method ((self record-batch))
163  (length (record-batch-fields self))))
164 
165 ;;; Data Source
166 (defclass data-source ()
167  ((schema :type schema :accessor schema)))
168 
169 (defclass file-data-source (data-source)
170  ((path :initarg :path :accessor file-data-path)))
171 
172 (defgeneric scan-data (self projection)
173  (:documentation "Scan the data source, selecting the specified columns."))
174 
175 ;;; Expressions
176 (defclass query-expression () ())
177 
178 (defclass query-plan ()
179  ((schema :type schema :accessor schema :initarg :schema)
180  (children :type (vector query-plan))))
181 
182 (defclass logical-plan (query-plan)
183  ((children :type (vector logical-plan) :accessor children :initarg :children)))
184 
185 (defclass physical-plan (query-plan)
186  ((children :type (vector physical-plan))))
187 
188 ;;; Logical Expressions
189 (defclass logical-expression (query-expression) ())
190 
191 (defgeneric to-field (self input)
192  (:method ((self string) (input logical-plan))
193  (declare (ignore input))
194  (make-field :name self :type 'string))
195  (:method ((self number) (input logical-plan))
196  (declare (ignore input))
197  (make-field :name (princ-to-string self) :type 'number)))
198 
199 (defclass column-expression (logical-expression)
200  ((name :type string :initarg :name :accessor column-name)))
201 
202 (defmethod to-field ((self column-expression) (input logical-plan))
203  (or (find (column-name self) (fields (schema input)) :test 'equal :key 'field-name)
204  (error 'invalid-argument :item (column-name self) :reason "Invalid column name")))
205 
206 (defmethod df-col ((self string))
207  (make-instance 'column-expression :name self))
208 
209 (defclass literal-expression (logical-expression) ())
210 
211 ;;;;; Alias
212 (defclass alias-expression (logical-expression)
213  ((expr :type logical-expression :initarg :expr :accessor expr)
214  (alias :type string :initarg :alias)))
215 
216 (defclass cast-expression (logical-expression)
217  ((expr :type logical-expression :initarg :expr :accessor expr)
218  (data-type :type form :initarg :data-type)))
219 
220 (defmethod to-field ((self cast-expression) (input logical-plan))
221  (make-field :name (field-name (to-field (expr self) input)) :type (slot-value self 'data-type)))
222 
223 ;;;;; Unary
224 (defclass unary-expression (logical-expression)
225  ((expr :type logical-expression :accessor expr)))
226 
227 ;;;;; Binary
228 (defclass binary-expression (logical-expression)
229  ((lhs :type logical-expression :initarg :lhs :accessor lhs)
230  (rhs :type logical-expression :initarg :rhs :accessor rhs)))
231 
232 (defgeneric binary-expression-name (self))
233 (defgeneric binary-expression-op (self))
234 
235 (defclass boolean-binary-expression (binary-expression)
236  ((name :initarg :name :type string :accessor binary-expression-name)
237  (op :initarg :op :type symbol :accessor binary-expression-op)))
238 
239 (defmethod to-field ((self boolean-binary-expression) (input logical-plan))
240  (declare (ignore input))
241  (make-field :name (binary-expression-name self) :type 'boolean))
242 
243 ;; Equiv Expr
244 (defclass eq-expression (boolean-binary-expression) ()
245  (:default-initargs
246  :name "eq"
247  :op 'eq))
248 
249 (defclass neq-expression (boolean-binary-expression) ()
250  (:default-initargs
251  :name "neq"
252  :op 'neq))
253 
254 (defclass gt-expression (boolean-binary-expression) ()
255  (:default-initargs
256  :name "gt"
257  :op '>))
258 
259 (defclass lt-expression (boolean-binary-expression) ()
260  (:default-initargs
261  :name "lt"
262  :op '<))
263 
264 (defclass gteq-expression (boolean-binary-expression) ()
265  (:default-initargs
266  :name "gteq"
267  :op '>=))
268 
269 (defclass lteq-expression (boolean-binary-expression) ()
270  (:default-initargs
271  :name "lteq"
272  :op '<=))
273 
274 ;; Bool Expr
275 (defclass and-expression (boolean-binary-expression) ()
276  (:default-initargs
277  :name "and"
278  :op 'and))
279 
280 (defclass or-expression (boolean-binary-expression) ()
281  (:default-initargs
282  :name "or"
283  :op 'or))
284 
285 ;; Math Expr
286 (defclass math-expression (binary-expression)
287  ((name :initarg :name :type string :accessor binary-expression-name)
288  (op :initarg :op :type symbol :accessor binary-expression-op)))
289 
290 ;; TODO 2024-08-03: ???
291 (defmethod to-field ((self math-expression) (input logical-plan))
292  (declare (ignorable input))
293  (make-field :name "*" :type (field-type (to-field (lhs self) input))))
294 
295 (defclass add-expression (math-expression) ()
296  (:default-initargs
297  :name "add"
298  :op '+))
299 
300 (defclass sub-expression (math-expression) ()
301  (:default-initargs
302  :name "sub"
303  :op '-))
304 
305 (defclass mult-expression (math-expression) ()
306  (:default-initargs
307  :name "mult"
308  :op '*))
309 
310 (defclass div-expression (math-expression) ()
311  (:default-initargs
312  :name "div"
313  :op '/))
314 
315 (defclass mod-expression (math-expression) ()
316  (:default-initargs
317  :name "mod"
318  :op 'mod))
319 
320 ;;;;; Agg Expr
321 (deftype aggregate-function () `(function ((input logical-expression)) query-expression))
322 
323 (deftype aggregate-function-designator () `(or aggregate-function symbol))
324 
325 (defclass aggregate-expression (logical-expression)
326  ((name :type string)
327  (expr :type logical-expression :accessor expr)))
328 
329 (defgeneric aggregate-expression-p (self)
330  (:method ((self aggregate-expression)) t)
331  (:method ((self alias-expression)) (aggregate-expression-p (expr self))))
332 
333 (defmethod to-field ((self aggregate-expression) (input logical-plan))
334  (declare (ignorable input))
335  (make-field :name (slot-value self 'name) :type (field-type (to-field (slot-value self 'expr) input))))
336 
337 (defclass sum-expression (aggregate-expression) ()
338  (:default-initargs
339  :name "SUM"))
340 
341 (defclass min-expression (aggregate-expression) ()
342  (:default-initargs
343  :name "MIN"))
344 
345 (defclass max-expression (aggregate-expression) ()
346  (:default-initargs
347  :name "MAX"))
348 
349 (defclass avg-expression (aggregate-expression) ()
350  (:default-initargs
351  :name "AVG"))
352 
353 (defclass count-expression (aggregate-expression) ()
354  (:default-initargs
355  :name "COUNT"))
356 
357 (defmethod to-field ((self count-expression) (input logical-plan))
358  (declare (ignore input))
359  (make-field :name "COUNT" :type 'number))
360 
361 ;;; Logical Plan
362 
363 ;;;;; Scan
364 (defclass scan-data (logical-plan)
365  ((path :type string :initarg :path)
366  (data-source :type data-source :initarg :data-source)
367  (projection :type (vector string) :initarg :projection)))
368 
369 (defmethod derive-schema ((self scan-data))
370  (let ((proj (slot-value self 'projection)))
371  (if (= 0 (length proj))
372  (slot-value self 'schema)
373  (select (slot-value self 'schema) proj))))
374 
375 (defmethod schema ((self scan-data))
376  (derive-schema self))
377 
378 ;;;;; Projection
379 (defclass projection (logical-plan)
380  ((input :type logical-plan :initarg :input)
381  (expr :type (vector logical-expression) :initarg :expr)))
382 
383 (defmethod schema ((self projection))
384  (schema (slot-value self 'input)))
385 
386 ;;;;; Selection
387 (defclass selection (logical-plan)
388  ((input :type logical-plan :initarg :input)
389  (expr :type logical-expression :initarg :expr)))
390 
391 (defmethod schema ((self selection))
392  (schema (slot-value self 'input)))
393 
394 ;;;;; Aggregate
395 (defclass aggregate (logical-plan)
396  ((input :type logical-plan :initarg :input)
397  (group-expr :type (vector logical-expression) :initarg :group-expr)
398  (agg-expr :type (vector aggregate-expression) :initarg :agg-expr)))
399 
400 (defmethod schema ((self aggregate))
401  (let ((input (slot-value self 'input))
402  (ret))
403  (loop for g across (slot-value self 'group-expr)
404  do (push (to-field g input) ret))
405  (loop for a across (slot-value self 'agg-expr)
406  do (push (to-field a input) ret))
407  (make-schema :fields (coerce ret 'field-vector))))
408 
409 ;;;;; Limit
410 (defclass limit (logical-plan)
411  ((input :type logical-plan :initarg :input)
412  (limit :type integer)))
413 
414 (defmethod schema ((self limit))
415  (setf (slot-value self 'schema)
416  (schema (slot-value self 'input))))
417 
418 (defmethod children ((self limit))
419  (setf (slot-value self 'children)
420  (children (slot-value self 'input))))
421 
422 ;;;;; Joins
423 (defclass join (logical-plan)
424  ((left :accessor lhs)
425  (right :accessor rhs)
426  (on :accessor join-on)))
427 
428 (defclass inner-join (join) ())
429 ;; (defclass outer-join (join))
430 (defclass left-join (join) ())
431 (defclass right-join (join) ())
432 ;; left-outer-join
433 ;; right-outer-join
434 ;; semi-join
435 ;; anti-join
436 ;; cross-join
437 
438 (defmethod schema ((self join))
439  ;; TODO 2024-08-04: test better dupe impl
440  (let ((dupes (mapcon #'(lambda (l) (when (eq (car l) (second l)) (list (car l))))
441  (coerce (join-on self) 'list)))
442  (schema (make-instance 'schema)))
443  (setf (fields schema)
444  (typecase self
445  (right-join
446  (let ((l (remove-if (lambda (x) (member x dupes :test 'string-equal)) (fields (schema (lhs self)))))
447  (r (fields (schema (rhs self)))))
448  (merge 'vector l r (lambda (x y) (declare (ignore y)) x))))
449  (inner-join
450  (let ((l (fields (schema (lhs self))))
451  (r (remove-if (lambda (x) (member x dupes :test 'string-equal)) (fields (schema (rhs self))))))
452  (merge 'vector l r (lambda (x y) (declare (ignore y)) x))))))
453  schema))
454 
455 (defmethod children ((self join))
456  (vector (lhs self) (rhs self)))
457 
458 ;;; Subqueries
459 
460 ;; TODO 2024-08-02:
461 
462 ;; subquery
463 
464 ;; correlated-subquery
465 
466 ;; SELECT id, name, (SELECT count(*) FROM orders WHERE customer_id = customer.id) AS num_orders FROM customers
467 
468 ;; uncorrelated-subquery
469 
470 ;; scalar-subquery
471 
472 ;; SELECT * FROM orders WHERE total > (SELECT avg(total) FROM sales WHERE customer_state = 'CA')
473 
474 ;; NOTE 2024-08-02: EXISTS, IN, NOT EXISTS, and NOT IN are also subqueries
475 
476 ;;; Dataframes
477 ;; minimal data-frame abstraction. methods are prefixed with 'DF-'.
478 (defstruct (data-frame (:constructor make-data-frame (&optional plan)))
479  (plan (make-instance 'logical-plan) :type logical-plan))
480 
481 (defgeneric df-col (self))
482 (defgeneric df-project (df exprs)
483  (:method ((df data-frame) (expr list))
484  (df-project df (coerce expr 'vector)))
485  (:method ((df data-frame) (expr vector))
486  (setf (data-frame-plan df)
487  (make-instance 'projection
488  :input (data-frame-plan df)
489  :expr expr))
490  df))
491 
492 (defgeneric df-filter (df expr)
493  (:method ((df data-frame) (expr logical-expression))
494  (setf (data-frame-plan df)
495  (make-instance 'selection :input (data-frame-plan df) :expr expr))
496  df))
497 
498 (defgeneric df-aggregate (df group-by agg-expr)
499  (:method ((df data-frame) (group-by vector) (agg-expr vector))
500  (setf (data-frame-plan df)
501  (make-instance 'aggregate :input (data-frame-plan df)
502  :group-expr group-by
503  :agg-expr agg-expr))
504  df)
505  (:method ((df data-frame) (group-by list) (agg-expr list))
506  (df-aggregate df (coerce group-by 'vector) (coerce agg-expr 'vector))))
507 
508 (defgeneric make-df (&rest initargs &key &allow-other-keys))
509 
510 (defmethod schema ((df data-frame))
511  (schema (data-frame-plan df)))
512 
513 (defmethod (setf schema) ((schema schema) (df data-frame))
514  (setf (slot-value (data-frame-plan df) 'schema) schema))
515 
516 (defgeneric df-plan (df)
517  (:documentation "Return the logical plan associated with this data-frame.")
518  (:method ((df data-frame)) (data-frame-plan df)))
519 
520 (defmethod (setf df-plan) ((plan logical-plan) (df data-frame))
521  (setf (df-plan df) plan))
522 
523 ;;; Physical Expression
524 (defclass physical-expression (query-expression) ())
525 
526 (defclass literal-physical-expression (physical-expression) ())
527 
528 (defgeneric evaluate (self input)
529  (:documentation "Evaluate the expression SELF with INPUT and return a COLUMN-VECTOR result.")
530  (:method ((self string) (input record-batch))
531  (make-instance 'literal-value-vector
532  :size (row-count input)
533  :type 'string
534  :data (sb-ext:string-to-octets self)))
535  (:method ((self number) (input record-batch))
536  (make-instance 'literal-value-vector :size (row-count input) :type 'number :data self)))
537 
538 (defclass column-physical-expression (physical-expression)
539  ((val :type array-index :initarg :val)))
540 
541 (defmethod evaluate ((self column-physical-expression) (input record-batch))
542  (field input (slot-value self 'val)))
543 
544 (defclass binary-physical-expression (physical-expression)
545  ((lhs :type physical-expression :accessor lhs :initarg :lhs)
546  (rhs :type physical-expression :accessor rhs :initarg :rhs)))
547 
548 (defgeneric evaluate2 (self lhs rhs))
549 
550 (defmethod evaluate ((self binary-physical-expression) (input record-batch))
551  (let ((ll (evaluate (lhs self) input))
552  (rr (evaluate (rhs self) input)))
553  (assert (= (length ll) (length rr)))
554  (if (eql (column-type ll) (column-type rr))
555  (evaluate2 self ll rr)
556  (error "invalid state! lhs != rhs"))))
557 
558 (defclass eq-physical-expression (binary-physical-expression) ())
559 
560 (defmethod evaluate2 ((self eq-physical-expression) lhs rhs)
561  (declare (ignore self))
562  (equal lhs rhs))
563 
564 (defclass neq-physical-expression (binary-physical-expression) ())
565 
566 (defmethod evaluate2 ((self neq-physical-expression) lhs rhs)
567  (declare (ignore self))
568  (equal lhs rhs))
569 
570 (defclass lt-physical-expression (binary-physical-expression) ())
571 
572 (defclass gt-physical-expression (binary-physical-expression) ())
573 
574 (defclass lteq-physical-expression (binary-physical-expression) ())
575 
576 (defclass gteq-physical-expression (binary-physical-expression) ())
577 
578 (defclass and-physical-expression (binary-physical-expression) ())
579 
580 (defclass or-physical-expression (binary-physical-expression) ())
581 
582 (defclass math-physical-expression (binary-physical-expression) ())
583 
584 (defmethod evaluate2 ((self math-physical-expression) (lhs column-vector) (rhs column-vector))
585  (coerce (loop for i below (column-size lhs)
586  collect (evaluate2 self (column-value lhs i) (column-value rhs i)))
587  'field-vector))
588 
589 (defclass add-physical-expresion (math-expression) ())
590 
591 (defmethod evaluate2 ((self add-physical-expresion) lhs rhs)
592  (declare (ignore self))
593  (+ lhs rhs))
594 
595 (defclass sub-physical-expression (math-expression) ())
596 
597 (defmethod evaluate2 ((self sub-physical-expression) lhs rhs)
598  (declare (ignore self))
599  (- lhs rhs))
600 
601 (defclass mult-physical-expression (math-expression) ())
602 
603 (defmethod evaluate2 ((self mult-physical-expression) lhs rhs)
604  (declare (ignore self))
605  (* lhs rhs))
606 
607 (defclass div-physical-expression (math-expression) ())
608 
609 (defmethod evaluate2 ((self div-physical-expression) lhs rhs)
610  (declare (ignore self))
611  (/ lhs rhs))
612 
613 (defclass accumulator ()
614  ((value :initarg :value :accessor accumulator-value)))
615 
616 (defgeneric accumulate (self val)
617  (:method ((self accumulator) val)
618  (when val
619  (setf (accumulator-value self) (+ val (accumulator-value self)))))
620  (:method ((self list) val)
621  (push val self)))
622 
623 (defgeneric make-accumulator (self))
624 
625 ;; max-accumulator
626 (defclass max-accumulator (accumulator) ())
627 
628 (defmethod accumulate ((self max-accumulator) (val number))
629  (when (> val (accumulator-value self))
630  (setf (accumulator-value self) val)))
631 
632 (defclass aggregate-physical-expression (physical-expression)
633  ((input :type physical-expression)))
634 
635 (defclass max-physical-expression (aggregate-physical-expression) ())
636 
637 (defmethod make-accumulator ((self max-physical-expression))
638  (make-instance 'max-accumulator))
639 
640 ;;; Physical Plan
641 (defgeneric execute (self)
642  (:documentation "Execute the LOGICAL-PLAN represented by object SELF.")
643  (:method ((self data-frame))
644  (execute (df-plan self))))
645 
646 (defclass scan-exec (physical-plan)
647  ((data-source :type data-source :initarg :data-source)
648  (projection :type (vector string) :initarg :projection)))
649 
650 (defmethod schema ((self scan-exec))
651  (select (schema (slot-value self 'data-source)) (slot-value self 'projection)))
652 
653 (defmethod execute ((self scan-exec))
654  (scan-data (slot-value self 'data-source) (slot-value self 'projection)))
655 
656 (defclass projection-exec (physical-plan)
657  ((input :type physical-plan :initarg :input)
658  (expr :type (vector physical-expression) :initarg :expr)))
659 
660 (defmethod execute ((self projection-exec))
661  (coerce
662  (loop for batch across (fields (execute (slot-value self 'input)))
663  collect (make-record-batch :schema (slot-value self 'schema)
664  :fields (coerce
665  (loop for e across (slot-value self 'expr)
666  collect (evaluate e batch))
667  'field-vector)))
668  '(vector record-batch)))
669 
670 
671 (defclass selection-exec (physical-plan)
672  ((input :type physical-plan :initarg :input)
673  (expr :type physical-expression :initarg :expr)))
674 
675 (defmethod schema ((self selection-exec))
676  (schema (slot-value self 'input)))
677 
678 (defmethod execute ((self selection-exec))
679  (coerce
680  (loop for batch across (execute (slot-value self 'input))
681  with res = (coerce (evaluate (slot-value self 'expr) batch) 'bit-vector)
682  with schema = (schema batch)
683  with count = (column-count (fields (schema batch)))
684  with filtered = (loop for i from 0 below count
685  collect (filter self (field batch i) res))
686  collect (make-record-batch :schema schema :fields (coerce filtered 'field-vector)))
687  '(vector record-batch)))
688 
689 (defgeneric filter (self columns selection)
690  (:method ((self selection-exec) (columns column-vector) (selection simple-bit-vector))
691  (coerce
692  (loop for i from 0 below (length selection)
693  unless (zerop (bit selection i))
694  collect (column-value columns i))
695  'field-vector)))
696 
697 (defclass hash-aggregate-exec (physical-plan)
698  ((input :type physical-plan :initarg :input)
699  (group-expr :type (vector physical-plan) :initarg :group-expr)
700  (agg-expr :type (vector aggregate-physical-expression) :initarg :agg-expr)))
701 
702 (defmethod execute ((self hash-aggregate-exec))
703  (coerce
704  (loop for batch across (execute (slot-value self 'input))
705  with map = (make-hash-table :test 'equal)
706  with groupkeys = (map 'vector (lambda (x) (evaluate x batch)) (slot-value self 'group-expr))
707  with aggr-inputs = (map 'vector (lambda (x) (evaluate (slot-value x 'input) batch))
708  (slot-value self 'agg-expr))
709  do (loop for row-idx from 0 below (row-count batch)
710  with row-key = (map 'vector
711  (lambda (x)
712  (when-let ((val (column-value x row-idx)))
713  (typecase val
714  (octet-vector (sb-ext:octets-to-string val))
715  (t val))))
716  groupkeys)
717  with accs = (if-let ((val (gethash row-key map)))
718  val
719  (setf
720  (gethash row-key map)
721  (map 'vector
722  #'make-accumulator
723  (slot-value self 'agg-expr))))
724  ;; start accumulating
725  do (loop for i from 0 below (length accs)
726  for accum across accs
727  with val = (column-value (aref aggr-inputs i) row-idx)
728  return (accumulate accum val))
729  ;; collect results in array
730  with ret = (make-record-batch :schema (slot-value self 'schema)
731  :fields (make-array (hash-table-size map)
732  :element-type 'field
733  :initial-element (make-field)))
734  do (loop for row-idx from 0 below (hash-table-size map)
735  for gkey being the hash-keys of map
736  using (hash-value accums)
737  with glen = (length (slot-value self 'group-expr))
738  do (loop for i from 0 below glen
739  do (setf (aref (aref (fields ret) i) row-idx)
740  (aref gkey i)))
741  do (loop for i from 0 below (length (slot-value self 'agg-expr))
742  do (setf (aref (aref (fields ret) (+ i glen)) row-idx)
743  (accumulator-value (aref accums i)))))
744  collect ret))
745  '(vector record-batch)))
746 
747 ;;; Planner
748 
749 ;; The Query Planner is effectively a compiler which translates logical
750 ;; expressions and plans into their physical counterparts.
751 
752 (defclass query-planner () ())
753 
754 (defgeneric make-physical-expression (expr input)
755  (:documentation "Translate logical expression EXPR and logical plan INPUT
756  into a physical expression.")
757  (:method ((expr string) (input logical-plan))
758  (declare (ignore input))
759  expr)
760  (:method ((expr number) (input logical-plan))
761  (declare (ignore input))
762  expr)
763  (:method ((expr column-expression) (input logical-plan))
764  (let ((i (position (column-name expr) (fields (schema input)) :key 'field-name :test 'equal)))
765  (make-instance 'column-physical-expression :val i)))
766  (:method ((expr binary-expression) (input logical-plan))
767  (let ((l (make-physical-expression (lhs expr) input))
768  (r (make-physical-expression (rhs expr) input)))
769  (etypecase expr
770  (eq-expression (make-instance 'eq-physical-expression :lhs l :rhs r))
771  (neq-expression (make-instance 'neq-physical-expression :lhs l :rhs r))
772  (gt-expression (make-instance 'gt-physical-expression :lhs l :rhs r))
773  (gteq-expression (make-instance 'gteq-physical-expression :lhs l :rhs r))
774  (lt-expression (make-instance 'lt-physical-expression :lhs l :rhs r))
775  (lteq-expression (make-instance 'lteq-physical-expression :lhs l :rhs r))
776  (and-expression (make-instance 'and-physical-expression :lhs l :rhs r))
777  (or-expression (make-instance 'or-physical-expression :lhs l :rhs r))
778  (add-expression (make-instance 'add-physical-expresion :lhs l :rhs r))
779  (sub-expression (make-instance 'sub-physical-expression :lhs l :rhs r))
780  (mult-expression (make-instance 'mult-physical-expression :lhs l :rhs r))
781  (div-expression (make-instance 'div-physical-expression :lhs l :rhs r))))))
782 
783 (defgeneric make-physical-plan (plan)
784  (:documentation "Create a physical plan from logical PLAN.")
785  (:method ((plan logical-plan))
786  (etypecase plan
787  (scan-data (make-instance 'scan-exec
788  :data-source (slot-value plan 'data-source)
789  :projection (slot-value plan 'projection)))
790  (projection (make-instance 'projection-exec
791  :schema (make-instance 'schema
792  :fields
793  (map 'field-vector
794  (lambda (x) (to-field x (slot-value plan 'input)))
795  (slot-value plan 'expr)))
796  :input (make-physical-plan (slot-value plan 'input))
797  :expr (map 'vector (lambda (x) (make-physical-expression x (slot-value plan 'input)))
798  (slot-value plan 'expr))))
799  (selection (make-instance 'selection-exec
800  :input (make-physical-plan (slot-value plan 'input))
801  :expr (make-physical-expression (slot-value plan 'expr) (slot-value plan 'input))))
802  (aggregate (make-instance 'hash-aggregate-exec
803  :input (make-physical-plan (slot-value plan 'input))
804  :group-expr (make-physical-expression (slot-value plan 'group-expr) (slot-value plan 'input))
805  :agg-expr (make-physical-expression (slot-value plan 'agg-expr) (slot-value plan 'input)))))))
806 
807 ;;; Optimizer
808 
809 ;; The Query Optimizer is responsible for walking a QUERY-PLAN and returning a
810 ;; modified version of the same object. Usually we want to run optimization on
811 ;; LOGICAL-PLANs but we also support specializing on PHYSICAL-PLAN.
812 
813 ;; Rule-based Optimizers: projection/predicate push-down, sub-expr elim
814 
815 ;; TBD: Cost-based optimizers
816 ;; TODO 2024-07-10:
817 (defclass query-optimizer () ())
818 
819 (defstruct (query-vop (:constructor make-query-vop (info)))
820  (info nil))
821 
822 (defgeneric optimize-query (self plan))
823 
824 ;; Projection Pushdown
825 (defun extract-columns (expr input &optional accum)
826  (etypecase expr
827  (array-index (accumulate accum (field (fields (schema input)) expr)))
828  (column-expression (accumulate accum (column-name expr)))
829  (binary-expression
830  (extract-columns (lhs expr) input accum)
831  (extract-columns (rhs expr) input accum))
832  (alias-expression (extract-columns (expr expr) input accum))
833  (cast-expression (extract-columns (expr expr) input accum))
834  (literal-expression nil)))
835 
836 (defun extract-columns* (exprs input &optional accum)
837  (mapcar (lambda (x) (extract-columns x input accum)) exprs))
838 
839 (defclass projection-pushdown-optimizer (query-optimizer) ())
840 
841 (defun %pushdown (plan &optional column-names)
842  (declare (logical-plan plan))
843  (etypecase plan
844  (projection
845  (extract-columns (slot-value plan 'expr) column-names)
846  (let ((input (%pushdown (slot-value plan 'input) column-names)))
847  (make-instance 'projection :input input :expr (slot-value plan 'expr))))
848  (selection
849  (extract-columns (slot-value plan 'expr) column-names)
850  (let ((input (%pushdown (slot-value plan 'input) column-names)))
851  (make-instance 'selection :input input :expr (slot-value plan 'expr))))
852  (aggregate
853  (extract-columns (slot-value plan 'group-expr) column-names)
854  (extract-columns*
855  (loop for x across (slot-value plan 'agg-expr) collect (slot-value x 'input))
856  column-names)
857  (let ((input (%pushdown (slot-value plan 'input) column-names)))
858  (make-instance 'aggregate
859  :input input
860  :group-expr (slot-value plan 'group-expr)
861  :agg-expr (slot-value plan 'agg-expr))))
862  (scan-data (make-instance 'scan-data
863  :path (slot-value plan 'name)
864  :data-source (slot-value plan 'data-source)
865  :projection column-names)))) ;; maybe sort here?
866 
867 (defmethod optimize-query ((self projection-pushdown-optimizer) (plan logical-plan))
868  (%pushdown plan))
869 
870 ;;; Query
871 (defclass query () ())
872 
873 (defgeneric make-query (self &rest initargs &key &allow-other-keys)
874  (:method ((self t) &rest initargs)
875  (declare (ignore initargs))
876  (make-instance 'query)))
877 
878 ;;; Execution Context
879 (defclass execution-context () ())
880 
881 (defgeneric register-df (self name df)
882  (:documentation "Register a DATA-FRAME with an EXECUTION-CONTEXT."))
883 
884 (defgeneric register-data-source (self name source)
885  (:documentation "Register a DATA-SOURCE with an EXECUTION-CONTEXT."))
886 
887 (defgeneric register-file (self name path &key type &allow-other-keys)
888  (:documentation "Register a DATA-SOURCE contained in a file of type TYPE at PATH."))
889 
890 (defgeneric execute* (self df)
891  (:documentation "Execute the DATA-FRAME DF in CONTEXT.")
892  (:method ((self execution-context) (df data-frame))
893  (declare (ignore self))
894  (execute df)))
895 
896 (defmethod execute ((self logical-plan))
897  (execute
898  (make-physical-plan
899  (optimize-query (make-instance 'projection-pushdown-optimizer) self))))