API

Stream

Stream([upstream, upstreams, stream_name, ...])

A Stream is an infinite sequence of data.

Stream.connect(downstream)

Connect this stream to a downstream element.

Stream.destroy([streams])

Disconnect this stream from any upstream sources

Stream.disconnect(downstream)

Disconnect this stream to a downstream element.

Stream.visualize([filename])

Render the computation of this object's task graph using graphviz.

accumulate(upstream, func[, start, ...])

Accumulate results with previous state

buffer(upstream, n, **kwargs)

Allow results to pile up at this point in the stream

collect(upstream[, cache, metadata_cache])

Hold elements in a cache and emit them as a collection when flushed.

combine_latest(*upstreams, **kwargs)

Combine multiple streams together to a stream of tuples

delay(upstream, interval, **kwargs)

Add a time delay to results

filter(upstream, predicate, *args, **kwargs)

Only pass through elements that satisfy the predicate

flatten([upstream, upstreams, stream_name, ...])

Flatten streams of lists or iterables into a stream of elements

map(upstream, func, *args, **kwargs)

Apply a function to every element in the stream

partition(upstream, n[, timeout, key])

Partition stream into tuples of equal size

rate_limit(upstream, interval, **kwargs)

Limit the flow of data

scatter(*args, **kwargs)

Convert local stream to Dask Stream

sink(upstream, func, *args, **kwargs)

Apply a function on every element

sink_to_textfile(upstream, file[, end, mode])

Write elements to a plain text file, one element per line.

slice(upstream[, start, end, step])

Get only some events in a stream by position.

sliding_window(upstream, n[, return_partial])

Produce overlapping tuples of size n

starmap(upstream, func, *args, **kwargs)

Apply a function to every element in the stream, splayed out

timed_window(upstream, interval, **kwargs)

Emit a tuple of collected results every interval

union(*upstreams, **kwargs)

Combine multiple streams into one

unique(upstream[, maxsize, key, hashable])

Avoid sending through repeated elements

pluck(upstream, pick, **kwargs)

Select elements from elements in the stream.

zip(*upstreams, **kwargs)

Combine streams together into a stream of tuples

zip_latest(lossless, *upstreams, **kwargs)

Combine multiple streams together to a stream of tuples

Stream.connect(downstream)

Connect this stream to a downstream element.

Parameters
downstream: Stream

The downstream stream to connect to

Stream.disconnect(downstream)

Disconnect this stream to a downstream element.

Parameters
downstream: Stream

The downstream stream to disconnect from

Stream.destroy(streams=None)

Disconnect this stream from any upstream sources

Stream.emit(x, asynchronous=False, metadata=None)

Push data into the stream at this point

This is typically done only at source Streams but can theoretically be done at any point

Parameters
x: any

an element of data

asynchronous:

emit asynchronously

metadata: list[dict], optional

Various types of metadata associated with the data element in x.

ref: RefCounter A reference counter used to check when data is done

Stream.frequencies(**kwargs)

Count occurrences of elements

classmethod Stream.register_api(modifier=<function identity>, attribute_name=None)

Add callable to Stream API

This allows you to register a new method onto this class. You can use it as a decorator.:

>>> @Stream.register_api()
... class foo(Stream):
...     ...

>>> Stream().foo(...)  # this works now

It attaches the callable as a normal attribute to the class object. In doing so it respects inheritance (all subclasses of Stream will also get the foo attribute).

By default callables are assumed to be instance methods. If you like you can include modifiers to apply before attaching to the class as in the following case where we construct a staticmethod.

>>> @Stream.register_api(staticmethod)
... class foo(Stream):
...     ...
>>> Stream.foo(...)  # Foo operates as a static method

You can also provide an optional attribute_name argument to control the name of the attribute your callable will be attached as.

>>> @Stream.register_api(attribute_name="bar")
... class foo(Stream):
...     ...

>> Stream().bar(…) # foo was actually attached as bar

Stream.sink(func, *args, **kwargs)

Apply a function on every element

Parameters
func: callable

A function that will be applied on every element.

args:

Positional arguments that will be passed to func after the incoming element.

kwargs:

Stream-specific arguments will be passed to Stream.__init__, the rest of them will be passed to func.

