Collections API

Collections

Streaming([stream, example, stream_type])

Superclass for streaming collections

Streaming.map_partitions(func, *args, **kwargs)

Map a function across all batch elements of this stream

Streaming.accumulate_partitions(func, *args, ...)

Accumulate a function with state across batch elements

Streaming.verify(x)

Verify elements that pass through this stream

Batch

Batch([stream, example])

A Stream of tuples or lists

Batch.filter(predicate)

Filter elements by a predicate

Batch.map(func, **kwargs)

Map a function across all elements

Batch.pluck(ind)

Pick a field out of all elements

Batch.to_dataframe()

Convert to a streaming dataframe

Batch.to_stream()

Concatenate batches and return base Stream

Dataframes

DataFrame(*args, **kwargs)

A Streaming Dataframe

DataFrame.groupby(other)

Groupby aggregations

DataFrame.rolling(window[, min_periods, ...])

Compute rolling aggregations

DataFrame.assign(**kwargs)

Assign new columns to this dataframe

DataFrame.sum([start])

Sum frame.

DataFrame.mean([start])

Average frame

DataFrame.cumsum()

Cumulative sum

DataFrame.cumprod()

Cumulative product

DataFrame.cummin()

Cumulative minimum

DataFrame.cummax()

Cumulative maximum

GroupBy(root, grouper[, index])

Groupby aggregations on streaming dataframes

GroupBy.count([start])

Groupby-count

GroupBy.mean([with_state, start])

Groupby-mean

GroupBy.size()

Groupby-size

GroupBy.std([ddof])

Groupby-std

GroupBy.sum([start])

Groupby-sum

GroupBy.var([ddof])

Groupby-variance

Rolling(sdf, window, min_periods, ...)

Rolling aggregations

Rolling.aggregate(*args, **kwargs)

Rolling aggregation

Rolling.count(*args, **kwargs)

Rolling count

Rolling.max()

Rolling maximum

Rolling.mean()

Rolling mean

Rolling.median()

Rolling median

Rolling.min()

Rolling minimum

Rolling.quantile(*args, **kwargs)

Rolling quantile

Rolling.std(*args, **kwargs)

Rolling standard deviation

Rolling.sum()

Rolling sum

Rolling.var(*args, **kwargs)

Rolling variance

DataFrame.window([n, value, with_state, start])

Sliding window operations

Window.apply(func)

Apply an arbitrary function over each window of data

Window.count()

Count elements within window

Window.groupby(other)

Groupby-aggregations within window

Window.sum()

Sum elements within window

Window.size

Number of elements within window

Window.std([ddof])

Compute standard deviation of elements within window

Window.var([ddof])

Compute variance of elements within window

Rolling.aggregate(*args, **kwargs)

Rolling aggregation

Rolling.count(*args, **kwargs)

Rolling count

Rolling.max()

Rolling maximum

Rolling.mean()

Rolling mean

Rolling.median()

Rolling median

Rolling.min()

Rolling minimum

Rolling.quantile(*args, **kwargs)

Rolling quantile

Rolling.std(*args, **kwargs)

Rolling standard deviation

Rolling.sum()

Rolling sum

Rolling.var(*args, **kwargs)

Rolling variance

PeriodicDataFrame([datafn, interval, dask, ...])

A streaming dataframe using the asyncio ioloop to poll a callback fn

Random([freq, interval, dask, start, datafn])

PeriodicDataFrame providing random values by default

Details

class streamz.collection.Streaming(stream=None, example=None, stream_type=None)

Superclass for streaming collections

Do not create this class directly, use one of the subclasses instead.

Parameters
stream: streamz.Stream
example: object

An object to represent an example element of this stream

See also

streamz.dataframe.StreamingDataFrame
streamz.dataframe.StreamingBatch
Attributes
current_value

Methods

accumulate_partitions(func, *args, **kwargs)

Accumulate a function with state across batch elements

map_partitions(func, *args, **kwargs)

Map a function across all batch elements of this stream

register_api([modifier, attribute_name])

Add callable to Stream API

verify(x)

Verify elements that pass through this stream

emit

register_plugin_entry_point

start

stop

accumulate_partitions(func, *args, **kwargs)

Accumulate a function with state across batch elements

static map_partitions(func, *args, **kwargs)

