summaryrefslogtreecommitdiff
path: root/src/lib/Actor.hs
blob: 59ff089a5f51c446899eef2ac0329a2e817306dd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
-- Copyright 2023 Google LLC
--
-- Use of this source code is governed by a BSD-style
-- license that can be found in the LICENSE file or at
-- https://developers.google.com/open-source/licenses/bsd

{-# LANGUAGE UndecidableInstances #-}

module Actor (
  ActorM, Actor (..), launchActor, send, selfMailbox, messageLoop,
  sliceMailbox, SubscribeMsg (..), IncServer, IncServerT, FileWatcher,
  StateServer, flushDiffs, handleSubscribeMsg, subscribe, subscribeIO, sendSync,
  runIncServerT, launchFileWatcher, Mailbox, launchIncFunctionEvaluator
  ) where

import Control.Concurrent
import Control.Monad
import Control.Monad.State.Strict
import Control.Monad.Reader
import qualified Data.ByteString as BS
import Data.IORef
import Data.Text.Encoding qualified as T
import Data.Text (Text)
import System.Directory (getModificationTime)
import GHC.Generics

import IncState
import MonadUtil

-- === Actor implementation ===

newtype ActorM msg a = ActorM { runActorM ::  ReaderT (Chan msg) IO a }
  deriving (Functor, Applicative, Monad, MonadIO)

newtype Mailbox a = Mailbox { sendToMailbox :: a -> IO () }

class (Show msg, MonadIO m) => Actor msg m | m -> msg where
  selfChan :: m (Chan msg)

instance Show msg => Actor msg (ActorM msg) where
  selfChan = ActorM ask

instance Actor msg m => Actor msg (ReaderT r m) where selfChan = lift $ selfChan
instance Actor msg m => Actor msg (StateT  s m) where selfChan = lift $ selfChan

send :: MonadIO m => Mailbox msg -> msg -> m ()
send chan msg = liftIO $ sendToMailbox chan msg

selfMailbox :: Actor msg m => (a -> msg) -> m (Mailbox a)
selfMailbox asSelfMessage = do
  chan <- selfChan
  return $ Mailbox \msg -> writeChan chan (asSelfMessage msg)

launchActor :: MonadIO m => ActorM msg () -> m (Mailbox msg)
launchActor m = liftIO do
  chan <- newChan
  void $ forkIO $ runReaderT (runActorM m) chan
  return $ Mailbox \msg -> writeChan chan msg

messageLoop :: Actor msg m => (msg -> m ()) -> m ()
messageLoop handleMessage = do
  forever do
    msg <- liftIO . readChan =<< selfChan
    handleMessage msg

sliceMailbox :: (b -> a) -> Mailbox a -> Mailbox b
sliceMailbox f (Mailbox sendMsg) = Mailbox $ sendMsg . f

-- === Promises ===

newtype Promise a = Promise (MVar a)
newtype PromiseSetter a = PromiseSetter (MVar a)

newPromise :: MonadIO m => m (Promise a, PromiseSetter a)
newPromise = do
  v <- liftIO $ newEmptyMVar
  return (Promise v, PromiseSetter v)

waitForPromise :: MonadIO m => Promise a -> m a
waitForPromise (Promise v) = liftIO $ readMVar v

setPromise :: MonadIO m => PromiseSetter a -> a -> m ()
setPromise (PromiseSetter v) x = liftIO $ putMVar v x

-- Message that expects a synchronous reponse
data SyncMsg msg response = SyncMsg msg (PromiseSetter response)

sendSync :: MonadIO m => Mailbox (SyncMsg msg response) -> msg -> m response
sendSync mailbox msg = do
  (result, resultSetter) <- newPromise
  send mailbox (SyncMsg msg resultSetter)
  waitForPromise result


-- === Diff server ===

data IncServerState s d = IncServerState
  { subscribers     :: [Mailbox d]
  , bufferedUpdates :: d
  , curIncState     :: s }
  deriving (Show, Generic)

class (Monoid d, MonadIO m) => IncServer s d m | m -> s, m -> d where
  getIncServerStateRef :: m (IORef (IncServerState s d))

data SubscribeMsg s d = Subscribe (SyncMsg (Mailbox d) s)  deriving (Show)

getIncServerState :: IncServer s d m => m (IncServerState s d)
getIncServerState = readRef =<< getIncServerStateRef

updateIncServerState :: IncServer s d m => (IncServerState s d -> IncServerState s d) -> m ()
updateIncServerState f = do
  ref <- getIncServerStateRef
  prev <- readRef ref
  writeRef ref $ f prev

handleSubscribeMsg :: IncServer s d m => SubscribeMsg s d -> m ()
handleSubscribeMsg (Subscribe (SyncMsg newSub response)) = do
  flushDiffs
  updateIncServerState \s -> s { subscribers = newSub : subscribers s }
  curState <- curIncState <$> getIncServerState
  setPromise response curState

flushDiffs :: IncServer s d m => m ()
flushDiffs = do
  d <- bufferedUpdates <$> getIncServerState
  updateIncServerState \s -> s { bufferedUpdates = mempty }
  subs <- subscribers <$> getIncServerState
  -- TODO: consider testing for emptiness here
  forM_ subs \sub -> send sub d

type StateServer s d = Mailbox (SubscribeMsg s d)

subscribe :: Actor msg m => (d -> msg) -> StateServer s d -> m s
subscribe inject server = do
  updateChannel <- selfMailbox inject
  sendSync (sliceMailbox Subscribe server) updateChannel

subscribeIO :: StateServer s d -> IO (s, Chan d)
subscribeIO server = do
  chan <- newChan
  let mailbox = Mailbox (writeChan chan)
  s <- sendSync (sliceMailbox Subscribe server) mailbox
  return (s, chan)

newtype IncServerT s d m a = IncServerT { runIncServerT' :: ReaderT (Ref (IncServerState s d)) m a }
  deriving (Functor, Applicative, Monad, MonadIO, Actor msg, FreshNames name, MonadTrans)

instance (MonadIO m, IncState s d) => IncServer s d (IncServerT s d m) where
  getIncServerStateRef = IncServerT ask

instance (MonadIO m, IncState s d) => DefuncState d (IncServerT s d m) where
  update d = updateIncServerState \s -> s
    { bufferedUpdates = bufferedUpdates s <> d
    , curIncState     = curIncState     s `applyDiff` d}

instance (MonadIO m, IncState s d) => LabelReader (SingletonLabel s) (IncServerT s d m) where
  getl It = curIncState <$> getIncServerState

runIncServerT :: (MonadIO m, IncState s d) => s -> IncServerT s d m a -> m a
runIncServerT s cont = do
  ref <- newRef $ IncServerState [] mempty s
  runReaderT (runIncServerT' cont) ref

-- === Incremental function server ===

-- If you just need something that computes a function incrementally and doesn't
-- need to maintain any other state then this will do.

data IncFunctionEvaluatorMsg da b db =
   Subscribe_IFEM (SubscribeMsg b db)
 | Update_IFEM da
   deriving (Show)

launchIncFunctionEvaluator
  :: (IncState b db, Show da, MonadIO m)
  => StateServer a da
  -> (a -> (b,s))
  -> (b -> s -> da -> (db, s))
  -> m (StateServer b db)
launchIncFunctionEvaluator server fInit fUpdate =
  sliceMailbox Subscribe_IFEM <$> launchActor do
    x0 <- subscribe Update_IFEM server
    let (y0, s0) = fInit x0
    flip evalStateT s0 $ runIncServerT y0 $ messageLoop \case
      Subscribe_IFEM msg -> handleSubscribeMsg msg
      Update_IFEM dx -> do
        y <- getl It
        s <- lift get
        let (dy, s') = fUpdate y s dx
        lift $ put s'
        update dy
        flushDiffs

-- === Refs ===
-- Just a wrapper around IORef lifted to `MonadIO`

type Ref = IORef

newRef :: MonadIO m => a -> m (Ref a)
newRef = liftIO . newIORef

readRef :: MonadIO m => Ref a -> m a
readRef = liftIO . readIORef

writeRef :: MonadIO m => Ref a -> a -> m ()
writeRef ref val = liftIO $ writeIORef ref val

-- === Clock ===

-- Provides a periodic clock signal. The time interval is in microseconds.
launchClock :: MonadIO m => Int -> Mailbox () -> m ()
launchClock intervalMicroseconds mailbox =
  liftIO $ void $ forkIO $ forever do
    threadDelay intervalMicroseconds
    send mailbox ()

-- === File watcher ===

type SourceFileContents = Text
type FileWatcher = StateServer (Overwritable SourceFileContents) (Overwrite SourceFileContents)

readFileContents :: MonadIO m => FilePath -> m Text
readFileContents path = liftIO $ T.decodeUtf8 <$> BS.readFile path

data FileWatcherMsg =
   ClockSignal_FW ()
 | Subscribe_FW (SubscribeMsg (Overwritable Text) (Overwrite Text))
   deriving (Show)

launchFileWatcher :: MonadIO m => FilePath -> m FileWatcher
launchFileWatcher path = sliceMailbox Subscribe_FW <$> launchActor (fileWatcherImpl path)

fileWatcherImpl :: FilePath -> ActorM FileWatcherMsg ()
fileWatcherImpl path = do
  initContents <- readFileContents path
  t0 <- liftIO $ getModificationTime path
  launchClock 100000 =<< selfMailbox ClockSignal_FW
  modTimeRef <- newRef t0
  runIncServerT (Overwritable initContents) $ messageLoop \case
    Subscribe_FW msg -> handleSubscribeMsg msg
    ClockSignal_FW () -> do
      tOld <- readRef modTimeRef
      tNew <- liftIO $ getModificationTime path
      when (tNew /= tOld) do
        newContents <- readFileContents path
        update $ OverwriteWith newContents
        flushDiffs
        writeRef modTimeRef tNew

-- === instances ===

instance Show msg => Show (SyncMsg msg response) where
  show (SyncMsg msg _) = show msg

instance Show (Mailbox a) where
  show _ = "mailbox"

deriving instance Actor msg m => Actor msg (FreshNameT m)