Examples

>>> source = Stream()
>>> L = list()
>>> source.sink(L.append)
>>> source.sink(print)
>>> source.sink(print)
>>> source.emit(123)
123
123
>>> L
[123]
Stream.sink_to_list()

Append all elements of a stream to a list as they come in

Examples

>>> source = Stream()
>>> L = source.map(lambda x: 10 * x).sink_to_list()
>>> for i in range(5):
...     source.emit(i)
>>> L
[0, 10, 20, 30, 40]
Stream.sink_to_textfile(file, end='\n', mode='a', **kwargs)

Write elements to a plain text file, one element per line.

Type of elements must be str.

Parameters
file: str or file-like

File to write the elements to. str is treated as a file name to open. If file-like, descriptor must be open in text mode. Note that the file descriptor will be closed when this sink is destroyed.

end: str, optional

This value will be written to the file after each element. Defaults to newline character.

mode: str, optional

If file is str, file will be opened in this mode. Defaults to "a" (append mode).

Examples

>>> source = Stream()
>>> source.map(str).sink_to_textfile("test.txt")
>>> source.emit(0)
>>> source.emit(1)
>>> print(open("test.txt", "r").read())
0
1
Stream.to_websocket(uri, ws_kwargs=None, **kwargs)

Write bytes data to websocket

The websocket will be opened on first call, and kept open. Should it close at some point, future writes will fail.

Requires the websockets package.

Parameters
  • uri – str Something like “ws://host:port”. Use “wss:” to allow TLS.

  • ws_kwargs – dict Further kwargs to pass to websockets.connect, please read its documentation.

  • kwargs – Passed to superclass

Stream.to_mqtt(host, port, topic, keepalive=60, client_kwargs=None, **kwargs)

Send data to MQTT broker

See also sources.from_mqtt.

Requires paho.mqtt

Parameters
  • host – str

  • port – int

  • topic – str

  • keepalive – int See mqtt docs - to keep the channel alive

  • client_kwargs – Passed to the client’s connect() method

Stream.update(x, who=None, metadata=None)
Stream.visualize(filename='mystream.png', **kwargs)

Render the computation of this object’s task graph using graphviz.

Requires graphviz and networkx to be installed.

Parameters
filenamestr, optional

The name of the file to write to disk.

kwargs:

Graph attributes to pass to graphviz like rankdir="LR"

Sources

from_iterable(iterable, **kwargs)

Emits items from an iterable.

filenames(path[, poll_interval])

Stream over filenames in a directory

from_kafka(topics, consumer_params[, ...])

Accepts messages from Kafka

from_kafka_batched(topic, consumer_params[, ...])

Get messages and keys (optional) from Kafka in batches

from_mqtt(host, port, topic[, keepalive, ...])

Read from MQTT source

from_process(cmd[, open_kwargs, ...])

Messages from a running external process

from_websocket(host, port[, serve_kwargs])

Read binary data from a websocket

from_textfile(f[, poll_interval, delimiter, ...])

Stream data from a text file

from_tcp(port[, delimiter, server_kwargs])

Creates events by reading from a socket using tornado TCPServer

from_http_server(port[, path, server_kwargs])

Listen for HTTP POSTs on given port

DaskStream

DaskStream(*args, **kwargs)

A Parallel stream using Dask

gather([upstream, upstreams, stream_name, ...])

Wait on and gather results from DaskStream to local Stream

Definitions

streamz.accumulate(upstream, func, start='--no-default--', returns_state=False, **kwargs)

Accumulate results with previous state

This performs running or cumulative reductions, applying the function to the previous total and the new element. The function should take two arguments, the previous accumulated state and the next element and it should return a new accumulated state, - state = func(previous_state, new_value) (returns_state=False) - state, result = func(previous_state, new_value) (returns_state=True)

where the new_state is passed to the next invocation. The state or result is emitted downstream for the two cases.

Parameters
func: callable
start: object

Initial value, passed as the value of previous_state on the first invocation. Defaults to the first submitted element

returns_state: boolean

If true then func should return both the state and the value to emit If false then both values are the same, and func returns one value

**kwargs:

Keyword arguments to pass to func

Examples

A running total, producing triangular numbers

