changelog shortlog graph tags branches changeset file revisions annotate raw help

Mercurial > demo / examples/db/mbdb.lisp

revision 40: 6b652d7d6663
parent 39: 1ef551e24009
child 41: 81b7333f27f8
     1.1--- a/examples/db/mbdb.lisp	Thu Apr 11 18:58:35 2024 -0400
     1.2+++ b/examples/db/mbdb.lisp	Sun Apr 14 20:48:05 2024 -0400
     1.3@@ -5,10 +5,19 @@
     1.4 ;; (https://musicbrainz.org/). The files are hosted at
     1.5 ;; https://packy.compiler.company/data/mbdump
     1.6 
     1.7+;; we parse some of the database schema from the sql files here:
     1.8+;; https://github.com/metabrainz/musicbrainz-server/tree/master/admin/sql
     1.9+
    1.10 ;;; Code:
    1.11 (defpackage :examples/mbdb
    1.12-  (:use :cl :std :dat/json :net/fetch :obj/id :rdb :cli/clap :obj/uuid)
    1.13+  (:use :cl :std :dat/json :net/fetch :obj/id :rdb :cli/clap :obj/uuid
    1.14+        :sb-concurrency :log :dat/csv :dat/proto :sb-thread)
    1.15+  (:import-from :obj/uuid :make-uuid-from-string)
    1.16+  (:import-from :cli/progress :with-progress-bar :make-progress-bar
    1.17+   :*progress-bar* :*progress-bar-enabled* :update-progress)
    1.18+  (:import-from :obj/time :parse-timestring :now :timestamp)
    1.19   (:import-from :log :info! :debug!)
    1.20+  (:import-from :obj/uri :parse-uri)
    1.21   (:import-from :rocksdb :load-rocksdb)
    1.22   (:export :main))
    1.23 
    1.24@@ -16,33 +25,291 @@
    1.25 
    1.26 (load-rocksdb t)
    1.27 
    1.28+;;; Vars
    1.29+(declaim (timestamp *mbdb-epoch*))
    1.30+(defvar *mbdb-epoch* (now)
    1.31+  "mbdb time of birth.")
    1.32+
    1.33+;; (defvar *mbdb-logger* (make-logger))
    1.34+
    1.35 (declaim (type pathname *mbdb-path*))
    1.36 (defvar *mbdb-path* #P"/tmp/mbdb/")
    1.37-(defvar *mbdb* (create-db *mbdb-path* :opts (default-rdb-opts)))
    1.38+
    1.39+(defvar *default-mbdb-opts*
    1.40+  (let ((opts (default-rdb-opts)))
    1.41+    (set-opt opts :enable-statistics 1)
    1.42+    opts))
    1.43+
    1.44+(declaim (rdb *mbdb*))
    1.45+(defvar *mbdb* (create-db *mbdb-path* :opts *default-mbdb-opts* :open nil)
    1.46+  "The local MusicBrainz database. The default value is an uninitialized
    1.47+instance without any columns. Before use, make sure to open the
    1.48+database and on exit the database must be closed.")
    1.49+
    1.50+(declaim (oracle *mbdb-oracle*))
    1.51+(defvar *mbdb-oracle* (make-oracle sb-thread:*current-thread*)
    1.52+  "The oracle assigned to the mbdb system, which should usually be the current thread.")
    1.53+
    1.54+(declaim (task-pool *mbdb-tasks*))
    1.55+(defvar *mbdb-tasks* (make-task-pool :oracle *mbdb-oracle*)
    1.56+  "The mbdb task pool. This object holds a queue of jobs which are
    1.57+dispatched to workers. Results are collected and processed by the
    1.58+oracle.")
    1.59+
    1.60+(defvar *mbsamp-pack-url* "https://packy.compiler.company/data/mbsamp.tar.zst"
    1.61+  "Remote location of MusicBrainz ZST-compressed archive filled with TSV
    1.62+files.")
    1.63 
    1.64 (defvar *mbdump-base-url* "https://packy.compiler.company/data/mbdump/"
    1.65   "Remote location of MusicBrainz JSON data files.")
    1.66 
    1.67-(defvar *mbdump-files*
    1.68-  (mapcar (lambda (f) (make-pathname :name f :type "json" :directory *mbdump-base-url*))
    1.69-          (list "area" "artist" "event" "instrument"
    1.70-                "label" "place" "recording" "release"
    1.71-                "release-group" "series" "work")))
    1.72+(defvar *mbdump-pack-url* "https://packy.compiler.company/data/mbdump.tar.zst"
    1.73+  "Remote locaton of MusicBrainz JSON dump pack.")
    1.74+
    1.75+(defvar *mbdump-pack* (merge-pathnames "mbdump.tar.zst" *mbdb-worker-dir*))
    1.76+(defvar *mbsamp-pack* (merge-pathnames "mbsamp.tar.zst" *mbdb-worker-dir*))
    1.77+
    1.78+(defvar *mbdb-worker-dir* (merge-pathnames ".import/" *mbdb-path*))
    1.79+
    1.80+(defvar *mbdump-files* nil) ;; set by MBDB-UNPACK
    1.81+
    1.82+(defvar *mbsamp-files* nil) ;; set by MBDB-UNPACK
    1.83+
    1.84+;;; Fetch Data
    1.85+(defun mbdump-fetch ()
    1.86+  "Download mbdump data pack."
    1.87+  (unless (probe-file *mbdump-pack*)
    1.88+    (download
    1.89+     ;; (parse-uri
    1.90+     *mbdump-pack-url*
    1.91+     ;; )
    1.92+     *mbdump-pack*)))
    1.93+
    1.94+(defun mbsamp-fetch ()
    1.95+  (unless (probe-file *mbsamp-pack*)
    1.96+    (download *mbsamp-pack-url* *mbsamp-pack*)))
    1.97+
    1.98+(defun mbsamp-unpack ()
    1.99+  ;; unpack into mbsamp
   1.100+  (let ((out-dir (merge-pathnames "mbsamp/" *mbdb-worker-dir*)))
   1.101+    (unless (probe-file out-dir)
   1.102+      (sb-ext:run-program "tar" `("-I" "zstd" "-xf" ,(namestring *mbsamp-pack*))
   1.103+                          :directory *mbdb-worker-dir*
   1.104+                          :search t
   1.105+                          :wait t))
   1.106+    (setq *mbsamp-files* (directory "/tmp/mbdb/.import/mbsamp/*"))))
   1.107+
   1.108+(defun mbdump-unpack ()
   1.109+  ;; unpack into mbsamp
   1.110+  (let ((out-dir (merge-pathnames "mbdump/" *mbdb-worker-dir*)))
   1.111+    (unless (probe-file out-dir)
   1.112+      (sb-ext:run-program "tar" `("-I" "zstd" "-xf" ,(namestring *mbdump-pack*))
   1.113+                          :directory *mbdb-worker-dir*
   1.114+                          :search t
   1.115+                          :wait t))
   1.116+    (setq *mbsamp-files* (directory "/tmp/mbdb/.import/mbdump/*"))))
   1.117+
   1.118+#+nil (extract-mbsamp (car (mbsamp-fetch)))
   1.119+
   1.120+;;; Parsing
   1.121+(define-constant +mbsamp-null+ "\\N" :test #'string=)
   1.122+
   1.123+(defun nullable (str)
   1.124+  (unless (string= +mbsamp-null+ str)
   1.125+    (unless (= (length str) 0)
   1.126+      str)))
   1.127+
   1.128+(defun proc-key (type)
   1.129+  (case (sb-int:keywordicate type)
   1.130+    (:id 'make-uuid-from-string)
   1.131+    (:url 'parse-uri)
   1.132+    (:num 'parse-integer)
   1.133+    (:*  'nullable)
   1.134+    (t 'identity)))
   1.135+
   1.136+(defun nullable-int (str)
   1.137+  (parse-integer str :junk-allowed t))
   1.138+
   1.139+(defun nullable-int* (str)
   1.140+  (or (ignore-errors
   1.141+       (parse-integer str :junk-allowed t))
   1.142+      (nullable str)))
   1.143+
   1.144+(defun nullable-time (str)
   1.145+  (obj/time:parse-timestring str :date-time-separator #\Space :fail-on-error nil))
   1.146+
   1.147+(defun nullable-uri (str)
   1.148+  (or
   1.149+   (ignore-errors
   1.150+    (parse-uri str :escape nil))
   1.151+   (nullable str)))
   1.152+
   1.153+(defun mbsamp-schema (name &rest list)
   1.154+  (cons name list))
   1.155 
   1.156-(defun extract-columns (obj)
   1.157-  "Extract fields from a JSON-OBJECT, returning a vector of
   1.158-  uninitialized column-families which can be created with CREATE-CFS.
   1.159+(defvar *mbsamp-schema-table*
   1.160+  (let ((tbl (make-hash-table :test #'equal)))
   1.161+    (mapc (lambda (x)
   1.162+            (setf (gethash (car x) tbl) (cdr x)))
   1.163+          (list
   1.164+           (mbsamp-schema
   1.165+            "alternative_release_type"
   1.166+            #'parse-integer nil #'nullable #'parse-integer nil #'make-uuid-from-string)
   1.167+           (mbsamp-schema
   1.168+            "artist"
   1.169+            #'parse-integer #'make-uuid-from-string nil nil
   1.170+            #'nullable-int #'nullable #'nullable #'nullable #'nullable  #'nullable
   1.171+            #'nullable-int #'nullable-int #'nullable nil #'parse-integer
   1.172+            #'nullable-time #'nullable-int #'nullable-int #'nullable)
   1.173+           (mbsamp-schema
   1.174+            "track"
   1.175+            #'parse-integer #'make-uuid-from-string #'parse-integer #'parse-integer
   1.176+            #'parse-integer #'nullable-int* nil #'parse-integer #'nullable-int
   1.177+            #'parse-integer #'nullable-time #'parse-integer)
   1.178+           (mbsamp-schema
   1.179+            "recording"
   1.180+            #'parse-integer #'make-uuid-from-string nil #'parse-integer
   1.181+            #'nullable-int #'nullable-int* #'parse-integer #'nullable-time #'parse-integer)
   1.182+           (mbsamp-schema
   1.183+            "release"
   1.184+            #'parse-integer #'make-uuid-from-string nil nil nil nil nil nil nil nil nil nil nil #'nullable-time)
   1.185+           ;; (mbsamp-schema
   1.186+           ;;  "url"
   1.187+           ;;  #'parse-integer #'make-uuid-from-string #'nullable-uri #'parse-integer #'nullable-time)
   1.188+           (mbsamp-schema
   1.189+            "url" ;; 2,3
   1.190+            #'parse-integer #'make-uuid-from-string #'nullable-uri nil nil)
   1.191+           (mbsamp-schema
   1.192+            "url_gid_redirect"
   1.193+            #'make-uuid-from-string #'parse-integer #'nullable-time)
   1.194+           (mbsamp-schema
   1.195+            "tag"
   1.196+            #'parse-integer nil #'parse-integer)
   1.197+           (mbsamp-schema
   1.198+            "genre"
   1.199+            #'parse-integer #'make-uuid-from-string nil nil #'parse-integer #'nullable-time)
   1.200+           (mbsamp-schema
   1.201+            "work"
   1.202+            #'parse-integer #'make-uuid-from-string nil #'nullable-int nil #'parse-integer #'nullable-time)
   1.203+           (mbsamp-schema
   1.204+            "instrument"
   1.205+            #'parse-integer #'make-uuid-from-string nil #'nullable-int #'parse-integer #'nullable-time nil nil)
   1.206+           ))
   1.207+    tbl)
   1.208+  "A Hashtable containing the various MusicBrainz table schemas of interest.")
   1.209+
   1.210+(defun get-schema (schema) (gethash schema *mbsamp-schema-table*))
   1.211+
   1.212+(defun extract-mbsamp (schema)
   1.213+  "Extract the contents of FILE which is assumed to contain Tab-separated
   1.214+values. Return a 2d array of row(values)."
   1.215+  (let ((file (find schema *mbsamp-files* :test #'string= :key #'pathname-name))
   1.216+        (map-fns (gethash schema *mbsamp-schema-table*)))
   1.217+    (when file
   1.218+      (dat/csv:read-csv-file file :header nil :delimiter #\Tab :map-fns map-fns))))
   1.219+
   1.220+(defun extract-mbdump-file (file)
   1.221+  "Extract the contents of a json-dump FILE. Return a json-object."
   1.222+  (with-open-file (f file)
   1.223+    ;; (sb-impl::with-array-data
   1.224+    (loop for x = (json-read f nil)
   1.225+          while x
   1.226+          collect x)))
   1.227+
   1.228+(defmacro with-mbsamp-proc (table shape &body vals)
   1.229+  (with-gensyms (row i)
   1.230+    `(coerce
   1.231+      (loop for ,row across ,table
   1.232+            for ,i below (length ,table)
   1.233+            collect (make-array
   1.234+                     ,shape
   1.235+                     :initial-contents
   1.236+                     (list
   1.237+                      ,@(mapcar
   1.238+                         (lambda (v) `(aref ,row ,v))
   1.239+                         vals))))
   1.240+      'vector)))
   1.241+
   1.242+(defmacro def-mbsamp-proc (name &rest vals)
   1.243+  (with-gensyms (table)
   1.244+    (let ((fn-name (symbolicate "PROC-MBSAMP-" name)))
   1.245+      `(defun ,fn-name (,table)
   1.246+         ,(format nil "Process rows of ~A mbsamp data." name)
   1.247+         (with-mbsamp-proc ,table ,(length vals) ,@vals)))))
   1.248+
   1.249+(defvar *mbsamp-cfs*
   1.250+  (vector (make-rdb-cf "url")
   1.251+          (make-rdb-cf "genre")
   1.252+          (make-rdb-cf "tag")
   1.253+          (make-rdb-cf "track")
   1.254+          (make-rdb-cf "artist")
   1.255+          (make-rdb-cf "work")
   1.256+          (make-rdb-cf "recording")
   1.257+          (make-rdb-cf "release")
   1.258+          (make-rdb-cf "instrument")))
   1.259+
   1.260+(def-mbsamp-proc url 0 1 2)
   1.261+(def-mbsamp-proc genre 0 1 2)
   1.262+(def-mbsamp-proc tag 0 1 2)
   1.263+(def-mbsamp-proc track 0 1 6)
   1.264+(def-mbsamp-proc artist 0 1 2)
   1.265+(def-mbsamp-proc work 0 1 4 6)
   1.266+(def-mbsamp-proc recording 0 1 2 7)
   1.267+(def-mbsamp-proc release 0 1 2 13)
   1.268+(def-mbsamp-proc instrument 0 1 2 5 7)
   1.269+
   1.270+(defun extract-mbdump-columns (obj)
   1.271+  "Extract fields from a json-object, returning a vector of
   1.272+  uninitialized column-families which can be created with #'create-cfs.
   1.273 
   1.274 Returns multiple values: the list of columns, the id, and type-id if present."
   1.275   (values
   1.276-   (mapcar #'car (json-object-members obj))
   1.277+   (mapcar (lambda (x) (make-rdb-cf (car x))) (json-object-members obj))
   1.278    (make-uuid-from-string (json-getf obj "id"))
   1.279    (when-let ((tid (json-getf obj "type-id")))
   1.280      (make-uuid-from-string tid))))
   1.281 
   1.282+;;; Tasks
   1.283+(defvar *mbdb-buffer-size* 4096)
   1.284+
   1.285+(defclass mbdb-task (task) ())
   1.286+
   1.287+;;; Main
   1.288 (defmain ()
   1.289-  (let ((*default-pathname-defaults* (ensure-directories-exist *mbdb-path* :verbose t)))
   1.290+  (let ((*default-pathname-defaults* *mbdb-path*)
   1.291+        (*progress-bar-enabled* t)
   1.292+        (*csv-separator* #\Tab)
   1.293+        (*cpus* (num-cpus))
   1.294+        (*log-timestamp* nil)
   1.295+        (*log-level* :warn))
   1.296     (log:info! "Welcome to MBDB")
   1.297+    (ensure-directories-exist *mbdb-worker-dir* :verbose t)
   1.298+    ;; prepare workers
   1.299+    (setf *mbdb-oracle* (make-oracle sb-thread:*current-thread*)
   1.300+          *mbdb-tasks* (make-task-pool :oracle *mbdb-oracle*))
   1.301+    (push-worker (sb-thread:make-thread #'mbsamp-fetch) *mbdb-tasks*)
   1.302+    ;; (with-tasks ())
   1.303+    (let ((job (make-job)))
   1.304+      (push-task (make-instance 'mbdb-task :object #'mbsamp-fetch) job))
   1.305+
   1.306+    ;; (sb-thread:make-thread #'mbsamp-fetch)
   1.307+
   1.308+    ;; prepare column family data
   1.309+    
   1.310+    ;; initialize database
   1.311     (with-db (db *mbdb*)
   1.312       (open-db db)
   1.313-      (close-db db))))
   1.314+      (setf (rdb-cfs db) *mbsamp-cfs*)
   1.315+      ;; (create-cfs db)
   1.316+      (log:info! "database initialized")
   1.317+      ;; 
   1.318+      (close-db db))
   1.319+    
   1.320+    ;; launch tasks
   1.321+    
   1.322+    ;; wait
   1.323+    (wait-for-threads (task-pool-workers *mbdb-tasks*))
   1.324+    ;; summarize
   1.325+    (info! "mbdb stats" (print-stats *mbdb*))
   1.326+    ;; close
   1.327+    ))