I don’t know if it’s any help, but I tried to implement Iain’s suggestion and made a variant of mergeSources' that stops as soon as any of the channels does:
mergeSources' :: (MonadIO m, MonadBaseControl IO m)
=> [Source (ResourceT m) a] -- ^ The sources to merge.
-> Int -- ^ The bound of the intermediate channel.
-> ResourceT m (Source (ResourceT m) a)
mergeSources' sx bound = do
c <- liftSTM $ newTBMChan bound
mapM_ (\s -> resourceForkIO $
s $$ chanSink c writeTBMChan closeTBMChan) sx
return $ sourceTBMChan c
(This simple addition is available here).
Some comments to your version of mergeSources (take them with a grain of salt, it can be I didn’t understand something well):
- Using
...TMChaninstead of...TBMChanseems dangerous. If the writers are faster than the reader, your heap will blow. Looking at your diagram it seems that this can easily happen, if your TCP peer doesn’t read data fast enough. So I’d definitely use...TBMChanwith perhaps large but limited bound. -
You don’t need the
MonadSTM mconstraint. All STM stuff is wrapped intoIOwithliftSTM = liftIO . atomicallyMaybe this will help you slightly when using
mergeSources'inserverApp. -
Just a cosmetic issue, I found
liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retnvery hard to read due to its use of
liftA2on the(->) rmonad. I’d saydo c <- liftSTM newTMChan fsrc sx c retn cwould be longer, but much easier to read.
Could you perhaps create a self-contained project where it would be possible to play with serverApp?