Map a function across all batch elements of this stream

The output stream type will be determined by the action of that function on the example

verify(x)

Verify elements that pass through this stream

class streamz.batch.Batch(stream=None, example=None)

A Stream of tuples or lists

This streaming collection manages batches of Python objects such as lists of text or dictionaries. By batching many elements together we reduce overhead from Python.

This library is typically used at the early stages of data ingestion before handing off to streaming dataframes

Examples

>>> text = Streaming.from_file(myfile)  
>>> b = text.partition(100).map(json.loads)  
Attributes
current_value

Methods

accumulate_partitions(func, *args, **kwargs)

Accumulate a function with state across batch elements

filter(predicate)

Filter elements by a predicate

map(func, **kwargs)

Map a function across all elements

map_partitions(func, *args, **kwargs)

Map a function across all batch elements of this stream

pluck(ind)

Pick a field out of all elements

register_api([modifier, attribute_name])

Add callable to Stream API

sum()

Sum elements

to_dataframe()

Convert to a streaming dataframe

to_stream()

Concatenate batches and return base Stream

verify(x)

Verify elements that pass through this stream

emit

register_plugin_entry_point

start

stop

accumulate_partitions(func, *args, **kwargs)

Accumulate a function with state across batch elements

See also

Streaming.map_partitions
filter(predicate)

Filter elements by a predicate

map(func, **kwargs)

Map a function across all elements

static map_partitions(func, *args, **kwargs)

Map a function across all batch elements of this stream

The output stream type will be determined by the action of that function on the example

See also

Streaming.accumulate_partitions
pluck(ind)

Pick a field out of all elements

classmethod register_api(modifier=<function identity>, attribute_name=None)

Add callable to Stream API

This allows you to register a new method onto this class. You can use it as a decorator.:

>>> @Stream.register_api()
... class foo(Stream):
...     ...

>>> Stream().foo(...)  # this works now

It attaches the callable as a normal attribute to the class object. In doing so it respects inheritance (all subclasses of Stream will also get the foo attribute).

By default callables are assumed to be instance methods. If you like you can include modifiers to apply before attaching to the class as in the following case where we construct a staticmethod.

>>> @Stream.register_api(staticmethod)
... class foo(Stream):
...     ...
>>> Stream.foo(...)  # Foo operates as a static method

You can also provide an optional attribute_name argument to control the name of the attribute your callable will be attached as.

>>> @Stream.register_api(attribute_name="bar")
... class foo(Stream):
...     ...

>> Stream().bar(…) # foo was actually attached as bar

sum()

Sum elements

to_dataframe()

Convert to a streaming dataframe

This calls pd.DataFrame on all list-elements of this stream

to_stream()

Concatenate batches and return base Stream

Returned stream will be composed of single elements

verify(x)

Verify elements that pass through this stream

class streamz.dataframe.DataFrame(*args, **kwargs)

A Streaming Dataframe

This is a logical collection over a stream of Pandas dataframes. Operations on this object will translate to the appropriate operations on the underlying Pandas dataframes.

See also

Series
Attributes
columns
current_value
dtypes
index
plot
size

size of frame

Methods

accumulate_partitions(func, *args, **kwargs)

Accumulate a function with state across batch elements

assign(**kwargs)

Assign new columns to this dataframe

count([start])

Count of frame

cummax()

Cumulative maximum

cummin()

Cumulative minimum

cumprod()

Cumulative product

cumsum()

Cumulative sum

from_periodic

groupby(other)

Groupby aggregations

map_partitions(func, *args, **kwargs)

Map a function across all batch elements of this stream

mean([start])

Average frame

random

register_api([modifier, attribute_name])

Add callable to Stream API

reset_index()

Reset Index

rolling(window[, min_periods, with_state, start])

Compute rolling aggregations

round([decimals])

Round elements in frame

set_index(index, **kwargs)

Set Index

sum([start])

Sum frame.

tail([n])

Round elements in frame

to_frame()

Convert to a streaming dataframe

verify(x)

Verify consistency of elements that pass through this stream

window([n, value, with_state, start])

Sliding window operations

aggregate

astype

emit

ewm

expanding

map

query

register_plugin_entry_point

start

stop

accumulate_partitions(func, *args, **kwargs)

