Mercurial > demo / examples/db/mbdb.lisp
changeset 41: |
81b7333f27f8 |
parent: |
6b652d7d6663
|
child: |
5c58d05abae6 |
author: |
Richard Westhaver <ellis@rwest.io> |
date: |
Sun, 16 Jun 2024 22:15:04 -0400 |
permissions: |
-rw-r--r-- |
description: |
more examples |
1 ;;; examples/mbdb.lisp --- MusicBrainz Database import and analysis 3 ;; This example show how to migrate a set of complex JSON objects and 4 ;; SQL dumps to RocksDB using data from the MusicBrainz database 5 ;; (https://musicbrainz.org/). The files are hosted at 6 ;; https://packy.compiler.company/data 10 ;; The original data is located here: 11 ;; https://data.metabrainz.org/pub/musicbrainz/data/ 13 ;; The actual json dumps are quite large (releas.json is 208Gb!), so 14 ;; we provide our own trimmed down sampling. Each file is sampled 15 ;; randomly and individually, so actual linkage data is totally 16 ;; clobbered. If you want to work do some OLAP stuff you will need the 17 ;; full data set which is packaged as mbdump-full.tar.zst. 19 ;; the data prep script is located at ../mbdump-prep.lisp 21 ;; we parsed some of the database schema from the sql files here: 22 ;; https://github.com/metabrainz/musicbrainz-server/tree/master/admin/sql 25 (in-package :std-user) 26 (defpkg :examples/mbdb 27 (:use :cl :std :dat/json :net/fetch :obj/id :rdb :cli/clap :obj/uuid 28 :sb-concurrency :log :dat/csv :dat/proto :sb-thread) 29 (:import-from :obj/uuid :make-uuid-from-string) 30 (:import-from :cli/progress :with-progress-bar :make-progress-bar 31 :*progress-bar* :*progress-bar-enabled* :update-progress) 32 (:import-from :obj/time :parse-timestring :now :timestamp) 33 (:import-from :log :info! :debug!) 34 (:import-from :obj/uri :parse-uri) 35 (:import-from :rocksdb :load-rocksdb) 38 (in-package :examples/mbdb) 43 (declaim (timestamp *mbdb-epoch*)) 44 (defvar *mbdb-epoch* (now) 45 "mbdb time of birth.") 47 ;; (defvar *mbdb-logger* (make-logger)) 49 (declaim (type pathname *mbdb-path*)) 50 (defvar *mbdb-path* #P"/tmp/mbdb/") 52 (defvar *default-mbdb-opts* 53 (let ((opts (default-rdb-opts))) 54 (set-opt opts :enable-statistics 1) 57 (declaim (rdb *mbdb*)) 58 (defvar *mbdb* (create-db *mbdb-path* :opts *default-mbdb-opts* :open nil) 59 "The local MusicBrainz database. The default value is an uninitialized 60 instance without any columns. Before use, make sure to open the 61 database and on exit the database must be closed.") 63 (declaim (oracle *mbdb-oracle*)) 64 (defvar *mbdb-oracle* (make-oracle sb-thread:*current-thread*) 65 "The oracle assigned to the mbdb system, which should usually be the current thread.") 67 (declaim (task-pool *mbdb-tasks*)) 68 (defvar *mbdb-tasks* (make-task-pool :oracle-id (oracle-id *mbdb-oracle*)) 69 "The mbdb task pool. This object holds a queue of jobs which are 70 dispatched to workers. Results are collected and processed by the 73 (defvar *mbsamp-pack-url* "https://packy.compiler.company/data/mbsamp.tar.zst" 74 "Remote location of MusicBrainz ZST-compressed archive filled with TSV 77 (defvar *mbdump-base-url* "https://packy.compiler.company/data/mbdump/" 78 "Remote location of MusicBrainz JSON data files.") 80 (defvar *mbdump-pack-url* "https://packy.compiler.company/data/mbdump.tar.zst" 81 "Remote locaton of MusicBrainz JSON dump pack.") 83 (defvar *mbdb-worker-dir* (merge-pathnames ".import/" *mbdb-path*)) 85 (defvar *mbdump-pack* (merge-pathnames "mbdump.tar.zst" *mbdb-worker-dir*)) 87 (defvar *mbsamp-pack* (merge-pathnames "mbsamp.tar.zst" *mbdb-worker-dir*)) 89 (defvar *mbdump-files* nil) ;; set by MBDB-UNPACK 91 (defvar *mbsamp-files* nil) ;; set by MBDB-UNPACK 94 (defun mbdump-fetch () 95 "Download mbdump data pack." 96 (unless (probe-file *mbdump-pack*) 103 (defun mbsamp-fetch () 104 (unless (probe-file *mbsamp-pack*) 105 (download *mbsamp-pack-url* *mbsamp-pack*))) 107 (defun mbsamp-unpack () 108 ;; unpack into mbsamp 109 (let ((out-dir (merge-pathnames "mbsamp/" *mbdb-worker-dir*))) 110 (unless (probe-file out-dir) 111 (sb-ext:run-program "tar" `("-I" "zstd" "-xf" ,(namestring *mbsamp-pack*)) 112 :directory *mbdb-worker-dir* 115 (setq *mbsamp-files* (directory "/tmp/mbdb/.import/mbsamp/*")))) 117 (defun mbdump-unpack () 118 ;; unpack into mbsamp 119 (let ((out-dir (merge-pathnames "mbdump/" *mbdb-worker-dir*))) 120 (unless (probe-file out-dir) 121 (sb-ext:run-program "tar" `("-I" "zstd" "-xf" ,(namestring *mbdump-pack*)) 122 :directory *mbdb-worker-dir* 125 (setq *mbsamp-files* (directory "/tmp/mbdb/.import/mbdump/*")))) 127 #+nil (extract-mbsamp (car (mbsamp-fetch))) 132 (define-constant +mbsamp-null+ "\\N" :test #'string=) 134 (defun nullable (str) 135 (unless (string= +mbsamp-null+ str) 136 (unless (= (length str) 0) 139 (defun proc-key (type) 140 (case (sb-int:keywordicate type) 141 (:id 'make-uuid-from-string) 143 (:num 'parse-integer) 147 (defun nullable-int (str) 148 (parse-integer str :junk-allowed t)) 150 (defun nullable-int* (str) 152 (parse-integer str :junk-allowed t)) 155 (defun nullable-time (str) 156 (obj/time:parse-timestring str :date-time-separator #\Space :fail-on-error nil)) 158 (defun nullable-uri (str) 161 (parse-uri str :escape nil)) 164 (defun mbsamp-schema (name &rest list) 167 (defvar *mbsamp-schema-table* 168 (let ((tbl (make-hash-table :test #'equal))) 170 (setf (gethash (car x) tbl) (cdr x))) 173 "alternative_release_type" 174 #'parse-integer nil #'nullable #'parse-integer nil #'make-uuid-from-string) 177 #'parse-integer #'make-uuid-from-string nil nil 178 #'nullable-int #'nullable #'nullable #'nullable #'nullable #'nullable 179 #'nullable-int #'nullable-int #'nullable nil #'parse-integer 180 #'nullable-time #'nullable-int #'nullable-int #'nullable) 183 #'parse-integer #'make-uuid-from-string #'parse-integer #'parse-integer 184 #'parse-integer #'nullable-int* nil #'parse-integer #'nullable-int 185 #'parse-integer #'nullable-time #'parse-integer) 188 #'parse-integer #'make-uuid-from-string nil #'parse-integer 189 #'nullable-int #'nullable-int* #'parse-integer #'nullable-time #'parse-integer) 192 #'parse-integer #'make-uuid-from-string nil nil nil nil nil nil nil nil nil nil nil #'nullable-time) 195 ;; #'parse-integer #'make-uuid-from-string #'nullable-uri #'parse-integer #'nullable-time) 198 #'parse-integer #'make-uuid-from-string #'nullable-uri nil nil) 201 #'make-uuid-from-string #'parse-integer #'nullable-time) 204 #'parse-integer nil #'parse-integer) 207 #'parse-integer #'make-uuid-from-string nil nil #'parse-integer #'nullable-time) 210 #'parse-integer #'make-uuid-from-string nil #'nullable-int nil #'parse-integer #'nullable-time) 213 #'parse-integer #'make-uuid-from-string nil #'nullable-int #'parse-integer #'nullable-time nil nil) 216 "A Hashtable containing the various MusicBrainz table schemas of interest.") 218 (defun get-schema (schema) (gethash schema *mbsamp-schema-table*)) 220 (defun extract-mbsamp (schema) 221 "Extract the contents of FILE which is assumed to contain Tab-separated 222 values. Return a 2d array of row(values)." 223 (let ((file (find schema *mbsamp-files* :test #'string= :key #'pathname-name)) 224 (map-fns (gethash schema *mbsamp-schema-table*))) 226 (dat/csv:read-csv-file file :header nil :delimiter #\Tab :map-fns map-fns)))) 228 (defmacro with-mbsamp-proc (table shape &body vals) 229 (with-gensyms (row i) 231 (loop for ,row across ,table 232 for ,i below (length ,table) 238 (lambda (v) `(aref ,row ,v)) 242 (defmacro def-mbsamp-proc (name &rest vals) 243 (with-gensyms (table) 244 (let ((fn-name (symbolicate "PROC-MBSAMP-" name))) 245 `(defun ,fn-name (,table) 246 ,(format nil "Process rows of ~A mbsamp data." name) 247 (with-mbsamp-proc ,table ,(length vals) ,@vals))))) 250 (vector (make-rdb-cf "url") 251 (make-rdb-cf "genre") 253 (make-rdb-cf "track") 254 (make-rdb-cf "artist") 256 (make-rdb-cf "recording") 257 (make-rdb-cf "release") 258 (make-rdb-cf "instrument"))) 260 (def-mbsamp-proc url 0 1 2) 261 (def-mbsamp-proc genre 0 1 2) 262 (def-mbsamp-proc tag 0 1 2) 263 (def-mbsamp-proc track 0 1 6) 264 (def-mbsamp-proc artist 0 1 2) 265 (def-mbsamp-proc work 0 1 4 6) 266 (def-mbsamp-proc recording 0 1 2 7) 267 (def-mbsamp-proc release 0 1 2 13) 268 (def-mbsamp-proc instrument 0 1 2 5 7) 271 (defun extract-mbdump-file (file) 272 "Extract the contents of a json-dump FILE. Return a json-object." 273 (with-open-file (f file) 274 ;; (sb-impl::with-array-data 275 (loop for x = (json-read f nil) 279 (defun extract-mbdump-columns (obj) 280 "Extract fields from a json-object, returning a vector of 281 uninitialized column-families which can be created with #'create-cfs. 283 Returns multiple values: the list of columns, the id, and type-id if present." 285 (mapcar (lambda (x) (make-rdb-cf (car x))) (json-object-members obj)) 286 (make-uuid-from-string (json-getf obj "id")) 287 (when-let ((tid (json-getf obj "type-id"))) 288 (make-uuid-from-string tid)))) 291 (defvar *mbdb-buffer-size* 4096) 293 (defclass mbdb-task (task) ()) 295 (defclass mbdb-stage (stage) ()) 299 (let ((*default-pathname-defaults* *mbdb-path*) 300 (*progress-bar-enabled* t) 301 (*csv-separator* #\Tab) 303 (*log-timestamp* nil) 305 (log:info! "Welcome to MBDB") 306 (ensure-directories-exist *mbdb-worker-dir* :verbose t) 308 (setq *mbdb-oracle* (make-oracle sb-thread:*current-thread*)) 309 (setq *mbdb-tasks* (make-task-pool :oracle-id (oracle-id *mbdb-oracle*))) 311 ;; (push-worker (make-thread #'?) *mbdb-tasks*) 316 (let ((job (make-job (make-array 2 :fill-pointer 0 :initial-element (make-task) :element-type 'task)))) 317 (push-task (make-task #'mbsamp-fetch) job) 318 (push-task (make-task #'mbdump-fetch) job) 319 (push-job job *mbdb-tasks*)) 321 (let ((job (make-job (make-array 2 :fill-pointer 0 :initial-element (make-task) :element-type 'task)))) 322 (push-task (make-task #'mbsamp-unpack) job) 323 (push-task (make-task #'mbdump-unpack) job) 324 (push-job job *mbdb-tasks*)) 325 ;; (sb-thread:make-thread #'mbsamp-fetch) 327 ;; prepare column family data 329 ;; initialize database 332 (setf (rdb-cfs db) *mbsamp-cfs*) 334 (log:info! "database initialized")) 340 (wait-for-threads (task-pool-workers *mbdb-tasks*)) 342 (when-let ((stats (print-stats *mbdb*))) (info! "mbdb stats" stats)))