Plugins¶
In addition to using @Stream.register_api()
decorator, custom stream nodes can
be added to Streamz by installing 3rd-party Python packages.
Known plugins¶
Extras¶
These plugins are supported by the Streamz community and can be installed as extras,
e.g. pip install streamz[kafka]
.
There are no plugins here yet, but hopefully soon there will be.
Entry points¶
Plugins register themselves with Streamz by using entry_points
argument
in setup.py
:
# setup.py
from setuptools import setup
setup(
name="streamz_example_plugin",
version="0.0.1",
entry_points={
"streamz.nodes": [
"repeat = streamz_example_plugin:RepeatNode"
]
}
)
In this example, RepeatNode
class will be imported from
streamz_example_plugin
package and will be available as Stream.repeat
.
In practice, class name and entry point name (the part before =
in entry point
definition) are usually the same, but they can be different.
Different kinds of add-ons go into different entry point groups:
Node type |
Required parent class |
Entry point group |
---|---|---|
Source |
|
|
Node |
|
|
Sink |
|
|
Lazy loading¶
Streamz will attach methods from existing plugins to the Stream
class when it’s
imported, but actual classes will be loaded only when the corresponding Stream
method is first called. Streamz will also validate the loaded class before attaching it
and will raise an appropriate exception if validation fails.
Reference implementation¶
Let’s look at how stream nodes can be implemented.
# __init__.py
from tornado import gen
from streamz import Stream
class RepeatNode(Stream):
def __init__(self, upstream, n, **kwargs):
super().__init__(upstream, ensure_io_loop=True, **kwargs)
self._n = n
@gen.coroutine
def update(self, x, who=None, metadata=None):
for _ in range(self._n):
yield self._emit(x, metadata=metadata)
As you can see, implementation is the same as usual, but there’s no
@Stream.register_api()
— Streamz will take care of that when loading the plugin.
It will still work if you add the decorator, but you don’t have to.