1.1--- a/examples/db/mbdb.lisp Thu Apr 11 18:58:35 2024 -0400
1.2+++ b/examples/db/mbdb.lisp Sun Apr 14 20:48:05 2024 -0400
1.3@@ -5,10 +5,19 @@
1.4 ;; (https://musicbrainz.org/). The files are hosted at
1.5 ;; https://packy.compiler.company/data/mbdump
1.6
1.7+;; we parse some of the database schema from the sql files here:
1.8+;; https://github.com/metabrainz/musicbrainz-server/tree/master/admin/sql
1.9+
1.10 ;;; Code:
1.11 (defpackage :examples/mbdb
1.12- (:use :cl :std :dat/json :net/fetch :obj/id :rdb :cli/clap :obj/uuid)
1.13+ (:use :cl :std :dat/json :net/fetch :obj/id :rdb :cli/clap :obj/uuid
1.14+ :sb-concurrency :log :dat/csv :dat/proto :sb-thread)
1.15+ (:import-from :obj/uuid :make-uuid-from-string)
1.16+ (:import-from :cli/progress :with-progress-bar :make-progress-bar
1.17+ :*progress-bar* :*progress-bar-enabled* :update-progress)
1.18+ (:import-from :obj/time :parse-timestring :now :timestamp)
1.19 (:import-from :log :info! :debug!)
1.20+ (:import-from :obj/uri :parse-uri)
1.21 (:import-from :rocksdb :load-rocksdb)
1.22 (:export :main))
1.23
1.24@@ -16,33 +25,291 @@
1.25
1.26 (load-rocksdb t)
1.27
1.28+;;; Vars
1.29+(declaim (timestamp *mbdb-epoch*))
1.30+(defvar *mbdb-epoch* (now)
1.31+ "mbdb time of birth.")
1.32+
1.33+;; (defvar *mbdb-logger* (make-logger))
1.34+
1.35 (declaim (type pathname *mbdb-path*))
1.36 (defvar *mbdb-path* #P"/tmp/mbdb/")
1.37-(defvar *mbdb* (create-db *mbdb-path* :opts (default-rdb-opts)))
1.38+
1.39+(defvar *default-mbdb-opts*
1.40+ (let ((opts (default-rdb-opts)))
1.41+ (set-opt opts :enable-statistics 1)
1.42+ opts))
1.43+
1.44+(declaim (rdb *mbdb*))
1.45+(defvar *mbdb* (create-db *mbdb-path* :opts *default-mbdb-opts* :open nil)
1.46+ "The local MusicBrainz database. The default value is an uninitialized
1.47+instance without any columns. Before use, make sure to open the
1.48+database and on exit the database must be closed.")
1.49+
1.50+(declaim (oracle *mbdb-oracle*))
1.51+(defvar *mbdb-oracle* (make-oracle sb-thread:*current-thread*)
1.52+ "The oracle assigned to the mbdb system, which should usually be the current thread.")
1.53+
1.54+(declaim (task-pool *mbdb-tasks*))
1.55+(defvar *mbdb-tasks* (make-task-pool :oracle *mbdb-oracle*)
1.56+ "The mbdb task pool. This object holds a queue of jobs which are
1.57+dispatched to workers. Results are collected and processed by the
1.58+oracle.")
1.59+
1.60+(defvar *mbsamp-pack-url* "https://packy.compiler.company/data/mbsamp.tar.zst"
1.61+ "Remote location of MusicBrainz ZST-compressed archive filled with TSV
1.62+files.")
1.63
1.64 (defvar *mbdump-base-url* "https://packy.compiler.company/data/mbdump/"
1.65 "Remote location of MusicBrainz JSON data files.")
1.66
1.67-(defvar *mbdump-files*
1.68- (mapcar (lambda (f) (make-pathname :name f :type "json" :directory *mbdump-base-url*))
1.69- (list "area" "artist" "event" "instrument"
1.70- "label" "place" "recording" "release"
1.71- "release-group" "series" "work")))
1.72+(defvar *mbdump-pack-url* "https://packy.compiler.company/data/mbdump.tar.zst"
1.73+ "Remote locaton of MusicBrainz JSON dump pack.")
1.74+
1.75+(defvar *mbdump-pack* (merge-pathnames "mbdump.tar.zst" *mbdb-worker-dir*))
1.76+(defvar *mbsamp-pack* (merge-pathnames "mbsamp.tar.zst" *mbdb-worker-dir*))
1.77+
1.78+(defvar *mbdb-worker-dir* (merge-pathnames ".import/" *mbdb-path*))
1.79+
1.80+(defvar *mbdump-files* nil) ;; set by MBDB-UNPACK
1.81+
1.82+(defvar *mbsamp-files* nil) ;; set by MBDB-UNPACK
1.83+
1.84+;;; Fetch Data
1.85+(defun mbdump-fetch ()
1.86+ "Download mbdump data pack."
1.87+ (unless (probe-file *mbdump-pack*)
1.88+ (download
1.89+ ;; (parse-uri
1.90+ *mbdump-pack-url*
1.91+ ;; )
1.92+ *mbdump-pack*)))
1.93+
1.94+(defun mbsamp-fetch ()
1.95+ (unless (probe-file *mbsamp-pack*)
1.96+ (download *mbsamp-pack-url* *mbsamp-pack*)))
1.97+
1.98+(defun mbsamp-unpack ()
1.99+ ;; unpack into mbsamp
1.100+ (let ((out-dir (merge-pathnames "mbsamp/" *mbdb-worker-dir*)))
1.101+ (unless (probe-file out-dir)
1.102+ (sb-ext:run-program "tar" `("-I" "zstd" "-xf" ,(namestring *mbsamp-pack*))
1.103+ :directory *mbdb-worker-dir*
1.104+ :search t
1.105+ :wait t))
1.106+ (setq *mbsamp-files* (directory "/tmp/mbdb/.import/mbsamp/*"))))
1.107+
1.108+(defun mbdump-unpack ()
1.109+ ;; unpack into mbsamp
1.110+ (let ((out-dir (merge-pathnames "mbdump/" *mbdb-worker-dir*)))
1.111+ (unless (probe-file out-dir)
1.112+ (sb-ext:run-program "tar" `("-I" "zstd" "-xf" ,(namestring *mbdump-pack*))
1.113+ :directory *mbdb-worker-dir*
1.114+ :search t
1.115+ :wait t))
1.116+ (setq *mbsamp-files* (directory "/tmp/mbdb/.import/mbdump/*"))))
1.117+
1.118+#+nil (extract-mbsamp (car (mbsamp-fetch)))
1.119+
1.120+;;; Parsing
1.121+(define-constant +mbsamp-null+ "\\N" :test #'string=)
1.122+
1.123+(defun nullable (str)
1.124+ (unless (string= +mbsamp-null+ str)
1.125+ (unless (= (length str) 0)
1.126+ str)))
1.127+
1.128+(defun proc-key (type)
1.129+ (case (sb-int:keywordicate type)
1.130+ (:id 'make-uuid-from-string)
1.131+ (:url 'parse-uri)
1.132+ (:num 'parse-integer)
1.133+ (:* 'nullable)
1.134+ (t 'identity)))
1.135+
1.136+(defun nullable-int (str)
1.137+ (parse-integer str :junk-allowed t))
1.138+
1.139+(defun nullable-int* (str)
1.140+ (or (ignore-errors
1.141+ (parse-integer str :junk-allowed t))
1.142+ (nullable str)))
1.143+
1.144+(defun nullable-time (str)
1.145+ (obj/time:parse-timestring str :date-time-separator #\Space :fail-on-error nil))
1.146+
1.147+(defun nullable-uri (str)
1.148+ (or
1.149+ (ignore-errors
1.150+ (parse-uri str :escape nil))
1.151+ (nullable str)))
1.152+
1.153+(defun mbsamp-schema (name &rest list)
1.154+ (cons name list))
1.155
1.156-(defun extract-columns (obj)
1.157- "Extract fields from a JSON-OBJECT, returning a vector of
1.158- uninitialized column-families which can be created with CREATE-CFS.
1.159+(defvar *mbsamp-schema-table*
1.160+ (let ((tbl (make-hash-table :test #'equal)))
1.161+ (mapc (lambda (x)
1.162+ (setf (gethash (car x) tbl) (cdr x)))
1.163+ (list
1.164+ (mbsamp-schema
1.165+ "alternative_release_type"
1.166+ #'parse-integer nil #'nullable #'parse-integer nil #'make-uuid-from-string)
1.167+ (mbsamp-schema
1.168+ "artist"
1.169+ #'parse-integer #'make-uuid-from-string nil nil
1.170+ #'nullable-int #'nullable #'nullable #'nullable #'nullable #'nullable
1.171+ #'nullable-int #'nullable-int #'nullable nil #'parse-integer
1.172+ #'nullable-time #'nullable-int #'nullable-int #'nullable)
1.173+ (mbsamp-schema
1.174+ "track"
1.175+ #'parse-integer #'make-uuid-from-string #'parse-integer #'parse-integer
1.176+ #'parse-integer #'nullable-int* nil #'parse-integer #'nullable-int
1.177+ #'parse-integer #'nullable-time #'parse-integer)
1.178+ (mbsamp-schema
1.179+ "recording"
1.180+ #'parse-integer #'make-uuid-from-string nil #'parse-integer
1.181+ #'nullable-int #'nullable-int* #'parse-integer #'nullable-time #'parse-integer)
1.182+ (mbsamp-schema
1.183+ "release"
1.184+ #'parse-integer #'make-uuid-from-string nil nil nil nil nil nil nil nil nil nil nil #'nullable-time)
1.185+ ;; (mbsamp-schema
1.186+ ;; "url"
1.187+ ;; #'parse-integer #'make-uuid-from-string #'nullable-uri #'parse-integer #'nullable-time)
1.188+ (mbsamp-schema
1.189+ "url" ;; 2,3
1.190+ #'parse-integer #'make-uuid-from-string #'nullable-uri nil nil)
1.191+ (mbsamp-schema
1.192+ "url_gid_redirect"
1.193+ #'make-uuid-from-string #'parse-integer #'nullable-time)
1.194+ (mbsamp-schema
1.195+ "tag"
1.196+ #'parse-integer nil #'parse-integer)
1.197+ (mbsamp-schema
1.198+ "genre"
1.199+ #'parse-integer #'make-uuid-from-string nil nil #'parse-integer #'nullable-time)
1.200+ (mbsamp-schema
1.201+ "work"
1.202+ #'parse-integer #'make-uuid-from-string nil #'nullable-int nil #'parse-integer #'nullable-time)
1.203+ (mbsamp-schema
1.204+ "instrument"
1.205+ #'parse-integer #'make-uuid-from-string nil #'nullable-int #'parse-integer #'nullable-time nil nil)
1.206+ ))
1.207+ tbl)
1.208+ "A Hashtable containing the various MusicBrainz table schemas of interest.")
1.209+
1.210+(defun get-schema (schema) (gethash schema *mbsamp-schema-table*))
1.211+
1.212+(defun extract-mbsamp (schema)
1.213+ "Extract the contents of FILE which is assumed to contain Tab-separated
1.214+values. Return a 2d array of row(values)."
1.215+ (let ((file (find schema *mbsamp-files* :test #'string= :key #'pathname-name))
1.216+ (map-fns (gethash schema *mbsamp-schema-table*)))
1.217+ (when file
1.218+ (dat/csv:read-csv-file file :header nil :delimiter #\Tab :map-fns map-fns))))
1.219+
1.220+(defun extract-mbdump-file (file)
1.221+ "Extract the contents of a json-dump FILE. Return a json-object."
1.222+ (with-open-file (f file)
1.223+ ;; (sb-impl::with-array-data
1.224+ (loop for x = (json-read f nil)
1.225+ while x
1.226+ collect x)))
1.227+
1.228+(defmacro with-mbsamp-proc (table shape &body vals)
1.229+ (with-gensyms (row i)
1.230+ `(coerce
1.231+ (loop for ,row across ,table
1.232+ for ,i below (length ,table)
1.233+ collect (make-array
1.234+ ,shape
1.235+ :initial-contents
1.236+ (list
1.237+ ,@(mapcar
1.238+ (lambda (v) `(aref ,row ,v))
1.239+ vals))))
1.240+ 'vector)))
1.241+
1.242+(defmacro def-mbsamp-proc (name &rest vals)
1.243+ (with-gensyms (table)
1.244+ (let ((fn-name (symbolicate "PROC-MBSAMP-" name)))
1.245+ `(defun ,fn-name (,table)
1.246+ ,(format nil "Process rows of ~A mbsamp data." name)
1.247+ (with-mbsamp-proc ,table ,(length vals) ,@vals)))))
1.248+
1.249+(defvar *mbsamp-cfs*
1.250+ (vector (make-rdb-cf "url")
1.251+ (make-rdb-cf "genre")
1.252+ (make-rdb-cf "tag")
1.253+ (make-rdb-cf "track")
1.254+ (make-rdb-cf "artist")
1.255+ (make-rdb-cf "work")
1.256+ (make-rdb-cf "recording")
1.257+ (make-rdb-cf "release")
1.258+ (make-rdb-cf "instrument")))
1.259+
1.260+(def-mbsamp-proc url 0 1 2)
1.261+(def-mbsamp-proc genre 0 1 2)
1.262+(def-mbsamp-proc tag 0 1 2)
1.263+(def-mbsamp-proc track 0 1 6)
1.264+(def-mbsamp-proc artist 0 1 2)
1.265+(def-mbsamp-proc work 0 1 4 6)
1.266+(def-mbsamp-proc recording 0 1 2 7)
1.267+(def-mbsamp-proc release 0 1 2 13)
1.268+(def-mbsamp-proc instrument 0 1 2 5 7)
1.269+
1.270+(defun extract-mbdump-columns (obj)
1.271+ "Extract fields from a json-object, returning a vector of
1.272+ uninitialized column-families which can be created with #'create-cfs.
1.273
1.274 Returns multiple values: the list of columns, the id, and type-id if present."
1.275 (values
1.276- (mapcar #'car (json-object-members obj))
1.277+ (mapcar (lambda (x) (make-rdb-cf (car x))) (json-object-members obj))
1.278 (make-uuid-from-string (json-getf obj "id"))
1.279 (when-let ((tid (json-getf obj "type-id")))
1.280 (make-uuid-from-string tid))))
1.281
1.282+;;; Tasks
1.283+(defvar *mbdb-buffer-size* 4096)
1.284+
1.285+(defclass mbdb-task (task) ())
1.286+
1.287+;;; Main
1.288 (defmain ()
1.289- (let ((*default-pathname-defaults* (ensure-directories-exist *mbdb-path* :verbose t)))
1.290+ (let ((*default-pathname-defaults* *mbdb-path*)
1.291+ (*progress-bar-enabled* t)
1.292+ (*csv-separator* #\Tab)
1.293+ (*cpus* (num-cpus))
1.294+ (*log-timestamp* nil)
1.295+ (*log-level* :warn))
1.296 (log:info! "Welcome to MBDB")
1.297+ (ensure-directories-exist *mbdb-worker-dir* :verbose t)
1.298+ ;; prepare workers
1.299+ (setf *mbdb-oracle* (make-oracle sb-thread:*current-thread*)
1.300+ *mbdb-tasks* (make-task-pool :oracle *mbdb-oracle*))
1.301+ (push-worker (sb-thread:make-thread #'mbsamp-fetch) *mbdb-tasks*)
1.302+ ;; (with-tasks ())
1.303+ (let ((job (make-job)))
1.304+ (push-task (make-instance 'mbdb-task :object #'mbsamp-fetch) job))
1.305+
1.306+ ;; (sb-thread:make-thread #'mbsamp-fetch)
1.307+
1.308+ ;; prepare column family data
1.309+
1.310+ ;; initialize database
1.311 (with-db (db *mbdb*)
1.312 (open-db db)
1.313- (close-db db))))
1.314+ (setf (rdb-cfs db) *mbsamp-cfs*)
1.315+ ;; (create-cfs db)
1.316+ (log:info! "database initialized")
1.317+ ;;
1.318+ (close-db db))
1.319+
1.320+ ;; launch tasks
1.321+
1.322+ ;; wait
1.323+ (wait-for-threads (task-pool-workers *mbdb-tasks*))
1.324+ ;; summarize
1.325+ (info! "mbdb stats" (print-stats *mbdb*))
1.326+ ;; close
1.327+ ))