I don’t know if it’s any help, but I tried to implement Iain’s suggestion and made a variant of mergeSources'
that stops as soon as any of the channels does:
mergeSources' :: (MonadIO m, MonadBaseControl IO m)
=> [Source (ResourceT m) a] -- ^ The sources to merge.
-> Int -- ^ The bound of the intermediate channel.
-> ResourceT m (Source (ResourceT m) a)
mergeSources' sx bound = do
c <- liftSTM $ newTBMChan bound
mapM_ (\s -> resourceForkIO $
s $$ chanSink c writeTBMChan closeTBMChan) sx
return $ sourceTBMChan c
(This simple addition is available here).
Some comments to your version of mergeSources
(take them with a grain of salt, it can be I didn’t understand something well):
- Using
...TMChan
instead of...TBMChan
seems dangerous. If the writers are faster than the reader, your heap will blow. Looking at your diagram it seems that this can easily happen, if your TCP peer doesn’t read data fast enough. So I’d definitely use...TBMChan
with perhaps large but limited bound. -
You don’t need the
MonadSTM m
constraint. All STM stuff is wrapped intoIO
withliftSTM = liftIO . atomically
Maybe this will help you slightly when using
mergeSources'
inserverApp
. -
Just a cosmetic issue, I found
liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
very hard to read due to its use of
liftA2
on the(->) r
monad. I’d saydo c <- liftSTM newTMChan fsrc sx c retn c
would be longer, but much easier to read.
Could you perhaps create a self-contained project where it would be possible to play with serverApp
?