Mercurial > demo / examples/db/mbdb.lisp
changeset 44: |
99d4ab4f8d53 |
parent: |
5c58d05abae6
|
author: |
Richard Westhaver <ellis@rwest.io> |
date: |
Sun, 11 Aug 2024 01:50:18 -0400 |
permissions: |
-rw-r--r-- |
description: |
update |
1 ;;; examples/mbdb.lisp --- MusicBrainz Database import and analysis 3 ;; This example show how to migrate a set of complex JSON objects and 4 ;; SQL dumps to RocksDB using data from the MusicBrainz database 5 ;; (https://musicbrainz.org/). The files are hosted at 6 ;; https://packy.compiler.company/data 10 ;; The original data is located here: 11 ;; https://data.metabrainz.org/pub/musicbrainz/data/ 13 ;; The actual json dumps are quite large (releas.json is 208Gb!), so 14 ;; we provide our own trimmed down sampling. Each file is sampled 15 ;; randomly and individually, so actual linkage data is totally 16 ;; clobbered. If you want to work do some OLAP stuff you will need the 17 ;; full data set which is packaged as mbdump-full.tar.zst. 19 ;; the data prep script is located at ../mbdump-prep.lisp 21 ;; we parsed some of the database schema from the sql files here: 22 ;; https://github.com/metabrainz/musicbrainz-server/tree/master/admin/sql 25 (in-package :std-user) 26 (defpkg :examples/mbdb 27 (:use :cl :std :dat/json :net/fetch :obj/id :rdb :cli/clap :obj/uuid 28 :sb-concurrency :log :dat/csv :dat/proto :sb-thread) 29 (:import-from :obj/uuid :make-uuid-from-string) 30 (:import-from :cli/progress :with-progress-bar :make-progress-bar 31 :*progress-bar* :*progress-bar-enabled* :update-progress) 32 (:import-from :obj/time :parse-timestring :now :timestamp) 33 (:import-from :log :info! :debug!) 34 (:import-from :obj/uri :parse-uri) 35 (:import-from :rocksdb :load-rocksdb) 38 (in-package :examples/mbdb) 43 (declaim (timestamp *mbdb-epoch*)) 44 (defvar *mbdb-epoch* (now) 45 "mbdb time of birth.") 47 ;; (defvar *mbdb-logger* (make-logger)) 49 (declaim (type pathname *mbdb-path*)) 50 (defvar *mbdb-path* #P"/tmp/mbdb/") 52 (defvar *default-mbdb-opts* 53 (let ((opts (default-rdb-opts))) 54 (set-opt opts :enable-statistics 1) 57 (declaim (rdb *mbdb*)) 58 (defvar *mbdb* (create-db *mbdb-path* :opts *default-mbdb-opts* :open nil) 59 "The local MusicBrainz database. The default value is an uninitialized 60 instance without any columns. Before use, make sure to open the 61 database and on exit the database must be closed.") 63 (declaim (oracle *mbdb-oracle*)) 64 (defvar *mbdb-oracle* (multiple-value-bind (id thread) (make-oracle sb-thread:*current-thread*) 67 "The oracle assigned to the mbdb system, which should usually be the current thread.") 69 (declaim (task-pool *mbdb-tasks*)) 70 (defvar *mbdb-tasks* (make-task-pool :oracle-id (oracle-id *mbdb-oracle*)) 71 "The mbdb task pool. This object holds a queue of jobs which are 72 dispatched to workers. Results are collected and processed by the 75 (defvar *mbsamp-pack-url* "https://packy.compiler.company/data/mbsamp.tar.zst" 76 "Remote location of MusicBrainz ZST-compressed archive filled with TSV 79 (defvar *mbdump-base-url* "https://packy.compiler.company/data/mbdump/" 80 "Remote location of MusicBrainz JSON data files.") 82 (defvar *mbdump-pack-url* "https://packy.compiler.company/data/mbdump.tar.zst" 83 "Remote locaton of MusicBrainz JSON dump pack.") 85 (defvar *mbdb-worker-dir* (merge-pathnames ".import/" *mbdb-path*)) 87 (defvar *mbdump-pack* (merge-pathnames "mbdump.tar.zst" *mbdb-worker-dir*)) 89 (defvar *mbsamp-pack* (merge-pathnames "mbsamp.tar.zst" *mbdb-worker-dir*)) 91 (defvar *mbdump-files* nil) ;; set by MBDB-UNPACK 93 (defvar *mbsamp-files* nil) ;; set by MBDB-UNPACK 96 (defun mbdump-fetch () 97 "Download mbdump data pack." 98 (unless (probe-file *mbdump-pack*) 105 (defun mbsamp-fetch () 106 (unless (probe-file *mbsamp-pack*) 107 (download *mbsamp-pack-url* *mbsamp-pack*))) 109 (defun mbsamp-unpack () 110 ;; unpack into mbsamp 111 (let ((out-dir (merge-pathnames "mbsamp/" *mbdb-worker-dir*))) 112 (unless (probe-file out-dir) 113 (sb-ext:run-program "tar" `("-I" "zstd" "-xf" ,(namestring *mbsamp-pack*)) 114 :directory *mbdb-worker-dir* 117 (setq *mbsamp-files* (directory "/tmp/mbdb/.import/mbsamp/*")))) 119 (defun mbdump-unpack () 120 ;; unpack into mbsamp 121 (let ((out-dir (merge-pathnames "mbdump/" *mbdb-worker-dir*))) 122 (unless (probe-file out-dir) 123 (sb-ext:run-program "tar" `("-I" "zstd" "-xf" ,(namestring *mbdump-pack*)) 124 :directory *mbdb-worker-dir* 127 (setq *mbsamp-files* (directory "/tmp/mbdb/.import/mbdump/*")))) 129 #+nil (extract-mbsamp (car (mbsamp-fetch))) 134 (define-constant +mbsamp-null+ "\\N" :test #'string=) 136 (defun nullable (str) 137 (unless (string= +mbsamp-null+ str) 138 (unless (= (length str) 0) 141 (defun proc-key (type) 142 (case (sb-int:keywordicate type) 143 (:id 'make-uuid-from-string) 145 (:num 'parse-integer) 149 (defun nullable-int (str) 150 (parse-integer str :junk-allowed t)) 152 (defun nullable-int* (str) 154 (parse-integer str :junk-allowed t)) 157 (defun nullable-time (str) 158 (obj/time:parse-timestring str :date-time-separator #\Space :fail-on-error nil)) 160 (defun nullable-uri (str) 163 (parse-uri str :escape nil)) 166 (defun mbsamp-schema (name &rest list) 169 (defvar *mbsamp-schema-table* 170 (let ((tbl (make-hash-table :test #'equal))) 172 (setf (gethash (car x) tbl) (cdr x))) 175 "alternative_release_type" 176 #'parse-integer nil #'nullable #'parse-integer nil #'make-uuid-from-string) 179 #'parse-integer #'make-uuid-from-string nil nil 180 #'nullable-int #'nullable #'nullable #'nullable #'nullable #'nullable 181 #'nullable-int #'nullable-int #'nullable nil #'parse-integer 182 #'nullable-time #'nullable-int #'nullable-int #'nullable) 185 #'parse-integer #'make-uuid-from-string #'parse-integer #'parse-integer 186 #'parse-integer #'nullable-int* nil #'parse-integer #'nullable-int 187 #'parse-integer #'nullable-time #'parse-integer) 190 #'parse-integer #'make-uuid-from-string nil #'parse-integer 191 #'nullable-int #'nullable-int* #'parse-integer #'nullable-time #'parse-integer) 194 #'parse-integer #'make-uuid-from-string nil nil nil nil nil nil nil nil nil nil nil #'nullable-time) 197 ;; #'parse-integer #'make-uuid-from-string #'nullable-uri #'parse-integer #'nullable-time) 200 #'parse-integer #'make-uuid-from-string #'nullable-uri nil nil) 203 #'make-uuid-from-string #'parse-integer #'nullable-time) 206 #'parse-integer nil #'parse-integer) 209 #'parse-integer #'make-uuid-from-string nil nil #'parse-integer #'nullable-time) 212 #'parse-integer #'make-uuid-from-string nil #'nullable-int nil #'parse-integer #'nullable-time) 215 #'parse-integer #'make-uuid-from-string nil #'nullable-int #'parse-integer #'nullable-time nil nil) 218 "A Hashtable containing the various MusicBrainz table schemas of interest.") 220 (defun get-schema (schema) (gethash schema *mbsamp-schema-table*)) 222 (defun extract-mbsamp (schema) 223 "Extract the contents of FILE which is assumed to contain Tab-separated 224 values. Return a 2d array of row(values)." 225 (let ((file (find schema *mbsamp-files* :test #'string= :key #'pathname-name)) 226 (map-fns (gethash schema *mbsamp-schema-table*))) 228 (dat/csv:read-csv-file file :header nil :delimiter #\Tab :map-fns map-fns)))) 230 (defmacro with-mbsamp-proc (table shape &body vals) 231 (with-gensyms (row i) 233 (loop for ,row across ,table 234 for ,i below (length ,table) 240 (lambda (v) `(aref ,row ,v)) 244 (defmacro def-mbsamp-proc (name &rest vals) 245 (with-gensyms (table) 246 (let ((fn-name (symbolicate "PROC-MBSAMP-" name))) 247 `(defun ,fn-name (,table) 248 ,(format nil "Process rows of ~A mbsamp data." name) 249 (with-mbsamp-proc ,table ,(length vals) ,@vals))))) 252 (vector (make-rdb-cf "url") 253 (make-rdb-cf "genre") 255 (make-rdb-cf "track") 256 (make-rdb-cf "artist") 258 (make-rdb-cf "recording") 259 (make-rdb-cf "release") 260 (make-rdb-cf "instrument"))) 262 (def-mbsamp-proc url 0 1 2) 263 (def-mbsamp-proc genre 0 1 2) 264 (def-mbsamp-proc tag 0 1 2) 265 (def-mbsamp-proc track 0 1 6) 266 (def-mbsamp-proc artist 0 1 2) 267 (def-mbsamp-proc work 0 1 4 6) 268 (def-mbsamp-proc recording 0 1 2 7) 269 (def-mbsamp-proc release 0 1 2 13) 270 (def-mbsamp-proc instrument 0 1 2 5 7) 273 (defun extract-mbdump-file (file) 274 "Extract the contents of a json-dump FILE. Return a json-object." 275 (with-open-file (f file) 276 ;; (sb-impl::with-array-data 277 (loop for x = (json-read f nil) 281 (defun extract-mbdump-columns (obj) 282 "Extract fields from a json-object, returning a vector of 283 uninitialized column-families which can be created with #'create-cfs. 285 Returns multiple values: the list of columns, the id, and type-id if present." 287 (mapcar (lambda (x) (make-rdb-cf (car x))) (json-object-members obj)) 288 (make-uuid-from-string (json-getf obj "id")) 289 (when-let ((tid (json-getf obj "type-id"))) 290 (make-uuid-from-string tid)))) 293 (defvar *mbdb-buffer-size* 4096) 295 (defclass mbdb-task (task) ()) 297 (defclass mbdb-stage (stage) ()) 301 (let ((*default-pathname-defaults* *mbdb-path*) 302 (*progress-bar-enabled* t) 303 (*csv-separator* #\Tab) 305 (*log-timestamp* nil) 307 (log:info! "Welcome to MBDB") 308 (ensure-directories-exist *mbdb-worker-dir* :verbose t) 310 (setq *mbdb-oracle* (make-oracle sb-thread:*current-thread*)) 311 (setq *mbdb-tasks* (make-task-pool :oracle-id (oracle-id *mbdb-oracle*))) 313 ;; (push-worker (make-thread #'?) *mbdb-tasks*) 318 (let ((job (make-job (make-array 2 :fill-pointer 0 :initial-element (make-task) :element-type 'task)))) 319 (push-task (make-task #'mbsamp-fetch) job) 320 (push-task (make-task #'mbdump-fetch) job) 321 (push-job job *mbdb-tasks*)) 323 (let ((job (make-job (make-array 2 :fill-pointer 0 :initial-element (make-task) :element-type 'task)))) 324 (push-task (make-task #'mbsamp-unpack) job) 325 (push-task (make-task #'mbdump-unpack) job) 326 (push-job job *mbdb-tasks*)) 327 ;; (sb-thread:make-thread #'mbsamp-fetch) 329 ;; prepare column family data 331 ;; initialize database 334 (setf (rdb-cfs db) *mbsamp-cfs*) 336 (log:info! "database initialized")) 342 (wait-for-threads (task-pool-workers *mbdb-tasks*)) 344 (when-let ((stats (print-stats *mbdb*))) (info! "mbdb stats" stats)))