Collections API¶
Collections¶
Streaming ([stream, example, stream_type]) |
Superclass for streaming collections |
Streaming.map_partitions (func, *args, **kwargs) |
Map a function across all batch elements of this stream |
Streaming.accumulate_partitions (func, *args, …) |
Accumulate a function with state across batch elements |
Streaming.verify (x) |
Verify elements that pass through this stream |
Batch¶
Batch ([stream, example]) |
A Stream of tuples or lists |
Batch.filter (predicate) |
Filter elements by a predicate |
Batch.map (func, **kwargs) |
Map a function across all elements |
Batch.pluck (ind) |
Pick a field out of all elements |
Batch.to_dataframe () |
Convert to a streaming dataframe |
Batch.to_stream () |
Concatenate batches and return base Stream |
Dataframes¶
DataFrame (*args, **kwargs) |
A Streaming Dataframe |
DataFrame.groupby (other) |
Groupby aggregations |
DataFrame.rolling (window[, min_periods, …]) |
Compute rolling aggregations |
DataFrame.assign (**kwargs) |
Assign new columns to this dataframe |
DataFrame.sum ([start]) |
Sum frame. |
DataFrame.mean ([start]) |
Average frame |
DataFrame.cumsum () |
Cumulative sum |
DataFrame.cumprod () |
Cumulative product |
DataFrame.cummin () |
Cumulative minimum |
DataFrame.cummax () |
Cumulative maximum |
GroupBy (root, grouper[, index]) |
Groupby aggregations on streaming dataframes |
GroupBy.count ([start]) |
Groupby-count |
GroupBy.mean ([with_state, start]) |
Groupby-mean |
GroupBy.size () |
Groupby-size |
GroupBy.std ([ddof]) |
Groupby-std |
GroupBy.sum ([start]) |
Groupby-sum |
GroupBy.var ([ddof]) |
Groupby-variance |
Rolling (sdf, window, min_periods, …) |
Rolling aggregations |
Rolling.aggregate (*args, **kwargs) |
Rolling aggregation |
Rolling.count (*args, **kwargs) |
Rolling count |
Rolling.max () |
Rolling maximum |
Rolling.mean () |
Rolling mean |
Rolling.median () |
Rolling median |
Rolling.min () |
Rolling minimum |
Rolling.quantile (*args, **kwargs) |
Rolling quantile |
Rolling.std (*args, **kwargs) |
Rolling standard deviation |
Rolling.sum () |
Rolling sum |
Rolling.var (*args, **kwargs) |
Rolling variance |
DataFrame.window ([n, value, with_state, start]) |
Sliding window operations |
Window.apply (func) |
Apply an arbitrary function over each window of data |
Window.count () |
Count elements within window |
Window.groupby (other) |
Groupby-aggregations within window |
Window.sum () |
Sum elements within window |
Window.size |
Number of elements within window |
Window.std ([ddof]) |
Compute standard deviation of elements within window |
Window.var ([ddof]) |
Compute variance of elements within window |
Rolling.aggregate (*args, **kwargs) |
Rolling aggregation |
Rolling.count (*args, **kwargs) |
Rolling count |
Rolling.max () |
Rolling maximum |
Rolling.mean () |
Rolling mean |
Rolling.median () |
Rolling median |
Rolling.min () |
Rolling minimum |
Rolling.quantile (*args, **kwargs) |
Rolling quantile |
Rolling.std (*args, **kwargs) |
Rolling standard deviation |
Rolling.sum () |
Rolling sum |
Rolling.var (*args, **kwargs) |
Rolling variance |
PeriodicDataFrame ([datafn, interval, dask, …]) |
A streaming dataframe using the asyncio ioloop to poll a callback fn |
Random ([freq, interval, dask, start, datafn]) |
PeriodicDataFrame providing random values by default |
Details¶
-
class
streamz.collection.
Streaming
(stream=None, example=None, stream_type=None)¶ Superclass for streaming collections
Do not create this class directly, use one of the subclasses instead.
Parameters: - stream: streamz.Stream
- example: object
An object to represent an example element of this stream
See also
streamz.dataframe.StreamingDataFrame
streamz.dataframe.StreamingBatch
Attributes: - current_value
Methods
accumulate_partitions
(func, *args, **kwargs)Accumulate a function with state across batch elements map_partitions
(func, *args, **kwargs)Map a function across all batch elements of this stream register_api
([modifier, attribute_name])Add callable to Stream API verify
(x)Verify elements that pass through this stream emit register_plugin_entry_point start stop -
accumulate_partitions
(func, *args, **kwargs)¶ Accumulate a function with state across batch elements
See also
-
static
map_partitions
(func, *args, **kwargs)¶ Map a function across all batch elements of this stream
The output stream type will be determined by the action of that function on the example
See also
-
verify
(x)¶ Verify elements that pass through this stream
-
class
streamz.batch.
Batch
(stream=None, example=None)¶ A Stream of tuples or lists
This streaming collection manages batches of Python objects such as lists of text or dictionaries. By batching many elements together we reduce overhead from Python.
This library is typically used at the early stages of data ingestion before handing off to streaming dataframes
Examples
>>> text = Streaming.from_file(myfile) # doctest: +SKIP >>> b = text.partition(100).map(json.loads) # doctest: +SKIP
Attributes: - current_value
Methods
accumulate_partitions
(func, *args, **kwargs)Accumulate a function with state across batch elements filter
(predicate)Filter elements by a predicate map
(func, **kwargs)Map a function across all elements map_partitions
(func, *args, **kwargs)Map a function across all batch elements of this stream pluck
(ind)Pick a field out of all elements register_api
([modifier, attribute_name])Add callable to Stream API sum
()Sum elements to_dataframe
()Convert to a streaming dataframe to_stream
()Concatenate batches and return base Stream verify
(x)Verify elements that pass through this stream emit register_plugin_entry_point start stop -
accumulate_partitions
(func, *args, **kwargs)¶ Accumulate a function with state across batch elements
See also
Streaming.map_partitions
-
filter
(predicate)¶ Filter elements by a predicate
-
map
(func, **kwargs)¶ Map a function across all elements
-
static
map_partitions
(func, *args, **kwargs)¶ Map a function across all batch elements of this stream
The output stream type will be determined by the action of that function on the example
See also
Streaming.accumulate_partitions
-
pluck
(ind)¶ Pick a field out of all elements
-
classmethod
register_api
(modifier=<function identity>, attribute_name=None)¶ Add callable to Stream API
This allows you to register a new method onto this class. You can use it as a decorator.:
>>> @Stream.register_api() ... class foo(Stream): ... ... >>> Stream().foo(...) # this works now
It attaches the callable as a normal attribute to the class object. In doing so it respects inheritance (all subclasses of Stream will also get the foo attribute).
By default callables are assumed to be instance methods. If you like you can include modifiers to apply before attaching to the class as in the following case where we construct a
staticmethod
.>>> @Stream.register_api(staticmethod) ... class foo(Stream): ... ...
>>> Stream.foo(...) # Foo operates as a static method
You can also provide an optional
attribute_name
argument to control the name of the attribute your callable will be attached as.>>> @Stream.register_api(attribute_name="bar") ... class foo(Stream): ... ...
>> Stream().bar(…) # foo was actually attached as bar
-
sum
()¶ Sum elements
-
to_dataframe
()¶ Convert to a streaming dataframe
This calls
pd.DataFrame
on all list-elements of this stream
-
to_stream
()¶ Concatenate batches and return base Stream
Returned stream will be composed of single elements
-
verify
(x)¶ Verify elements that pass through this stream
-
class
streamz.dataframe.
DataFrame
(*args, **kwargs)¶ A Streaming Dataframe
This is a logical collection over a stream of Pandas dataframes. Operations on this object will translate to the appropriate operations on the underlying Pandas dataframes.
See also
Series
Attributes: - columns
- current_value
- dtypes
- index
- plot
size
size of frame
Methods
accumulate_partitions
(func, *args, **kwargs)Accumulate a function with state across batch elements assign
(**kwargs)Assign new columns to this dataframe count
([start])Count of frame cummax
()Cumulative maximum cummin
()Cumulative minimum cumprod
()Cumulative product cumsum
()Cumulative sum from_periodic
([datafn, interval, dask, start])A streaming dataframe using the asyncio ioloop to poll a callback fn groupby
(other)Groupby aggregations map_partitions
(func, *args, **kwargs)Map a function across all batch elements of this stream mean
([start])Average frame random
([freq, interval, dask, start, datafn])PeriodicDataFrame providing random values by default register_api
([modifier, attribute_name])Add callable to Stream API reset_index
()Reset Index rolling
(window[, min_periods, with_state, start])Compute rolling aggregations round
([decimals])Round elements in frame set_index
(index, **kwargs)Set Index sum
([start])Sum frame. tail
([n])Round elements in frame to_frame
()Convert to a streaming dataframe verify
(x)Verify consistency of elements that pass through this stream window
([n, value, with_state, start])Sliding window operations aggregate astype emit map query register_plugin_entry_point start stop -
accumulate_partitions
(func, *args, **kwargs)¶ Accumulate a function with state across batch elements
See also
Streaming.map_partitions
-
assign
(**kwargs)¶ Assign new columns to this dataframe
Alternatively use setitem syntax
Examples
>>> sdf = sdf.assign(z=sdf.x + sdf.y) # doctest: +SKIP >>> sdf['z'] = sdf.x + sdf.y # doctest: +SKIP
-
count
(start=None)¶ Count of frame
Parameters: - start: None or resulting Python object type from the operation
Accepts a valid start state.
-
cummax
()¶ Cumulative maximum
-
cummin
()¶ Cumulative minimum
-
cumprod
()¶ Cumulative product
-
cumsum
()¶ Cumulative sum
-
static
from_periodic
(datafn=<function random_datablock>, interval='500ms', dask=False, start=True, **kwargs)¶ A streaming dataframe using the asyncio ioloop to poll a callback fn
Parameters: - datafn: callable
Callback function accepting **kwargs and returning a pd.DataFrame. kwargs will include at least ‘last’ (pd.Timestamp.now() when datafn was last invoked), and ‘now’ (current pd.Timestamp.now()).
- interval: timedelta
The time interval between new dataframes.
- dask: boolean
If true, uses a DaskStream instead of a regular Source.
- **kwargs:
Optional keyword arguments to be passed into the callback function.
- By default, returns a three-column random pd.DataFrame generated
- by the ‘random_datablock’ function.
-
groupby
(other)¶ Groupby aggregations
-
static
map_partitions
(func, *args, **kwargs)¶ Map a function across all batch elements of this stream
The output stream type will be determined by the action of that function on the example
See also
Streaming.accumulate_partitions
-
mean
(start=None)¶ Average frame
Parameters: - start: None or resulting Python object type from the operation
Accepts a valid start state.
-
static
random
(freq='100ms', interval='500ms', dask=False, start=True, datafn=<function random_datablock>)¶ PeriodicDataFrame providing random values by default
Accepts same parameters as PeriodicDataFrame, plus freq, a string that will be converted to a pd.Timedelta and passed to the ‘datafn’.
Useful mainly for examples and docs.
-
classmethod
register_api
(modifier=<function identity>, attribute_name=None)¶ Add callable to Stream API
This allows you to register a new method onto this class. You can use it as a decorator.:
>>> @Stream.register_api() ... class foo(Stream): ... ... >>> Stream().foo(...) # this works now
It attaches the callable as a normal attribute to the class object. In doing so it respects inheritance (all subclasses of Stream will also get the foo attribute).
By default callables are assumed to be instance methods. If you like you can include modifiers to apply before attaching to the class as in the following case where we construct a
staticmethod
.>>> @Stream.register_api(staticmethod) ... class foo(Stream): ... ...
>>> Stream.foo(...) # Foo operates as a static method
You can also provide an optional
attribute_name
argument to control the name of the attribute your callable will be attached as.>>> @Stream.register_api(attribute_name="bar") ... class foo(Stream): ... ...
>> Stream().bar(…) # foo was actually attached as bar
-
reset_index
()¶ Reset Index
-
rolling
(window, min_periods=1, with_state=False, start=())¶ Compute rolling aggregations
When followed by an aggregation method like
sum
,mean
, orstd
this produces a new Streaming dataframe whose values are aggregated over that window.The window parameter can be either a number of rows or a timedelta like ``”2 minutes”` in which case the index should be a datetime index.
This operates by keeping enough of a backlog of records to maintain an accurate stream. It performs a copy at every added dataframe. Because of this it may be slow if the rolling window is much larger than the average stream element.
Parameters: - window: int or timedelta
Window over which to roll
- with_state: bool (False)
Whether to return the state along with the result as a tuple (state, result). State may be needed downstream for a number of reasons like checkpointing.
- start: () or resulting Python object type from the operation
Accepts a valid start state.
Returns: - Rolling object
See also
DataFrame.window
- more generic window operations
-
round
(decimals=0)¶ Round elements in frame
-
set_index
(index, **kwargs)¶ Set Index
-
size
¶ size of frame
-
sum
(start=None)¶ Sum frame.
Parameters: - start: None or resulting Python object type from the operation
Accepts a valid start state.
-
tail
(n=5)¶ Round elements in frame
-
to_frame
()¶ Convert to a streaming dataframe
-
verify
(x)¶ Verify consistency of elements that pass through this stream
-
window
(n=None, value=None, with_state=False, start=None)¶ Sliding window operations
Windowed operations are defined over a sliding window of data, either with a fixed number of elements:
>>> df.window(n=10).sum() # sum of the last ten elements
or over an index value range (index must be monotonic):
>>> df.window(value='2h').mean() # average over the last two hours
Windowed dataframes support all normal arithmetic, aggregations, and groupby-aggregations.
Parameters: - n: int
Window of number of elements over which to roll
- value: str
Window of time over which to roll
- with_state: bool (False)
Whether to return the state along with the result as a tuple (state, result). State may be needed downstream for a number of reasons like checkpointing.
- start: None or resulting Python object type from the operation
Accepts a valid start state.
See also
DataFrame.rolling
- mimic’s Pandas rolling aggregations
Examples
>>> df.window(n=10).std() >>> df.window(value='2h').count()
>>> w = df.window(n=100) >>> w.groupby(w.name).amount.sum() >>> w.groupby(w.x % 10).y.var()
-
class
streamz.dataframe.
Rolling
(sdf, window, min_periods, with_state, start)¶ Rolling aggregations
This intermediate class enables rolling aggregations across either a fixed number of rows or a time window.
Examples
>>> sdf.rolling(10).x.mean() # doctest: +SKIP >>> sdf.rolling('100ms').x.mean() # doctest: +SKIP
Methods
aggregate
(*args, **kwargs)Rolling aggregation count
(*args, **kwargs)Rolling count max
()Rolling maximum mean
()Rolling mean median
()Rolling median min
()Rolling minimum quantile
(*args, **kwargs)Rolling quantile std
(*args, **kwargs)Rolling standard deviation sum
()Rolling sum var
(*args, **kwargs)Rolling variance -
aggregate
(*args, **kwargs)¶ Rolling aggregation
-
count
(*args, **kwargs)¶ Rolling count
-
max
()¶ Rolling maximum
-
mean
()¶ Rolling mean
-
median
()¶ Rolling median
-
min
()¶ Rolling minimum
-
quantile
(*args, **kwargs)¶ Rolling quantile
-
std
(*args, **kwargs)¶ Rolling standard deviation
-
sum
()¶ Rolling sum
-
var
(*args, **kwargs)¶ Rolling variance
-
-
class
streamz.dataframe.
Window
(sdf, n=None, value=None, with_state=False, start=None)¶ Windowed aggregations
This provides a set of aggregations that can be applied over a sliding window of data.
See also
DataFrame.window
- contains full docstring
Attributes: - columns
- dtypes
- example
- index
size
Number of elements within window
Methods
apply
(func)Apply an arbitrary function over each window of data count
()Count elements within window groupby
(other)Groupby-aggregations within window mean
()Average elements within window std
([ddof])Compute standard deviation of elements within window sum
()Sum elements within window value_counts
()Count groups of elements within window var
([ddof])Compute variance of elements within window aggregate full map_partitions reset_index -
apply
(func)¶ Apply an arbitrary function over each window of data
-
count
()¶ Count elements within window
-
groupby
(other)¶ Groupby-aggregations within window
-
mean
()¶ Average elements within window
-
size
¶ Number of elements within window
-
std
(ddof=1)¶ Compute standard deviation of elements within window
-
sum
()¶ Sum elements within window
-
value_counts
()¶ Count groups of elements within window
-
var
(ddof=1)¶ Compute variance of elements within window
-
class
streamz.dataframe.
GroupBy
(root, grouper, index=None)¶ Groupby aggregations on streaming dataframes
Methods
count
([start])Groupby-count mean
([with_state, start])Groupby-mean size
()Groupby-size std
([ddof])Groupby-std sum
([start])Groupby-sum var
([ddof])Groupby-variance -
count
(start=None)¶ Groupby-count
Parameters: - start: None or resulting Python object type from the operation
Accepts a valid start state.
-
mean
(with_state=False, start=None)¶ Groupby-mean
Parameters: - start: None or resulting Python object type from the operation
Accepts a valid start state.
-
size
()¶ Groupby-size
-
std
(ddof=1)¶ Groupby-std
-
sum
(start=None)¶ Groupby-sum
Parameters: - start: None or resulting Python object type from the operation
Accepts a valid start state.
-
var
(ddof=1)¶ Groupby-variance
-
-
class
streamz.dataframe.
Random
(freq='100ms', interval='500ms', dask=False, start=True, datafn=<function random_datablock>)¶ PeriodicDataFrame providing random values by default
Accepts same parameters as PeriodicDataFrame, plus freq, a string that will be converted to a pd.Timedelta and passed to the ‘datafn’.
Useful mainly for examples and docs.
Attributes: - columns
- current_value
- dtypes
- index
- plot
size
size of frame
Methods
accumulate_partitions
(func, *args, **kwargs)Accumulate a function with state across batch elements assign
(**kwargs)Assign new columns to this dataframe count
([start])Count of frame cummax
()Cumulative maximum cummin
()Cumulative minimum cumprod
()Cumulative product cumsum
()Cumulative sum from_periodic
([datafn, interval, dask, start])A streaming dataframe using the asyncio ioloop to poll a callback fn groupby
(other)Groupby aggregations map_partitions
(func, *args, **kwargs)Map a function across all batch elements of this stream mean
([start])Average frame random
([freq, interval, dask, start, datafn])PeriodicDataFrame providing random values by default register_api
([modifier, attribute_name])Add callable to Stream API reset_index
()Reset Index rolling
(window[, min_periods, with_state, start])Compute rolling aggregations round
([decimals])Round elements in frame set_index
(index, **kwargs)Set Index sum
([start])Sum frame. tail
([n])Round elements in frame to_frame
()Convert to a streaming dataframe verify
(x)Verify consistency of elements that pass through this stream window
([n, value, with_state, start])Sliding window operations aggregate astype emit map query register_plugin_entry_point start stop