1.1--- a/examples/db/mbdb.lisp Sun Apr 14 20:48:05 2024 -0400
1.2+++ b/examples/db/mbdb.lisp Sun Jun 16 22:15:04 2024 -0400
1.3@@ -1,15 +1,29 @@
1.4 ;;; examples/mbdb.lisp --- MusicBrainz Database import and analysis
1.5
1.6-;; This example show how to migrate a set of complex JSON objects to
1.7-;; RocksDB using a dump from the MusicBrainz database
1.8+;; This example show how to migrate a set of complex JSON objects and
1.9+;; SQL dumps to RocksDB using data from the MusicBrainz database
1.10 ;; (https://musicbrainz.org/). The files are hosted at
1.11-;; https://packy.compiler.company/data/mbdump
1.12+;; https://packy.compiler.company/data
1.13+
1.14+;;; Commentary:
1.15+
1.16+;; The original data is located here:
1.17+;; https://data.metabrainz.org/pub/musicbrainz/data/
1.18
1.19-;; we parse some of the database schema from the sql files here:
1.20+;; The actual json dumps are quite large (releas.json is 208Gb!), so
1.21+;; we provide our own trimmed down sampling. Each file is sampled
1.22+;; randomly and individually, so actual linkage data is totally
1.23+;; clobbered. If you want to work do some OLAP stuff you will need the
1.24+;; full data set which is packaged as mbdump-full.tar.zst.
1.25+
1.26+;; the data prep script is located at ../mbdump-prep.lisp
1.27+
1.28+;; we parsed some of the database schema from the sql files here:
1.29 ;; https://github.com/metabrainz/musicbrainz-server/tree/master/admin/sql
1.30
1.31 ;;; Code:
1.32-(defpackage :examples/mbdb
1.33+(in-package :std-user)
1.34+(defpkg :examples/mbdb
1.35 (:use :cl :std :dat/json :net/fetch :obj/id :rdb :cli/clap :obj/uuid
1.36 :sb-concurrency :log :dat/csv :dat/proto :sb-thread)
1.37 (:import-from :obj/uuid :make-uuid-from-string)
1.38@@ -51,7 +65,7 @@
1.39 "The oracle assigned to the mbdb system, which should usually be the current thread.")
1.40
1.41 (declaim (task-pool *mbdb-tasks*))
1.42-(defvar *mbdb-tasks* (make-task-pool :oracle *mbdb-oracle*)
1.43+(defvar *mbdb-tasks* (make-task-pool :oracle-id (oracle-id *mbdb-oracle*))
1.44 "The mbdb task pool. This object holds a queue of jobs which are
1.45 dispatched to workers. Results are collected and processed by the
1.46 oracle.")
1.47@@ -66,11 +80,12 @@
1.48 (defvar *mbdump-pack-url* "https://packy.compiler.company/data/mbdump.tar.zst"
1.49 "Remote locaton of MusicBrainz JSON dump pack.")
1.50
1.51+(defvar *mbdb-worker-dir* (merge-pathnames ".import/" *mbdb-path*))
1.52+
1.53 (defvar *mbdump-pack* (merge-pathnames "mbdump.tar.zst" *mbdb-worker-dir*))
1.54+
1.55 (defvar *mbsamp-pack* (merge-pathnames "mbsamp.tar.zst" *mbdb-worker-dir*))
1.56
1.57-(defvar *mbdb-worker-dir* (merge-pathnames ".import/" *mbdb-path*))
1.58-
1.59 (defvar *mbdump-files* nil) ;; set by MBDB-UNPACK
1.60
1.61 (defvar *mbsamp-files* nil) ;; set by MBDB-UNPACK
1.62@@ -112,6 +127,8 @@
1.63 #+nil (extract-mbsamp (car (mbsamp-fetch)))
1.64
1.65 ;;; Parsing
1.66+
1.67+;;;; MBSamp
1.68 (define-constant +mbsamp-null+ "\\N" :test #'string=)
1.69
1.70 (defun nullable (str)
1.71@@ -208,14 +225,6 @@
1.72 (when file
1.73 (dat/csv:read-csv-file file :header nil :delimiter #\Tab :map-fns map-fns))))
1.74
1.75-(defun extract-mbdump-file (file)
1.76- "Extract the contents of a json-dump FILE. Return a json-object."
1.77- (with-open-file (f file)
1.78- ;; (sb-impl::with-array-data
1.79- (loop for x = (json-read f nil)
1.80- while x
1.81- collect x)))
1.82-
1.83 (defmacro with-mbsamp-proc (table shape &body vals)
1.84 (with-gensyms (row i)
1.85 `(coerce
1.86@@ -258,6 +267,15 @@
1.87 (def-mbsamp-proc release 0 1 2 13)
1.88 (def-mbsamp-proc instrument 0 1 2 5 7)
1.89
1.90+;;;; MBDump
1.91+(defun extract-mbdump-file (file)
1.92+ "Extract the contents of a json-dump FILE. Return a json-object."
1.93+ (with-open-file (f file)
1.94+ ;; (sb-impl::with-array-data
1.95+ (loop for x = (json-read f nil)
1.96+ while x
1.97+ collect x)))
1.98+
1.99 (defun extract-mbdump-columns (obj)
1.100 "Extract fields from a json-object, returning a vector of
1.101 uninitialized column-families which can be created with #'create-cfs.
1.102@@ -274,6 +292,8 @@
1.103
1.104 (defclass mbdb-task (task) ())
1.105
1.106+(defclass mbdb-stage (stage) ())
1.107+
1.108 ;;; Main
1.109 (defmain ()
1.110 (let ((*default-pathname-defaults* *mbdb-path*)
1.111@@ -281,17 +301,27 @@
1.112 (*csv-separator* #\Tab)
1.113 (*cpus* (num-cpus))
1.114 (*log-timestamp* nil)
1.115- (*log-level* :warn))
1.116+ (*log-level* :info))
1.117 (log:info! "Welcome to MBDB")
1.118 (ensure-directories-exist *mbdb-worker-dir* :verbose t)
1.119 ;; prepare workers
1.120- (setf *mbdb-oracle* (make-oracle sb-thread:*current-thread*)
1.121- *mbdb-tasks* (make-task-pool :oracle *mbdb-oracle*))
1.122- (push-worker (sb-thread:make-thread #'mbsamp-fetch) *mbdb-tasks*)
1.123+ (setq *mbdb-oracle* (make-oracle sb-thread:*current-thread*))
1.124+ (setq *mbdb-tasks* (make-task-pool :oracle-id (oracle-id *mbdb-oracle*)))
1.125+ ;; (make-workers
1.126+ ;; (push-worker (make-thread #'?) *mbdb-tasks*)
1.127+
1.128 ;; (with-tasks ())
1.129- (let ((job (make-job)))
1.130- (push-task (make-instance 'mbdb-task :object #'mbsamp-fetch) job))
1.131
1.132+ ;; fetch
1.133+ (let ((job (make-job (make-array 2 :fill-pointer 0 :initial-element (make-task) :element-type 'task))))
1.134+ (push-task (make-task #'mbsamp-fetch) job)
1.135+ (push-task (make-task #'mbdump-fetch) job)
1.136+ (push-job job *mbdb-tasks*))
1.137+ ;; unpack
1.138+ (let ((job (make-job (make-array 2 :fill-pointer 0 :initial-element (make-task) :element-type 'task))))
1.139+ (push-task (make-task #'mbsamp-unpack) job)
1.140+ (push-task (make-task #'mbdump-unpack) job)
1.141+ (push-job job *mbdb-tasks*))
1.142 ;; (sb-thread:make-thread #'mbsamp-fetch)
1.143
1.144 ;; prepare column family data
1.145@@ -300,16 +330,16 @@
1.146 (with-db (db *mbdb*)
1.147 (open-db db)
1.148 (setf (rdb-cfs db) *mbsamp-cfs*)
1.149- ;; (create-cfs db)
1.150- (log:info! "database initialized")
1.151- ;;
1.152- (close-db db))
1.153-
1.154+ (backfill-opts db)
1.155+ (log:info! "database initialized"))
1.156 ;; launch tasks
1.157
1.158 ;; wait
1.159- (wait-for-threads (task-pool-workers *mbdb-tasks*))
1.160- ;; summarize
1.161- (info! "mbdb stats" (print-stats *mbdb*))
1.162- ;; close
1.163- ))
1.164+ (unwind-protect
1.165+ (progn
1.166+ (wait-for-threads (task-pool-workers *mbdb-tasks*))
1.167+ ;; summarize
1.168+ (when-let ((stats (print-stats *mbdb*))) (info! "mbdb stats" stats)))
1.169+ ;; close
1.170+ (close-db *mbdb*))))
1.171+