Duplicate exceptions with BroadcastBlock in TPL Dataflow

I’ve written a TPL DataFlow example (https://github.com/squideyes/PodFetch) that takes a slightly different approach to completion and error handling. Here’s the relevant code from Line’s 171 to 201 of Program.cs:

    scraper.LinkTo(fetcher, link => link != null);
    scraper.LinkTo(DataflowBlock.NullTarget<Link>());

    scraper.HandleCompletion(fetcher);

    Status.Info.Log("Fetching APOD's archive list");

    links.ForEach(link => scraper.Post(link));

    scraper.Complete();

    try
    {
        await fetcher.Completion;

        Status.Finished.Log("Fetched: {0:N0}, Skipped: {1:N0}, Errors: {2:N0}, Seconds: {3:N2}",
            fetched, skipped, errored, (DateTime.UtcNow - startedOn).TotalMilliseconds / 1000.0);
    }
    catch (AggregateException errors)
    {
        foreach (var error in errors.InnerExceptions)
            Status.Failure.Log(error.Message);
    }
    catch (TaskCanceledException)
    {
        Status.Cancelled.Log("The process was manually cancelled!");
    }
    catch (Exception error)
    {
        Status.Failure.Log(error.Message);
    }

As you can see, I link a couple of TPL blocks together then get primed for handling completion using a HandleCompletion extension method:

    public static void HandleCompletion(
        this IDataflowBlock source, params IDataflowBlock[] targets)
    {
        source.Completion.ContinueWith(
            task =>
            {
                foreach (var target in targets)
                {
                    if (task.IsFaulted)
                        target.Fault(task.Exception);
                    else
                        target.Complete();
                }
            });
    }

Very importantly, I call scraper.Complete() when I’m done passing in objects to the first block in the chain. With that, the HandleCompletion extension method then deals with continuation. And, since I’m waiting on fetcher (the last block in the chain to complete), it’s easy to catch any resulting errors within a try/catch.

Leave a Comment

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)