Converting numpy solution into dask (numpy indexing doesn’t work in dask)

Chunk random_days_panel instead of historical_data and use da.map_blocks:

def dask_way(sim_count, sim_days, hist_days):
    # shared historical data
    # on a cluster you'd load this on each worker, e.g. from a NPZ file
    historical_data = np.random.normal(111.51, 10, size=hist_days)

    random_days_panel = da.random.randint(
        1, hist_days, size=(1, 1, sim_count, sim_days)
    )
    future_panel = da.map_blocks(
        lambda chunk: historical_data[chunk], random_days_panel, dtype=float
    )

    future_panel.compute()

    return future_panel

This will delegate all the work to the workers and will become faster than pure numpy once your problem gets large enough to amortize the initial (constant) cost for bringing up the scheduler and distributing the context:

hist_days = int(1e6)
sim_days = int(1e5)
sim_count = int(1000)

# dask_way(sim_count, sim_days, hist_days)
# (code in this answer)
532 ms ± 53.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# numpy_way(sim_count, sim_days, hist_days)
# (code in the question)
4.47 s ± 79.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# dask_way_1d(sim_count, sim_days, hist_days)
# (code in the question)
5.76 s ± 91.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Leave a Comment

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)