changelog shortlog graph tags branches changeset files revisions annotate raw help

Mercurial > demo / examples/db/mbdb.lisp

changeset 41: 81b7333f27f8
parent: 6b652d7d6663
child: 5c58d05abae6
author: Richard Westhaver <ellis@rwest.io>
date: Sun, 16 Jun 2024 22:15:04 -0400
permissions: -rw-r--r--
description: more examples
1 ;;; examples/mbdb.lisp --- MusicBrainz Database import and analysis
2 
3 ;; This example show how to migrate a set of complex JSON objects and
4 ;; SQL dumps to RocksDB using data from the MusicBrainz database
5 ;; (https://musicbrainz.org/). The files are hosted at
6 ;; https://packy.compiler.company/data
7 
8 ;;; Commentary:
9 
10 ;; The original data is located here:
11 ;; https://data.metabrainz.org/pub/musicbrainz/data/
12 
13 ;; The actual json dumps are quite large (releas.json is 208Gb!), so
14 ;; we provide our own trimmed down sampling. Each file is sampled
15 ;; randomly and individually, so actual linkage data is totally
16 ;; clobbered. If you want to work do some OLAP stuff you will need the
17 ;; full data set which is packaged as mbdump-full.tar.zst.
18 
19 ;; the data prep script is located at ../mbdump-prep.lisp
20 
21 ;; we parsed some of the database schema from the sql files here:
22 ;; https://github.com/metabrainz/musicbrainz-server/tree/master/admin/sql
23 
24 ;;; Code:
25 (in-package :std-user)
26 (defpkg :examples/mbdb
27  (:use :cl :std :dat/json :net/fetch :obj/id :rdb :cli/clap :obj/uuid
28  :sb-concurrency :log :dat/csv :dat/proto :sb-thread)
29  (:import-from :obj/uuid :make-uuid-from-string)
30  (:import-from :cli/progress :with-progress-bar :make-progress-bar
31  :*progress-bar* :*progress-bar-enabled* :update-progress)
32  (:import-from :obj/time :parse-timestring :now :timestamp)
33  (:import-from :log :info! :debug!)
34  (:import-from :obj/uri :parse-uri)
35  (:import-from :rocksdb :load-rocksdb)
36  (:export :main))
37 
38 (in-package :examples/mbdb)
39 
40 (load-rocksdb t)
41 
42 ;;; Vars
43 (declaim (timestamp *mbdb-epoch*))
44 (defvar *mbdb-epoch* (now)
45  "mbdb time of birth.")
46 
47 ;; (defvar *mbdb-logger* (make-logger))
48 
49 (declaim (type pathname *mbdb-path*))
50 (defvar *mbdb-path* #P"/tmp/mbdb/")
51 
52 (defvar *default-mbdb-opts*
53  (let ((opts (default-rdb-opts)))
54  (set-opt opts :enable-statistics 1)
55  opts))
56 
57 (declaim (rdb *mbdb*))
58 (defvar *mbdb* (create-db *mbdb-path* :opts *default-mbdb-opts* :open nil)
59  "The local MusicBrainz database. The default value is an uninitialized
60 instance without any columns. Before use, make sure to open the
61 database and on exit the database must be closed.")
62 
63 (declaim (oracle *mbdb-oracle*))
64 (defvar *mbdb-oracle* (make-oracle sb-thread:*current-thread*)
65  "The oracle assigned to the mbdb system, which should usually be the current thread.")
66 
67 (declaim (task-pool *mbdb-tasks*))
68 (defvar *mbdb-tasks* (make-task-pool :oracle-id (oracle-id *mbdb-oracle*))
69  "The mbdb task pool. This object holds a queue of jobs which are
70 dispatched to workers. Results are collected and processed by the
71 oracle.")
72 
73 (defvar *mbsamp-pack-url* "https://packy.compiler.company/data/mbsamp.tar.zst"
74  "Remote location of MusicBrainz ZST-compressed archive filled with TSV
75 files.")
76 
77 (defvar *mbdump-base-url* "https://packy.compiler.company/data/mbdump/"
78  "Remote location of MusicBrainz JSON data files.")
79 
80 (defvar *mbdump-pack-url* "https://packy.compiler.company/data/mbdump.tar.zst"
81  "Remote locaton of MusicBrainz JSON dump pack.")
82 
83 (defvar *mbdb-worker-dir* (merge-pathnames ".import/" *mbdb-path*))
84 
85 (defvar *mbdump-pack* (merge-pathnames "mbdump.tar.zst" *mbdb-worker-dir*))
86 
87 (defvar *mbsamp-pack* (merge-pathnames "mbsamp.tar.zst" *mbdb-worker-dir*))
88 
89 (defvar *mbdump-files* nil) ;; set by MBDB-UNPACK
90 
91 (defvar *mbsamp-files* nil) ;; set by MBDB-UNPACK
92 
93 ;;; Fetch Data
94 (defun mbdump-fetch ()
95  "Download mbdump data pack."
96  (unless (probe-file *mbdump-pack*)
97  (download
98  ;; (parse-uri
99  *mbdump-pack-url*
100  ;; )
101  *mbdump-pack*)))
102 
103 (defun mbsamp-fetch ()
104  (unless (probe-file *mbsamp-pack*)
105  (download *mbsamp-pack-url* *mbsamp-pack*)))
106 
107 (defun mbsamp-unpack ()
108  ;; unpack into mbsamp
109  (let ((out-dir (merge-pathnames "mbsamp/" *mbdb-worker-dir*)))
110  (unless (probe-file out-dir)
111  (sb-ext:run-program "tar" `("-I" "zstd" "-xf" ,(namestring *mbsamp-pack*))
112  :directory *mbdb-worker-dir*
113  :search t
114  :wait t))
115  (setq *mbsamp-files* (directory "/tmp/mbdb/.import/mbsamp/*"))))
116 
117 (defun mbdump-unpack ()
118  ;; unpack into mbsamp
119  (let ((out-dir (merge-pathnames "mbdump/" *mbdb-worker-dir*)))
120  (unless (probe-file out-dir)
121  (sb-ext:run-program "tar" `("-I" "zstd" "-xf" ,(namestring *mbdump-pack*))
122  :directory *mbdb-worker-dir*
123  :search t
124  :wait t))
125  (setq *mbsamp-files* (directory "/tmp/mbdb/.import/mbdump/*"))))
126 
127 #+nil (extract-mbsamp (car (mbsamp-fetch)))
128 
129 ;;; Parsing
130 
131 ;;;; MBSamp
132 (define-constant +mbsamp-null+ "\\N" :test #'string=)
133 
134 (defun nullable (str)
135  (unless (string= +mbsamp-null+ str)
136  (unless (= (length str) 0)
137  str)))
138 
139 (defun proc-key (type)
140  (case (sb-int:keywordicate type)
141  (:id 'make-uuid-from-string)
142  (:url 'parse-uri)
143  (:num 'parse-integer)
144  (:* 'nullable)
145  (t 'identity)))
146 
147 (defun nullable-int (str)
148  (parse-integer str :junk-allowed t))
149 
150 (defun nullable-int* (str)
151  (or (ignore-errors
152  (parse-integer str :junk-allowed t))
153  (nullable str)))
154 
155 (defun nullable-time (str)
156  (obj/time:parse-timestring str :date-time-separator #\Space :fail-on-error nil))
157 
158 (defun nullable-uri (str)
159  (or
160  (ignore-errors
161  (parse-uri str :escape nil))
162  (nullable str)))
163 
164 (defun mbsamp-schema (name &rest list)
165  (cons name list))
166 
167 (defvar *mbsamp-schema-table*
168  (let ((tbl (make-hash-table :test #'equal)))
169  (mapc (lambda (x)
170  (setf (gethash (car x) tbl) (cdr x)))
171  (list
172  (mbsamp-schema
173  "alternative_release_type"
174  #'parse-integer nil #'nullable #'parse-integer nil #'make-uuid-from-string)
175  (mbsamp-schema
176  "artist"
177  #'parse-integer #'make-uuid-from-string nil nil
178  #'nullable-int #'nullable #'nullable #'nullable #'nullable #'nullable
179  #'nullable-int #'nullable-int #'nullable nil #'parse-integer
180  #'nullable-time #'nullable-int #'nullable-int #'nullable)
181  (mbsamp-schema
182  "track"
183  #'parse-integer #'make-uuid-from-string #'parse-integer #'parse-integer
184  #'parse-integer #'nullable-int* nil #'parse-integer #'nullable-int
185  #'parse-integer #'nullable-time #'parse-integer)
186  (mbsamp-schema
187  "recording"
188  #'parse-integer #'make-uuid-from-string nil #'parse-integer
189  #'nullable-int #'nullable-int* #'parse-integer #'nullable-time #'parse-integer)
190  (mbsamp-schema
191  "release"
192  #'parse-integer #'make-uuid-from-string nil nil nil nil nil nil nil nil nil nil nil #'nullable-time)
193  ;; (mbsamp-schema
194  ;; "url"
195  ;; #'parse-integer #'make-uuid-from-string #'nullable-uri #'parse-integer #'nullable-time)
196  (mbsamp-schema
197  "url" ;; 2,3
198  #'parse-integer #'make-uuid-from-string #'nullable-uri nil nil)
199  (mbsamp-schema
200  "url_gid_redirect"
201  #'make-uuid-from-string #'parse-integer #'nullable-time)
202  (mbsamp-schema
203  "tag"
204  #'parse-integer nil #'parse-integer)
205  (mbsamp-schema
206  "genre"
207  #'parse-integer #'make-uuid-from-string nil nil #'parse-integer #'nullable-time)
208  (mbsamp-schema
209  "work"
210  #'parse-integer #'make-uuid-from-string nil #'nullable-int nil #'parse-integer #'nullable-time)
211  (mbsamp-schema
212  "instrument"
213  #'parse-integer #'make-uuid-from-string nil #'nullable-int #'parse-integer #'nullable-time nil nil)
214  ))
215  tbl)
216  "A Hashtable containing the various MusicBrainz table schemas of interest.")
217 
218 (defun get-schema (schema) (gethash schema *mbsamp-schema-table*))
219 
220 (defun extract-mbsamp (schema)
221  "Extract the contents of FILE which is assumed to contain Tab-separated
222 values. Return a 2d array of row(values)."
223  (let ((file (find schema *mbsamp-files* :test #'string= :key #'pathname-name))
224  (map-fns (gethash schema *mbsamp-schema-table*)))
225  (when file
226  (dat/csv:read-csv-file file :header nil :delimiter #\Tab :map-fns map-fns))))
227 
228 (defmacro with-mbsamp-proc (table shape &body vals)
229  (with-gensyms (row i)
230  `(coerce
231  (loop for ,row across ,table
232  for ,i below (length ,table)
233  collect (make-array
234  ,shape
235  :initial-contents
236  (list
237  ,@(mapcar
238  (lambda (v) `(aref ,row ,v))
239  vals))))
240  'vector)))
241 
242 (defmacro def-mbsamp-proc (name &rest vals)
243  (with-gensyms (table)
244  (let ((fn-name (symbolicate "PROC-MBSAMP-" name)))
245  `(defun ,fn-name (,table)
246  ,(format nil "Process rows of ~A mbsamp data." name)
247  (with-mbsamp-proc ,table ,(length vals) ,@vals)))))
248 
249 (defvar *mbsamp-cfs*
250  (vector (make-rdb-cf "url")
251  (make-rdb-cf "genre")
252  (make-rdb-cf "tag")
253  (make-rdb-cf "track")
254  (make-rdb-cf "artist")
255  (make-rdb-cf "work")
256  (make-rdb-cf "recording")
257  (make-rdb-cf "release")
258  (make-rdb-cf "instrument")))
259 
260 (def-mbsamp-proc url 0 1 2)
261 (def-mbsamp-proc genre 0 1 2)
262 (def-mbsamp-proc tag 0 1 2)
263 (def-mbsamp-proc track 0 1 6)
264 (def-mbsamp-proc artist 0 1 2)
265 (def-mbsamp-proc work 0 1 4 6)
266 (def-mbsamp-proc recording 0 1 2 7)
267 (def-mbsamp-proc release 0 1 2 13)
268 (def-mbsamp-proc instrument 0 1 2 5 7)
269 
270 ;;;; MBDump
271 (defun extract-mbdump-file (file)
272  "Extract the contents of a json-dump FILE. Return a json-object."
273  (with-open-file (f file)
274  ;; (sb-impl::with-array-data
275  (loop for x = (json-read f nil)
276  while x
277  collect x)))
278 
279 (defun extract-mbdump-columns (obj)
280  "Extract fields from a json-object, returning a vector of
281  uninitialized column-families which can be created with #'create-cfs.
282 
283 Returns multiple values: the list of columns, the id, and type-id if present."
284  (values
285  (mapcar (lambda (x) (make-rdb-cf (car x))) (json-object-members obj))
286  (make-uuid-from-string (json-getf obj "id"))
287  (when-let ((tid (json-getf obj "type-id")))
288  (make-uuid-from-string tid))))
289 
290 ;;; Tasks
291 (defvar *mbdb-buffer-size* 4096)
292 
293 (defclass mbdb-task (task) ())
294 
295 (defclass mbdb-stage (stage) ())
296 
297 ;;; Main
298 (defmain ()
299  (let ((*default-pathname-defaults* *mbdb-path*)
300  (*progress-bar-enabled* t)
301  (*csv-separator* #\Tab)
302  (*cpus* (num-cpus))
303  (*log-timestamp* nil)
304  (*log-level* :info))
305  (log:info! "Welcome to MBDB")
306  (ensure-directories-exist *mbdb-worker-dir* :verbose t)
307  ;; prepare workers
308  (setq *mbdb-oracle* (make-oracle sb-thread:*current-thread*))
309  (setq *mbdb-tasks* (make-task-pool :oracle-id (oracle-id *mbdb-oracle*)))
310  ;; (make-workers
311  ;; (push-worker (make-thread #'?) *mbdb-tasks*)
312 
313  ;; (with-tasks ())
314 
315  ;; fetch
316  (let ((job (make-job (make-array 2 :fill-pointer 0 :initial-element (make-task) :element-type 'task))))
317  (push-task (make-task #'mbsamp-fetch) job)
318  (push-task (make-task #'mbdump-fetch) job)
319  (push-job job *mbdb-tasks*))
320  ;; unpack
321  (let ((job (make-job (make-array 2 :fill-pointer 0 :initial-element (make-task) :element-type 'task))))
322  (push-task (make-task #'mbsamp-unpack) job)
323  (push-task (make-task #'mbdump-unpack) job)
324  (push-job job *mbdb-tasks*))
325  ;; (sb-thread:make-thread #'mbsamp-fetch)
326 
327  ;; prepare column family data
328 
329  ;; initialize database
330  (with-db (db *mbdb*)
331  (open-db db)
332  (setf (rdb-cfs db) *mbsamp-cfs*)
333  (backfill-opts db)
334  (log:info! "database initialized"))
335  ;; launch tasks
336 
337  ;; wait
338  (unwind-protect
339  (progn
340  (wait-for-threads (task-pool-workers *mbdb-tasks*))
341  ;; summarize
342  (when-let ((stats (print-stats *mbdb*))) (info! "mbdb stats" stats)))
343  ;; close
344  (close-db *mbdb*))))
345