Plugins

In addition to using @Stream.register_api() decorator, custom stream nodes can be added to Streamz by installing 3rd-party Python packages.

Known plugins

Extras

These plugins are supported by the Streamz community and can be installed as extras, e.g. pip install streamz[kafka].

There are no plugins here yet, but hopefully soon there will be.

Entry points

Plugins register themselves with Streamz by using entry_points argument in setup.py:

# setup.py

from setuptools import setup

setup(
    name="streamz_example_plugin",
    version="0.0.1",
    entry_points={
        "streamz.nodes": [
            "repeat = streamz_example_plugin:RepeatNode"
        ]
    }
)

In this example, RepeatNode class will be imported from streamz_example_plugin package and will be available as Stream.repeat. In practice, class name and entry point name (the part before = in entry point definition) are usually the same, but they can be different.

Different kinds of add-ons go into different entry point groups:

Node type

Required parent class

Entry point group

Source

streamz.Source

streamz.sources

Node

streamz.Stream

streamz.nodes

Sink

streamz.Sink

streamz.sinks

Lazy loading

Streamz will attach methods from existing plugins to the Stream class when it’s imported, but actual classes will be loaded only when the corresponding Stream method is first called. Streamz will also validate the loaded class before attaching it and will raise an appropriate exception if validation fails.

Reference implementation

Let’s look at how stream nodes can be implemented.

# __init__.py

from tornado import gen
from streamz import Stream


class RepeatNode(Stream):

    def __init__(self, upstream, n, **kwargs):
        super().__init__(upstream, ensure_io_loop=True, **kwargs)
        self._n = n

    @gen.coroutine
    def update(self, x, who=None, metadata=None):
        for _ in range(self._n):
            yield self._emit(x, metadata=metadata)

As you can see, implementation is the same as usual, but there’s no @Stream.register_api() — Streamz will take care of that when loading the plugin. It will still work if you add the decorator, but you don’t have to.