streamz.dask module contains a Dask-powered implementation of the
core Stream object. This is a drop-in implementation, but uses Dask for
execution and so can scale to a multicore machine or a distributed cluster.
First install dask and dask.distributed:
conda install dask or pip install dask[complete] --upgrade
You may also want to install Bokeh for web diagnostics:
conda install -c bokeh bokeh or pip install bokeh --upgrade
Start Local Dask Client¶
Then start a local Dask cluster
from dask.distributed import Client client = Client()
This operates on a local processes or threads. If you have Bokeh installed then this will also start a diagnostics web server at http://localhost:8787/status which you may want to open to get a real-time view of execution.
||Push data into the stream at this point|
||Apply a function to every element in the stream|
||Apply a function on every element|
Before we build a parallel stream, lets build a sequential stream that maps a
simple function across data, and then prints those results. We use the core
from time import sleep def inc(x): sleep(1) # simulate actual work return x + 1 from streamz import Stream source = Stream() source.map(inc).sink(print) for i in range(10): source.emit(i)
This should take ten seconds we call the
inc function ten times
||Convert local stream to Dask Stream|
||Allow results to pile up at this point in the stream|
||Wait on and gather results from DaskStream to local Stream|
That example ran sequentially under normal execution, now we use
to convert our stream into a DaskStream and
.gather() to convert back.
source = Stream() source.scatter().map(inc).buffer(8).gather().sink(print) for i in range(10): source.emit(i)
You may want to look at http://localhost:8787/status during execution to get a sense of the parallel execution.
This should have run much more quickly depending on how many cores you have on your machine. We added a few extra nodes to our stream, lets look at what they did.
scatter: Converted our Stream into a DaskStream. The elements that we emitted into our source were sent to the Dask client, and the subsequent
mapcall used that client’s cores to perform the computations.
gather: Converted our DaskStream back into a Stream, pulling data on our Dask client back to our local stream
buffer(5): Normally gather would exert back pressure so that the source would not accept new data until results finished and were pulled back to the local stream. This back-pressure would limit parallelism. To counter-act this we add a buffer of size eight to allow eight unfinished futures to build up in the pipeline before we start to apply back-pressure to
An important gotcha with
DaskStream is that it is a subclass
Stream, and so can be used as an input
to any function expecting a
Stream. If there is no intervening
.gather(), then the downstream node will
receive Dask futures instead of the data they represent:
source = Stream() source2 = Stream() a = source.scatter().map(inc) b = source2.combine_latest(a)
In this case, the combine operation will get real values from
source2, and Dask futures.
Downstream nodes would be free to operate on the futures, but more likely, the line should be:
b = source2.combine_latest(a.gather())