Merge pull request #1027 from mindofmatthew/separate-busses
Split controller OSC off from SuperDirt handshake OSC
yaxu authored Aug 23, 2023


This commit was signed with the committer’s verified signature.
alexsnaps Alex Snaps
2 parents 077b65f + cf6377e commit fcc4c5d
Showing 1 changed file with 102 additions and 104 deletions.
206 changes: 102 additions & 104 deletions src/Sound/Tidal/Stream.hs
Original file line number Diff line number Diff line change
@@ -65,7 +65,6 @@ import Sound.Tidal.Version
import Sound.Tidal.StreamTypes as Sound.Tidal.Stream

data Stream = Stream {sConfig :: Config,
sBusses :: MVar [Int],
sStateMV :: MVar ValueMap,
-- sOutput :: MVar ControlSignal,
sLink :: Link.AbletonLink,
@@ -80,9 +79,9 @@ data Cx = Cx {cxTarget :: Target,
cxUDP :: O.Udp,
cxOSCs :: [OSC],
cxAddr :: N.AddrInfo,
cxBusAddr :: Maybe N.AddrInfo
cxBusAddr :: Maybe N.AddrInfo,
cxBusses :: Maybe (MVar [Int])
deriving (Show)

data StampStyle = BundleStamp
| MessageStamp
@@ -213,28 +212,27 @@ startStream :: Config -> [(Target, [OSC])] -> IO Stream
startStream config oscmap
= do sMapMV <- newMVar Map.empty
pMapMV <- newMVar Map.empty
bussesMV <- newMVar []
globalFMV <- newMVar id
actionsMV <- newEmptyMVar

tidal_status_string >>= verbose config
verbose config $ "Listening for external controls on " ++ cCtrlAddr config ++ ":" ++ show (cCtrlPort config)
listen <- openListener config

cxs <- mapM (\(target, os) -> do remote_addr <- resolve (oAddress target) (show $ oPort target)
remote_bus_addr <- if isJust $ oBusPort target
then Just <$> resolve (oAddress target) (show $ fromJust $ oBusPort target)
else return Nothing
cxs <- mapM (\(target, os) -> do remote_addr <- resolve (oAddress target) (oPort target)
remote_bus_addr <- sequence (resolve (oAddress target) <$> (oBusPort target))
remote_busses <- sequence (oBusPort target >> (Just $ newMVar []))
let broadcast = if cCtrlBroadcast config then 1 else 0
u <- O.udp_socket (\sock sockaddr -> do N.setSocketOption sock N.Broadcast broadcast
N.connect sock sockaddr
-- N.connect sock sockaddr
) (oAddress target) (oPort target)
return $ Cx {cxUDP = u, cxAddr = remote_addr, cxBusAddr = remote_bus_addr, cxTarget = target, cxOSCs = os}
let cx = Cx {cxUDP = u, cxAddr = remote_addr, cxBusAddr = remote_bus_addr, cxBusses = remote_busses, cxTarget = target, cxOSCs = os}
_ <- forkIO $ handshake cx config
return cx
) oscmap
let bpm = (coerce defaultCps) * 60 * (cBeatsPerCycle config)
abletonLink <- Link.create bpm
let stream = Stream {sConfig = config,
sBusses = bussesMV,
sStateMV = sMapMV,
sLink = abletonLink,
sListen = listen,
@@ -243,7 +241,6 @@ startStream config oscmap
sGlobalFMV = globalFMV,
sCxs = cxs
sendHandshakes stream
let ac = T.ActionHandler {
T.onTick = onTick stream,
T.onSingleTick = onSingleTick stream,
@@ -252,35 +249,51 @@ startStream config oscmap
-- Spawn a thread that acts as the clock
_ <- T.clocked config sMapMV pMapMV actionsMV ac abletonLink
-- Spawn a thread to handle OSC control messages
_ <- forkIO $ ctrlResponder 0 config stream
_ <- forkIO $ ctrlResponder config stream
return stream

-- It only really works to handshake with one target at the moment..
sendHandshakes :: Stream -> IO ()
sendHandshakes stream = mapM_ sendHandshake $ filter (oHandshake . cxTarget) (sCxs stream)
where sendHandshake cx = if (isJust $ sListen stream)
do -- send it _from_ the udp socket we're listening to, so the
-- replies go back there
sendO False (sListen stream) cx $ O.Message "/dirt/handshake" []
hPutStrLn stderr "Can't handshake with SuperCollider without control port."

sendO :: Bool -> (Maybe O.Udp) -> Cx -> O.Message -> IO ()
sendO isBusMsg (Just listen) cx msg = O.sendTo listen (O.Packet_Message msg) (N.addrAddress addr)
handshake :: Cx -> Config -> IO ()
handshake Cx { cxUDP = udp, cxBusses = Just bussesMV, cxAddr = addr } c = sendHandshake >> listen 0
sendHandshake :: IO ()
sendHandshake = O.sendTo udp (O.Packet_Message $ O.Message "/dirt/handshake" []) (N.addrAddress addr)
listen :: Int -> IO ()
listen waits = do ms <- recvMessagesTimeout 2 udp
if (null ms)
then do checkHandshake waits -- there was a timeout, check handshake
listen (waits+1)
else do mapM_ respond ms
listen 0
checkHandshake :: Int -> IO ()
checkHandshake waits = do busses <- readMVar bussesMV
when (null busses) $ do when (waits == 0) $ verbose c $ "Waiting for SuperDirt (v.1.7.2 or higher).."
respond :: O.Message -> IO ()
respond (O.Message "/dirt/hello" _) = sendHandshake
respond (O.Message "/dirt/handshake/reply" xs) = do prev <- swapMVar bussesMV $ bufferIndices xs
-- Only report the first time..
when (null prev) $ verbose c $ "Connected to SuperDirt."
return ()
respond _ = return ()
bufferIndices :: [O.Datum] -> [Int]
bufferIndices [] = []
bufferIndices (x:xs') | x == (O.AsciiString $ O.ascii "&controlBusIndices") = catMaybes $ takeWhile isJust $ map O.datum_integral xs'
| otherwise = bufferIndices xs'
handshake _ _ = return ()

sendO :: Bool -> Cx -> O.Message -> IO ()
sendO isBusMsg cx msg = O.sendTo (cxUDP cx) (O.Packet_Message msg) (N.addrAddress addr)
where addr | isBusMsg && isJust (cxBusAddr cx) = fromJust $ cxBusAddr cx
| otherwise = cxAddr cx
sendO _ Nothing cx msg = O.sendMessage (cxUDP cx) msg

sendBndl :: Bool -> (Maybe O.Udp) -> Cx -> O.Bundle -> IO ()
sendBndl isBusMsg (Just listen) cx bndl = O.sendTo listen (O.Packet_Bundle bndl) (N.addrAddress addr)
sendBndl :: Bool -> Cx -> O.Bundle -> IO ()
sendBndl isBusMsg cx bndl = O.sendTo (cxUDP cx) (O.Packet_Bundle bndl) (N.addrAddress addr)
where addr | isBusMsg && isJust (cxBusAddr cx) = fromJust $ cxBusAddr cx
| otherwise = cxAddr cx
sendBndl _ Nothing cx bndl = O.sendBundle (cxUDP cx) bndl

resolve :: String -> String -> IO N.AddrInfo
resolve :: String -> Int -> IO N.AddrInfo
resolve host port = do let hints = N.defaultHints { N.addrSocketType = N.Stream }
addr:_ <- N.getAddrInfo (Just hints) (Just host) (Just port)
addr:_ <- N.getAddrInfo (Just hints) (Just host) (Just $ show port)
return addr

-- Start an instance of Tidal with superdirt OSC
@@ -348,8 +361,8 @@ playStack pMap = stack $ map pattern activeStates
else not (mute pState)
) $ Map.elems pMap

toOSC :: [Int] -> ProcessedEvent -> OSC -> [(Double, Bool, O.Message)]
toOSC busses pe osc@(OSC _ _)
toOSC :: Maybe [Int] -> ProcessedEvent -> OSC -> [(Double, Bool, O.Message)]
toOSC maybeBusses pe osc@(OSC _ _)
= catMaybes (playmsg:busmsgs)
-- playmap is a ValueMap where the keys don't start with ^ and are not ""
-- busmap is a ValueMap containing the rest of the keys from the event value
@@ -384,8 +397,8 @@ toOSC busses pe osc@(OSC _ _)
O.Message mungedPath vs
| otherwise = Nothing
toBus n | null busses = n
| otherwise = busses !!! n
toBus n | Just busses <- maybeBusses, (not . null) busses = busses !!! n
| otherwise = n
busmsgs = map
(\(('^':k), (VI b)) -> do v <- Map.lookup k playmap
return $ (tsPart,
@@ -516,7 +529,6 @@ doTick stream st ops sMap =
setPreviousPatternOrSilence stream
return sMap) (do
pMap <- readMVar (sPMapMV stream)
busses <- readMVar (sBusses stream)
sGlobalF <- readMVar (sGlobalFMV stream)
bpm <- (T.getTempo ops)
@@ -534,14 +546,15 @@ doTick stream st ops sMap =
(sMap'', es') = resolveState sMap' es
tes <- processCps ops es'
-- For each OSC target
forM_ cxs $ \cx@(Cx target _ oscs _ _) -> do
forM_ cxs $ \cx@(Cx target _ oscs _ _ bussesMV) -> do
busses <- mapM readMVar bussesMV
-- Latency is configurable per target.
-- Latency is only used when sending events live.
let latency = oLatency target
ms = concatMap (\e -> concatMap (toOSC busses e) oscs) tes
-- send the events to the OSC target
forM_ ms $ \ m -> (do
send (sListen stream) cx latency extraLatency m) `E.catch` \ (e :: E.SomeException) -> do
send cx latency extraLatency m) `E.catch` \ (e :: E.SomeException) -> do
hPutStrLn stderr $ "Failed to send. Is the '" ++ oName target ++ "' target running? " ++ show e
sMap'' `seq` return sMap'')

@@ -557,13 +570,13 @@ setPreviousPatternOrSilence stream =
-- Send events early using timestamp in the OSC bundle - used by Superdirt
-- Send events early by adding timestamp to the OSC message - used by Dirt
-- Send events live by delaying the thread
send :: Maybe O.Udp -> Cx -> Double -> Double -> (Double, Bool, O.Message) -> IO ()
send listen cx latency extraLatency (time, isBusMsg, m)
| oSchedule target == Pre BundleStamp = sendBndl isBusMsg listen cx $ O.Bundle timeWithLatency [m]
| oSchedule target == Pre MessageStamp = sendO isBusMsg listen cx $ addtime m
send :: Cx -> Double -> Double -> (Double, Bool, O.Message) -> IO ()
send cx latency extraLatency (time, isBusMsg, m)
| oSchedule target == Pre BundleStamp = sendBndl isBusMsg cx $ O.Bundle timeWithLatency [m]
| oSchedule target == Pre MessageStamp = sendO isBusMsg cx $ addtime m
| otherwise = do _ <- forkOS $ do now <- O.time
threadDelay $ floor $ (timeWithLatency - now) * 1000000
sendO isBusMsg listen cx m
sendO isBusMsg cx m
return ()
where addtime (O.Message mpath params) = O.Message mpath ((O.int32 sec):((O.int32 usec):params))
ut = O.ntpr_to_posix timeWithLatency
@@ -684,66 +697,51 @@ openListener c
catchAny = E.catch

-- Listen to and act on OSC control messages
ctrlResponder :: Int -> Config -> Stream -> IO ()
ctrlResponder waits c (stream@(Stream {sListen = Just sock}))
= do ms <- recvMessagesTimeout 2 sock
if (null ms)
then do checkHandshake -- there was a timeout, check handshake
ctrlResponder (waits+1) c stream
else do mapM_ act ms
ctrlResponder 0 c stream
checkHandshake = do busses <- readMVar (sBusses stream)
when (null busses) $ do when (waits == 0) $ verbose c $ "Waiting for SuperDirt (v.1.7.2 or higher).."
sendHandshakes stream

act (O.Message "/dirt/hello" _) = sendHandshakes stream
act (O.Message "/dirt/handshake/reply" xs) = do prev <- swapMVar (sBusses stream) $ bufferIndices xs
-- Only report the first time..
when (null prev) $ verbose c $ "Connected to SuperDirt."
return ()
bufferIndices [] = []
bufferIndices (x:xs') | x == (O.AsciiString $ O.ascii "&controlBusIndices") = catMaybes $ takeWhile isJust $ map O.datum_integral xs'
| otherwise = bufferIndices xs'
-- External controller commands
act (O.Message "/ctrl" (O.Int32 k:v:[]))
= act (O.Message "/ctrl" [O.string $ show k,v])
act (O.Message "/ctrl" (O.AsciiString k:v@(O.Float _):[]))
= add (O.ascii_to_string k) (VF (fromJust $ O.datum_floating v))
act (O.Message "/ctrl" (O.AsciiString k:O.AsciiString v:[]))
= add (O.ascii_to_string k) (VS (O.ascii_to_string v))
act (O.Message "/ctrl" (O.AsciiString k:O.Int32 v:[]))
= add (O.ascii_to_string k) (VI (fromIntegral v))
-- Stream playback commands
act (O.Message "/mute" (k:[]))
= withID k $ streamMute stream
act (O.Message "/unmute" (k:[]))
= withID k $ streamUnmute stream
act (O.Message "/solo" (k:[]))
= withID k $ streamSolo stream
act (O.Message "/unsolo" (k:[]))
= withID k $ streamUnsolo stream
act (O.Message "/muteAll" [])
= streamMuteAll stream
act (O.Message "/unmuteAll" [])
= streamUnmuteAll stream
act (O.Message "/unsoloAll" [])
= streamUnsoloAll stream
act (O.Message "/hush" [])
= streamHush stream
act (O.Message "/silence" (k:[]))
= withID k $ streamSilence stream
act m = hPutStrLn stderr $ "Unhandled OSC: " ++ show m
add :: String -> Value -> IO ()
add k v = do sMap <- takeMVar (sStateMV stream)
putMVar (sStateMV stream) $ Map.insert k v sMap
return ()
withID :: O.Datum -> (ID -> IO ()) -> IO ()
withID (O.AsciiString k) func = func $ (ID . O.ascii_to_string) k
withID (O.Int32 k) func = func $ (ID . show) k
withID _ _ = return ()
ctrlResponder _ _ _ = return ()
ctrlResponder :: Config -> Stream -> IO ()
ctrlResponder c (stream@(Stream {sListen = Just sock})) = loop
loop :: IO ()
loop = do O.recvMessages sock >>= mapM_ act
-- External controller commands
act :: O.Message -> IO ()
act (O.Message "/ctrl" (O.Int32 k:v:[]))
= act (O.Message "/ctrl" [O.string $ show k,v])
act (O.Message "/ctrl" (O.AsciiString k:v@(O.Float _):[]))
= add (O.ascii_to_string k) (VF (fromJust $ O.datum_floating v))
act (O.Message "/ctrl" (O.AsciiString k:O.AsciiString v:[]))
= add (O.ascii_to_string k) (VS (O.ascii_to_string v))
act (O.Message "/ctrl" (O.AsciiString k:O.Int32 v:[]))
= add (O.ascii_to_string k) (VI (fromIntegral v))
-- Stream playback commands
act (O.Message "/mute" (k:[]))
= withID k $ streamMute stream
act (O.Message "/unmute" (k:[]))
= withID k $ streamUnmute stream
act (O.Message "/solo" (k:[]))
= withID k $ streamSolo stream
act (O.Message "/unsolo" (k:[]))
= withID k $ streamUnsolo stream
act (O.Message "/muteAll" [])
= streamMuteAll stream
act (O.Message "/unmuteAll" [])
= streamUnmuteAll stream
act (O.Message "/unsoloAll" [])
= streamUnsoloAll stream
act (O.Message "/hush" [])
= streamHush stream
act (O.Message "/silence" (k:[]))
= withID k $ streamSilence stream
act m = hPutStrLn stderr $ "Unhandled OSC: " ++ show m
add :: String -> Value -> IO ()
add k v = do sMap <- takeMVar (sStateMV stream)
putMVar (sStateMV stream) $ Map.insert k v sMap
return ()
withID :: O.Datum -> (ID -> IO ()) -> IO ()
withID (O.AsciiString k) func = func $ (ID . O.ascii_to_string) k
withID (O.Int32 k) func = func $ (ID . show) k
withID _ _ = return ()
ctrlResponder _ _ = return ()

verbose :: Config -> String -> IO ()
verbose c s = when (cVerbose c) $ putStrLn s

