Mercurial > demo / examples/db/mbdb.lisp
changeset 40: |
6b652d7d6663 |
parent: |
1ef551e24009
|
child: |
81b7333f27f8 |
author: |
Richard Westhaver <ellis@rwest.io> |
date: |
Sun, 14 Apr 2024 20:48:05 -0400 |
permissions: |
-rw-r--r-- |
description: |
examples |
1 ;;; examples/mbdb.lisp --- MusicBrainz Database import and analysis 3 ;; This example show how to migrate a set of complex JSON objects to 4 ;; RocksDB using a dump from the MusicBrainz database 5 ;; (https://musicbrainz.org/). The files are hosted at 6 ;; https://packy.compiler.company/data/mbdump 8 ;; we parse some of the database schema from the sql files here: 9 ;; https://github.com/metabrainz/musicbrainz-server/tree/master/admin/sql 12 (defpackage :examples/mbdb 13 (:use :cl :std :dat/json :net/fetch :obj/id :rdb :cli/clap :obj/uuid 14 :sb-concurrency :log :dat/csv :dat/proto :sb-thread) 15 (:import-from :obj/uuid :make-uuid-from-string) 16 (:import-from :cli/progress :with-progress-bar :make-progress-bar 17 :*progress-bar* :*progress-bar-enabled* :update-progress) 18 (:import-from :obj/time :parse-timestring :now :timestamp) 19 (:import-from :log :info! :debug!) 20 (:import-from :obj/uri :parse-uri) 21 (:import-from :rocksdb :load-rocksdb) 24 (in-package :examples/mbdb) 29 (declaim (timestamp *mbdb-epoch*)) 30 (defvar *mbdb-epoch* (now) 31 "mbdb time of birth.") 33 ;; (defvar *mbdb-logger* (make-logger)) 35 (declaim (type pathname *mbdb-path*)) 36 (defvar *mbdb-path* #P"/tmp/mbdb/") 38 (defvar *default-mbdb-opts* 39 (let ((opts (default-rdb-opts))) 40 (set-opt opts :enable-statistics 1) 43 (declaim (rdb *mbdb*)) 44 (defvar *mbdb* (create-db *mbdb-path* :opts *default-mbdb-opts* :open nil) 45 "The local MusicBrainz database. The default value is an uninitialized 46 instance without any columns. Before use, make sure to open the 47 database and on exit the database must be closed.") 49 (declaim (oracle *mbdb-oracle*)) 50 (defvar *mbdb-oracle* (make-oracle sb-thread:*current-thread*) 51 "The oracle assigned to the mbdb system, which should usually be the current thread.") 53 (declaim (task-pool *mbdb-tasks*)) 54 (defvar *mbdb-tasks* (make-task-pool :oracle *mbdb-oracle*) 55 "The mbdb task pool. This object holds a queue of jobs which are 56 dispatched to workers. Results are collected and processed by the 59 (defvar *mbsamp-pack-url* "https://packy.compiler.company/data/mbsamp.tar.zst" 60 "Remote location of MusicBrainz ZST-compressed archive filled with TSV 63 (defvar *mbdump-base-url* "https://packy.compiler.company/data/mbdump/" 64 "Remote location of MusicBrainz JSON data files.") 66 (defvar *mbdump-pack-url* "https://packy.compiler.company/data/mbdump.tar.zst" 67 "Remote locaton of MusicBrainz JSON dump pack.") 69 (defvar *mbdump-pack* (merge-pathnames "mbdump.tar.zst" *mbdb-worker-dir*)) 70 (defvar *mbsamp-pack* (merge-pathnames "mbsamp.tar.zst" *mbdb-worker-dir*)) 72 (defvar *mbdb-worker-dir* (merge-pathnames ".import/" *mbdb-path*)) 74 (defvar *mbdump-files* nil) ;; set by MBDB-UNPACK 76 (defvar *mbsamp-files* nil) ;; set by MBDB-UNPACK 79 (defun mbdump-fetch () 80 "Download mbdump data pack." 81 (unless (probe-file *mbdump-pack*) 88 (defun mbsamp-fetch () 89 (unless (probe-file *mbsamp-pack*) 90 (download *mbsamp-pack-url* *mbsamp-pack*))) 92 (defun mbsamp-unpack () 94 (let ((out-dir (merge-pathnames "mbsamp/" *mbdb-worker-dir*))) 95 (unless (probe-file out-dir) 96 (sb-ext:run-program "tar" `("-I" "zstd" "-xf" ,(namestring *mbsamp-pack*)) 97 :directory *mbdb-worker-dir* 100 (setq *mbsamp-files* (directory "/tmp/mbdb/.import/mbsamp/*")))) 102 (defun mbdump-unpack () 103 ;; unpack into mbsamp 104 (let ((out-dir (merge-pathnames "mbdump/" *mbdb-worker-dir*))) 105 (unless (probe-file out-dir) 106 (sb-ext:run-program "tar" `("-I" "zstd" "-xf" ,(namestring *mbdump-pack*)) 107 :directory *mbdb-worker-dir* 110 (setq *mbsamp-files* (directory "/tmp/mbdb/.import/mbdump/*")))) 112 #+nil (extract-mbsamp (car (mbsamp-fetch))) 115 (define-constant +mbsamp-null+ "\\N" :test #'string=) 117 (defun nullable (str) 118 (unless (string= +mbsamp-null+ str) 119 (unless (= (length str) 0) 122 (defun proc-key (type) 123 (case (sb-int:keywordicate type) 124 (:id 'make-uuid-from-string) 126 (:num 'parse-integer) 130 (defun nullable-int (str) 131 (parse-integer str :junk-allowed t)) 133 (defun nullable-int* (str) 135 (parse-integer str :junk-allowed t)) 138 (defun nullable-time (str) 139 (obj/time:parse-timestring str :date-time-separator #\Space :fail-on-error nil)) 141 (defun nullable-uri (str) 144 (parse-uri str :escape nil)) 147 (defun mbsamp-schema (name &rest list) 150 (defvar *mbsamp-schema-table* 151 (let ((tbl (make-hash-table :test #'equal))) 153 (setf (gethash (car x) tbl) (cdr x))) 156 "alternative_release_type" 157 #'parse-integer nil #'nullable #'parse-integer nil #'make-uuid-from-string) 160 #'parse-integer #'make-uuid-from-string nil nil 161 #'nullable-int #'nullable #'nullable #'nullable #'nullable #'nullable 162 #'nullable-int #'nullable-int #'nullable nil #'parse-integer 163 #'nullable-time #'nullable-int #'nullable-int #'nullable) 166 #'parse-integer #'make-uuid-from-string #'parse-integer #'parse-integer 167 #'parse-integer #'nullable-int* nil #'parse-integer #'nullable-int 168 #'parse-integer #'nullable-time #'parse-integer) 171 #'parse-integer #'make-uuid-from-string nil #'parse-integer 172 #'nullable-int #'nullable-int* #'parse-integer #'nullable-time #'parse-integer) 175 #'parse-integer #'make-uuid-from-string nil nil nil nil nil nil nil nil nil nil nil #'nullable-time) 178 ;; #'parse-integer #'make-uuid-from-string #'nullable-uri #'parse-integer #'nullable-time) 181 #'parse-integer #'make-uuid-from-string #'nullable-uri nil nil) 184 #'make-uuid-from-string #'parse-integer #'nullable-time) 187 #'parse-integer nil #'parse-integer) 190 #'parse-integer #'make-uuid-from-string nil nil #'parse-integer #'nullable-time) 193 #'parse-integer #'make-uuid-from-string nil #'nullable-int nil #'parse-integer #'nullable-time) 196 #'parse-integer #'make-uuid-from-string nil #'nullable-int #'parse-integer #'nullable-time nil nil) 199 "A Hashtable containing the various MusicBrainz table schemas of interest.") 201 (defun get-schema (schema) (gethash schema *mbsamp-schema-table*)) 203 (defun extract-mbsamp (schema) 204 "Extract the contents of FILE which is assumed to contain Tab-separated 205 values. Return a 2d array of row(values)." 206 (let ((file (find schema *mbsamp-files* :test #'string= :key #'pathname-name)) 207 (map-fns (gethash schema *mbsamp-schema-table*))) 209 (dat/csv:read-csv-file file :header nil :delimiter #\Tab :map-fns map-fns)))) 211 (defun extract-mbdump-file (file) 212 "Extract the contents of a json-dump FILE. Return a json-object." 213 (with-open-file (f file) 214 ;; (sb-impl::with-array-data 215 (loop for x = (json-read f nil) 219 (defmacro with-mbsamp-proc (table shape &body vals) 220 (with-gensyms (row i) 222 (loop for ,row across ,table 223 for ,i below (length ,table) 229 (lambda (v) `(aref ,row ,v)) 233 (defmacro def-mbsamp-proc (name &rest vals) 234 (with-gensyms (table) 235 (let ((fn-name (symbolicate "PROC-MBSAMP-" name))) 236 `(defun ,fn-name (,table) 237 ,(format nil "Process rows of ~A mbsamp data." name) 238 (with-mbsamp-proc ,table ,(length vals) ,@vals))))) 241 (vector (make-rdb-cf "url") 242 (make-rdb-cf "genre") 244 (make-rdb-cf "track") 245 (make-rdb-cf "artist") 247 (make-rdb-cf "recording") 248 (make-rdb-cf "release") 249 (make-rdb-cf "instrument"))) 251 (def-mbsamp-proc url 0 1 2) 252 (def-mbsamp-proc genre 0 1 2) 253 (def-mbsamp-proc tag 0 1 2) 254 (def-mbsamp-proc track 0 1 6) 255 (def-mbsamp-proc artist 0 1 2) 256 (def-mbsamp-proc work 0 1 4 6) 257 (def-mbsamp-proc recording 0 1 2 7) 258 (def-mbsamp-proc release 0 1 2 13) 259 (def-mbsamp-proc instrument 0 1 2 5 7) 261 (defun extract-mbdump-columns (obj) 262 "Extract fields from a json-object, returning a vector of 263 uninitialized column-families which can be created with #'create-cfs. 265 Returns multiple values: the list of columns, the id, and type-id if present." 267 (mapcar (lambda (x) (make-rdb-cf (car x))) (json-object-members obj)) 268 (make-uuid-from-string (json-getf obj "id")) 269 (when-let ((tid (json-getf obj "type-id"))) 270 (make-uuid-from-string tid)))) 273 (defvar *mbdb-buffer-size* 4096) 275 (defclass mbdb-task (task) ()) 279 (let ((*default-pathname-defaults* *mbdb-path*) 280 (*progress-bar-enabled* t) 281 (*csv-separator* #\Tab) 283 (*log-timestamp* nil) 285 (log:info! "Welcome to MBDB") 286 (ensure-directories-exist *mbdb-worker-dir* :verbose t) 288 (setf *mbdb-oracle* (make-oracle sb-thread:*current-thread*) 289 *mbdb-tasks* (make-task-pool :oracle *mbdb-oracle*)) 290 (push-worker (sb-thread:make-thread #'mbsamp-fetch) *mbdb-tasks*) 292 (let ((job (make-job))) 293 (push-task (make-instance 'mbdb-task :object #'mbsamp-fetch) job)) 295 ;; (sb-thread:make-thread #'mbsamp-fetch) 297 ;; prepare column family data 299 ;; initialize database 302 (setf (rdb-cfs db) *mbsamp-cfs*) 304 (log:info! "database initialized") 311 (wait-for-threads (task-pool-workers *mbdb-tasks*)) 313 (info! "mbdb stats" (print-stats *mbdb*))