diff -r 6b652d7d6663 -r 81b7333f27f8 examples/db/mbdb.lisp --- a/examples/db/mbdb.lisp Sun Apr 14 20:48:05 2024 -0400 +++ b/examples/db/mbdb.lisp Sun Jun 16 22:15:04 2024 -0400 @@ -1,15 +1,29 @@ ;;; examples/mbdb.lisp --- MusicBrainz Database import and analysis -;; This example show how to migrate a set of complex JSON objects to -;; RocksDB using a dump from the MusicBrainz database +;; This example show how to migrate a set of complex JSON objects and +;; SQL dumps to RocksDB using data from the MusicBrainz database ;; (https://musicbrainz.org/). The files are hosted at -;; https://packy.compiler.company/data/mbdump +;; https://packy.compiler.company/data + +;;; Commentary: + +;; The original data is located here: +;; https://data.metabrainz.org/pub/musicbrainz/data/ -;; we parse some of the database schema from the sql files here: +;; The actual json dumps are quite large (releas.json is 208Gb!), so +;; we provide our own trimmed down sampling. Each file is sampled +;; randomly and individually, so actual linkage data is totally +;; clobbered. If you want to work do some OLAP stuff you will need the +;; full data set which is packaged as mbdump-full.tar.zst. + +;; the data prep script is located at ../mbdump-prep.lisp + +;; we parsed some of the database schema from the sql files here: ;; https://github.com/metabrainz/musicbrainz-server/tree/master/admin/sql ;;; Code: -(defpackage :examples/mbdb +(in-package :std-user) +(defpkg :examples/mbdb (:use :cl :std :dat/json :net/fetch :obj/id :rdb :cli/clap :obj/uuid :sb-concurrency :log :dat/csv :dat/proto :sb-thread) (:import-from :obj/uuid :make-uuid-from-string) @@ -51,7 +65,7 @@ "The oracle assigned to the mbdb system, which should usually be the current thread.") (declaim (task-pool *mbdb-tasks*)) -(defvar *mbdb-tasks* (make-task-pool :oracle *mbdb-oracle*) +(defvar *mbdb-tasks* (make-task-pool :oracle-id (oracle-id *mbdb-oracle*)) "The mbdb task pool. This object holds a queue of jobs which are dispatched to workers. Results are collected and processed by the oracle.") @@ -66,11 +80,12 @@ (defvar *mbdump-pack-url* "https://packy.compiler.company/data/mbdump.tar.zst" "Remote locaton of MusicBrainz JSON dump pack.") +(defvar *mbdb-worker-dir* (merge-pathnames ".import/" *mbdb-path*)) + (defvar *mbdump-pack* (merge-pathnames "mbdump.tar.zst" *mbdb-worker-dir*)) + (defvar *mbsamp-pack* (merge-pathnames "mbsamp.tar.zst" *mbdb-worker-dir*)) -(defvar *mbdb-worker-dir* (merge-pathnames ".import/" *mbdb-path*)) - (defvar *mbdump-files* nil) ;; set by MBDB-UNPACK (defvar *mbsamp-files* nil) ;; set by MBDB-UNPACK @@ -112,6 +127,8 @@ #+nil (extract-mbsamp (car (mbsamp-fetch))) ;;; Parsing + +;;;; MBSamp (define-constant +mbsamp-null+ "\\N" :test #'string=) (defun nullable (str) @@ -208,14 +225,6 @@ (when file (dat/csv:read-csv-file file :header nil :delimiter #\Tab :map-fns map-fns)))) -(defun extract-mbdump-file (file) - "Extract the contents of a json-dump FILE. Return a json-object." - (with-open-file (f file) - ;; (sb-impl::with-array-data - (loop for x = (json-read f nil) - while x - collect x))) - (defmacro with-mbsamp-proc (table shape &body vals) (with-gensyms (row i) `(coerce @@ -258,6 +267,15 @@ (def-mbsamp-proc release 0 1 2 13) (def-mbsamp-proc instrument 0 1 2 5 7) +;;;; MBDump +(defun extract-mbdump-file (file) + "Extract the contents of a json-dump FILE. Return a json-object." + (with-open-file (f file) + ;; (sb-impl::with-array-data + (loop for x = (json-read f nil) + while x + collect x))) + (defun extract-mbdump-columns (obj) "Extract fields from a json-object, returning a vector of uninitialized column-families which can be created with #'create-cfs. @@ -274,6 +292,8 @@ (defclass mbdb-task (task) ()) +(defclass mbdb-stage (stage) ()) + ;;; Main (defmain () (let ((*default-pathname-defaults* *mbdb-path*) @@ -281,17 +301,27 @@ (*csv-separator* #\Tab) (*cpus* (num-cpus)) (*log-timestamp* nil) - (*log-level* :warn)) + (*log-level* :info)) (log:info! "Welcome to MBDB") (ensure-directories-exist *mbdb-worker-dir* :verbose t) ;; prepare workers - (setf *mbdb-oracle* (make-oracle sb-thread:*current-thread*) - *mbdb-tasks* (make-task-pool :oracle *mbdb-oracle*)) - (push-worker (sb-thread:make-thread #'mbsamp-fetch) *mbdb-tasks*) + (setq *mbdb-oracle* (make-oracle sb-thread:*current-thread*)) + (setq *mbdb-tasks* (make-task-pool :oracle-id (oracle-id *mbdb-oracle*))) + ;; (make-workers + ;; (push-worker (make-thread #'?) *mbdb-tasks*) + ;; (with-tasks ()) - (let ((job (make-job))) - (push-task (make-instance 'mbdb-task :object #'mbsamp-fetch) job)) + ;; fetch + (let ((job (make-job (make-array 2 :fill-pointer 0 :initial-element (make-task) :element-type 'task)))) + (push-task (make-task #'mbsamp-fetch) job) + (push-task (make-task #'mbdump-fetch) job) + (push-job job *mbdb-tasks*)) + ;; unpack + (let ((job (make-job (make-array 2 :fill-pointer 0 :initial-element (make-task) :element-type 'task)))) + (push-task (make-task #'mbsamp-unpack) job) + (push-task (make-task #'mbdump-unpack) job) + (push-job job *mbdb-tasks*)) ;; (sb-thread:make-thread #'mbsamp-fetch) ;; prepare column family data @@ -300,16 +330,16 @@ (with-db (db *mbdb*) (open-db db) (setf (rdb-cfs db) *mbsamp-cfs*) - ;; (create-cfs db) - (log:info! "database initialized") - ;; - (close-db db)) - + (backfill-opts db) + (log:info! "database initialized")) ;; launch tasks ;; wait - (wait-for-threads (task-pool-workers *mbdb-tasks*)) - ;; summarize - (info! "mbdb stats" (print-stats *mbdb*)) - ;; close - )) + (unwind-protect + (progn + (wait-for-threads (task-pool-workers *mbdb-tasks*)) + ;; summarize + (when-let ((stats (print-stats *mbdb*))) (info! "mbdb stats" stats))) + ;; close + (close-db *mbdb*)))) +