Chunk random_days_panel
instead of historical_data
and use da.map_blocks
:
def dask_way(sim_count, sim_days, hist_days):
# shared historical data
# on a cluster you'd load this on each worker, e.g. from a NPZ file
historical_data = np.random.normal(111.51, 10, size=hist_days)
random_days_panel = da.random.randint(
1, hist_days, size=(1, 1, sim_count, sim_days)
)
future_panel = da.map_blocks(
lambda chunk: historical_data[chunk], random_days_panel, dtype=float
)
future_panel.compute()
return future_panel
This will delegate all the work to the workers and will become faster than pure numpy once your problem gets large enough to amortize the initial (constant) cost for bringing up the scheduler and distributing the context:
hist_days = int(1e6)
sim_days = int(1e5)
sim_count = int(1000)
# dask_way(sim_count, sim_days, hist_days)
# (code in this answer)
532 ms ± 53.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# numpy_way(sim_count, sim_days, hist_days)
# (code in the question)
4.47 s ± 79.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# dask_way_1d(sim_count, sim_days, hist_days)
# (code in the question)
5.76 s ± 91.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)