Accumulate a function with state across batch elements

See also

Streaming.map_partitions
assign(**kwargs)

Assign new columns to this dataframe

Alternatively use setitem syntax

Examples

>>> sdf = sdf.assign(z=sdf.x + sdf.y)  
>>> sdf['z'] = sdf.x + sdf.y  
count(start=None)

Count of frame

Parameters
start: None or resulting Python object type from the operation

Accepts a valid start state.

cummax()

Cumulative maximum

cummin()

Cumulative minimum

cumprod()

Cumulative product

cumsum()

Cumulative sum

from_periodic = <function PeriodicDataFrame>
groupby(other)

Groupby aggregations

static map_partitions(func, *args, **kwargs)

Map a function across all batch elements of this stream

The output stream type will be determined by the action of that function on the example

See also

Streaming.accumulate_partitions
mean(start=None)

Average frame

Parameters
start: None or resulting Python object type from the operation

Accepts a valid start state.

random = <function Random>
classmethod register_api(modifier=<function identity>, attribute_name=None)

Add callable to Stream API

This allows you to register a new method onto this class. You can use it as a decorator.:

>>> @Stream.register_api()
... class foo(Stream):
...     ...

>>> Stream().foo(...)  # this works now

It attaches the callable as a normal attribute to the class object. In doing so it respects inheritance (all subclasses of Stream will also get the foo attribute).

By default callables are assumed to be instance methods. If you like you can include modifiers to apply before attaching to the class as in the following case where we construct a staticmethod.

>>> @Stream.register_api(staticmethod)
... class foo(Stream):
...     ...
>>> Stream.foo(...)  # Foo operates as a static method

You can also provide an optional attribute_name argument to control the name of the attribute your callable will be attached as.

>>> @Stream.register_api(attribute_name="bar")
... class foo(Stream):
...     ...

>> Stream().bar(…) # foo was actually attached as bar

reset_index()

Reset Index

rolling(window, min_periods=1, with_state=False, start=())

Compute rolling aggregations

When followed by an aggregation method like sum, mean, or std this produces a new Streaming dataframe whose values are aggregated over that window.

