API¶
Stream¶
Stream ([upstream, upstreams, stream_name, …]) |
A Stream is an infinite sequence of data |
Stream.connect (self, downstream) |
Connect this stream to a downstream element. |
Stream.destroy (self[, streams]) |
Disconnect this stream from any upstream sources |
Stream.disconnect (self, downstream) |
Disconnect this stream to a downstream element. |
Stream.visualize (self[, filename]) |
Render the computation of this object’s task graph using graphviz. |
accumulate (upstream, func[, start, …]) |
Accumulate results with previous state |
buffer (upstream, n, **kwargs) |
Allow results to pile up at this point in the stream |
collect (upstream[, cache]) |
Hold elements in a cache and emit them as a collection when flushed. |
combine_latest (*upstreams, **kwargs) |
Combine multiple streams together to a stream of tuples |
delay (upstream, interval, **kwargs) |
Add a time delay to results |
filter (upstream, predicate, *args, **kwargs) |
Only pass through elements that satisfy the predicate |
flatten ([upstream, upstreams, stream_name, …]) |
Flatten streams of lists or iterables into a stream of elements |
map (upstream, func, *args, **kwargs) |
Apply a function to every element in the stream |
partition (upstream, n, **kwargs) |
Partition stream into tuples of equal size |
rate_limit (upstream, interval, **kwargs) |
Limit the flow of data |
scatter (*args, **kwargs) |
Convert local stream to Dask Stream |
sink (upstream, func, *args, **kwargs) |
Apply a function on every element |
slice (upstream[, start, end, step]) |
Get only some events in a stream by position. |
sliding_window (upstream, n[, return_partial]) |
Produce overlapping tuples of size n |
starmap (upstream, func, *args, **kwargs) |
Apply a function to every element in the stream, splayed out |
timed_window (upstream, interval, **kwargs) |
Emit a tuple of collected results every interval |
union (*upstreams, **kwargs) |
Combine multiple streams into one |
unique (upstream[, maxsize, key, hashable]) |
Avoid sending through repeated elements |
pluck (upstream, pick, **kwargs) |
Select elements from elements in the stream. |
zip (*upstreams, **kwargs) |
Combine streams together into a stream of tuples |
zip_latest (lossless, *upstreams, **kwargs) |
Combine multiple streams together to a stream of tuples |
-
Stream.
connect
(self, downstream)¶ Connect this stream to a downstream element.
Parameters: - downstream: Stream
The downstream stream to connect to
-
Stream.
disconnect
(self, downstream)¶ Disconnect this stream to a downstream element.
Parameters: - downstream: Stream
The downstream stream to disconnect from
-
Stream.
destroy
(self, streams=None)¶ Disconnect this stream from any upstream sources
-
Stream.
emit
(self, x, asynchronous=False)¶ Push data into the stream at this point
This is typically done only at source Streams but can theortically be done at any point
-
Stream.
frequencies
(self, **kwargs)¶ Count occurrences of elements
-
classmethod
Stream.
register_api
(modifier=<function identity at 0x7f4b33e4d2f0>)¶ Add callable to Stream API
This allows you to register a new method onto this class. You can use it as a decorator.:
>>> @Stream.register_api() ... class foo(Stream): ... ... >>> Stream().foo(...) # this works now
It attaches the callable as a normal attribute to the class object. In doing so it respsects inheritance (all subclasses of Stream will also get the foo attribute).
By default callables are assumed to be instance methods. If you like you can include modifiers to apply before attaching to the class as in the following case where we construct a
staticmethod
.>>> @Stream.register_api(staticmethod) ... class foo(Stream): ... ...
>>> Stream.foo(...) # Foo operates as a static method
-
Stream.
sink_to_list
(self)¶ Append all elements of a stream to a list as they come in
Examples
>>> source = Stream() >>> L = source.map(lambda x: 10 * x).sink_to_list() >>> for i in range(5): ... source.emit(i) >>> L [0, 10, 20, 30, 40]
-
Stream.
update
(self, x, who=None)¶
-
Stream.
visualize
(self, filename='mystream.png', **kwargs)¶ Render the computation of this object’s task graph using graphviz.
Requires
graphviz
to be installed.Parameters: - filename : str, optional
The name of the file to write to disk.
- kwargs:
Graph attributes to pass to graphviz like
rankdir="LR"
Sources¶
filenames (path[, poll_interval, start]) |
Stream over filenames in a directory |
from_kafka (topics, consumer_params[, …]) |
Accepts messages from Kafka |
from_kafka_batched (topic, consumer_params[, …]) |
Get messages from Kafka in batches |
from_process (cmd[, open_kwargs, …]) |
Messages from a running external process |
from_textfile (f[, poll_interval, delimiter, …]) |
Stream data from a text file |
from_tcp (port[, delimiter, start, server_kwargs]) |
Creates events by reading from a socket using tornado TCPServer |
from_http_server (port[, path, start, …]) |
Listen for HTTP POSTs on given port |
DaskStream¶
DaskStream (*args, **kwargs) |
A Parallel stream using Dask |
gather ([upstream, upstreams, stream_name, …]) |
Wait on and gather results from DaskStream to local Stream |
Definitions¶
-
streamz.
accumulate
(upstream, func, start='--no-default--', returns_state=False, **kwargs)¶ Accumulate results with previous state
This performs running or cumulative reductions, applying the function to the previous total and the new element. The function should take two arguments, the previous accumulated state and the next element and it should return a new accumulated state, -
state = func(previous_state, new_value)
(returns_state=False) -state, result = func(previous_state, new_value)
(returns_state=True)where the new_state is passed to the next invocation. The state or result is emitted downstream for the two cases.
Parameters: - func: callable
- start: object
Initial value, passed as the value of
previous_state
on the first invocation. Defaults to the first submitted element- returns_state: boolean
If true then func should return both the state and the value to emit If false then both values are the same, and func returns one value
- **kwargs:
Keyword arguments to pass to func
Examples
A running total, producing triangular numbers
>>> source = Stream() >>> source.accumulate(lambda acc, x: acc + x).sink(print) >>> for i in range(5): ... source.emit(i) 0 1 3 6 10
A count of number of events (including the current one)
>>> source = Stream() >>> source.accumulate(lambda acc, x: acc + 1, start=0).sink(print) >>> for _ in range(5): ... source.emit(0) 1 2 3 4 5
Like the builtin “enumerate”.
>>> source = Stream() >>> source.accumulate(lambda acc, x: ((acc[0] + 1, x), (acc[0], x)), ... start=(0, 0), returns_state=True ... ).sink(print) >>> for i in range(3): ... source.emit(0) (0, 0) (1, 0) (2, 0)
-
streamz.
buffer
(upstream, n, **kwargs)¶ Allow results to pile up at this point in the stream
This allows results to buffer in place at various points in the stream. This can help to smooth flow through the system when backpressure is applied.
-
streamz.
collect
(upstream, cache=None, **kwargs)¶ Hold elements in a cache and emit them as a collection when flushed.
Examples
>>> source1 = Stream() >>> source2 = Stream() >>> collector = collect(source1) >>> collector.sink(print) >>> source2.sink(collector.flush) >>> source1.emit(1) >>> source1.emit(2) >>> source2.emit('anything') # flushes collector ... [1, 2]
-
streamz.
combine_latest
(*upstreams, **kwargs)¶ Combine multiple streams together to a stream of tuples
This will emit a new tuple of all of the most recent elements seen from any stream.
Parameters: - emit_on : stream or list of streams or None
only emit upon update of the streams listed. If None, emit on update from any stream
See also
-
streamz.
delay
(upstream, interval, **kwargs)¶ Add a time delay to results
-
streamz.
filter
(upstream, predicate, *args, **kwargs)¶ Only pass through elements that satisfy the predicate
Parameters: - predicate : function
The predicate. Should return True or False, where True means that the predicate is satisfied.
- *args :
The arguments to pass to the predicate.
- **kwargs:
Keyword arguments to pass to predicate
Examples
>>> source = Stream() >>> source.filter(lambda x: x % 2 == 0).sink(print) >>> for i in range(5): ... source.emit(i) 0 2 4
-
streamz.
flatten
(upstream=None, upstreams=None, stream_name=None, loop=None, asynchronous=None, ensure_io_loop=False)¶ Flatten streams of lists or iterables into a stream of elements
See also
Examples
>>> source = Stream() >>> source.flatten().sink(print) >>> for x in [[1, 2, 3], [4, 5], [6, 7, 7]]: ... source.emit(x) 1 2 3 4 5 6 7
-
streamz.
map
(upstream, func, *args, **kwargs)¶ Apply a function to every element in the stream
Parameters: - func: callable
- *args :
The arguments to pass to the function.
- **kwargs:
Keyword arguments to pass to func
Examples
>>> source = Stream() >>> source.map(lambda x: 2*x).sink(print) >>> for i in range(5): ... source.emit(i) 0 2 4 6 8
-
streamz.
partition
(upstream, n, **kwargs)¶ Partition stream into tuples of equal size
Examples
>>> source = Stream() >>> source.partition(3).sink(print) >>> for i in range(10): ... source.emit(i) (0, 1, 2) (3, 4, 5) (6, 7, 8)
-
streamz.
rate_limit
(upstream, interval, **kwargs)¶ Limit the flow of data
This stops two elements of streaming through in an interval shorter than the provided value.
Parameters: - interval: float
Time in seconds
-
streamz.
sink
(upstream, func, *args, **kwargs)¶ Apply a function on every element
See also
Examples
>>> source = Stream() >>> L = list() >>> source.sink(L.append) >>> source.sink(print) >>> source.sink(print) >>> source.emit(123) 123 123 >>> L [123]
-
streamz.
sliding_window
(upstream, n, return_partial=True, **kwargs)¶ Produce overlapping tuples of size n
Parameters: - return_partial : bool
If True, yield tuples as soon as any events come in, each tuple being smaller or equal to the window size. If False, only start yielding tuples once a full window has accrued.
Examples
>>> source = Stream() >>> source.sliding_window(3, return_partial=False).sink(print) >>> for i in range(8): ... source.emit(i) (0, 1, 2) (1, 2, 3) (2, 3, 4) (3, 4, 5) (4, 5, 6) (5, 6, 7)
-
streamz.
Stream
(upstream=None, upstreams=None, stream_name=None, loop=None, asynchronous=None, ensure_io_loop=False)¶ A Stream is an infinite sequence of data
Streams subscribe to each other passing and transforming data between them. A Stream object listens for updates from upstream, reacts to these updates, and then emits more data to flow downstream to all Stream objects that subscribe to it. Downstream Stream objects may connect at any point of a Stream graph to get a full view of the data coming off of that point to do with as they will.
Parameters: - asynchronous: boolean or None
Whether or not this stream will be used in asynchronous functions or normal Python functions. Leave as None if you don’t know. True will cause operations like emit to return awaitable Futures False will use an Event loop in another thread (starts it if necessary)
- ensure_io_loop: boolean
Ensure that some IOLoop will be created. If asynchronous is None or False then this will be in a separate thread, otherwise it will be IOLoop.current
Examples
>>> def inc(x): ... return x + 1
>>> source = Stream() # Create a stream object >>> s = source.map(inc).map(str) # Subscribe to make new streams >>> s.sink(print) # take an action whenever an element reaches the end
>>> L = list() >>> s.sink(L.append) # or take multiple actions (streams can branch)
>>> for i in range(5): ... source.emit(i) # push data in at the source '1' '2' '3' '4' '5' >>> L # and the actions happen at the sinks ['1', '2', '3', '4', '5']
-
streamz.
timed_window
(upstream, interval, **kwargs)¶ Emit a tuple of collected results every interval
Every
interval
seconds this emits a tuple of all of the results seen so far. This can help to batch data coming off of a high-volume stream.
-
streamz.
union
(*upstreams, **kwargs)¶ Combine multiple streams into one
Every element from any of the upstreams streams will immediately flow into the output stream. They will not be combined with elements from other streams.
See also
Stream.zip
Stream.combine_latest
-
streamz.
unique
(upstream, maxsize=None, key=<function identity>, hashable=True, **kwargs)¶ Avoid sending through repeated elements
This deduplicates a stream so that only new elements pass through. You can control how much of a history is stored with the
maxsize=
parameter. For example settingmaxsize=1
avoids sending through elements when one is repeated right after the other.Parameters: - maxsize: int or None, optional
number of stored unique values to check against
- key : function, optional
Function which returns a representation of the incoming data. For example
key=lambda x: x['a']
could be used to allow only pieces of data with unique'a'
values to pass through.- hashable : bool, optional
If True then data is assumed to be hashable, else it is not. This is used for determining how to cache the history, if hashable then either dicts or LRU caches are used, otherwise a deque is used. Defaults to True.
Examples
>>> source = Stream() >>> source.unique(maxsize=1).sink(print) >>> for x in [1, 1, 2, 2, 2, 1, 3]: ... source.emit(x) 1 2 1 3
-
streamz.
pluck
(upstream, pick, **kwargs)¶ Select elements from elements in the stream.
Parameters: - pluck : object, list
The element(s) to pick from the incoming element in the stream If an instance of list, will pick multiple elements.
Examples
>>> source = Stream() >>> source.pluck([0, 3]).sink(print) >>> for x in [[1, 2, 3, 4], [4, 5, 6, 7], [8, 9, 10, 11]]: ... source.emit(x) (1, 4) (4, 7) (8, 11)
>>> source = Stream() >>> source.pluck('name').sink(print) >>> for x in [{'name': 'Alice', 'x': 123}, {'name': 'Bob', 'x': 456}]: ... source.emit(x) 'Alice' 'Bob'
-
streamz.
zip
(*upstreams, **kwargs)¶ Combine streams together into a stream of tuples
We emit a new tuple once all streams have produce a new tuple.
See also
-
streamz.
zip_latest
(lossless, *upstreams, **kwargs)¶ Combine multiple streams together to a stream of tuples
The stream which this is called from is lossless. All elements from the lossless stream are emitted reguardless of when they came in. This will emit a new tuple consisting of an element from the lossless stream paired with the latest elements from the other streams. Elements are only emitted when an element on the lossless stream are received, similar to
combine_latest
with theemit_on
flag.See also
Stream.combine_latest
Stream.zip
-
streamz.
filenames
(path, poll_interval=0.1, start=False, **kwargs)¶ Stream over filenames in a directory
Parameters: - path: string
Directory path or globstring over which to search for files
- poll_interval: Number
Seconds between checking path
- start: bool (False)
Whether to start running immediately; otherwise call stream.start() explicitly.
Examples
>>> source = Stream.filenames('path/to/dir') # doctest: +SKIP >>> source = Stream.filenames('path/to/*.csv', poll_interval=0.500) # doctest: +SKIP
-
streamz.
from_kafka
(topics, consumer_params, poll_interval=0.1, start=False, **kwargs)¶ Accepts messages from Kafka
Uses the confluent-kafka library, https://docs.confluent.io/current/clients/confluent-kafka-python/
Parameters: - topics: list of str
Labels of Kafka topics to consume from
- consumer_params: dict
Settings to set up the stream, see https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md Examples: bootstrap.servers, Connection string(s) (host:port) by which to reach Kafka; group.id, Identity of the consumer. If multiple sources share the same group, each message will be passed to only one of them.
- poll_interval: number
Seconds that elapse between polling Kafka for new messages
- start: bool (False)
Whether to start polling upon instantiation
Examples
>>> source = Stream.from_kafka(['mytopic'], ... {'bootstrap.servers': 'localhost:9092', ... 'group.id': 'streamz'}) # doctest: +SKIP
-
streamz.
from_textfile
(f, poll_interval=0.1, delimiter='\n', start=False, from_end=False, **kwargs)¶ Stream data from a text file
Parameters: - f: file or string
Source of the data. If string, will be opened.
- poll_interval: Number
Interval to poll file for new data in seconds
- delimiter: str
Character(s) to use to split the data into parts
- start: bool
Whether to start running immediately; otherwise call stream.start() explicitly.
- from_end: bool
Whether to begin streaming from the end of the file (i.e., only emit lines appended after the stream starts).
Returns: - Stream
Examples
>>> source = Stream.from_textfile('myfile.json') # doctest: +SKIP >>> js.map(json.loads).pluck('value').sum().sink(print) # doctest: +SKIP >>> source.start() # doctest: +SKIP
-
streamz.dask.
DaskStream
(*args, **kwargs)¶ A Parallel stream using Dask
This object is fully compliant with the
streamz.core.Stream
object but uses a Dask client for execution. Operations likemap
andaccumulate
submit functions to run on the Dask instance usingdask.distributed.Client.submit
and pass around Dask futures. Time-based operations liketimed_window
, buffer, and so on operate as normal.Typically one transfers between normal Stream and DaskStream objects using the
Stream.scatter()
andDaskStream.gather()
methods.See also
dask.distributed.Client
Examples
>>> from dask.distributed import Client >>> client = Client()
>>> from streamz import Stream >>> source = Stream() >>> source.scatter().map(func).accumulate(binop).gather().sink(...)
-
streamz.dask.
gather
(upstream=None, upstreams=None, stream_name=None, loop=None, asynchronous=None, ensure_io_loop=False)¶ Wait on and gather results from DaskStream to local Stream
This waits on every result in the stream and then gathers that result back to the local stream. Warning, this can restrict parallelism. It is common to combine a
gather()
node with abuffer()
to allow unfinished futures to pile up.See also
buffer
scatter
Examples
>>> local_stream = dask_stream.buffer(20).gather()