1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
|
-- Copyright 2023 Google LLC
--
-- Use of this source code is governed by a BSD-style
-- license that can be found in the LICENSE file or at
-- https://developers.google.com/open-source/licenses/bsd
{-# LANGUAGE UndecidableInstances #-}
module Live.Eval (
watchAndEvalFile, EvalServer, CellsState, CellsUpdate,
NodeList (..), NodeListUpdate (..), subscribeIO, cellsStateAsUpdate) where
import Control.Concurrent
import Control.Monad
import Control.Monad.State.Strict
import Control.Monad.Writer.Strict
import qualified Data.Map.Strict as M
import Data.Aeson (ToJSON)
import Data.Functor ((<&>))
import Data.Maybe (fromJust)
import Data.Text (Text)
import Prelude hiding (span)
import GHC.Generics
import Actor
import IncState
import Types.Source
import TopLevel
import ConcreteSyntax
import MonadUtil
import RenderHtml
-- === Top-level interface ===
type EvalServer = StateServer CellsState CellsUpdate
-- `watchAndEvalFile` returns the channel by which a client may
-- subscribe by sending a write-only view of its input channel.
watchAndEvalFile :: FilePath -> EvalConfig -> TopStateEx -> IO EvalServer
watchAndEvalFile fname opts env = do
watcher <- launchFileWatcher fname
parser <- launchCellParser watcher \source -> uModuleSourceBlocks $ parseUModule Main source
launchDagEvaluator opts parser env
sourceBlockEvalFun :: EvalConfig -> Mailbox Outputs -> TopStateEx -> SourceBlock -> IO (ExitStatus, TopStateEx)
sourceBlockEvalFun cfg resultChan env block = do
let cfg' = cfg { cfgLogAction = send resultChan }
evalSourceBlockIO cfg' env block
cellsStateAsUpdate :: CellsState -> CellsUpdate
cellsStateAsUpdate = nodeListAsUpdate
-- === DAG diff state ===
-- We intend to make this an arbitrary Dag at some point but for now we just
-- assume that dependence is just given by the top-to-bottom ordering of blocks
-- within the file.
type NodeId = Int
data NodeList a = NodeList
{ orderedNodes :: [NodeId]
, nodeMap :: M.Map NodeId a }
deriving (Show, Generic, Functor)
data NodeListUpdate s d = NodeListUpdate
{ orderedNodesUpdate :: TailUpdate NodeId
, nodeMapUpdate :: MapUpdate NodeId s d }
deriving (Show, Generic)
instance IncState s d => Semigroup (NodeListUpdate s d) where
NodeListUpdate x1 y1 <> NodeListUpdate x2 y2 = NodeListUpdate (x1<>x2) (y1<>y2)
instance IncState s d => Monoid (NodeListUpdate s d) where
mempty = NodeListUpdate mempty mempty
instance IncState s d => IncState (NodeList s) (NodeListUpdate s d) where
applyDiff (NodeList m xs) (NodeListUpdate dm dxs) =
NodeList (applyDiff m dm) (applyDiff xs dxs)
type Dag a = NodeList (Unchanging a)
type DagUpdate a = NodeListUpdate (Unchanging a) ()
nodeListAsUpdate :: NodeList s -> NodeListUpdate s d
nodeListAsUpdate (NodeList xs m)= NodeListUpdate (TailUpdate 0 xs) (MapUpdate $ fmap Create m)
emptyNodeList :: NodeList a
emptyNodeList = NodeList [] mempty
buildNodeList :: FreshNames NodeId m => [a] -> m (NodeList a)
buildNodeList vals = do
nodeList <- forM vals \val -> do
nodeId <- freshName
return (nodeId, val)
return $ NodeList (fst <$> nodeList) (M.fromList nodeList)
commonPrefixLength :: Eq a => [a] -> [a] -> Int
commonPrefixLength (x:xs) (y:ys) | x == y = 1 + commonPrefixLength xs ys
commonPrefixLength _ _ = 0
nodeListVals :: NodeList a -> [a]
nodeListVals nodes = orderedNodes nodes <&> \k -> fromJust $ M.lookup k (nodeMap nodes)
computeNodeListUpdate :: (Eq s, FreshNames NodeId m) => NodeList s -> [s] -> m (NodeListUpdate s d)
computeNodeListUpdate nodes newVals = do
let prefixLength = commonPrefixLength (nodeListVals nodes) newVals
let oldTail = drop prefixLength $ orderedNodes nodes
NodeList newTail nodesCreated <- buildNodeList $ drop prefixLength newVals
let nodeUpdates = fmap Create nodesCreated <> M.fromList (fmap (,Delete) oldTail)
return $ NodeListUpdate (TailUpdate (length oldTail) newTail) (MapUpdate nodeUpdates)
-- === Cell parser ===
-- This coarsely parses the full file into blocks and forms a DAG (for now a
-- trivial one assuming all top-to-bottom dependencies) of the results.
type CellParser = StateServer (Dag SourceBlock) (DagUpdate SourceBlock)
data CellParserMsg =
Subscribe_CP (SubscribeMsg (Dag SourceBlock) (DagUpdate SourceBlock))
| Update_CP (Overwrite Text)
deriving (Show)
launchCellParser :: MonadIO m => FileWatcher -> (Text -> [SourceBlock]) -> m CellParser
launchCellParser fileWatcher parseCells =
sliceMailbox Subscribe_CP <$> launchActor (cellParserImpl fileWatcher parseCells)
cellParserImpl :: FileWatcher -> (Text -> [SourceBlock]) -> ActorM CellParserMsg ()
cellParserImpl fileWatcher parseCells = runFreshNameT do
Overwritable initContents <- subscribe Update_CP fileWatcher
initNodeList <- buildNodeList $ fmap Unchanging $ parseCells initContents
runIncServerT initNodeList $ messageLoop \case
Subscribe_CP msg -> handleSubscribeMsg msg
Update_CP NoChange -> return ()
Update_CP (OverwriteWith newContents) -> do
let newCells = fmap Unchanging $ parseCells newContents
curNodeList <- getl It
update =<< computeNodeListUpdate curNodeList newCells
flushDiffs
-- === Dag evaluator ===
-- This is where we track the state of evaluation and decide what we needs to be
-- run and what needs to be killed.
type Evaluator = StateServer CellsState CellsUpdate
newtype EvaluatorM a =
EvaluatorM { runEvaluatorM' ::
IncServerT CellsState CellsUpdate
(StateT EvaluatorState
(ActorM EvaluatorMsg)) a }
deriving (Functor, Applicative, Monad, MonadIO, Actor (EvaluatorMsg))
deriving instance IncServer CellsState CellsUpdate EvaluatorM
instance Semigroup CellUpdate where
CellUpdate s o <> CellUpdate s' o' = CellUpdate (s<>s') (o<>o')
instance Monoid CellUpdate where
mempty = CellUpdate mempty mempty
instance IncState CellState CellUpdate where
applyDiff (CellState source status result) (CellUpdate status' result') =
CellState source (fromOverwritable (applyDiff (Overwritable status) status')) (result <> result')
instance DefuncState EvaluatorMUpdate EvaluatorM where
update = \case
UpdateDagEU dag -> EvaluatorM $ update dag
UpdateCurJob status -> EvaluatorM $ lift $ modify \s -> s { curRunningJob = status }
UpdateEnvs envs -> EvaluatorM $ lift $ modify \s -> s { prevEnvs = envs}
AppendEnv env -> do
envs <- getl PrevEnvs
update $ UpdateEnvs $ envs ++ [env]
UpdateCellState nodeId cellUpdate -> update $ UpdateDagEU $ NodeListUpdate mempty $
MapUpdate $ M.singleton nodeId $ Update cellUpdate
instance LabelReader EvaluatorMLabel EvaluatorM where
getl l = case l of
NodeListEM -> EvaluatorM $ orderedNodes <$> getl It
NodeInfo nodeId -> EvaluatorM $ M.lookup nodeId <$> nodeMap <$> getl It
PrevEnvs -> EvaluatorM $ lift $ prevEnvs <$> get
CurRunningJob -> EvaluatorM $ lift $ curRunningJob <$> get
EvalCfg -> EvaluatorM $ lift $ evaluatorCfg <$> get
data EvaluatorMUpdate =
UpdateDagEU (NodeListUpdate CellState CellUpdate)
| UpdateCellState NodeId CellUpdate
| UpdateCurJob CurJobStatus
| UpdateEnvs [TopStateEx]
| AppendEnv TopStateEx
data EvaluatorMLabel a where
NodeListEM :: EvaluatorMLabel [NodeId]
NodeInfo :: NodeId -> EvaluatorMLabel (Maybe CellState)
PrevEnvs :: EvaluatorMLabel [TopStateEx]
CurRunningJob :: EvaluatorMLabel (CurJobStatus)
EvalCfg :: EvaluatorMLabel EvalConfig
-- It's redundant to have both NodeId and TheadId but it defends against
-- possible GHC reuse of ThreadId (I don't know if that can actually happen)
type JobId = (ThreadId, NodeId)
type CurJobStatus = Maybe (JobId, CellIndex)
data EvaluatorState = EvaluatorState
{ evaluatorCfg :: EvalConfig
, prevEnvs :: [TopStateEx]
, curRunningJob :: CurJobStatus }
data CellStatus =
Waiting
| Running -- TODO: split into compiling/running
| Complete -- completed without errors
| CompleteWithErrors
| Inert -- doesn't require running at all
deriving (Show, Generic)
data CellState = CellState SourceBlockWithId CellStatus Outputs
deriving (Show, Generic)
data CellUpdate = CellUpdate (Overwrite CellStatus) Outputs deriving (Show, Generic)
type CellsState = NodeList CellState
type CellsUpdate = NodeListUpdate CellState CellUpdate
type CellIndex = Int -- index in the list of cells, not the NodeId
data JobUpdate =
PartialJobUpdate Outputs
| JobComplete (ExitStatus, TopStateEx)
deriving (Show)
data EvaluatorMsg =
SourceUpdate (DagUpdate SourceBlock)
| JobUpdate JobId JobUpdate
| Subscribe_E (SubscribeMsg CellsState CellsUpdate)
deriving (Show)
initEvaluatorState :: EvalConfig -> TopStateEx -> EvaluatorState
initEvaluatorState cfg s = EvaluatorState cfg [s] Nothing
launchDagEvaluator :: MonadIO m => EvalConfig -> CellParser -> TopStateEx -> m Evaluator
launchDagEvaluator cfg cellParser env = do
mailbox <- launchActor do
let s = initEvaluatorState cfg env
void $ flip runStateT s $ runIncServerT emptyNodeList $ runEvaluatorM' $
dagEvaluatorImpl cellParser
return $ sliceMailbox Subscribe_E mailbox
dagEvaluatorImpl :: CellParser -> EvaluatorM ()
dagEvaluatorImpl cellParser = do
initDag <- subscribe SourceUpdate cellParser
processDagUpdate (nodeListAsUpdate initDag) >> flushDiffs
launchNextJob
messageLoop \case
Subscribe_E msg -> handleSubscribeMsg msg
SourceUpdate dagUpdate -> do
processDagUpdate dagUpdate
flushDiffs
JobUpdate jobId jobUpdate -> do
processJobUpdate jobId jobUpdate
flushDiffs
processJobUpdate :: JobId -> JobUpdate -> EvaluatorM ()
processJobUpdate jobId jobUpdate = do
getl CurRunningJob >>= \case
Just (jobId', _) -> when (jobId == jobId') do
let nodeId = snd jobId
case jobUpdate of
JobComplete (exitStatus, newEnv) -> do
let newStatus = case exitStatus of
ExitSuccess -> Complete
ExitFailure -> CompleteWithErrors
update $ UpdateCellState nodeId $ CellUpdate (OverwriteWith newStatus) mempty
update $ UpdateCurJob Nothing
update $ AppendEnv newEnv
launchNextJob
flushDiffs
PartialJobUpdate result -> update $ UpdateCellState nodeId $ CellUpdate NoChange result
Nothing -> return () -- this job is a zombie
nextCellIndex :: EvaluatorM Int
nextCellIndex = do
envs <- getl PrevEnvs
return $ length envs - 1
launchNextJob :: EvaluatorM ()
launchNextJob = do
cellIndex <- nextCellIndex
nodeList <- getl NodeListEM
when (cellIndex < length nodeList) do -- otherwise we're all done
curEnv <- (!! cellIndex) <$> getl PrevEnvs
let nodeId = nodeList !! cellIndex
CellState source _ _ <- fromJust <$> getl (NodeInfo nodeId)
if isInert $ sourceBlockWithoutId source
then do
update $ AppendEnv curEnv
launchNextJob
else launchJob cellIndex nodeId curEnv
launchJob :: CellIndex -> NodeId -> TopStateEx -> EvaluatorM ()
launchJob cellIndex nodeId env = do
jobAction <- sourceBlockEvalFun <$> getl EvalCfg
CellState source _ _ <- fromJust <$> getl (NodeInfo nodeId)
mailbox <- selfMailbox id
update $ UpdateCellState nodeId $ CellUpdate (OverwriteWith Running) mempty
threadId <- liftIO $ forkIO do
threadId <- myThreadId
let jobId = (threadId, nodeId)
let resultsMailbox = sliceMailbox (JobUpdate jobId . PartialJobUpdate) mailbox
finalEnv <- jobAction resultsMailbox env $ sourceBlockWithoutId source
send mailbox $ JobUpdate jobId $ JobComplete finalEnv
let jobId = (threadId, nodeId)
update $ UpdateCurJob (Just (jobId, cellIndex))
computeNumValidCells :: TailUpdate NodeId -> EvaluatorM Int
computeNumValidCells tailUpdate = do
let nDropped = numDropped tailUpdate
nTotal <- length <$> getl NodeListEM
return $ nTotal - nDropped
processDagUpdate :: DagUpdate SourceBlock -> EvaluatorM ()
processDagUpdate (NodeListUpdate tailUpdate mapUpdate) = do
nValid <- computeNumValidCells tailUpdate
envs <- getl PrevEnvs
update $ UpdateEnvs $ take (nValid + 1) envs
update $ UpdateDagEU $ NodeListUpdate tailUpdate $ mapUpdateMapWithKey mapUpdate
(\cellId (Unchanging source) -> initCellState cellId source)
(\_ () -> mempty)
getl CurRunningJob >>= \case
Nothing -> launchNextJob
Just ((threadId, _), cellIndex)
| (cellIndex >= nValid) -> do
-- Current job is no longer valid. Kill it and restart.
liftIO $ killThread threadId
update $ UpdateCurJob Nothing
launchNextJob
| otherwise -> return () -- Current job is fine. Let it continue.
isInert :: SourceBlock -> Bool
isInert sb = case sbContents sb of
TopDecl _ -> False
Command _ _ -> False
DeclareForeign _ _ _ -> False
DeclareCustomLinearization _ _ _ -> False
Misc misc -> case misc of
GetNameType _ -> False
ImportModule _ -> False
QueryEnv _ -> False
ProseBlock _ -> True
CommentLine -> True
EmptyLines -> True
UnParseable _ _ -> True
initCellState :: NodeId -> SourceBlock -> CellState
initCellState cellId source = do
let status = if isInert source
then Inert
else Waiting
CellState (SourceBlockWithId cellId source) status mempty
-- === ToJSON ===
instance ToJSON CellState where
instance ToJSON CellStatus
instance ToJSON CellUpdate
instance (ToJSON s, ToJSON d) => ToJSON (NodeListUpdate s d)
|