The streamz.dask module contains a Dask-powered implementation of the core Stream object. This is a drop-in implementation, but uses Dask for execution and so can scale to a multicore machine or a distributed cluster.

## Quickstart¶

### Installation¶

conda install dask
or


You may also want to install Bokeh for web diagnostics:

conda install -c bokeh bokeh
or


Then start a local Dask cluster

from dask.distributed import Client
client = Client()


This operates on a local processes or threads. If you have Bokeh installed then this will also start a diagnostics web server at http://localhost:8787/status which you may want to open to get a real-time view of execution.

### Sequential Execution¶

 Stream.emit(x[, asynchronous]) Push data into the stream at this point map(upstream, func, *args, **kwargs) Apply a function to every element in the stream sink(upstream, func, *args, **kwargs) Apply a function on every element

Before we build a parallel stream, lets build a sequential stream that maps a simple function across data, and then prints those results. We use the core Stream object.

from time import sleep

def inc(x):
sleep(1)  # simulate actual work
return x + 1

from streamz import Stream

source = Stream()
source.map(inc).sink(print)

for i in range(10):
source.emit(i)


This should take ten seconds we call the inc function ten times sequentially.

### Parallel Execution¶

 scatter(*args, **kwargs) Convert local stream to Dask Stream buffer(upstream, n, **kwargs) Allow results to pile up at this point in the stream
 gather([upstream, upstreams, stream_name, …]) Wait on and gather results from DaskStream to local Stream

That example ran sequentially under normal execution, now we use .scatter() to convert our stream into a DaskStream and .gather() to convert back.

source = Stream()
source.scatter().map(inc).buffer(8).gather().sink(print)

for i in range(10):
source.emit(i)


You may want to look at http://localhost:8787/status during execution to get a sense of the parallel execution.

This should have run much more quickly depending on how many cores you have on your machine. We added a few extra nodes to our stream, lets look at what they did.

• scatter: Converted our Stream into a DaskStream. The elements that we emitted into our source were sent to the Dask client, and the subsequent map call used that client’s cores to perform the computations.
• gather: Converted our DaskStream back into a Stream, pulling data on our Dask client back to our local stream
• buffer(5): Normally gather would exert back pressure so that the source would not accept new data until results finished and were pulled back to the local stream. This back-pressure would limit parallelism. To counter-act this we add a buffer of size eight to allow eight unfinished futures to build up in the pipeline before we start to apply back-pressure to source.emit.