One processing conduit, 2 IO sources of the same type

I don’t know if it’s any help, but I tried to implement Iain’s suggestion and made a variant of mergeSources' that stops as soon as any of the channels does:

mergeSources' :: (MonadIO m, MonadBaseControl IO m)
              => [Source (ResourceT m) a] -- ^ The sources to merge.
              -> Int -- ^ The bound of the intermediate channel.
              -> ResourceT m (Source (ResourceT m) a)
mergeSources' sx bound = do
    c <- liftSTM $ newTBMChan bound
    mapM_ (\s -> resourceForkIO $
                    s $$ chanSink c writeTBMChan closeTBMChan) sx
    return $ sourceTBMChan c

(This simple addition is available here).

Some comments to your version of mergeSources (take them with a grain of salt, it can be I didn’t understand something well):

  • Using ...TMChan instead of ...TBMChan seems dangerous. If the writers are faster than the reader, your heap will blow. Looking at your diagram it seems that this can easily happen, if your TCP peer doesn’t read data fast enough. So I’d definitely use ...TBMChan with perhaps large but limited bound.
  • You don’t need the MonadSTM m constraint. All STM stuff is wrapped into IO with

    liftSTM = liftIO . atomically
    

    Maybe this will help you slightly when using mergeSources' in serverApp.

  • Just a cosmetic issue, I found

    liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
    

    very hard to read due to its use of liftA2 on the (->) r monad. I’d say

    do
        c <- liftSTM newTMChan
        fsrc sx c
        retn c
    

    would be longer, but much easier to read.

Could you perhaps create a self-contained project where it would be possible to play with serverApp?

Leave a Comment

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)