changeset 44: | 99d4ab4f8d53 |
parent: | 1ef551e24009 |
author: | Richard Westhaver <ellis@rwest.io> |
date: | Sun, 11 Aug 2024 01:50:18 -0400 |
permissions: | -rw-r--r-- |
description: | update |
37 | 1 | ;;; examples/db/mini-redis.lisp --- Mini-Redis client/server |
2 | ||
3 | ;; based on https://github.com/no-defun-allowed/concurrent-hash-tables/blob/master/Examples/phony-redis.lisp |
|
4 | ||
38 | 5 | ;; if the heap gets exhausted you probably want to trying increasing |
6 | ;; the dynamic-space-size at runtime. |
|
7 | ||
37 | 8 | ;;; Code: |
9 | (defpackage :examples/mini-redis |
|
10 | (:use :cl :std :net :obj :cli :sb-concurrency :sb-thread) |
|
38 | 11 | (:export :main)) |
37 | 12 | |
13 | (in-package :examples/mini-redis) |
|
14 | ||
39
1ef551e24009
added musicbrainz db example
Richard Westhaver <ellis@rwest.io>
parents:
38
diff
changeset
|
15 | (defparameter *worker-count* 8) |
38 | 16 | (defparameter *writer-proportion* 0.5) |
17 | (defvar *keys* |
|
18 | (loop for n below 130 by 2 |
|
19 | collect (format nil "~r" n))) |
|
20 | (defvar *other-keys* |
|
21 | (loop for n from 1 below 128 by 2 |
|
22 | collect (format nil "~r" n))) |
|
23 | (defvar *ops* 400000) |
|
24 | ||
37 | 25 | (defun make-server () |
26 | (make-castable :test #'equal)) |
|
27 | ||
28 | (defstruct conn tx rx) |
|
29 | ||
30 | (defun connect-to-server (server) |
|
31 | (let ((tx (make-mailbox)) |
|
32 | (rx (make-mailbox))) |
|
33 | (make-thread |
|
34 | (lambda () |
|
35 | (let ((msg (receive-message tx))) |
|
36 | (loop do |
|
37 | (case (car msg) |
|
38 | (:quit (return)) |
|
39 | (:get |
|
40 | (multiple-value-bind (val p) |
|
38 | 41 | (getchash (cdr msg) server) |
37 | 42 | (if p |
43 | (send-message rx `(:found ,val)) |
|
44 | (send-message rx `(:not-found))))) |
|
45 | (:put |
|
38 | 46 | (setf (getchash (cadr msg) server) |
37 | 47 | (copy-seq (caddr msg))) |
48 | (send-message rx '(:ok))) |
|
49 | (t (return)))))) |
|
50 | :name "mini-redis-conn") |
|
51 | (make-conn :tx tx :rx rx))) |
|
52 | ||
53 | (defun find-val (conn name) |
|
54 | (send-message |
|
55 | (conn-tx conn) |
|
56 | `(:get ,name)) |
|
57 | (let ((rx (receive-message (conn-rx conn)))) |
|
58 | (case (car rx) |
|
59 | (:found |
|
60 | (values (cdr rx) t)) |
|
61 | (:not-found |
|
62 | (values nil nil))))) |
|
63 | ||
64 | (defun (setf find-val) (val conn name) |
|
65 | (send-message |
|
66 | (conn-tx conn) |
|
67 | `(:put ,name ,val)) |
|
68 | (receive-message |
|
69 | (conn-rx conn))) |
|
70 | ||
71 | (defun close-conn (conn) |
|
72 | (send-message |
|
73 | (conn-tx conn) |
|
74 | `(:quit))) |
|
75 | ||
76 | (defun worker (n server |
|
77 | ready start |
|
78 | writer-proportion names) |
|
79 | (declare (optimize (speed 3)) |
|
80 | (single-float writer-proportion)) |
|
81 | (let ((name (elt names n)) |
|
82 | (bitmap (make-array 100 |
|
83 | :element-type '(unsigned-byte 8) |
|
84 | :initial-element 0)) |
|
85 | (conn (connect-to-server server))) |
|
86 | (dotimes (i 100) |
|
87 | (setf (aref bitmap i) |
|
88 | (if (< (random 1.0) writer-proportion) |
|
89 | 1 |
|
90 | 0))) |
|
91 | (signal-semaphore ready) |
|
92 | (wait-on-semaphore start) |
|
93 | (let ((position 0)) |
|
94 | (dotimes (o (the fixnum *ops*)) |
|
95 | (if (zerop (aref bitmap position)) |
|
96 | (find-val conn name) |
|
97 | (setf (find-val conn name) |
|
98 | #(1))) |
|
99 | (setf position (mod (1+ position) 100)))) |
|
100 | (close-conn conn))) |
|
101 | ||
102 | (defun run (&optional (worker-count *worker-count*) |
|
103 | (writer-proportion *writer-proportion*) |
|
104 | (keys *keys*)) |
|
105 | (let* ((ready (make-semaphore :name "ready-threads")) |
|
106 | (start (make-semaphore :name "start-threads")) |
|
107 | (server (make-server)) |
|
108 | (workers (loop for n below worker-count |
|
109 | collect (let ((n n)) |
|
110 | (make-thread |
|
111 | (lambda () |
|
112 | (worker n server |
|
113 | ready start |
|
114 | writer-proportion |
|
115 | keys))))))) |
|
116 | (dotimes (n worker-count) |
|
117 | (wait-on-semaphore ready)) |
|
118 | (let ((start-time (get-internal-real-time))) |
|
119 | (signal-semaphore start worker-count) |
|
120 | (mapc #'join-thread workers) |
|
121 | (let* ((time (float (/ (- (get-internal-real-time) start-time) |
|
122 | internal-time-units-per-second))) |
|
123 | (throughput (/ (* *ops* worker-count) time))) |
|
124 | (format t "~&~20@a: ~$ seconds (~d transactions/second)" |
|
38 | 125 | "mini-redis" time (round throughput)))) |
126 | server)) |
|
37 | 127 | |
128 | (defmain () |
|
38 | 129 | (run)) |