changelog shortlog graph tags branches changeset files revisions annotate raw help

Mercurial > demo / examples/db/mbdb.lisp

changeset 42: 5c58d05abae6
parent: 81b7333f27f8
author: Richard Westhaver <ellis@rwest.io>
date: Thu, 20 Jun 2024 22:31:58 -0400
permissions: -rw-r--r--
description: rm trashfile
1 ;;; examples/mbdb.lisp --- MusicBrainz Database import and analysis
2 
3 ;; This example show how to migrate a set of complex JSON objects and
4 ;; SQL dumps to RocksDB using data from the MusicBrainz database
5 ;; (https://musicbrainz.org/). The files are hosted at
6 ;; https://packy.compiler.company/data
7 
8 ;;; Commentary:
9 
10 ;; The original data is located here:
11 ;; https://data.metabrainz.org/pub/musicbrainz/data/
12 
13 ;; The actual json dumps are quite large (releas.json is 208Gb!), so
14 ;; we provide our own trimmed down sampling. Each file is sampled
15 ;; randomly and individually, so actual linkage data is totally
16 ;; clobbered. If you want to work do some OLAP stuff you will need the
17 ;; full data set which is packaged as mbdump-full.tar.zst.
18 
19 ;; the data prep script is located at ../mbdump-prep.lisp
20 
21 ;; we parsed some of the database schema from the sql files here:
22 ;; https://github.com/metabrainz/musicbrainz-server/tree/master/admin/sql
23 
24 ;;; Code:
25 (in-package :std-user)
26 (defpkg :examples/mbdb
27  (:use :cl :std :dat/json :net/fetch :obj/id :rdb :cli/clap :obj/uuid
28  :sb-concurrency :log :dat/csv :dat/proto :sb-thread)
29  (:import-from :obj/uuid :make-uuid-from-string)
30  (:import-from :cli/progress :with-progress-bar :make-progress-bar
31  :*progress-bar* :*progress-bar-enabled* :update-progress)
32  (:import-from :obj/time :parse-timestring :now :timestamp)
33  (:import-from :log :info! :debug!)
34  (:import-from :obj/uri :parse-uri)
35  (:import-from :rocksdb :load-rocksdb)
36  (:export :main))
37 
38 (in-package :examples/mbdb)
39 
40 (load-rocksdb t)
41 
42 ;;; Vars
43 (declaim (timestamp *mbdb-epoch*))
44 (defvar *mbdb-epoch* (now)
45  "mbdb time of birth.")
46 
47 ;; (defvar *mbdb-logger* (make-logger))
48 
49 (declaim (type pathname *mbdb-path*))
50 (defvar *mbdb-path* #P"/tmp/mbdb/")
51 
52 (defvar *default-mbdb-opts*
53  (let ((opts (default-rdb-opts)))
54  (set-opt opts :enable-statistics 1)
55  opts))
56 
57 (declaim (rdb *mbdb*))
58 (defvar *mbdb* (create-db *mbdb-path* :opts *default-mbdb-opts* :open nil)
59  "The local MusicBrainz database. The default value is an uninitialized
60 instance without any columns. Before use, make sure to open the
61 database and on exit the database must be closed.")
62 
63 (declaim (oracle *mbdb-oracle*))
64 (defvar *mbdb-oracle* (multiple-value-bind (id thread) (make-oracle sb-thread:*current-thread*)
65  (declare (ignore id))
66  thread)
67  "The oracle assigned to the mbdb system, which should usually be the current thread.")
68 
69 (declaim (task-pool *mbdb-tasks*))
70 (defvar *mbdb-tasks* (make-task-pool :oracle-id (oracle-id *mbdb-oracle*))
71  "The mbdb task pool. This object holds a queue of jobs which are
72 dispatched to workers. Results are collected and processed by the
73 oracle.")
74 
75 (defvar *mbsamp-pack-url* "https://packy.compiler.company/data/mbsamp.tar.zst"
76  "Remote location of MusicBrainz ZST-compressed archive filled with TSV
77 files.")
78 
79 (defvar *mbdump-base-url* "https://packy.compiler.company/data/mbdump/"
80  "Remote location of MusicBrainz JSON data files.")
81 
82 (defvar *mbdump-pack-url* "https://packy.compiler.company/data/mbdump.tar.zst"
83  "Remote locaton of MusicBrainz JSON dump pack.")
84 
85 (defvar *mbdb-worker-dir* (merge-pathnames ".import/" *mbdb-path*))
86 
87 (defvar *mbdump-pack* (merge-pathnames "mbdump.tar.zst" *mbdb-worker-dir*))
88 
89 (defvar *mbsamp-pack* (merge-pathnames "mbsamp.tar.zst" *mbdb-worker-dir*))
90 
91 (defvar *mbdump-files* nil) ;; set by MBDB-UNPACK
92 
93 (defvar *mbsamp-files* nil) ;; set by MBDB-UNPACK
94 
95 ;;; Fetch Data
96 (defun mbdump-fetch ()
97  "Download mbdump data pack."
98  (unless (probe-file *mbdump-pack*)
99  (download
100  ;; (parse-uri
101  *mbdump-pack-url*
102  ;; )
103  *mbdump-pack*)))
104 
105 (defun mbsamp-fetch ()
106  (unless (probe-file *mbsamp-pack*)
107  (download *mbsamp-pack-url* *mbsamp-pack*)))
108 
109 (defun mbsamp-unpack ()
110  ;; unpack into mbsamp
111  (let ((out-dir (merge-pathnames "mbsamp/" *mbdb-worker-dir*)))
112  (unless (probe-file out-dir)
113  (sb-ext:run-program "tar" `("-I" "zstd" "-xf" ,(namestring *mbsamp-pack*))
114  :directory *mbdb-worker-dir*
115  :search t
116  :wait t))
117  (setq *mbsamp-files* (directory "/tmp/mbdb/.import/mbsamp/*"))))
118 
119 (defun mbdump-unpack ()
120  ;; unpack into mbsamp
121  (let ((out-dir (merge-pathnames "mbdump/" *mbdb-worker-dir*)))
122  (unless (probe-file out-dir)
123  (sb-ext:run-program "tar" `("-I" "zstd" "-xf" ,(namestring *mbdump-pack*))
124  :directory *mbdb-worker-dir*
125  :search t
126  :wait t))
127  (setq *mbsamp-files* (directory "/tmp/mbdb/.import/mbdump/*"))))
128 
129 #+nil (extract-mbsamp (car (mbsamp-fetch)))
130 
131 ;;; Parsing
132 
133 ;;;; MBSamp
134 (define-constant +mbsamp-null+ "\\N" :test #'string=)
135 
136 (defun nullable (str)
137  (unless (string= +mbsamp-null+ str)
138  (unless (= (length str) 0)
139  str)))
140 
141 (defun proc-key (type)
142  (case (sb-int:keywordicate type)
143  (:id 'make-uuid-from-string)
144  (:url 'parse-uri)
145  (:num 'parse-integer)
146  (:* 'nullable)
147  (t 'identity)))
148 
149 (defun nullable-int (str)
150  (parse-integer str :junk-allowed t))
151 
152 (defun nullable-int* (str)
153  (or (ignore-errors
154  (parse-integer str :junk-allowed t))
155  (nullable str)))
156 
157 (defun nullable-time (str)
158  (obj/time:parse-timestring str :date-time-separator #\Space :fail-on-error nil))
159 
160 (defun nullable-uri (str)
161  (or
162  (ignore-errors
163  (parse-uri str :escape nil))
164  (nullable str)))
165 
166 (defun mbsamp-schema (name &rest list)
167  (cons name list))
168 
169 (defvar *mbsamp-schema-table*
170  (let ((tbl (make-hash-table :test #'equal)))
171  (mapc (lambda (x)
172  (setf (gethash (car x) tbl) (cdr x)))
173  (list
174  (mbsamp-schema
175  "alternative_release_type"
176  #'parse-integer nil #'nullable #'parse-integer nil #'make-uuid-from-string)
177  (mbsamp-schema
178  "artist"
179  #'parse-integer #'make-uuid-from-string nil nil
180  #'nullable-int #'nullable #'nullable #'nullable #'nullable #'nullable
181  #'nullable-int #'nullable-int #'nullable nil #'parse-integer
182  #'nullable-time #'nullable-int #'nullable-int #'nullable)
183  (mbsamp-schema
184  "track"
185  #'parse-integer #'make-uuid-from-string #'parse-integer #'parse-integer
186  #'parse-integer #'nullable-int* nil #'parse-integer #'nullable-int
187  #'parse-integer #'nullable-time #'parse-integer)
188  (mbsamp-schema
189  "recording"
190  #'parse-integer #'make-uuid-from-string nil #'parse-integer
191  #'nullable-int #'nullable-int* #'parse-integer #'nullable-time #'parse-integer)
192  (mbsamp-schema
193  "release"
194  #'parse-integer #'make-uuid-from-string nil nil nil nil nil nil nil nil nil nil nil #'nullable-time)
195  ;; (mbsamp-schema
196  ;; "url"
197  ;; #'parse-integer #'make-uuid-from-string #'nullable-uri #'parse-integer #'nullable-time)
198  (mbsamp-schema
199  "url" ;; 2,3
200  #'parse-integer #'make-uuid-from-string #'nullable-uri nil nil)
201  (mbsamp-schema
202  "url_gid_redirect"
203  #'make-uuid-from-string #'parse-integer #'nullable-time)
204  (mbsamp-schema
205  "tag"
206  #'parse-integer nil #'parse-integer)
207  (mbsamp-schema
208  "genre"
209  #'parse-integer #'make-uuid-from-string nil nil #'parse-integer #'nullable-time)
210  (mbsamp-schema
211  "work"
212  #'parse-integer #'make-uuid-from-string nil #'nullable-int nil #'parse-integer #'nullable-time)
213  (mbsamp-schema
214  "instrument"
215  #'parse-integer #'make-uuid-from-string nil #'nullable-int #'parse-integer #'nullable-time nil nil)
216  ))
217  tbl)
218  "A Hashtable containing the various MusicBrainz table schemas of interest.")
219 
220 (defun get-schema (schema) (gethash schema *mbsamp-schema-table*))
221 
222 (defun extract-mbsamp (schema)
223  "Extract the contents of FILE which is assumed to contain Tab-separated
224 values. Return a 2d array of row(values)."
225  (let ((file (find schema *mbsamp-files* :test #'string= :key #'pathname-name))
226  (map-fns (gethash schema *mbsamp-schema-table*)))
227  (when file
228  (dat/csv:read-csv-file file :header nil :delimiter #\Tab :map-fns map-fns))))
229 
230 (defmacro with-mbsamp-proc (table shape &body vals)
231  (with-gensyms (row i)
232  `(coerce
233  (loop for ,row across ,table
234  for ,i below (length ,table)
235  collect (make-array
236  ,shape
237  :initial-contents
238  (list
239  ,@(mapcar
240  (lambda (v) `(aref ,row ,v))
241  vals))))
242  'vector)))
243 
244 (defmacro def-mbsamp-proc (name &rest vals)
245  (with-gensyms (table)
246  (let ((fn-name (symbolicate "PROC-MBSAMP-" name)))
247  `(defun ,fn-name (,table)
248  ,(format nil "Process rows of ~A mbsamp data." name)
249  (with-mbsamp-proc ,table ,(length vals) ,@vals)))))
250 
251 (defvar *mbsamp-cfs*
252  (vector (make-rdb-cf "url")
253  (make-rdb-cf "genre")
254  (make-rdb-cf "tag")
255  (make-rdb-cf "track")
256  (make-rdb-cf "artist")
257  (make-rdb-cf "work")
258  (make-rdb-cf "recording")
259  (make-rdb-cf "release")
260  (make-rdb-cf "instrument")))
261 
262 (def-mbsamp-proc url 0 1 2)
263 (def-mbsamp-proc genre 0 1 2)
264 (def-mbsamp-proc tag 0 1 2)
265 (def-mbsamp-proc track 0 1 6)
266 (def-mbsamp-proc artist 0 1 2)
267 (def-mbsamp-proc work 0 1 4 6)
268 (def-mbsamp-proc recording 0 1 2 7)
269 (def-mbsamp-proc release 0 1 2 13)
270 (def-mbsamp-proc instrument 0 1 2 5 7)
271 
272 ;;;; MBDump
273 (defun extract-mbdump-file (file)
274  "Extract the contents of a json-dump FILE. Return a json-object."
275  (with-open-file (f file)
276  ;; (sb-impl::with-array-data
277  (loop for x = (json-read f nil)
278  while x
279  collect x)))
280 
281 (defun extract-mbdump-columns (obj)
282  "Extract fields from a json-object, returning a vector of
283  uninitialized column-families which can be created with #'create-cfs.
284 
285 Returns multiple values: the list of columns, the id, and type-id if present."
286  (values
287  (mapcar (lambda (x) (make-rdb-cf (car x))) (json-object-members obj))
288  (make-uuid-from-string (json-getf obj "id"))
289  (when-let ((tid (json-getf obj "type-id")))
290  (make-uuid-from-string tid))))
291 
292 ;;; Tasks
293 (defvar *mbdb-buffer-size* 4096)
294 
295 (defclass mbdb-task (task) ())
296 
297 (defclass mbdb-stage (stage) ())
298 
299 ;;; Main
300 (defmain ()
301  (let ((*default-pathname-defaults* *mbdb-path*)
302  (*progress-bar-enabled* t)
303  (*csv-separator* #\Tab)
304  (*cpus* (num-cpus))
305  (*log-timestamp* nil)
306  (*log-level* :info))
307  (log:info! "Welcome to MBDB")
308  (ensure-directories-exist *mbdb-worker-dir* :verbose t)
309  ;; prepare workers
310  (setq *mbdb-oracle* (make-oracle sb-thread:*current-thread*))
311  (setq *mbdb-tasks* (make-task-pool :oracle-id (oracle-id *mbdb-oracle*)))
312  ;; (make-workers
313  ;; (push-worker (make-thread #'?) *mbdb-tasks*)
314 
315  ;; (with-tasks ())
316 
317  ;; fetch
318  (let ((job (make-job (make-array 2 :fill-pointer 0 :initial-element (make-task) :element-type 'task))))
319  (push-task (make-task #'mbsamp-fetch) job)
320  (push-task (make-task #'mbdump-fetch) job)
321  (push-job job *mbdb-tasks*))
322  ;; unpack
323  (let ((job (make-job (make-array 2 :fill-pointer 0 :initial-element (make-task) :element-type 'task))))
324  (push-task (make-task #'mbsamp-unpack) job)
325  (push-task (make-task #'mbdump-unpack) job)
326  (push-job job *mbdb-tasks*))
327  ;; (sb-thread:make-thread #'mbsamp-fetch)
328 
329  ;; prepare column family data
330 
331  ;; initialize database
332  (with-db (db *mbdb*)
333  (open-db db)
334  (setf (rdb-cfs db) *mbsamp-cfs*)
335  (backfill-opts db)
336  (log:info! "database initialized"))
337  ;; launch tasks
338 
339  ;; wait
340  (unwind-protect
341  (progn
342  (wait-for-threads (task-pool-workers *mbdb-tasks*))
343  ;; summarize
344  (when-let ((stats (print-stats *mbdb*))) (info! "mbdb stats" stats)))
345  ;; close
346  (close-db *mbdb*))))
347