changelog shortlog graph tags branches changeset file revisions annotate raw help

Mercurial > demo / examples/db/mbdb.lisp

revision 41: 81b7333f27f8
parent 40: 6b652d7d6663
child 42: 5c58d05abae6
     1.1--- a/examples/db/mbdb.lisp	Sun Apr 14 20:48:05 2024 -0400
     1.2+++ b/examples/db/mbdb.lisp	Sun Jun 16 22:15:04 2024 -0400
     1.3@@ -1,15 +1,29 @@
     1.4 ;;; examples/mbdb.lisp --- MusicBrainz Database import and analysis
     1.5 
     1.6-;; This example show how to migrate a set of complex JSON objects to
     1.7-;; RocksDB using a dump from the MusicBrainz database
     1.8+;; This example show how to migrate a set of complex JSON objects and
     1.9+;; SQL dumps to RocksDB using data from the MusicBrainz database
    1.10 ;; (https://musicbrainz.org/). The files are hosted at
    1.11-;; https://packy.compiler.company/data/mbdump
    1.12+;; https://packy.compiler.company/data
    1.13+
    1.14+;;; Commentary:
    1.15+
    1.16+;; The original data is located here:
    1.17+;; https://data.metabrainz.org/pub/musicbrainz/data/
    1.18 
    1.19-;; we parse some of the database schema from the sql files here:
    1.20+;; The actual json dumps are quite large (releas.json is 208Gb!), so
    1.21+;; we provide our own trimmed down sampling. Each file is sampled
    1.22+;; randomly and individually, so actual linkage data is totally
    1.23+;; clobbered. If you want to work do some OLAP stuff you will need the
    1.24+;; full data set which is packaged as mbdump-full.tar.zst.
    1.25+
    1.26+;; the data prep script is located at ../mbdump-prep.lisp
    1.27+
    1.28+;; we parsed some of the database schema from the sql files here:
    1.29 ;; https://github.com/metabrainz/musicbrainz-server/tree/master/admin/sql
    1.30 
    1.31 ;;; Code:
    1.32-(defpackage :examples/mbdb
    1.33+(in-package :std-user)
    1.34+(defpkg :examples/mbdb
    1.35   (:use :cl :std :dat/json :net/fetch :obj/id :rdb :cli/clap :obj/uuid
    1.36         :sb-concurrency :log :dat/csv :dat/proto :sb-thread)
    1.37   (:import-from :obj/uuid :make-uuid-from-string)
    1.38@@ -51,7 +65,7 @@
    1.39   "The oracle assigned to the mbdb system, which should usually be the current thread.")
    1.40 
    1.41 (declaim (task-pool *mbdb-tasks*))
    1.42-(defvar *mbdb-tasks* (make-task-pool :oracle *mbdb-oracle*)
    1.43+(defvar *mbdb-tasks* (make-task-pool :oracle-id (oracle-id *mbdb-oracle*))
    1.44   "The mbdb task pool. This object holds a queue of jobs which are
    1.45 dispatched to workers. Results are collected and processed by the
    1.46 oracle.")
    1.47@@ -66,11 +80,12 @@
    1.48 (defvar *mbdump-pack-url* "https://packy.compiler.company/data/mbdump.tar.zst"
    1.49   "Remote locaton of MusicBrainz JSON dump pack.")
    1.50 
    1.51+(defvar *mbdb-worker-dir* (merge-pathnames ".import/" *mbdb-path*))
    1.52+
    1.53 (defvar *mbdump-pack* (merge-pathnames "mbdump.tar.zst" *mbdb-worker-dir*))
    1.54+
    1.55 (defvar *mbsamp-pack* (merge-pathnames "mbsamp.tar.zst" *mbdb-worker-dir*))
    1.56 
    1.57-(defvar *mbdb-worker-dir* (merge-pathnames ".import/" *mbdb-path*))
    1.58-
    1.59 (defvar *mbdump-files* nil) ;; set by MBDB-UNPACK
    1.60 
    1.61 (defvar *mbsamp-files* nil) ;; set by MBDB-UNPACK
    1.62@@ -112,6 +127,8 @@
    1.63 #+nil (extract-mbsamp (car (mbsamp-fetch)))
    1.64 
    1.65 ;;; Parsing
    1.66+
    1.67+;;;; MBSamp
    1.68 (define-constant +mbsamp-null+ "\\N" :test #'string=)
    1.69 
    1.70 (defun nullable (str)
    1.71@@ -208,14 +225,6 @@
    1.72     (when file
    1.73       (dat/csv:read-csv-file file :header nil :delimiter #\Tab :map-fns map-fns))))
    1.74 
    1.75-(defun extract-mbdump-file (file)
    1.76-  "Extract the contents of a json-dump FILE. Return a json-object."
    1.77-  (with-open-file (f file)
    1.78-    ;; (sb-impl::with-array-data
    1.79-    (loop for x = (json-read f nil)
    1.80-          while x
    1.81-          collect x)))
    1.82-
    1.83 (defmacro with-mbsamp-proc (table shape &body vals)
    1.84   (with-gensyms (row i)
    1.85     `(coerce
    1.86@@ -258,6 +267,15 @@
    1.87 (def-mbsamp-proc release 0 1 2 13)
    1.88 (def-mbsamp-proc instrument 0 1 2 5 7)
    1.89 
    1.90+;;;; MBDump
    1.91+(defun extract-mbdump-file (file)
    1.92+  "Extract the contents of a json-dump FILE. Return a json-object."
    1.93+  (with-open-file (f file)
    1.94+    ;; (sb-impl::with-array-data
    1.95+    (loop for x = (json-read f nil)
    1.96+          while x
    1.97+          collect x)))
    1.98+
    1.99 (defun extract-mbdump-columns (obj)
   1.100   "Extract fields from a json-object, returning a vector of
   1.101   uninitialized column-families which can be created with #'create-cfs.
   1.102@@ -274,6 +292,8 @@
   1.103 
   1.104 (defclass mbdb-task (task) ())
   1.105 
   1.106+(defclass mbdb-stage (stage) ())
   1.107+
   1.108 ;;; Main
   1.109 (defmain ()
   1.110   (let ((*default-pathname-defaults* *mbdb-path*)
   1.111@@ -281,17 +301,27 @@
   1.112         (*csv-separator* #\Tab)
   1.113         (*cpus* (num-cpus))
   1.114         (*log-timestamp* nil)
   1.115-        (*log-level* :warn))
   1.116+        (*log-level* :info))
   1.117     (log:info! "Welcome to MBDB")
   1.118     (ensure-directories-exist *mbdb-worker-dir* :verbose t)
   1.119     ;; prepare workers
   1.120-    (setf *mbdb-oracle* (make-oracle sb-thread:*current-thread*)
   1.121-          *mbdb-tasks* (make-task-pool :oracle *mbdb-oracle*))
   1.122-    (push-worker (sb-thread:make-thread #'mbsamp-fetch) *mbdb-tasks*)
   1.123+    (setq *mbdb-oracle* (make-oracle sb-thread:*current-thread*))
   1.124+    (setq *mbdb-tasks* (make-task-pool :oracle-id (oracle-id *mbdb-oracle*)))
   1.125+    ;; (make-workers
   1.126+    ;; (push-worker (make-thread #'?) *mbdb-tasks*)
   1.127+
   1.128     ;; (with-tasks ())
   1.129-    (let ((job (make-job)))
   1.130-      (push-task (make-instance 'mbdb-task :object #'mbsamp-fetch) job))
   1.131 
   1.132+    ;; fetch
   1.133+    (let ((job (make-job (make-array 2 :fill-pointer 0 :initial-element (make-task) :element-type 'task))))
   1.134+      (push-task (make-task #'mbsamp-fetch) job)
   1.135+      (push-task (make-task #'mbdump-fetch) job)
   1.136+      (push-job job *mbdb-tasks*))
   1.137+    ;; unpack
   1.138+    (let ((job (make-job (make-array 2 :fill-pointer 0 :initial-element (make-task) :element-type 'task))))
   1.139+      (push-task (make-task #'mbsamp-unpack) job)
   1.140+      (push-task (make-task #'mbdump-unpack) job)
   1.141+      (push-job job *mbdb-tasks*))
   1.142     ;; (sb-thread:make-thread #'mbsamp-fetch)
   1.143 
   1.144     ;; prepare column family data
   1.145@@ -300,16 +330,16 @@
   1.146     (with-db (db *mbdb*)
   1.147       (open-db db)
   1.148       (setf (rdb-cfs db) *mbsamp-cfs*)
   1.149-      ;; (create-cfs db)
   1.150-      (log:info! "database initialized")
   1.151-      ;; 
   1.152-      (close-db db))
   1.153-    
   1.154+      (backfill-opts db)
   1.155+      (log:info! "database initialized"))
   1.156     ;; launch tasks
   1.157     
   1.158     ;; wait
   1.159-    (wait-for-threads (task-pool-workers *mbdb-tasks*))
   1.160-    ;; summarize
   1.161-    (info! "mbdb stats" (print-stats *mbdb*))
   1.162-    ;; close
   1.163-    ))
   1.164+    (unwind-protect
   1.165+         (progn
   1.166+           (wait-for-threads (task-pool-workers *mbdb-tasks*))
   1.167+           ;; summarize
   1.168+           (when-let ((stats (print-stats *mbdb*))) (info! "mbdb stats" stats)))
   1.169+      ;; close
   1.170+      (close-db *mbdb*))))
   1.171+