summaryrefslogtreecommitdiff
path: root/src/lib/Live/Eval.hs
blob: 77d9c472d5bf2046c4c08c1eb05a277aa5493c80 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
-- Copyright 2023 Google LLC
--
-- Use of this source code is governed by a BSD-style
-- license that can be found in the LICENSE file or at
-- https://developers.google.com/open-source/licenses/bsd

{-# LANGUAGE UndecidableInstances #-}

module Live.Eval (
  watchAndEvalFile, EvalServer, CellsState, CellsUpdate,
  NodeList (..), NodeListUpdate (..), subscribeIO, cellsStateAsUpdate) where

import Control.Concurrent
import Control.Monad
import Control.Monad.State.Strict
import Control.Monad.Writer.Strict
import qualified Data.Map.Strict as M
import Data.Aeson (ToJSON)
import Data.Functor ((<&>))
import Data.Maybe (fromJust)
import Data.Text (Text)
import Prelude hiding (span)
import GHC.Generics

import Actor
import IncState
import Types.Source
import TopLevel
import ConcreteSyntax
import MonadUtil
import RenderHtml

-- === Top-level interface ===

type EvalServer = StateServer CellsState CellsUpdate

-- `watchAndEvalFile` returns the channel by which a client may
-- subscribe by sending a write-only view of its input channel.
watchAndEvalFile :: FilePath -> EvalConfig -> TopStateEx -> IO EvalServer
watchAndEvalFile fname opts env = do
  watcher <- launchFileWatcher fname
  parser <- launchCellParser watcher \source -> uModuleSourceBlocks $ parseUModule Main source
  launchDagEvaluator opts parser env

sourceBlockEvalFun :: EvalConfig -> Mailbox Outputs -> TopStateEx -> SourceBlock -> IO (ExitStatus, TopStateEx)
sourceBlockEvalFun cfg resultChan env block = do
  let cfg' = cfg { cfgLogAction = send resultChan }
  evalSourceBlockIO cfg' env block

cellsStateAsUpdate :: CellsState -> CellsUpdate
cellsStateAsUpdate = nodeListAsUpdate

-- === DAG diff state ===

-- We intend to make this an arbitrary Dag at some point but for now we just
-- assume that dependence is just given by the top-to-bottom ordering of blocks
-- within the file.

type NodeId = Int

data NodeList a = NodeList
  { orderedNodes :: [NodeId]
  , nodeMap      :: M.Map NodeId a }
  deriving (Show, Generic, Functor)

data NodeListUpdate s d = NodeListUpdate
  { orderedNodesUpdate :: TailUpdate NodeId
  , nodeMapUpdate      :: MapUpdate NodeId s d }
  deriving (Show, Generic)

instance IncState s d => Semigroup (NodeListUpdate s d) where
  NodeListUpdate x1 y1 <> NodeListUpdate x2 y2 = NodeListUpdate (x1<>x2) (y1<>y2)

instance IncState s d => Monoid (NodeListUpdate s d) where
  mempty = NodeListUpdate mempty mempty

instance IncState s d => IncState (NodeList s) (NodeListUpdate s d) where
  applyDiff (NodeList m xs) (NodeListUpdate dm dxs) =
    NodeList (applyDiff m dm) (applyDiff xs dxs)

type Dag       a = NodeList (Unchanging a)
type DagUpdate a = NodeListUpdate (Unchanging a) ()

nodeListAsUpdate :: NodeList s -> NodeListUpdate s d
nodeListAsUpdate (NodeList xs m)= NodeListUpdate (TailUpdate 0 xs) (MapUpdate $ fmap Create m)

emptyNodeList :: NodeList a
emptyNodeList = NodeList [] mempty

buildNodeList :: FreshNames NodeId m => [a] -> m (NodeList a)
buildNodeList vals = do
  nodeList <- forM vals \val -> do
    nodeId <- freshName
    return (nodeId, val)
  return $ NodeList (fst <$> nodeList) (M.fromList nodeList)

commonPrefixLength :: Eq a => [a] -> [a] -> Int
commonPrefixLength (x:xs) (y:ys) | x == y = 1 + commonPrefixLength xs ys
commonPrefixLength _ _ = 0

nodeListVals :: NodeList a -> [a]
nodeListVals nodes = orderedNodes nodes <&> \k -> fromJust $ M.lookup k (nodeMap nodes)

computeNodeListUpdate :: (Eq s, FreshNames NodeId m) => NodeList s -> [s] -> m (NodeListUpdate s d)
computeNodeListUpdate nodes newVals = do
  let prefixLength = commonPrefixLength (nodeListVals nodes) newVals
  let oldTail = drop prefixLength $ orderedNodes nodes
  NodeList newTail nodesCreated <- buildNodeList $ drop prefixLength newVals
  let nodeUpdates = fmap Create nodesCreated <> M.fromList (fmap (,Delete) oldTail)
  return $ NodeListUpdate (TailUpdate (length oldTail) newTail) (MapUpdate nodeUpdates)

-- === Cell parser ===

-- This coarsely parses the full file into blocks and forms a DAG (for now a
-- trivial one assuming all top-to-bottom dependencies) of the results.

type CellParser = StateServer (Dag SourceBlock) (DagUpdate SourceBlock)

data CellParserMsg =
    Subscribe_CP (SubscribeMsg (Dag SourceBlock) (DagUpdate SourceBlock))
  | Update_CP (Overwrite Text)
  deriving (Show)

launchCellParser :: MonadIO m => FileWatcher -> (Text -> [SourceBlock]) -> m CellParser
launchCellParser fileWatcher parseCells =
  sliceMailbox Subscribe_CP <$> launchActor (cellParserImpl fileWatcher parseCells)

cellParserImpl :: FileWatcher -> (Text -> [SourceBlock]) -> ActorM CellParserMsg ()
cellParserImpl fileWatcher parseCells = runFreshNameT do
  Overwritable initContents <- subscribe Update_CP fileWatcher
  initNodeList <- buildNodeList $ fmap Unchanging $ parseCells initContents
  runIncServerT initNodeList $ messageLoop \case
    Subscribe_CP msg -> handleSubscribeMsg msg
    Update_CP NoChange -> return ()
    Update_CP (OverwriteWith newContents) -> do
      let newCells = fmap Unchanging $ parseCells newContents
      curNodeList <- getl It
      update =<< computeNodeListUpdate curNodeList newCells
      flushDiffs

-- === Dag evaluator ===

-- This is where we track the state of evaluation and decide what we needs to be
-- run and what needs to be killed.

type Evaluator = StateServer CellsState CellsUpdate
newtype EvaluatorM a =
  EvaluatorM { runEvaluatorM' ::
                 IncServerT CellsState CellsUpdate
                   (StateT EvaluatorState
                      (ActorM EvaluatorMsg)) a }
  deriving (Functor, Applicative, Monad, MonadIO, Actor (EvaluatorMsg))
deriving instance IncServer CellsState CellsUpdate EvaluatorM

instance Semigroup CellUpdate where
  CellUpdate s o <> CellUpdate s' o' = CellUpdate (s<>s') (o<>o')

instance Monoid CellUpdate where
  mempty = CellUpdate mempty mempty

instance IncState CellState CellUpdate where
  applyDiff (CellState source status result) (CellUpdate status' result') =
    CellState source (fromOverwritable (applyDiff (Overwritable status) status')) (result <> result')

instance DefuncState EvaluatorMUpdate EvaluatorM where
  update = \case
    UpdateDagEU dag     -> EvaluatorM $ update dag
    UpdateCurJob status -> EvaluatorM $ lift $ modify \s -> s { curRunningJob = status }
    UpdateEnvs   envs   -> EvaluatorM $ lift $ modify \s -> s { prevEnvs      = envs}
    AppendEnv env -> do
      envs <- getl PrevEnvs
      update $ UpdateEnvs $ envs ++ [env]
    UpdateCellState nodeId cellUpdate -> update $ UpdateDagEU $ NodeListUpdate mempty $
      MapUpdate $ M.singleton nodeId $ Update cellUpdate

instance LabelReader EvaluatorMLabel EvaluatorM where
  getl l = case l of
    NodeListEM      -> EvaluatorM $ orderedNodes                <$> getl It
    NodeInfo nodeId -> EvaluatorM $ M.lookup nodeId <$> nodeMap <$> getl It
    PrevEnvs        -> EvaluatorM $ lift $ prevEnvs      <$> get
    CurRunningJob   -> EvaluatorM $ lift $ curRunningJob <$> get
    EvalCfg         -> EvaluatorM $ lift $ evaluatorCfg <$> get

data EvaluatorMUpdate =
   UpdateDagEU (NodeListUpdate CellState CellUpdate)
 | UpdateCellState NodeId CellUpdate
 | UpdateCurJob CurJobStatus
 | UpdateEnvs [TopStateEx]
 | AppendEnv TopStateEx

data EvaluatorMLabel a where
  NodeListEM    ::           EvaluatorMLabel [NodeId]
  NodeInfo      :: NodeId -> EvaluatorMLabel (Maybe CellState)
  PrevEnvs      ::           EvaluatorMLabel [TopStateEx]
  CurRunningJob ::           EvaluatorMLabel (CurJobStatus)
  EvalCfg       ::           EvaluatorMLabel EvalConfig

-- It's redundant to have both NodeId and TheadId but it defends against
-- possible GHC reuse of ThreadId (I don't know if that can actually happen)
type JobId = (ThreadId, NodeId)
type CurJobStatus = Maybe (JobId, CellIndex)

data EvaluatorState = EvaluatorState
  { evaluatorCfg  :: EvalConfig
  , prevEnvs      :: [TopStateEx]
  , curRunningJob :: CurJobStatus }

data CellStatus =
    Waiting
  | Running            -- TODO: split into compiling/running
  | Complete           -- completed without errors
  | CompleteWithErrors
  | Inert              -- doesn't require running at all
    deriving (Show, Generic)

data CellState  = CellState SourceBlockWithId CellStatus Outputs
     deriving (Show, Generic)

data CellUpdate = CellUpdate (Overwrite CellStatus) Outputs deriving (Show, Generic)

type CellsState  = NodeList       CellState
type CellsUpdate = NodeListUpdate CellState CellUpdate

type CellIndex = Int -- index in the list of cells, not the NodeId

data JobUpdate =
    PartialJobUpdate   Outputs
  | JobComplete        (ExitStatus, TopStateEx)
    deriving (Show)

data EvaluatorMsg =
   SourceUpdate (DagUpdate SourceBlock)
 | JobUpdate JobId JobUpdate
 | Subscribe_E (SubscribeMsg CellsState CellsUpdate)
   deriving (Show)

initEvaluatorState :: EvalConfig -> TopStateEx -> EvaluatorState
initEvaluatorState cfg s = EvaluatorState cfg [s] Nothing

launchDagEvaluator :: MonadIO m => EvalConfig -> CellParser -> TopStateEx -> m Evaluator
launchDagEvaluator cfg cellParser env = do
  mailbox <- launchActor do
    let s = initEvaluatorState cfg env
    void $ flip runStateT s $ runIncServerT emptyNodeList $ runEvaluatorM' $
      dagEvaluatorImpl cellParser
  return $ sliceMailbox Subscribe_E mailbox

dagEvaluatorImpl :: CellParser -> EvaluatorM ()
dagEvaluatorImpl cellParser = do
  initDag <- subscribe SourceUpdate cellParser
  processDagUpdate (nodeListAsUpdate initDag) >> flushDiffs
  launchNextJob
  messageLoop \case
    Subscribe_E msg        -> handleSubscribeMsg msg
    SourceUpdate dagUpdate -> do
      processDagUpdate dagUpdate
      flushDiffs
    JobUpdate jobId jobUpdate -> do
      processJobUpdate jobId jobUpdate
      flushDiffs

processJobUpdate :: JobId -> JobUpdate -> EvaluatorM ()
processJobUpdate jobId jobUpdate = do
  getl CurRunningJob >>= \case
    Just (jobId', _) -> when (jobId == jobId') do
      let nodeId = snd jobId
      case jobUpdate of
        JobComplete (exitStatus, newEnv) -> do
          let newStatus = case exitStatus of
                ExitSuccess -> Complete
                ExitFailure -> CompleteWithErrors
          update $ UpdateCellState nodeId $ CellUpdate (OverwriteWith newStatus) mempty
          update $ UpdateCurJob Nothing
          update $ AppendEnv newEnv
          launchNextJob
          flushDiffs
        PartialJobUpdate result -> update $ UpdateCellState nodeId $ CellUpdate NoChange result
    Nothing -> return () -- this job is a zombie

nextCellIndex :: EvaluatorM Int
nextCellIndex = do
  envs <- getl PrevEnvs
  return $ length envs - 1

launchNextJob :: EvaluatorM ()
launchNextJob = do
  cellIndex <- nextCellIndex
  nodeList <- getl NodeListEM
  when (cellIndex < length nodeList) do -- otherwise we're all done
    curEnv <- (!! cellIndex) <$> getl PrevEnvs
    let nodeId = nodeList !! cellIndex
    CellState source _ _ <- fromJust <$> getl (NodeInfo nodeId)
    if isInert $ sourceBlockWithoutId source
      then do
        update $ AppendEnv curEnv
        launchNextJob
      else launchJob cellIndex nodeId curEnv

launchJob :: CellIndex -> NodeId -> TopStateEx -> EvaluatorM ()
launchJob cellIndex nodeId env = do
  jobAction <- sourceBlockEvalFun <$> getl EvalCfg
  CellState source _ _ <- fromJust <$> getl (NodeInfo nodeId)
  mailbox <- selfMailbox id
  update $ UpdateCellState nodeId $ CellUpdate (OverwriteWith Running) mempty
  threadId <- liftIO $ forkIO do
    threadId <- myThreadId
    let jobId = (threadId, nodeId)
    let resultsMailbox = sliceMailbox (JobUpdate jobId . PartialJobUpdate) mailbox
    finalEnv <- jobAction resultsMailbox env $ sourceBlockWithoutId source
    send mailbox $ JobUpdate jobId $ JobComplete finalEnv
  let jobId = (threadId, nodeId)
  update $ UpdateCurJob (Just (jobId, cellIndex))

computeNumValidCells :: TailUpdate NodeId -> EvaluatorM Int
computeNumValidCells tailUpdate = do
  let nDropped = numDropped tailUpdate
  nTotal <- length <$> getl NodeListEM
  return $ nTotal - nDropped

processDagUpdate :: DagUpdate SourceBlock -> EvaluatorM ()
processDagUpdate (NodeListUpdate tailUpdate mapUpdate) = do
  nValid <- computeNumValidCells tailUpdate
  envs <- getl PrevEnvs
  update $ UpdateEnvs $ take (nValid + 1) envs
  update $ UpdateDagEU $ NodeListUpdate tailUpdate $ mapUpdateMapWithKey mapUpdate
    (\cellId (Unchanging source) -> initCellState cellId source)
    (\_ () -> mempty)
  getl CurRunningJob >>= \case
    Nothing -> launchNextJob
    Just ((threadId, _), cellIndex)
      | (cellIndex >= nValid) -> do
          -- Current job is no longer valid. Kill it and restart.
          liftIO $ killThread threadId
          update $ UpdateCurJob Nothing
          launchNextJob
      | otherwise -> return () -- Current job is fine. Let it continue.

isInert :: SourceBlock -> Bool
isInert sb = case sbContents sb of
  TopDecl _   -> False
  Command _ _ -> False
  DeclareForeign _ _ _ -> False
  DeclareCustomLinearization _ _ _ -> False
  Misc misc -> case misc of
    GetNameType _  -> False
    ImportModule _ -> False
    QueryEnv _     -> False
    ProseBlock _  -> True
    CommentLine   -> True
    EmptyLines    -> True
  UnParseable _ _ -> True

initCellState :: NodeId -> SourceBlock -> CellState
initCellState cellId source = do
  let status = if isInert source
        then Inert
        else Waiting
  CellState (SourceBlockWithId cellId source) status mempty

-- === ToJSON ===

instance ToJSON CellState where
instance ToJSON CellStatus
instance ToJSON CellUpdate
instance (ToJSON s, ToJSON d) => ToJSON (NodeListUpdate s d)