>>> source = Stream()
>>> source.accumulate(lambda acc, x: acc + x).sink(print)
>>> for i in range(5):
...     source.emit(i)
0
1
3
6
10

A count of number of events (including the current one)

>>> source = Stream()
>>> source.accumulate(lambda acc, x: acc + 1, start=0).sink(print)
>>> for _ in range(5):
...     source.emit(0)
1
2
3
4
5

Like the builtin “enumerate”.

>>> source = Stream()
>>> source.accumulate(lambda acc, x: ((acc[0] + 1, x), (acc[0], x)),
...                   start=(0, 0), returns_state=True
...                   ).sink(print)
>>> for i in range(3):
...     source.emit(0)
(0, 0)
(1, 0)
(2, 0)
streamz.buffer(upstream, n, **kwargs)

Allow results to pile up at this point in the stream

This allows results to buffer in place at various points in the stream. This can help to smooth flow through the system when backpressure is applied.

streamz.collect(upstream, cache=None, metadata_cache=None, **kwargs)

Hold elements in a cache and emit them as a collection when flushed.

Examples

>>> source1 = Stream()
>>> source2 = Stream()
>>> collector = collect(source1)
>>> collector.sink(print)
>>> source2.sink(collector.flush)
>>> source1.emit(1)
>>> source1.emit(2)
>>> source2.emit('anything')  # flushes collector
...
[1, 2]
streamz.combine_latest(*upstreams, **kwargs)

Combine multiple streams together to a stream of tuples

This will emit a new tuple of all of the most recent elements seen from any stream.

Parameters
emit_onstream or list of streams or None

only emit upon update of the streams listed. If None, emit on update from any stream

See also

zip
streamz.delay(upstream, interval, **kwargs)

Add a time delay to results

streamz.filter(upstream, predicate, *args, **kwargs)

Only pass through elements that satisfy the predicate

Parameters
predicatefunction

The predicate. Should return True or False, where True means that the predicate is satisfied.

*args

The arguments to pass to the predicate.

**kwargs:

Keyword arguments to pass to predicate

Examples

>>> source = Stream()
>>> source.filter(lambda x: x % 2 == 0).sink(print)
>>> for i in range(5):
...     source.emit(i)
0
2
4
streamz.flatten(upstream=None, upstreams=None, stream_name=None, loop=None, asynchronous=None, ensure_io_loop=False)

Flatten streams of lists or iterables into a stream of elements

See also

partition

Examples

>>> source = Stream()
>>> source.flatten().sink(print)
>>> for x in [[1, 2, 3], [4, 5], [6, 7, 7]]:
...     source.emit(x)
1
2
3
4
5
6
7
streamz.map(upstream, func, *args, **kwargs)

Apply a function to every element in the stream

Parameters
func: callable
*args

The arguments to pass to the function.

**kwargs:

Keyword arguments to pass to func

Examples

>>> source = Stream()
>>> source.map(lambda x: 2*x).sink(print)
>>> for i in range(5):
...     source.emit(i)
0
2
4
6
8
streamz.partition(upstream, n, timeout=None, key=None, **kwargs)

Partition stream into tuples of equal size

Parameters
n: int

Maximum partition size

timeout: int or float, optional

Number of seconds after which a partition will be emitted, even if its size is less than n. If None (default), a partition will be emitted only when its size reaches n.

key: hashable or callable, optional

Emit items with the same key together as a separate partition. If key is callable, partition will be identified by key(x), otherwise by x[key]. Defaults to None.

Examples

>>> source = Stream()
>>> source.partition(3).sink(print)
>>> for i in range(10):
...     source.emit(i)
(0, 1, 2)
(3, 4, 5)
(6, 7, 8)
>>> source = Stream()
>>> source.partition(2, key=lambda x: x % 2).sink(print)
>>> for i in range(4):
...     source.emit(i)
(0, 2)
(1, 3)
>>> from time import sleep
>>> source = Stream()
>>> source.partition(5, timeout=1).sink(print)
>>> for i in range(3):
...     source.emit(i)
>>> sleep(1)
(0, 1, 2)
streamz.rate_limit(upstream, interval, **kwargs)

Limit the flow of data

