Dask Integration¶
The streamz.dask module contains a Dask-powered implementation of the
core Stream object. This is a drop-in implementation, but uses Dask for
execution and so can scale to a multicore machine or a distributed cluster.
Quickstart¶
Installation¶
First install dask and dask.distributed:
conda install dask
or
pip install dask[complete] --upgrade
You may also want to install Bokeh for web diagnostics:
conda install -c bokeh bokeh
or
pip install bokeh --upgrade
Start Local Dask Client¶
Then start a local Dask cluster
from dask.distributed import Client
client = Client()
This operates on local processes or threads. If you have Bokeh installed then this will also start a diagnostics web server at http://localhost:8787/status which you may want to open to get a real-time view of execution.
Sequential Execution¶
|
Push data into the stream at this point |
|
Apply a function to every element in the stream |
|
Apply a function on every element |
Before we build a parallel stream, let’s build a sequential stream that maps a
simple function across data, and then prints those results. We use the core
Stream object.
from time import sleep
def inc(x):
sleep(1) # simulate actual work
return x + 1
from streamz import Stream
source = Stream()
source.map(inc).sink(print)
for i in range(10):
source.emit(i)
This should take ten seconds because we call the inc function ten times
sequentially.
Parallel Execution¶
|
Convert local stream to Dask Stream |
|
Allow results to pile up at this point in the stream |
|
Wait on and gather results from DaskStream to local Stream |
That example ran sequentially under normal execution, now we use .scatter()
to convert our stream into a DaskStream and .gather() to convert back.
source = Stream()
source.scatter().map(inc).buffer(8).gather().sink(print)
for i in range(10):
source.emit(i)
You may want to look at http://localhost:8787/status during execution to get a sense of the parallel execution.
This should have run much more quickly depending on how many cores you have on your machine. We added a few extra nodes to our stream; let’s look at what they did.
scatter: Converted our Stream into a DaskStream. The elements that we emitted into our source were sent to the Dask client, and the subsequentmapcall used that client’s cores to perform the computations.gather: Converted our DaskStream back into a Stream, pulling data on our Dask client back to our local streambuffer(5): Normally gather would exert back pressure so that the source would not accept new data until results finished and were pulled back to the local stream. This back-pressure would limit parallelism. To counter-act this we add a buffer of size eight to allow eight unfinished futures to build up in the pipeline before we start to apply back-pressure tosource.emit.
Gotchas¶
An important gotcha with DaskStream is that it is a subclass of
Stream, and so can be used as an input to any function expecting a
Stream. If there is no intervening .gather(), then the
downstream node will receive Dask futures instead of the data they
represent:
source = Stream()
source2 = Stream()
a = source.scatter().map(inc)
b = source2.combine_latest(a)
In this case, the combine operation will get real values from
source2, and Dask futures. Downstream nodes would be free to
operate on the futures, but more likely, the line should be:
b = source2.combine_latest(a.gather())