The window parameter can be either a number of rows or a timedelta like ``”2 minutes”` in which case the index should be a datetime index.

This operates by keeping enough of a backlog of records to maintain an accurate stream. It performs a copy at every added dataframe. Because of this it may be slow if the rolling window is much larger than the average stream element.

Parameters
window: int or timedelta

Window over which to roll

with_state: bool (False)

Whether to return the state along with the result as a tuple (state, result). State may be needed downstream for a number of reasons like checkpointing.

start: () or resulting Python object type from the operation

Accepts a valid start state.

Returns
Rolling object

See also

DataFrame.window

more generic window operations

round(decimals=0)

Round elements in frame

set_index(index, **kwargs)

Set Index

property size

size of frame

sum(start=None)

Sum frame.

Parameters
start: None or resulting Python object type from the operation

Accepts a valid start state.

tail(n=5)

Round elements in frame

to_frame()

Convert to a streaming dataframe

verify(x)

Verify consistency of elements that pass through this stream

window(n=None, value=None, with_state=False, start=None)

Sliding window operations

Windowed operations are defined over a sliding window of data, either with a fixed number of elements:

>>> df.window(n=10).sum()  # sum of the last ten elements

or over an index value range (index must be monotonic):

>>> df.window(value='2h').mean()  # average over the last two hours

Windowed dataframes support all normal arithmetic, aggregations, and groupby-aggregations.

Parameters
n: int

Window of number of elements over which to roll

value: str

Window of time over which to roll

with_state: bool (False)

Whether to return the state along with the result as a tuple (state, result). State may be needed downstream for a number of reasons like checkpointing.

start: None or resulting Python object type from the operation

Accepts a valid start state.

See also

DataFrame.rolling

mimic’s Pandas rolling aggregations

Examples

>>> df.window(n=10).std()
>>> df.window(value='2h').count()
>>> w = df.window(n=100)
>>> w.groupby(w.name).amount.sum()
>>> w.groupby(w.x % 10).y.var()
class streamz.dataframe.Rolling(sdf, window, min_periods, with_state, start)

Rolling aggregations

This intermediate class enables rolling aggregations across either a fixed number of rows or a time window.

Examples

>>> sdf.rolling(10).x.mean()  
>>> sdf.rolling('100ms').x.mean()  

Methods

aggregate(*args, **kwargs)

Rolling aggregation

count(*args, **kwargs)

Rolling count

max()

Rolling maximum

mean()

Rolling mean

median()

Rolling median

min()

Rolling minimum

quantile(*args, **kwargs)

Rolling quantile

std(*args, **kwargs)

Rolling standard deviation

sum()

Rolling sum

var(*args, **kwargs)

Rolling variance

aggregate(*args, **kwargs)

Rolling aggregation

count(*args, **kwargs)

Rolling count

max()

Rolling maximum

mean()

Rolling mean

median()

Rolling median

min()

Rolling minimum

quantile(*args, **kwargs)

Rolling quantile

std(*args, **kwargs)

Rolling standard deviation

sum()

Rolling sum

var(*args, **kwargs)

Rolling variance

class streamz.dataframe.Window(sdf, n=None, value=None, with_state=False, start=None)

Windowed aggregations

This provides a set of aggregations that can be applied over a sliding window of data.

See also

DataFrame.window

contains full docstring

Attributes
columns
dtypes
example
index
size

Number of elements within window

Methods

apply(func)

Apply an arbitrary function over each window of data

count()

Count elements within window

groupby(other)

Groupby-aggregations within window

mean()

Average elements within window

std([ddof])

Compute standard deviation of elements within window

sum()

Sum elements within window

value_counts()

Count groups of elements within window

var([ddof])

Compute variance of elements within window

aggregate

full

map_partitions

reset_index

apply(func)

Apply an arbitrary function over each window of data

count()

Count elements within window

groupby(other)

Groupby-aggregations within window

mean()

Average elements within window

property size

Number of elements within window

std(ddof=1)

Compute standard deviation of elements within window

sum()

Sum elements within window

value_counts()

Count groups of elements within window

var(ddof=1)

Compute variance of elements within window

class streamz.dataframe.GroupBy(root, grouper, index=None)

Groupby aggregations on streaming dataframes

Methods

count([start])

Groupby-count

mean([with_state, start])

Groupby-mean

size()

Groupby-size

std([ddof])

Groupby-std

sum([start])

Groupby-sum

var([ddof])

Groupby-variance

count(start=None)

Groupby-count

Parameters
start: None or resulting Python object type from the operation

Accepts a valid start state.

mean(with_state=False, start=None)

Groupby-mean

Parameters
start: None or resulting Python object type from the operation

Accepts a valid start state.

size()

Groupby-size

std(ddof=1)

Groupby-std

sum(start=None)

Groupby-sum

Parameters
start: None or resulting Python object type from the operation

Accepts a valid start state.

var(ddof=1)

Groupby-variance

class streamz.dataframe.Random(freq='100ms', interval='500ms', dask=False, start=True, datafn=<function random_datablock>)

PeriodicDataFrame providing random values by default

Accepts same parameters as PeriodicDataFrame, plus freq, a string that will be converted to a pd.Timedelta and passed to the ‘datafn’.

Useful mainly for examples and docs.

Attributes
columns
current_value
dtypes
index
plot
size

size of frame

Methods

accumulate_partitions(func, *args, **kwargs)

Accumulate a function with state across batch elements

assign(**kwargs)

Assign new columns to this dataframe

count([start])

Count of frame

cummax()

Cumulative maximum

cummin()

Cumulative minimum

cumprod()

Cumulative product

cumsum()

Cumulative sum

from_periodic

groupby(other)

Groupby aggregations

map_partitions(func, *args, **kwargs)

Map a function across all batch elements of this stream

mean([start])

Average frame

random

register_api([modifier, attribute_name])

Add callable to Stream API

reset_index()

Reset Index

rolling(window[, min_periods, with_state, start])

Compute rolling aggregations

round([decimals])

Round elements in frame

set_index(index, **kwargs)

Set Index

sum([start])

Sum frame.

tail([n])

Round elements in frame

to_frame()

Convert to a streaming dataframe

verify(x)

Verify consistency of elements that pass through this stream

window([n, value, with_state, start])

Sliding window operations

aggregate

astype

emit

ewm

expanding

map

query

register_plugin_entry_point

start

stop