This stops two elements of streaming through in an interval shorter than the provided value.

Parameters
interval: float

Time in seconds

streamz.sink(upstream, func, *args, **kwargs)

Apply a function on every element

Parameters
func: callable

A function that will be applied on every element.

args:

Positional arguments that will be passed to func after the incoming element.

kwargs:

Stream-specific arguments will be passed to Stream.__init__, the rest of them will be passed to func.

Examples

>>> source = Stream()
>>> L = list()
>>> source.sink(L.append)
>>> source.sink(print)
>>> source.sink(print)
>>> source.emit(123)
123
123
>>> L
[123]
streamz.sink_to_textfile(upstream, file, end='\n', mode='a', **kwargs)

Write elements to a plain text file, one element per line.

Type of elements must be str.

Parameters
file: str or file-like

File to write the elements to. str is treated as a file name to open. If file-like, descriptor must be open in text mode. Note that the file descriptor will be closed when this sink is destroyed.

end: str, optional

This value will be written to the file after each element. Defaults to newline character.

mode: str, optional

If file is str, file will be opened in this mode. Defaults to "a" (append mode).

Examples

>>> source = Stream()
>>> source.map(str).sink_to_textfile("test.txt")
>>> source.emit(0)
>>> source.emit(1)
>>> print(open("test.txt", "r").read())
0
1
streamz.sliding_window(upstream, n, return_partial=True, **kwargs)

Produce overlapping tuples of size n

Parameters
return_partialbool

If True, yield tuples as soon as any events come in, each tuple being smaller or equal to the window size. If False, only start yielding tuples once a full window has accrued.

Examples

>>> source = Stream()
>>> source.sliding_window(3, return_partial=False).sink(print)
>>> for i in range(8):
...     source.emit(i)
(0, 1, 2)
(1, 2, 3)
(2, 3, 4)
(3, 4, 5)
(4, 5, 6)
(5, 6, 7)
streamz.Stream(upstream=None, upstreams=None, stream_name=None, loop=None, asynchronous=None, ensure_io_loop=False)

A Stream is an infinite sequence of data.

Streams subscribe to each other passing and transforming data between them. A Stream object listens for updates from upstream, reacts to these updates, and then emits more data to flow downstream to all Stream objects that subscribe to it. Downstream Stream objects may connect at any point of a Stream graph to get a full view of the data coming off of that point to do with as they will.

Parameters
stream_name: str or None

This is the name of the stream.

asynchronous: boolean or None

Whether or not this stream will be used in asynchronous functions or normal Python functions. Leave as None if you don’t know. True will cause operations like emit to return awaitable Futures False will use an Event loop in another thread (starts it if necessary)

ensure_io_loop: boolean

Ensure that some IOLoop will be created. If asynchronous is None or False then this will be in a separate thread, otherwise it will be IOLoop.current

Examples

>>> def inc(x):
...     return x + 1
>>> source = Stream()  # Create a stream object
>>> s = source.map(inc).map(str)  # Subscribe to make new streams
>>> s.sink(print)  # take an action whenever an element reaches the end
>>> L = list()
>>> s.sink(L.append)  # or take multiple actions (streams can branch)
>>> for i in range(5):
...     source.emit(i)  # push data in at the source
'1'
'2'
'3'
'4'
'5'
>>> L  # and the actions happen at the sinks
['1', '2', '3', '4', '5']
streamz.timed_window(upstream, interval, **kwargs)

Emit a tuple of collected results every interval

Every interval seconds this emits a tuple of all of the results seen so far. This can help to batch data coming off of a high-volume stream.

streamz.union(*upstreams, **kwargs)

Combine multiple streams into one

Every element from any of the upstreams streams will immediately flow into the output stream. They will not be combined with elements from other streams.

See also

Stream.zip
Stream.combine_latest
streamz.unique(upstream, maxsize=None, key=<function identity>, hashable=True, **kwargs)

Avoid sending through repeated elements

This deduplicates a stream so that only new elements pass through. You can control how much of a history is stored with the maxsize= parameter. For example setting maxsize=1 avoids sending through elements when one is repeated right after the other.

Parameters
maxsize: int or None, optional

number of stored unique values to check against

keyfunction, optional

