Dask Integration

The streamz.dask module contains a Dask-powered implementation of the core Stream object. This is a drop-in implementation, but uses Dask for execution and so can scale to a multicore machine or a distributed cluster.

Quickstart

Installation

First install dask and dask.distributed:

conda install dask
or
pip install dask[complete] --upgrade

You may also want to install Bokeh for web diagnostics:

conda install -c bokeh bokeh
or
pip install bokeh --upgrade

Start Local Dask Client

Then start a local Dask cluster

from dask.distributed import Client
client = Client()

This operates on local processes or threads. If you have Bokeh installed then this will also start a diagnostics web server at http://localhost:8787/status which you may want to open to get a real-time view of execution.

Sequential Execution

Stream.emit(x[, asynchronous, metadata])

Push data into the stream at this point

map(upstream, func, *args, **kwargs)

Apply a function to every element in the stream

sink(upstream, func, *args, **kwargs)

Apply a function on every element

Before we build a parallel stream, let’s build a sequential stream that maps a simple function across data, and then prints those results. We use the core Stream object.

from time import sleep

def inc(x):
    sleep(1)  # simulate actual work
    return x + 1

from streamz import Stream

source = Stream()
source.map(inc).sink(print)

for i in range(10):
    source.emit(i)

This should take ten seconds because we call the inc function ten times sequentially.

Parallel Execution

scatter(*args, **kwargs)

Convert local stream to Dask Stream

buffer(upstream, n, **kwargs)

Allow results to pile up at this point in the stream

gather([upstream, upstreams, stream_name, ...])

Wait on and gather results from DaskStream to local Stream

That example ran sequentially under normal execution, now we use .scatter() to convert our stream into a DaskStream and .gather() to convert back.

source = Stream()
source.scatter().map(inc).buffer(8).gather().sink(print)

for i in range(10):
    source.emit(i)

You may want to look at http://localhost:8787/status during execution to get a sense of the parallel execution.

This should have run much more quickly depending on how many cores you have on your machine. We added a few extra nodes to our stream; let’s look at what they did.

  • scatter: Converted our Stream into a DaskStream. The elements that we emitted into our source were sent to the Dask client, and the subsequent map call used that client’s cores to perform the computations.

  • gather: Converted our DaskStream back into a Stream, pulling data on our Dask client back to our local stream

  • buffer(5): Normally gather would exert back pressure so that the source would not accept new data until results finished and were pulled back to the local stream. This back-pressure would limit parallelism. To counter-act this we add a buffer of size eight to allow eight unfinished futures to build up in the pipeline before we start to apply back-pressure to source.emit.

Gotchas

An important gotcha with DaskStream is that it is a subclass of Stream, and so can be used as an input to any function expecting a Stream. If there is no intervening .gather(), then the downstream node will receive Dask futures instead of the data they represent:

source = Stream()
source2 = Stream()
a = source.scatter().map(inc)
b = source2.combine_latest(a)

In this case, the combine operation will get real values from source2, and Dask futures. Downstream nodes would be free to operate on the futures, but more likely, the line should be:

b = source2.combine_latest(a.gather())