Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tweak: dont enqueue filesystem events #5631

Open
wants to merge 6 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion unison-cli/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ library:
- aeson >= 2.0.0.0
- aeson-pretty
- ansi-terminal
- async
- bytestring
- cmark
- co-log-core
Expand Down
226 changes: 86 additions & 140 deletions unison-cli/src/Unison/Codebase/Watch.hs
Original file line number Diff line number Diff line change
@@ -1,153 +1,99 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE TypeApplications #-}

module Unison.Codebase.Watch where

import Control.Concurrent
( forkIO,
killThread,
threadDelay,
module Unison.Codebase.Watch
( watchDirectory,
)
where

import Control.Concurrent (threadDelay)
import Control.Concurrent.STM qualified as STM
import Control.Exception (MaskingState (..))
import Data.IORef (newIORef, readIORef, writeIORef)
import Data.Map qualified as Map
import Data.Time.Clock
( UTCTime,
diffUTCTime,
)
import Data.Time.Clock (UTCTime, diffUTCTime)
import GHC.Conc (registerDelay)
import GHC.IO (unsafeUnmask)
import Ki qualified
import System.FSNotify (Event (Added, Modified))
import System.FSNotify qualified as FSNotify
import Unison.Prelude
import Unison.Util.TQueue (TQueue)
import Unison.Util.TQueue qualified as TQueue
import UnliftIO.Exception (catch)
import UnliftIO.IORef
( newIORef,
readIORef,
writeIORef,
)
import UnliftIO.MVar
( newEmptyMVar,
putMVar,
takeMVar,
tryPutMVar,
tryTakeMVar,
)
import UnliftIO.Exception (finally, tryAny)
import UnliftIO.STM (atomically)

untilJust :: (Monad m) => m (Maybe a) -> m a
untilJust act = act >>= maybe (untilJust act) return
watchDirectory :: Ki.Scope -> FSNotify.WatchManager -> FilePath -> (FilePath -> Bool) -> IO (IO (FilePath, Text))
watchDirectory scope mgr dir allow = do
eventQueue <- forkDirWatcherThread scope mgr dir allow

-- Await an event from the event queue with the following simple debounce logic, which is intended to work around the
-- tendency for modern editors to create a flurry of rapid filesystem events when a file is saved:
--
-- 1. Block until an event arrives.
-- 2. Keep consuming events until 50ms elapse without an event.
-- 3. Return only the last event.
--
-- Note we don't have any smarts here for a flurry of events that are related to more than one file; we just throw
-- everything away except the last event. In practice, this has seemed to work fine.
let awaitEvent0 :: IO (FilePath, UTCTime)
awaitEvent0 = do
let go :: (FilePath, UTCTime) -> IO (FilePath, UTCTime)
go event0 = do
var <- registerDelay 50_000
(join . atomically . asum)
[ do
event1 <- STM.readTQueue eventQueue
pure (go event1),
do
STM.readTVar var >>= STM.check
pure (pure event0)
]
event <- atomically (STM.readTQueue eventQueue)
go event

-- Enhance the previous "await event" action with a small file cache that serves as a second debounce implementation.
-- We keep in memory the file contents of previously-saved files, so that we can avoid emitting events for files that
-- last changed less than 500ms ago, and whose contents haven't changed.
previousFilesRef <- newIORef Map.empty
let awaitEvent1 :: IO (FilePath, Text)
awaitEvent1 = do
(file, t) <- awaitEvent0
tryAny (readUtf8 file) >>= \case
-- Somewhat-expected read error from a file that was just written. Just ignore the event and try again.
Left _ -> awaitEvent1
Right contents -> do
previousFiles <- readIORef previousFilesRef
case Map.lookup file previousFiles of
Just (contents0, t0) | contents == contents0 && (t `diffUTCTime` t0) < 0.5 -> awaitEvent1
_ -> do
writeIORef previousFilesRef $! Map.insert file (contents, t) previousFiles
pure (file, contents)

-- Enhance the previous "await" event action by first clearing the whole event queue (tossing old filesystem events
-- we may have accumulated while e.g. running a long-running IO action), and *then* waiting.
let awaitEvent2 :: IO (FilePath, Text)
awaitEvent2 = do
_ <- STM.atomically (STM.flushTQueue eventQueue)
awaitEvent1

pure awaitEvent2

-- | `forkDirWatcherThread scope mgr dir allow` forks a background thread into `scope` that, using "file watcher
-- manager" `mgr` (just a boilerplate argument the caller is responsible for creating), watches directory `dir` for
-- all "added" and "modified" filesystem events that occur on files that pass the `allow` predicate. It returns a queue
-- of such event that is (obviously) meant to be read or flushed, never written.
forkDirWatcherThread :: Ki.Scope -> FSNotify.WatchManager -> FilePath -> (FilePath -> Bool) -> IO (STM.TQueue (FilePath, UTCTime))
forkDirWatcherThread scope mgr dir allow = do
queue <- STM.newTQueueIO

watchDirectory' ::
forall m. (MonadIO m) => FilePath -> m (IO (), IO (FilePath, UTCTime))
watchDirectory' d = do
mvar <- newEmptyMVar
let handler :: Event -> IO ()
handler e = case e of
Added fp t FSNotify.IsFile -> doIt fp t
Modified fp t FSNotify.IsFile -> doIt fp t
handler = \case
Added fp t FSNotify.IsFile | allow fp -> atomically (STM.writeTQueue queue (fp, t))
Modified fp t FSNotify.IsFile | allow fp -> atomically (STM.writeTQueue queue (fp, t))
_ -> pure ()
where
doIt fp t = do
_ <- tryTakeMVar mvar
putMVar mvar (fp, t)
-- janky: used to store the cancellation action returned
-- by `watchDir`, which is created asynchronously
cleanupRef <- newEmptyMVar
-- we don't like FSNotify's debouncing (it seems to drop later events)
-- so we will be doing our own instead
let config = FSNotify.defaultConfig
cancel <- liftIO $
forkIO $
FSNotify.withManagerConf config $ \mgr -> do
cancelInner <- FSNotify.watchDir mgr d (const True) handler <|> (pure (pure ()))
putMVar cleanupRef $ liftIO cancelInner
forever $ threadDelay 1000000
let cleanup :: IO ()
cleanup = join (takeMVar cleanupRef) >> killThread cancel
pure (cleanup, takeMVar mvar)

collectUntilPause :: forall a. TQueue a -> Int -> IO [a]
collectUntilPause queue minPauseµsec = do
-- 1. wait for at least one element in the queue
void . atomically $ TQueue.peek queue

let go :: IO [a]
go = do
before <- atomically $ TQueue.enqueueCount queue
threadDelay minPauseµsec
after <- atomically $ TQueue.enqueueCount queue
-- if nothing new is on the queue, then return the contents
if before == after
then atomically $ TQueue.flush queue
else go
go
-- A bit of a "one too many threads" situation but there's not much we can easily do about it. The `fsnotify` API
-- doesn't expose any synchronous API; the only option is to fork a background thread with a callback. So, we spawn
-- a thread that spawns *that* thread, then waits forever. The purpose here is to simply leverage `ki` exception
-- propagation machinery to ensure that the `fsnotify` thread is properly cleaned up.
Ki.forkWith_ scope Ki.defaultThreadOptions {Ki.maskingState = MaskedUninterruptible} do
stopListening <- FSNotify.watchDir mgr dir (const True) handler <|> pure (pure ())
unsafeUnmask (forever (threadDelay maxBound)) `finally` stopListening

watchDirectory ::
forall m.
(MonadIO m) =>
FilePath ->
(FilePath -> Bool) ->
m (IO (), IO (FilePath, Text))
watchDirectory dir allow = do
previousFiles <- newIORef Map.empty
(cancelWatch, watcher) <- watchDirectory' dir
let process :: FilePath -> UTCTime -> IO (Maybe (FilePath, Text))
process file t =
if allow file
then
let handle :: IOException -> IO ()
handle _e =
-- Sometimes we notice a change and try to read a file while it's being written.
-- This typically occurs when UCM is writing to the scratch file and can be
-- ignored anyways.
pure ()
go :: IO (Maybe (FilePath, Text))
go = liftIO $ do
contents <- readUtf8 file
prevs <- readIORef previousFiles
case Map.lookup file prevs of
-- if the file's content's haven't changed and less than .5s
-- have elapsed, wait for the next update
Just (contents0, t0)
| contents == contents0 && (t `diffUTCTime` t0) < 0.5 ->
return Nothing
_ ->
Just (file, contents)
<$ writeIORef previousFiles (Map.insert file (contents, t) prevs)
in catch go (\e -> Nothing <$ handle e)
else return Nothing
queue <- TQueue.newIO
gate <- liftIO newEmptyMVar
-- We spawn a separate thread to siphon the file change events
-- into a queue, which can be debounced using `collectUntilPause`
enqueuer <- liftIO . forkIO $ do
takeMVar gate -- wait until gate open before starting
forever $ do
event@(file, _) <- watcher
when (allow file) $
STM.atomically $
TQueue.enqueue queue event
pending <- newIORef []
let await :: IO (FilePath, Text)
await =
untilJust $
readIORef pending >>= \case
[] -> do
-- open the gate
_ <- tryPutMVar gate ()
-- this debounces the events, waits for 50ms pause
-- in file change events
events <- collectUntilPause queue 50000
-- traceM $ "Collected file change events" <> show events
case events of
[] -> pure Nothing
-- we pick the last of the events within the 50ms window
-- TODO: consider enqueing other events if there are
-- multiple events for different files
_ -> uncurry process $ last events
((file, t) : rest) -> do
writeIORef pending rest
process file t
cancel = cancelWatch >> killThread enqueuer
pure (cancel, await)
pure queue
15 changes: 1 addition & 14 deletions unison-cli/src/Unison/CommandLine.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ module Unison.CommandLine
parseInput,
prompt,
reportParseFailure,
watchFileSystem,
)
where