Function which returns a representation of the incoming data. For example key=lambda x: x['a'] could be used to allow only pieces of data with unique 'a' values to pass through.

hashablebool, optional

If True then data is assumed to be hashable, else it is not. This is used for determining how to cache the history, if hashable then either dicts or LRU caches are used, otherwise a deque is used. Defaults to True.

Examples

>>> source = Stream()
>>> source.unique(maxsize=1).sink(print)
>>> for x in [1, 1, 2, 2, 2, 1, 3]:
...     source.emit(x)
1
2
1
3
streamz.pluck(upstream, pick, **kwargs)

Select elements from elements in the stream.

Parameters
pluckobject, list

The element(s) to pick from the incoming element in the stream If an instance of list, will pick multiple elements.

Examples

>>> source = Stream()
>>> source.pluck([0, 3]).sink(print)
>>> for x in [[1, 2, 3, 4], [4, 5, 6, 7], [8, 9, 10, 11]]:
...     source.emit(x)
(1, 4)
(4, 7)
(8, 11)
>>> source = Stream()
>>> source.pluck('name').sink(print)
>>> for x in [{'name': 'Alice', 'x': 123}, {'name': 'Bob', 'x': 456}]:
...     source.emit(x)
'Alice'
'Bob'
streamz.zip(*upstreams, **kwargs)

Combine streams together into a stream of tuples

We emit a new tuple once all streams have produce a new tuple.

streamz.zip_latest(lossless, *upstreams, **kwargs)

Combine multiple streams together to a stream of tuples

The stream which this is called from is lossless. All elements from the lossless stream are emitted reguardless of when they came in. This will emit a new tuple consisting of an element from the lossless stream paired with the latest elements from the other streams. Elements are only emitted when an element on the lossless stream are received, similar to combine_latest with the emit_on flag.

See also

Stream.combine_latest
Stream.zip
streamz.from_iterable(iterable, **kwargs)

Emits items from an iterable.

Parameters
iterable: iterable

An iterable to emit messages from.

Examples

>>> source = Stream.from_iterable(range(3))
>>> L = source.sink_to_list()
>>> source.start()
>>> L
[0, 1, 2]
streamz.filenames(path, poll_interval=0.1, **kwargs)

Stream over filenames in a directory

Parameters
path: string

Directory path or globstring over which to search for files

poll_interval: Number

Seconds between checking path

start: bool (False)

Whether to start running immediately; otherwise call stream.start() explicitly.

Examples

>>> source = Stream.filenames('path/to/dir')  
>>> source = Stream.filenames('path/to/*.csv', poll_interval=0.500)  
streamz.from_kafka(topics, consumer_params, poll_interval=0.1, **kwargs)

Accepts messages from Kafka

Uses the confluent-kafka library, https://docs.confluent.io/current/clients/confluent-kafka-python/

Parameters
topics: list of str

Labels of Kafka topics to consume from

consumer_params: dict

Settings to set up the stream, see https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md Examples: bootstrap.servers, Connection string(s) (host:port) by which to reach Kafka; group.id, Identity of the consumer. If multiple sources share the same group, each message will be passed to only one of them.

poll_interval: number

Seconds that elapse between polling Kafka for new messages

start: bool (False)

Whether to start polling upon instantiation

Examples

>>> source = Stream.from_kafka(['mytopic'],
...           {'bootstrap.servers': 'localhost:9092',
...            'group.id': 'streamz'})  
streamz.from_kafka_batched(topic, consumer_params, poll_interval='1s', npartitions=None, refresh_partitions=False, start=False, dask=False, max_batch_size=10000, keys=False, engine=None, **kwargs)

Get messages and keys (optional) from Kafka in batches

Uses the confluent-kafka library, https://docs.confluent.io/current/clients/confluent-kafka-python/

This source will emit lists of messages for each partition of a single given topic per time interval, if there is new data. If using dask, one future will be produced per partition per time-step, if there is data.

Checkpointing is achieved through the use of reference counting. A reference counter is emitted downstream for each batch of data. A callback is triggered when the reference count reaches zero and the offsets are committed back to Kafka. Upon the start of this function, the previously committed offsets will be fetched from Kafka and begin reading form there. This will guarantee at-least-once semantics.

