Collections API

Collections

Streaming([stream, example, stream_type]) Superclass for streaming collections
Streaming.map_partitions(func, *args, **kwargs) Map a function across all batch elements of this stream
Streaming.accumulate_partitions(func, *args, …) Accumulate a function with state across batch elements
Streaming.verify(x) Verify elements that pass through this stream

Batch

Batch([stream, example]) A Stream of tuples or lists
Batch.filter(predicate) Filter elements by a predicate
Batch.map(func, **kwargs) Map a function across all elements
Batch.pluck(ind) Pick a field out of all elements
Batch.to_dataframe() Convert to a streaming dataframe
Batch.to_stream() Concatenate batches and return base Stream

Dataframes

DataFrame(*args, **kwargs) A Streaming dataframe
DataFrame.groupby(other) Groupby aggreagtions
DataFrame.rolling(window[, min_periods]) Compute rolling aggregations
DataFrame.assign(**kwargs) Assign new columns to this dataframe
DataFrame.sum() Sum frame
DataFrame.mean() Average
DataFrame.cumsum() Cumulative sum
DataFrame.cumprod() Cumulative product
DataFrame.cummin() Cumulative minimum
DataFrame.cummax() Cumulative maximum
GroupBy(root, grouper[, index]) Groupby aggregations on streaming dataframes
GroupBy.count() Groupby-count
GroupBy.mean() Groupby-mean
GroupBy.size() Groupby-size
GroupBy.std([ddof]) Groupby-std
GroupBy.sum() Groupby-sum
GroupBy.var([ddof]) Groupby-variance
Rolling(sdf, window, min_periods) Rolling aggregations
Rolling.aggregate(*args, **kwargs) Rolling aggregation
Rolling.count(*args, **kwargs) Rolling count
Rolling.max() Rolling maximum
Rolling.mean() Rolling mean
Rolling.median() Rolling median
Rolling.min() Rolling minimum
Rolling.quantile(*args, **kwargs) Rolling quantile
Rolling.std(*args, **kwargs) Rolling standard deviation
Rolling.sum() Rolling sum
Rolling.var(*args, **kwargs) Rolling variance
DataFrame.window([n, value]) Sliding window operations
Window.apply(func) Apply an arbitrary function over each window of data
Window.count() Count elements within window
Window.groupby(other) Groupby-aggregations within window
Window.sum() Sum elements within window
Window.size Number of elements within window
Window.std([ddof]) Compute standard deviation of elements within window
Window.var([ddof]) Compute variance of elements within window
Rolling.aggregate(*args, **kwargs) Rolling aggregation
Rolling.count(*args, **kwargs) Rolling count
Rolling.max() Rolling maximum
Rolling.mean() Rolling mean
Rolling.median() Rolling median
Rolling.min() Rolling minimum
Rolling.quantile(*args, **kwargs) Rolling quantile
Rolling.std(*args, **kwargs) Rolling standard deviation
Rolling.sum() Rolling sum
Rolling.var(*args, **kwargs) Rolling variance
Random([freq, interval, dask]) A streaming dataframe of random data

Details

class streamz.collection.Streaming(stream=None, example=None, stream_type=None)

Superclass for streaming collections

Do not create this class directly, use one of the subclasses instead.

Parameters:
stream: streamz.Stream
example: object

An object to represent an example element of this stream

See also

streamz.dataframe.StreamingDataFrame, streamz.dataframe.StreamingBatch

Methods

accumulate_partitions(func, *args, **kwargs) Accumulate a function with state across batch elements
map_partitions(func, *args, **kwargs) Map a function across all batch elements of this stream
verify(x) Verify elements that pass through this stream
emit  
accumulate_partitions(func, *args, **kwargs)

Accumulate a function with state across batch elements

static map_partitions(func, *args, **kwargs)

Map a function across all batch elements of this stream

The output stream type will be determined by the action of that function on the example

verify(x)

Verify elements that pass through this stream

class streamz.batch.Batch(stream=None, example=None)

A Stream of tuples or lists

This streaming collection manages batches of Python objects such as lists of text or dictionaries. By batching many elements together we reduce overhead from Python.

This library is typically used at the early stages of data ingestion before handing off to streaming dataframes

Examples

>>> text = Streaming.from_file(myfile)  
>>> b = text.partition(100).map(json.loads)  

