changelog shortlog graph tags branches changeset files revisions annotate raw help

Mercurial > demo / examples/db/mbdb.lisp

changeset 40: 6b652d7d6663
parent: 1ef551e24009
child: 81b7333f27f8
author: Richard Westhaver <ellis@rwest.io>
date: Sun, 14 Apr 2024 20:48:05 -0400
permissions: -rw-r--r--
description: examples
1 ;;; examples/mbdb.lisp --- MusicBrainz Database import and analysis
2 
3 ;; This example show how to migrate a set of complex JSON objects to
4 ;; RocksDB using a dump from the MusicBrainz database
5 ;; (https://musicbrainz.org/). The files are hosted at
6 ;; https://packy.compiler.company/data/mbdump
7 
8 ;; we parse some of the database schema from the sql files here:
9 ;; https://github.com/metabrainz/musicbrainz-server/tree/master/admin/sql
10 
11 ;;; Code:
12 (defpackage :examples/mbdb
13  (:use :cl :std :dat/json :net/fetch :obj/id :rdb :cli/clap :obj/uuid
14  :sb-concurrency :log :dat/csv :dat/proto :sb-thread)
15  (:import-from :obj/uuid :make-uuid-from-string)
16  (:import-from :cli/progress :with-progress-bar :make-progress-bar
17  :*progress-bar* :*progress-bar-enabled* :update-progress)
18  (:import-from :obj/time :parse-timestring :now :timestamp)
19  (:import-from :log :info! :debug!)
20  (:import-from :obj/uri :parse-uri)
21  (:import-from :rocksdb :load-rocksdb)
22  (:export :main))
23 
24 (in-package :examples/mbdb)
25 
26 (load-rocksdb t)
27 
28 ;;; Vars
29 (declaim (timestamp *mbdb-epoch*))
30 (defvar *mbdb-epoch* (now)
31  "mbdb time of birth.")
32 
33 ;; (defvar *mbdb-logger* (make-logger))
34 
35 (declaim (type pathname *mbdb-path*))
36 (defvar *mbdb-path* #P"/tmp/mbdb/")
37 
38 (defvar *default-mbdb-opts*
39  (let ((opts (default-rdb-opts)))
40  (set-opt opts :enable-statistics 1)
41  opts))
42 
43 (declaim (rdb *mbdb*))
44 (defvar *mbdb* (create-db *mbdb-path* :opts *default-mbdb-opts* :open nil)
45  "The local MusicBrainz database. The default value is an uninitialized
46 instance without any columns. Before use, make sure to open the
47 database and on exit the database must be closed.")
48 
49 (declaim (oracle *mbdb-oracle*))
50 (defvar *mbdb-oracle* (make-oracle sb-thread:*current-thread*)
51  "The oracle assigned to the mbdb system, which should usually be the current thread.")
52 
53 (declaim (task-pool *mbdb-tasks*))
54 (defvar *mbdb-tasks* (make-task-pool :oracle *mbdb-oracle*)
55  "The mbdb task pool. This object holds a queue of jobs which are
56 dispatched to workers. Results are collected and processed by the
57 oracle.")
58 
59 (defvar *mbsamp-pack-url* "https://packy.compiler.company/data/mbsamp.tar.zst"
60  "Remote location of MusicBrainz ZST-compressed archive filled with TSV
61 files.")
62 
63 (defvar *mbdump-base-url* "https://packy.compiler.company/data/mbdump/"
64  "Remote location of MusicBrainz JSON data files.")
65 
66 (defvar *mbdump-pack-url* "https://packy.compiler.company/data/mbdump.tar.zst"
67  "Remote locaton of MusicBrainz JSON dump pack.")
68 
69 (defvar *mbdump-pack* (merge-pathnames "mbdump.tar.zst" *mbdb-worker-dir*))
70 (defvar *mbsamp-pack* (merge-pathnames "mbsamp.tar.zst" *mbdb-worker-dir*))
71 
72 (defvar *mbdb-worker-dir* (merge-pathnames ".import/" *mbdb-path*))
73 
74 (defvar *mbdump-files* nil) ;; set by MBDB-UNPACK
75 
76 (defvar *mbsamp-files* nil) ;; set by MBDB-UNPACK
77 
78 ;;; Fetch Data
79 (defun mbdump-fetch ()
80  "Download mbdump data pack."
81  (unless (probe-file *mbdump-pack*)
82  (download
83  ;; (parse-uri
84  *mbdump-pack-url*
85  ;; )
86  *mbdump-pack*)))
87 
88 (defun mbsamp-fetch ()
89  (unless (probe-file *mbsamp-pack*)
90  (download *mbsamp-pack-url* *mbsamp-pack*)))
91 
92 (defun mbsamp-unpack ()
93  ;; unpack into mbsamp
94  (let ((out-dir (merge-pathnames "mbsamp/" *mbdb-worker-dir*)))
95  (unless (probe-file out-dir)
96  (sb-ext:run-program "tar" `("-I" "zstd" "-xf" ,(namestring *mbsamp-pack*))
97  :directory *mbdb-worker-dir*
98  :search t
99  :wait t))
100  (setq *mbsamp-files* (directory "/tmp/mbdb/.import/mbsamp/*"))))
101 
102 (defun mbdump-unpack ()
103  ;; unpack into mbsamp
104  (let ((out-dir (merge-pathnames "mbdump/" *mbdb-worker-dir*)))
105  (unless (probe-file out-dir)
106  (sb-ext:run-program "tar" `("-I" "zstd" "-xf" ,(namestring *mbdump-pack*))
107  :directory *mbdb-worker-dir*
108  :search t
109  :wait t))
110  (setq *mbsamp-files* (directory "/tmp/mbdb/.import/mbdump/*"))))
111 
112 #+nil (extract-mbsamp (car (mbsamp-fetch)))
113 
114 ;;; Parsing
115 (define-constant +mbsamp-null+ "\\N" :test #'string=)
116 
117 (defun nullable (str)
118  (unless (string= +mbsamp-null+ str)
119  (unless (= (length str) 0)
120  str)))
121 
122 (defun proc-key (type)
123  (case (sb-int:keywordicate type)
124  (:id 'make-uuid-from-string)
125  (:url 'parse-uri)
126  (:num 'parse-integer)
127  (:* 'nullable)
128  (t 'identity)))
129 
130 (defun nullable-int (str)
131  (parse-integer str :junk-allowed t))
132 
133 (defun nullable-int* (str)
134  (or (ignore-errors
135  (parse-integer str :junk-allowed t))
136  (nullable str)))
137 
138 (defun nullable-time (str)
139  (obj/time:parse-timestring str :date-time-separator #\Space :fail-on-error nil))
140 
141 (defun nullable-uri (str)
142  (or
143  (ignore-errors
144  (parse-uri str :escape nil))
145  (nullable str)))
146 
147 (defun mbsamp-schema (name &rest list)
148  (cons name list))
149 
150 (defvar *mbsamp-schema-table*
151  (let ((tbl (make-hash-table :test #'equal)))
152  (mapc (lambda (x)
153  (setf (gethash (car x) tbl) (cdr x)))
154  (list
155  (mbsamp-schema
156  "alternative_release_type"
157  #'parse-integer nil #'nullable #'parse-integer nil #'make-uuid-from-string)
158  (mbsamp-schema
159  "artist"
160  #'parse-integer #'make-uuid-from-string nil nil
161  #'nullable-int #'nullable #'nullable #'nullable #'nullable #'nullable
162  #'nullable-int #'nullable-int #'nullable nil #'parse-integer
163  #'nullable-time #'nullable-int #'nullable-int #'nullable)
164  (mbsamp-schema
165  "track"
166  #'parse-integer #'make-uuid-from-string #'parse-integer #'parse-integer
167  #'parse-integer #'nullable-int* nil #'parse-integer #'nullable-int
168  #'parse-integer #'nullable-time #'parse-integer)
169  (mbsamp-schema
170  "recording"
171  #'parse-integer #'make-uuid-from-string nil #'parse-integer
172  #'nullable-int #'nullable-int* #'parse-integer #'nullable-time #'parse-integer)
173  (mbsamp-schema
174  "release"
175  #'parse-integer #'make-uuid-from-string nil nil nil nil nil nil nil nil nil nil nil #'nullable-time)
176  ;; (mbsamp-schema
177  ;; "url"
178  ;; #'parse-integer #'make-uuid-from-string #'nullable-uri #'parse-integer #'nullable-time)
179  (mbsamp-schema
180  "url" ;; 2,3
181  #'parse-integer #'make-uuid-from-string #'nullable-uri nil nil)
182  (mbsamp-schema
183  "url_gid_redirect"
184  #'make-uuid-from-string #'parse-integer #'nullable-time)
185  (mbsamp-schema
186  "tag"
187  #'parse-integer nil #'parse-integer)
188  (mbsamp-schema
189  "genre"
190  #'parse-integer #'make-uuid-from-string nil nil #'parse-integer #'nullable-time)
191  (mbsamp-schema
192  "work"
193  #'parse-integer #'make-uuid-from-string nil #'nullable-int nil #'parse-integer #'nullable-time)
194  (mbsamp-schema
195  "instrument"
196  #'parse-integer #'make-uuid-from-string nil #'nullable-int #'parse-integer #'nullable-time nil nil)
197  ))
198  tbl)
199  "A Hashtable containing the various MusicBrainz table schemas of interest.")
200 
201 (defun get-schema (schema) (gethash schema *mbsamp-schema-table*))
202 
203 (defun extract-mbsamp (schema)
204  "Extract the contents of FILE which is assumed to contain Tab-separated
205 values. Return a 2d array of row(values)."
206  (let ((file (find schema *mbsamp-files* :test #'string= :key #'pathname-name))
207  (map-fns (gethash schema *mbsamp-schema-table*)))
208  (when file
209  (dat/csv:read-csv-file file :header nil :delimiter #\Tab :map-fns map-fns))))
210 
211 (defun extract-mbdump-file (file)
212  "Extract the contents of a json-dump FILE. Return a json-object."
213  (with-open-file (f file)
214  ;; (sb-impl::with-array-data
215  (loop for x = (json-read f nil)
216  while x
217  collect x)))
218 
219 (defmacro with-mbsamp-proc (table shape &body vals)
220  (with-gensyms (row i)
221  `(coerce
222  (loop for ,row across ,table
223  for ,i below (length ,table)
224  collect (make-array
225  ,shape
226  :initial-contents
227  (list
228  ,@(mapcar
229  (lambda (v) `(aref ,row ,v))
230  vals))))
231  'vector)))
232 
233 (defmacro def-mbsamp-proc (name &rest vals)
234  (with-gensyms (table)
235  (let ((fn-name (symbolicate "PROC-MBSAMP-" name)))
236  `(defun ,fn-name (,table)
237  ,(format nil "Process rows of ~A mbsamp data." name)
238  (with-mbsamp-proc ,table ,(length vals) ,@vals)))))
239 
240 (defvar *mbsamp-cfs*
241  (vector (make-rdb-cf "url")
242  (make-rdb-cf "genre")
243  (make-rdb-cf "tag")
244  (make-rdb-cf "track")
245  (make-rdb-cf "artist")
246  (make-rdb-cf "work")
247  (make-rdb-cf "recording")
248  (make-rdb-cf "release")
249  (make-rdb-cf "instrument")))
250 
251 (def-mbsamp-proc url 0 1 2)
252 (def-mbsamp-proc genre 0 1 2)
253 (def-mbsamp-proc tag 0 1 2)
254 (def-mbsamp-proc track 0 1 6)
255 (def-mbsamp-proc artist 0 1 2)
256 (def-mbsamp-proc work 0 1 4 6)
257 (def-mbsamp-proc recording 0 1 2 7)
258 (def-mbsamp-proc release 0 1 2 13)
259 (def-mbsamp-proc instrument 0 1 2 5 7)
260 
261 (defun extract-mbdump-columns (obj)
262  "Extract fields from a json-object, returning a vector of
263  uninitialized column-families which can be created with #'create-cfs.
264 
265 Returns multiple values: the list of columns, the id, and type-id if present."
266  (values
267  (mapcar (lambda (x) (make-rdb-cf (car x))) (json-object-members obj))
268  (make-uuid-from-string (json-getf obj "id"))
269  (when-let ((tid (json-getf obj "type-id")))
270  (make-uuid-from-string tid))))
271 
272 ;;; Tasks
273 (defvar *mbdb-buffer-size* 4096)
274 
275 (defclass mbdb-task (task) ())
276 
277 ;;; Main
278 (defmain ()
279  (let ((*default-pathname-defaults* *mbdb-path*)
280  (*progress-bar-enabled* t)
281  (*csv-separator* #\Tab)
282  (*cpus* (num-cpus))
283  (*log-timestamp* nil)
284  (*log-level* :warn))
285  (log:info! "Welcome to MBDB")
286  (ensure-directories-exist *mbdb-worker-dir* :verbose t)
287  ;; prepare workers
288  (setf *mbdb-oracle* (make-oracle sb-thread:*current-thread*)
289  *mbdb-tasks* (make-task-pool :oracle *mbdb-oracle*))
290  (push-worker (sb-thread:make-thread #'mbsamp-fetch) *mbdb-tasks*)
291  ;; (with-tasks ())
292  (let ((job (make-job)))
293  (push-task (make-instance 'mbdb-task :object #'mbsamp-fetch) job))
294 
295  ;; (sb-thread:make-thread #'mbsamp-fetch)
296 
297  ;; prepare column family data
298 
299  ;; initialize database
300  (with-db (db *mbdb*)
301  (open-db db)
302  (setf (rdb-cfs db) *mbsamp-cfs*)
303  ;; (create-cfs db)
304  (log:info! "database initialized")
305  ;;
306  (close-db db))
307 
308  ;; launch tasks
309 
310  ;; wait
311  (wait-for-threads (task-pool-workers *mbdb-tasks*))
312  ;; summarize
313  (info! "mbdb stats" (print-stats *mbdb*))
314  ;; close
315  ))