1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
|
-- Copyright 2023 Google LLC
--
-- Use of this source code is governed by a BSD-style
-- license that can be found in the LICENSE file or at
-- https://developers.google.com/open-source/licenses/bsd
{-# LANGUAGE UndecidableInstances #-}
module Actor (
ActorM, Actor (..), launchActor, send, selfMailbox, messageLoop,
sliceMailbox, SubscribeMsg (..), IncServer, IncServerT, FileWatcher,
StateServer, flushDiffs, handleSubscribeMsg, subscribe, subscribeIO, sendSync,
runIncServerT, launchFileWatcher, Mailbox, launchIncFunctionEvaluator
) where
import Control.Concurrent
import Control.Monad
import Control.Monad.State.Strict
import Control.Monad.Reader
import qualified Data.ByteString as BS
import Data.IORef
import Data.Text.Encoding qualified as T
import Data.Text (Text)
import System.Directory (getModificationTime)
import GHC.Generics
import IncState
import MonadUtil
-- === Actor implementation ===
newtype ActorM msg a = ActorM { runActorM :: ReaderT (Chan msg) IO a }
deriving (Functor, Applicative, Monad, MonadIO)
newtype Mailbox a = Mailbox { sendToMailbox :: a -> IO () }
class (Show msg, MonadIO m) => Actor msg m | m -> msg where
selfChan :: m (Chan msg)
instance Show msg => Actor msg (ActorM msg) where
selfChan = ActorM ask
instance Actor msg m => Actor msg (ReaderT r m) where selfChan = lift $ selfChan
instance Actor msg m => Actor msg (StateT s m) where selfChan = lift $ selfChan
send :: MonadIO m => Mailbox msg -> msg -> m ()
send chan msg = liftIO $ sendToMailbox chan msg
selfMailbox :: Actor msg m => (a -> msg) -> m (Mailbox a)
selfMailbox asSelfMessage = do
chan <- selfChan
return $ Mailbox \msg -> writeChan chan (asSelfMessage msg)
launchActor :: MonadIO m => ActorM msg () -> m (Mailbox msg)
launchActor m = liftIO do
chan <- newChan
void $ forkIO $ runReaderT (runActorM m) chan
return $ Mailbox \msg -> writeChan chan msg
messageLoop :: Actor msg m => (msg -> m ()) -> m ()
messageLoop handleMessage = do
forever do
msg <- liftIO . readChan =<< selfChan
handleMessage msg
sliceMailbox :: (b -> a) -> Mailbox a -> Mailbox b
sliceMailbox f (Mailbox sendMsg) = Mailbox $ sendMsg . f
-- === Promises ===
newtype Promise a = Promise (MVar a)
newtype PromiseSetter a = PromiseSetter (MVar a)
newPromise :: MonadIO m => m (Promise a, PromiseSetter a)
newPromise = do
v <- liftIO $ newEmptyMVar
return (Promise v, PromiseSetter v)
waitForPromise :: MonadIO m => Promise a -> m a
waitForPromise (Promise v) = liftIO $ readMVar v
setPromise :: MonadIO m => PromiseSetter a -> a -> m ()
setPromise (PromiseSetter v) x = liftIO $ putMVar v x
-- Message that expects a synchronous reponse
data SyncMsg msg response = SyncMsg msg (PromiseSetter response)
sendSync :: MonadIO m => Mailbox (SyncMsg msg response) -> msg -> m response
sendSync mailbox msg = do
(result, resultSetter) <- newPromise
send mailbox (SyncMsg msg resultSetter)
waitForPromise result
-- === Diff server ===
data IncServerState s d = IncServerState
{ subscribers :: [Mailbox d]
, bufferedUpdates :: d
, curIncState :: s }
deriving (Show, Generic)
class (Monoid d, MonadIO m) => IncServer s d m | m -> s, m -> d where
getIncServerStateRef :: m (IORef (IncServerState s d))
data SubscribeMsg s d = Subscribe (SyncMsg (Mailbox d) s) deriving (Show)
getIncServerState :: IncServer s d m => m (IncServerState s d)
getIncServerState = readRef =<< getIncServerStateRef
updateIncServerState :: IncServer s d m => (IncServerState s d -> IncServerState s d) -> m ()
updateIncServerState f = do
ref <- getIncServerStateRef
prev <- readRef ref
writeRef ref $ f prev
handleSubscribeMsg :: IncServer s d m => SubscribeMsg s d -> m ()
handleSubscribeMsg (Subscribe (SyncMsg newSub response)) = do
flushDiffs
updateIncServerState \s -> s { subscribers = newSub : subscribers s }
curState <- curIncState <$> getIncServerState
setPromise response curState
flushDiffs :: IncServer s d m => m ()
flushDiffs = do
d <- bufferedUpdates <$> getIncServerState
updateIncServerState \s -> s { bufferedUpdates = mempty }
subs <- subscribers <$> getIncServerState
-- TODO: consider testing for emptiness here
forM_ subs \sub -> send sub d
type StateServer s d = Mailbox (SubscribeMsg s d)
subscribe :: Actor msg m => (d -> msg) -> StateServer s d -> m s
subscribe inject server = do
updateChannel <- selfMailbox inject
sendSync (sliceMailbox Subscribe server) updateChannel
subscribeIO :: StateServer s d -> IO (s, Chan d)
subscribeIO server = do
chan <- newChan
let mailbox = Mailbox (writeChan chan)
s <- sendSync (sliceMailbox Subscribe server) mailbox
return (s, chan)
newtype IncServerT s d m a = IncServerT { runIncServerT' :: ReaderT (Ref (IncServerState s d)) m a }
deriving (Functor, Applicative, Monad, MonadIO, Actor msg, FreshNames name, MonadTrans)
instance (MonadIO m, IncState s d) => IncServer s d (IncServerT s d m) where
getIncServerStateRef = IncServerT ask
instance (MonadIO m, IncState s d) => DefuncState d (IncServerT s d m) where
update d = updateIncServerState \s -> s
{ bufferedUpdates = bufferedUpdates s <> d
, curIncState = curIncState s `applyDiff` d}
instance (MonadIO m, IncState s d) => LabelReader (SingletonLabel s) (IncServerT s d m) where
getl It = curIncState <$> getIncServerState
runIncServerT :: (MonadIO m, IncState s d) => s -> IncServerT s d m a -> m a
runIncServerT s cont = do
ref <- newRef $ IncServerState [] mempty s
runReaderT (runIncServerT' cont) ref
-- === Incremental function server ===
-- If you just need something that computes a function incrementally and doesn't
-- need to maintain any other state then this will do.
data IncFunctionEvaluatorMsg da b db =
Subscribe_IFEM (SubscribeMsg b db)
| Update_IFEM da
deriving (Show)
launchIncFunctionEvaluator
:: (IncState b db, Show da, MonadIO m)
=> StateServer a da
-> (a -> (b,s))
-> (b -> s -> da -> (db, s))
-> m (StateServer b db)
launchIncFunctionEvaluator server fInit fUpdate =
sliceMailbox Subscribe_IFEM <$> launchActor do
x0 <- subscribe Update_IFEM server
let (y0, s0) = fInit x0
flip evalStateT s0 $ runIncServerT y0 $ messageLoop \case
Subscribe_IFEM msg -> handleSubscribeMsg msg
Update_IFEM dx -> do
y <- getl It
s <- lift get
let (dy, s') = fUpdate y s dx
lift $ put s'
update dy
flushDiffs
-- === Refs ===
-- Just a wrapper around IORef lifted to `MonadIO`
type Ref = IORef
newRef :: MonadIO m => a -> m (Ref a)
newRef = liftIO . newIORef
readRef :: MonadIO m => Ref a -> m a
readRef = liftIO . readIORef
writeRef :: MonadIO m => Ref a -> a -> m ()
writeRef ref val = liftIO $ writeIORef ref val
-- === Clock ===
-- Provides a periodic clock signal. The time interval is in microseconds.
launchClock :: MonadIO m => Int -> Mailbox () -> m ()
launchClock intervalMicroseconds mailbox =
liftIO $ void $ forkIO $ forever do
threadDelay intervalMicroseconds
send mailbox ()
-- === File watcher ===
type SourceFileContents = Text
type FileWatcher = StateServer (Overwritable SourceFileContents) (Overwrite SourceFileContents)
readFileContents :: MonadIO m => FilePath -> m Text
readFileContents path = liftIO $ T.decodeUtf8 <$> BS.readFile path
data FileWatcherMsg =
ClockSignal_FW ()
| Subscribe_FW (SubscribeMsg (Overwritable Text) (Overwrite Text))
deriving (Show)
launchFileWatcher :: MonadIO m => FilePath -> m FileWatcher
launchFileWatcher path = sliceMailbox Subscribe_FW <$> launchActor (fileWatcherImpl path)
fileWatcherImpl :: FilePath -> ActorM FileWatcherMsg ()
fileWatcherImpl path = do
initContents <- readFileContents path
t0 <- liftIO $ getModificationTime path
launchClock 100000 =<< selfMailbox ClockSignal_FW
modTimeRef <- newRef t0
runIncServerT (Overwritable initContents) $ messageLoop \case
Subscribe_FW msg -> handleSubscribeMsg msg
ClockSignal_FW () -> do
tOld <- readRef modTimeRef
tNew <- liftIO $ getModificationTime path
when (tNew /= tOld) do
newContents <- readFileContents path
update $ OverwriteWith newContents
flushDiffs
writeRef modTimeRef tNew
-- === instances ===
instance Show msg => Show (SyncMsg msg response) where
show (SyncMsg msg _) = show msg
instance Show (Mailbox a) where
show _ = "mailbox"
deriving instance Actor msg m => Actor msg (FreshNameT m)
|