Methods

accumulate_partitions(func, *args, **kwargs) Accumulate a function with state across batch elements
filter(predicate) Filter elements by a predicate
map(func, **kwargs) Map a function across all elements
map_partitions(func, *args, **kwargs) Map a function across all batch elements of this stream
pluck(ind) Pick a field out of all elements
sum() Sum elements
to_dataframe() Convert to a streaming dataframe
to_stream() Concatenate batches and return base Stream
verify(x) Verify elements that pass through this stream
emit  
accumulate_partitions(func, *args, **kwargs)

Accumulate a function with state across batch elements

See also

Streaming.map_partitions

filter(predicate)

Filter elements by a predicate

map(func, **kwargs)

Map a function across all elements

static map_partitions(func, *args, **kwargs)

Map a function across all batch elements of this stream

The output stream type will be determined by the action of that function on the example

See also

Streaming.accumulate_partitions

pluck(ind)

Pick a field out of all elements

sum()

Sum elements

to_dataframe()

Convert to a streaming dataframe

This calls pd.DataFrame on all list-elements of this stream

to_stream()

Concatenate batches and return base Stream

Returned stream will be composed of single elements

verify(x)

Verify elements that pass through this stream

class streamz.dataframe.DataFrame(*args, **kwargs)

A Streaming dataframe

This is a logical collection over a stream of Pandas dataframes. Operations on this object will translate to the appropriate operations on the underlying Pandas dataframes.

See also

Series

Attributes:
columns
dtypes
index
size

size of frame

Methods

accumulate_partitions(func, *args, **kwargs) Accumulate a function with state across batch elements
assign(**kwargs) Assign new columns to this dataframe
count() Count of frame
cummax() Cumulative maximum
cummin() Cumulative minimum
cumprod() Cumulative product
cumsum() Cumulative sum
groupby(other) Groupby aggreagtions
map_partitions(func, *args, **kwargs) Map a function across all batch elements of this stream
mean() Average
reset_index() Reset Index
rolling(window[, min_periods]) Compute rolling aggregations
round([decimals]) Round elements in frame
set_index(index, **kwargs) Set Index
sum() Sum frame
tail([n]) Round elements in frame
to_frame() Convert to a streaming dataframe
verify(x) Verify consistency of elements that pass through this stream
window([n, value]) Sliding window operations
aggregate  
astype  
emit  
map  
query  
accumulate_partitions(func, *args, **kwargs)

Accumulate a function with state across batch elements

See also

Streaming.map_partitions

assign(**kwargs)

Assign new columns to this dataframe

Alternatively use setitem syntax

Examples

>>> sdf = sdf.assign(z=sdf.x + sdf.y)  
>>> sdf['z'] = sdf.x + sdf.y  
count()

Count of frame

cummax()

Cumulative maximum

cummin()

Cumulative minimum

cumprod()

Cumulative product

cumsum()

Cumulative sum

groupby(other)

Groupby aggreagtions

static map_partitions(func, *args, **kwargs)

Map a function across all batch elements of this stream

The output stream type will be determined by the action of that function on the example

See also

Streaming.accumulate_partitions

mean()

Average

reset_index()

Reset Index

rolling(window, min_periods=1)

Compute rolling aggregations

When followed by an aggregation method like sum, mean, or std this produces a new Streaming dataframe whose values are aggregated over that window.