Parameters
topic: str

Kafka topic to consume from

consumer_params: dict
Settings to set up the stream, see
Examples:
bootstrap.servers: Connection string(s) (host:port) by which to reach Kafka
group.id: Identity of the consumer. If multiple sources share the same
group, each message will be passed to only one of them.
poll_interval: number

Seconds that elapse between polling Kafka for new messages

npartitions: int (None)
Number of partitions in the topic.
If None, streamz will poll Kafka to get the number of partitions.
refresh_partitions: bool (False)
Useful if the user expects to increase the number of topic partitions on the
fly, maybe to handle spikes in load. Streamz polls Kafka in every batch to
determine the current number of partitions. If partitions have been added,
streamz will automatically start reading data from the new partitions as well.
If set to False, streamz will not accommodate adding partitions on the fly.
It is recommended to restart the stream after decreasing the number of partitions.
start: bool (False)

Whether to start polling upon instantiation

max_batch_size: int

The maximum number of messages per partition to be consumed per batch

keys: bool (False)
Whether to extract keys along with the messages.
If True, this will yield each message as a dict:
{‘key’:msg.key(), ‘value’:msg.value()}
engine: str (None)
If engine is set to “cudf”, streamz reads data (messages must be JSON)
from Kafka in an accelerated manner directly into cuDF (GPU) dataframes.
This is done using the RAPIDS custreamz library.
Please refer to RAPIDS cudf API here:
Folks interested in trying out custreamz would benefit from this
accelerated Kafka reader. If one does not want to use GPUs, they
can use streamz as is, with the default engine=None.
To use this option, one must install custreamz (use the
appropriate CUDA version recipe & Python version)
using a command like the one below, which will install all
GPU dependencies and streamz itself:
conda install -c rapidsai-nightly -c nvidia -c conda-forge | -c defaults custreamz=0.15 python=3.7 cudatoolkit=10.2
More information at: https://rapids.ai/start.html
Important Kafka Configurations
By default, a stream will start reading from the latest offsets
available. Please set ‘auto.offset.reset’: ‘earliest’ in the
consumer configs, if the stream needs to start processing from
the earliest offsets.

Examples

>>> source = Stream.from_kafka_batched('mytopic',
...           {'bootstrap.servers': 'localhost:9092',
...            'group.id': 'streamz'})  
streamz.from_textfile(f, poll_interval=0.1, delimiter='\n', from_end=False, **kwargs)

Stream data from a text file

Parameters
f: file or string

Source of the data. If string, will be opened.

poll_interval: Number

Interval to poll file for new data in seconds

delimiter: str

Character(s) to use to split the data into parts

start: bool

Whether to start running immediately; otherwise call stream.start() explicitly.

from_end: bool

Whether to begin streaming from the end of the file (i.e., only emit lines appended after the stream starts).

Returns
Stream

Examples

>>> source = Stream.from_textfile('myfile.json')  
>>> source.map(json.loads).pluck('value').sum().sink(print)  
>>> source.start()  
streamz.dask.DaskStream(*args, **kwargs)

A Parallel stream using Dask

This object is fully compliant with the streamz.core.Stream object but uses a Dask client for execution. Operations like map and accumulate submit functions to run on the Dask instance using dask.distributed.Client.submit and pass around Dask futures. Time-based operations like timed_window, buffer, and so on operate as normal.

Typically one transfers between normal Stream and DaskStream objects using the Stream.scatter() and DaskStream.gather() methods.

See also

dask.distributed.Client

Examples

>>> from dask.distributed import Client
>>> client = Client()
>>> from streamz import Stream
>>> source = Stream()
>>> source.scatter().map(func).accumulate(binop).gather().sink(...)
streamz.dask.gather(upstream=None, upstreams=None, stream_name=None, loop=None, asynchronous=None, ensure_io_loop=False)

Wait on and gather results from DaskStream to local Stream

This waits on every result in the stream and then gathers that result back to the local stream. Warning, this can restrict parallelism. It is common to combine a gather() node with a buffer() to allow unfinished futures to pile up.

See also

buffer
scatter

Examples

>>> local_stream = dask_stream.buffer(20).gather()