import Control.Concurrent (forkIO, killThread)
import Control.Lens hiding (aside)
import Control.Monad.Except
import Control.Monad.Trans.Except
Expand All @@ -31,11 +29,10 @@ import Text.Regex.TDFA ((=~))
import Unison.Codebase (Codebase)
import Unison.Codebase.Branch (Branch0)
import Unison.Codebase.Branch qualified as Branch
import Unison.Codebase.Editor.Input (Event (..), Input (..))
import Unison.Codebase.Editor.Input (Input (..))
import Unison.Codebase.Editor.Output (NumberedArgs)
import Unison.Codebase.Editor.StructuredArgument (StructuredArgument)
import Unison.Codebase.ProjectPath qualified as PP
import Unison.Codebase.Watch qualified as Watch
import Unison.CommandLine.FZFResolvers qualified as FZFResolvers
import Unison.CommandLine.FuzzySelect qualified as Fuzzy
import Unison.CommandLine.Helpers (warn)
Expand All @@ -46,8 +43,6 @@ import Unison.Parser.Ann (Ann)
import Unison.Prelude
import Unison.Symbol (Symbol)
import Unison.Util.Pretty qualified as P
import Unison.Util.TQueue qualified as Q
import UnliftIO.STM
import Prelude hiding (readFile, writeFile)

allow :: FilePath -> Bool
Expand All @@ -56,14 +51,6 @@ allow p =
not (".#" `isPrefixOf` takeFileName p)
&& (isSuffixOf ".u" p || isSuffixOf ".uu" p)

watchFileSystem :: Q.TQueue Event -> FilePath -> IO (IO ())
watchFileSystem q dir = do
(cancel, watcher) <- Watch.watchDirectory dir allow
t <- forkIO . forever $ do
(filePath, text) <- watcher
atomically . Q.enqueue q $ UnisonFileChanged (Text.pack filePath) text
pure (cancel >> killThread t)

data ExpansionFailure
= TooManyArguments (NonEmpty InputPattern.Argument)
| UnexpectedStructuredArgument StructuredArgument
Expand Down
Loading
Loading