The window parameter can be either a number of rows or a timedelta like ``”2 minutes”` in which case the index should be a datetime index.

This operates by keeping enough of a backlog of records to maintain an accurate stream. It performs a copy at every added dataframe. Because of this it may be slow if the rolling window is much larger than the average stream element.

Parameters:
window: int or timedelta

Window over which to roll

Returns:
Rolling object

See also

DataFrame.window
more generic window operations
round(decimals=0)

Round elements in frame

set_index(index, **kwargs)

Set Index

size

size of frame

sum()

Sum frame

tail(n=5)

Round elements in frame

to_frame()

Convert to a streaming dataframe

verify(x)

Verify consistency of elements that pass through this stream

window(n=None, value=None)

Sliding window operations

Windowed operations are defined over a sliding window of data, either with a fixed number of elements:

>>> df.window(n=10).sum()  # sum of the last ten elements

or over an index value range (index must be monotonic):

>>> df.window(value='2h').mean()  # average over the last two hours

Windowed dataframes support all normal arithmetic, aggregations, and groupby-aggregations.

See also

DataFrame.rolling
mimic’s Pandas rolling aggregations

Examples

>>> df.window(n=10).std()
>>> df.window(value='2h').count()
>>> w = df.window(n=100)
>>> w.groupby(w.name).amount.sum()
>>> w.groupby(w.x % 10).y.var()
class streamz.dataframe.Rolling(sdf, window, min_periods)

Rolling aggregations

This intermediate class enables rolling aggregations across either a fixed number of rows or a time window.

Examples

>>> sdf.rolling(10).x.mean()  
>>> sdf.rolling('100ms').x.mean()  

Methods

aggregate(*args, **kwargs) Rolling aggregation
count(*args, **kwargs) Rolling count
max() Rolling maximum
mean() Rolling mean
median() Rolling median
min() Rolling minimum
quantile(*args, **kwargs) Rolling quantile
std(*args, **kwargs) Rolling standard deviation
sum() Rolling sum
var(*args, **kwargs) Rolling variance
aggregate(*args, **kwargs)

Rolling aggregation

count(*args, **kwargs)

Rolling count

max()

Rolling maximum

mean()

Rolling mean

median()

Rolling median

min()

Rolling minimum

quantile(*args, **kwargs)

Rolling quantile

std(*args, **kwargs)

Rolling standard deviation

sum()

Rolling sum

var(*args, **kwargs)

Rolling variance

class streamz.dataframe.Window(sdf, n=None, value=None)

Windowed aggregations

This provides a set of aggregations that can be applied over a sliding window of data.

See also

DataFrame.window
contains full docstring
Attributes:
columns
dtypes
example
index
size

Number of elements within window

Methods

apply(func) Apply an arbitrary function over each window of data
count() Count elements within window
groupby(other) Groupby-aggregations within window
mean() Average elements within window
std([ddof]) Compute standard deviation of elements within window
sum() Sum elements within window
value_counts() Count groups of elements within window
var([ddof]) Compute variance of elements within window
aggregate  
full  
map_partitions  
reset_index  
apply(func)

Apply an arbitrary function over each window of data

count()

Count elements within window

groupby(other)

Groupby-aggregations within window

mean()

Average elements within window

size

Number of elements within window

std(ddof=1)

Compute standard deviation of elements within window

sum()

Sum elements within window

value_counts()

Count groups of elements within window

var(ddof=1)

Compute variance of elements within window

class streamz.dataframe.GroupBy(root, grouper, index=None)

Groupby aggregations on streaming dataframes

Methods

count() Groupby-count
mean() Groupby-mean
size() Groupby-size
std([ddof]) Groupby-std
sum() Groupby-sum
var([ddof]) Groupby-variance
count()

Groupby-count

mean()

Groupby-mean

size()

Groupby-size

std(ddof=1)

Groupby-std

sum()

Groupby-sum

var(ddof=1)

Groupby-variance

class streamz.dataframe.Random(freq='100ms', interval='500ms', dask=False)

A streaming dataframe of random data

The x column is uniformly distributed. The y column is poisson distributed. The z column is normally distributed.

This class is experimental and will likely be removed in the future

Parameters:
freq: timedelta

The time interval between records

interval: timedelta

The time interval between new dataframes, should be significantly larger than freq

Attributes:
columns
dtypes
index
size

size of frame

Methods

accumulate_partitions(func, *args, **kwargs) Accumulate a function with state across batch elements
assign(**kwargs) Assign new columns to this dataframe
count() Count of frame
cummax() Cumulative maximum
cummin() Cumulative minimum
cumprod() Cumulative product
cumsum() Cumulative sum
groupby(other) Groupby aggreagtions
map_partitions(func, *args, **kwargs) Map a function across all batch elements of this stream
mean() Average
reset_index() Reset Index
rolling(window[, min_periods]) Compute rolling aggregations
round([decimals]) Round elements in frame
set_index(index, **kwargs) Set Index
sum() Sum frame
tail([n]) Round elements in frame
to_frame() Convert to a streaming dataframe
verify(x) Verify consistency of elements that pass through this stream
window([n, value]) Sliding window operations
aggregate  
astype  
emit  
map  
query  
stop