Components and Concepts

Session and Config

Zenoh supports two paradigms of communication: Publish/Subscribe and Query/Reply. The entities that perform communication (for example, publishers, subscribers, queriers, and queryables) are declared through a zenoh.Session. A session is created by the zenoh.open() function, which takes a zenoh.Config as an argument.

The configuration is stored in a JSON file and can be read with zenoh.Config.from_file(). The file format is documented in the Zenoh Rust API Config reference.

Important

The recommended way to create a session is using a context manager (with statement). If a session is not explicitly closed or managed with a context manager, on exit object finalizers may be called when the library thread has already been killed, which can cause the script to hang.

Either use a context manager (recommended) or explicitly call zenoh.Session.close() before your script exits. See examples in the Quick Start Examples section.

Example: Creating a session with context manager

# Recommended: Using context manager
# The session is automatically closed when exiting the 'with' block
with zenoh.open(zenoh.Config()) as session:
    # Use the session
    session.put("demo/example/hello", "Hello World!")

Example: Creating a session with explicit close

# Alternative: Explicit open and close
# You must explicitly close the session before script exit
session = zenoh.open(zenoh.Config())
try:
    # Use the session
    session.put("demo/example/hello", "Hello World!")
finally:
    # Always close the session
    session.close()

Key Expressions

Key expressions are Zenoh’s address space.

In Zenoh, data is associated with keys in the form of a slash-separated path, e.g., robot/sensor/temp. The requesting side uses key expressions to address the data of interest. Key expressions can contain wildcards:

  • * matches any chunk (a chunk is a sequence of characters between / separators)

  • ** matches any number of chunks (including zero chunks)

For example:

  • robot/sensor/* matches robot/sensor/temp, robot/sensor/humidity, etc.

  • robot/** matches robot/sensor/temp, robot/actuator/motor, robot/status, etc.

The zenoh.KeyExpr class provides validation and operations on key expressions. The zenoh.KeyExpr constructor validates the syntax of the provided string and raises a zenoh.ZError exception if the syntax is invalid (e.g., it contains spaces, other illegal characters, or has empty chunks like foo//bar or /foo).

The zenoh.KeyExpr constructor raises an exception for key expressions that are valid but not in canonical form. For example, robot/sensor/**/* is valid, but its canonical form is robot/sensor/*/**. The zenoh.KeyExpr.autocanonize() method can accept such key expressions and convert them to their canonical form.

Example: Validating key expressions

try:
    # Valid key expressions
    valid_ke = KeyExpr("robot/sensor/temperature")
    assert str(valid_ke) == "robot/sensor/temperature"
    canonized_ke = KeyExpr.autocanonize("robot/sensor/**/*/**/**")
    assert str(canonized_ke) == "robot/sensor/*/**"

    # Invalid key expression (empty segment)
    invalid_ke = KeyExpr("robot/sensor//*")
    assert True, "This line should not be reached"
except zenoh.ZError as e:
    print(f"Validation error: {e}")

Key expressions support operations such as intersection and inclusion (see zenoh.KeyExpr.intersects() and zenoh.KeyExpr.includes()), which help determine how different expressions relate to each other.

Example: Performing operations on key expressions

# Create a key expression with validation
sensor_ke = KeyExpr("robot/sensor")
assert str(sensor_ke) == "robot/sensor"

# Join with another segment
temp_ke = sensor_ke.join("temp")
assert str(temp_ke) == "robot/sensor/temp"

# Create a wildcard expression
all_sensors = sensor_ke.join("**")
assert str(all_sensors) == "robot/sensor/**"

# Check inclusion
assert all_sensors.includes(temp_ke)
assert not temp_ke.includes(all_sensors)

# Check intersection
assert all_sensors.intersects(temp_ke)
assert not sensor_ke.intersects(KeyExpr("robot/actuator"))

Key expressions can also be declared with the session to optimize routing and network usage:

Example: Declaring key expressions

# Declare a key expression for optimized routing
declared_ke = session.declare_keyexpr("robot/sensor/temperature")

# Use the declared key expression
publisher = session.declare_publisher(declared_ke)

Publish/Subscribe

Data is published via a zenoh.Publisher, which is declared using zenoh.Session.declare_publisher(). The publisher exposes two primary operations: zenoh.Publisher.put() and zenoh.Publisher.delete(). Publishing can also be performed directly from the session via zenoh.Session.put() and zenoh.Session.delete().

Published data is received as zenoh.Sample instances by a zenoh.Subscriber, which is declared using zenoh.Session.declare_subscriber(). The samples are delivered to the callback or channel (Channels and callbacks).

Publishing can express two different semantics:

  • producing a sequence of values

  • updating a single value associated with a key expression

In the second case, it is necessary to indicate that a key is no longer associated with any value; the zenoh.Publisher.delete() operation is used for this.

On the receiving side, the subscriber distinguishes between zenoh.SampleKind.PUT and zenoh.SampleKind.DELETE using the zenoh.Sample.kind field in the zenoh.Sample structure.

The delete operation allows a subscriber to work with a zenoh.Queryable that caches the values associated with key expressions.

Example: Declaring a publisher and publishing data

# Declare a publisher and publish data
publisher = session.declare_publisher("key/expression")
publisher.put("value")

Example: Declaring a subscriber and receiving data

# Declare a subscriber and receive data
subscriber = session.declare_subscriber("key/expression")
for sample in subscriber:
    print(f">> Received {sample.payload.to_string()}")

Example: Using session methods directly

# Direct put operation
session.put("key/expression", "value")

# Direct delete operation
session.delete("key/expression")

Query/Reply

In the query/reply paradigm, data is made available by a zenoh.Queryable and requested by a zenoh.Querier or directly via zenoh.Session.get().

A zenoh.Queryable is declared using zenoh.Session.declare_queryable(). It serves zenoh.Query requests via a callback or channel (Channels and callbacks).

The zenoh.Query provides the zenoh.Query.reply() method to reply with a data sample of the zenoh.SampleKind.PUT kind, and zenoh.Query.reply_del() to send a zenoh.SampleKind.DELETE reply. See Publish/Subscribe for more details on the difference between the two sample kinds. There is also the zenoh.Query.reply_err() method which can be used to send a reply containing error information.

Data is requested from queryables via zenoh.Session.get() or via a zenoh.Querier object. Each request returns zero or more zenoh.Reply structures — one per queryable that matches the request. Each reply contains either a zenoh.Sample from reply and reply_del or a zenoh.ReplyError from reply_err.

Example: Declaring a queryable

# Queryable that replies with temperature data for a given day
queryable = session.declare_queryable("room/temperature/history")
query_count = 0
for query in queryable:
    if "day" in query.selector.parameters:
        day = query.selector.parameters["day"]
        if day in temperature_data:
            query.reply("room/temperature/history", temperature_data[day])
        else:
            query.reply_del("room/temperature/history")
    else:
        query.reply_err("missing day parameter")

Example: Requesting data using Session.get

# Request temperature for a specific day
replies = session.get("room/temperature/history?day=2023-03-15")
for reply in replies:
    if reply.ok:
        print(f">> Temperature is {reply.ok.payload.to_string()}")
    else:
        print(f">> Error: {reply.err.payload.to_string()}")

Example: Using a Querier

# Declare a querier for multiple queries
querier = session.declare_querier("room/temperature/history")

# Send a query with parameters
replies = querier.get(parameters="?day=2023-03-15")
for reply in replies:
    if reply.ok:
        print(f">> Temperature is {reply.ok.payload.to_string()}")
    else:
        print(f">> Error: {reply.err.payload.to_string()}")

Query Parameters

The query/reply API allows specifying additional parameters for the request. A zenoh.Selector object is passed to the zenoh.Session.get() operation. It combines a key expression with optional parameters and can be constructed from these elements or by parsing a selector string. The selector string has a syntax similar to a URL: it is a key expression followed by a question mark and a list of parameters in the format “name=value”, separated by ;. For example: key/expression?param1=value1;param2=value2.

Alternatively, parameters can be constructed programmatically using the zenoh.Parameters class, which accepts a dictionary, and then combined with a key expression to create a zenoh.Selector.

On the receiving side, queryables can access these parameters via zenoh.Query.parameters.

Example: Constructing a Selector from dictionary

# Create parameters from a dictionary
params = zenoh.Parameters({"day": "2023-03-15", "format": "celsius"})

# Create a selector from key expression and parameters
selector = zenoh.Selector("room/temperature/history", params)

# Request data using the selector
replies = session.get(selector)
for reply in replies:
    if reply.ok:
        print(f">> {reply.ok.payload.to_string()}")

Data representation

Data is received as zenoh.Sample objects, which contain the zenoh.Sample.payload and associated metadata like zenoh.Sample.timestamp, zenoh.Sample.encoding, and zenoh.Sample.kind. Additionally, optional user-defined metadata can be attached via zenoh.Sample.attachment.

Both zenoh.Sample.payload and zenoh.Sample.attachment are of type zenoh.ZBytes, which represents raw byte data.

Example: Using zenoh.ZBytes

# Raw bytes
payload = zenoh.ZBytes(b"Hello, World!")
data = payload.to_bytes()
assert isinstance(data, bytes)
assert data == b"Hello, World!"

# String data
payload = zenoh.ZBytes("Hello, World!")
text = payload.to_string()
assert isinstance(text, str)
assert text == "Hello, World!"

Serialization and deserialization of basic types and structures is provided in the zenoh.ext module via zenoh.ext.z_serialize() and zenoh.ext.z_deserialize().

Example: Data serialization

# Using zenoh.ext for serialization
from zenoh.ext import z_deserialize, z_serialize

# Serialize a dictionary
data = {"temperature": 25.5, "humidity": 60.0}
payload = z_serialize(data)
assert isinstance(payload, zenoh.ZBytes)

# Deserialize back
received = z_deserialize(dict[str, float], payload)
assert isinstance(received, dict)
assert received == {"temperature": 25.5, "humidity": 60.0}

Encoding

Zenoh uses zenoh.Encoding to indicate how data should be interpreted by the application. An encoding has a similar role to Content-Type in HTTP and is represented as a string in MIME-like format: type/subtype[;schema].

To optimize network usage, Zenoh internally maps some predefined encoding strings to integer identifiers. These encodings are provided as class attributes of the zenoh.Encoding class, such as zenoh.Encoding.ZENOH_BYTES, zenoh.Encoding.APPLICATION_JSON, etc. This internal mapping is not exposed to the application layer, but using these predefined encodings is more efficient than custom strings.

The Zenoh protocol does not impose any encoding value and does not operate on it. It can be seen as optional metadata that is carried over by Zenoh, allowing applications to perform different operations depending on the encoding value.

Additionally, a schema can be associated with the encoding. The convention is to use the ; separator if an encoding is created from a string. Alternatively, zenoh.Encoding.with_schema() can be used to add a schema to one of the predefined class attributes.

Example: Creating an zenoh.Encoding from a string and vice versa

encoding = zenoh.Encoding("text/plain")
text = str(encoding)
assert text == "text/plain"

Example: Using the schema

encoding1 = zenoh.Encoding("text/plain;utf-8")
encoding2 = zenoh.Encoding.TEXT_PLAIN.with_schema("utf-8")
assert encoding1 == encoding2
assert str(encoding1) == "text/plain;utf-8"
assert str(encoding2) == "text/plain;utf-8"

Scouting

Scouting is the process of discovering Zenoh nodes on the network. The scouting process depends on the transport layer and the Zenoh configuration. Note that it is not necessary to explicitly discover other nodes to publish, subscribe, or query data.

Scouting is performed using the zenoh.scout() function, which returns a zenoh.Scout object that yields zenoh.Hello messages for each discovered Zenoh node.

Scouting is different from liveliness requesting and monitoring. Liveliness works on the Zenoh protocol logical level and allows getting information about resources in terms of key expressions. On the other hand, scouting is about discovering Zenoh nodes visible to the local node on the network. The result of scouting is a list of zenoh.Hello messages, each containing information about a discovered Zenoh node:

See more details at scouting documentation.

Example: Scouting for Zenoh nodes

scout = zenoh.scout(what="peer|router")
threading.Timer(1.0, lambda: scout.stop()).start()
for hello in scout:
    print(hello)

Liveliness

Zenoh supports liveliness monitoring to notify when a specified resource appears or disappears on the network.

Sometimes it is necessary to know whether a Zenoh node is available. This can be achieved by declaring special publishers and queryables, but the dedicated liveliness API is more convenient and efficient.

The zenoh.Liveliness object is created by calling zenoh.Session.liveliness(). It allows a node to declare a zenoh.LivelinessToken associated with a key expression. To declare the token, use zenoh.Liveliness.declare_token().

Other nodes can query this key expression using zenoh.Liveliness.get(). They can also subscribe using zenoh.Liveliness.declare_subscriber() to be notified when the token appears or disappears.

The history parameter of zenoh.Liveliness.declare_subscriber() allows immediate receipt of tokens that are already present on the network.

Example: Declaring a liveliness token

# Declare a liveliness token
token = session.liveliness().declare_token("node/A")

Example: Getting currently present liveliness tokens

# Get currently present liveliness tokens
replies = session.liveliness().get("node/A", timeout=5)
for reply in replies:
    if reply.ok:
        print(f"Alive token ('{reply.ok.key_expr}')")
    else:
        print(f"Received (ERROR: '{reply.err.payload.to_string()}')")

Example: Checking if a liveliness token is present and subscribing to changes

# Check if a liveliness token is present and subscribe to changes
subscriber = session.liveliness().declare_subscriber("node/A", history=True)
for sample in subscriber:
    if sample.kind == zenoh.SampleKind.PUT:
        print(f"Alive token ('{sample.key_expr}')")
    elif sample.kind == zenoh.SampleKind.DELETE:
        print(f"Dropped token ('{sample.key_expr}')")

Matching

The matching API lets the active side of communication (publisher or querier) learn whether there are interested parties on the other side (subscriber or queryable). This information can save bandwidth and CPU resources.

Declare a zenoh.MatchingListener via zenoh.Publisher.declare_matching_listener() or zenoh.Querier.declare_matching_listener().

The matching listener behaves like a subscriber, but instead of producing data samples it yields zenoh.MatchingStatus instances whenever the matching status changes — for example, when the first matching subscriber or queryable appears or when the last one disappears.

Example: Declaring a matching listener for a publisher

# Declare a matching listener for a publisher
publisher = session.declare_publisher("key/expression")
listener = publisher.declare_matching_listener()
for status in listener:
    if status.matching:
        print(">> Publisher has at least one matching subscriber")
    else:
        print(">> Publisher has no matching subscribers")

Example: Declaring a matching listener for a querier

# Declare a matching listener for a querier
querier = session.declare_querier("service/endpoint")
listener = querier.declare_matching_listener()
for status in listener:
    if status.matching:
        print(">> Querier has at least one matching queryable")
    else:
        print(">> Querier has no matching queryables")

Channels and callbacks

There are two ways to receive sequential data from Zenoh primitives (for example, a series of zenoh.Sample objects from a zenoh.Subscriber or zenoh.Reply objects from a zenoh.Query): by channel or by callback.

This behavior is controlled by the handler parameter of the declare methods (for example, zenoh.Session.declare_subscriber() and zenoh.Session.declare_querier()). The parameter can be either a callable (a function or a method) or a channel type (blocking zenoh.handlers.FifoChannel or non-blocking zenoh.handlers.RingChannel). By default, the handler parameter is None, which uses zenoh.handlers.DefaultHandler (a FIFO channel with default capacity).

Channels

When constructed with a zenoh.handlers.FifoChannel or zenoh.handlers.RingChannel as handler (or using the default one), the returned object is iterable and can be used in a for loop to receive data sequentially. It also provides explicit methods such as zenoh.Subscriber.recv() to wait for data and zenoh.Subscriber.try_recv() to attempt a non-blocking receive. The subscriber (or queryable) is automatically undeclared when the object goes out of scope or when zenoh.Subscriber.undeclare() is explicitly called.

# Default channel
subscriber_default = session.declare_subscriber("key/expr")

# Explicit FIFO channel with custom capacity
subscriber_fifo = session.declare_subscriber(
    "key/expr", zenoh.handlers.FifoChannel(100)
)

# Ring channel (drops oldest when full)
subscriber_ring = session.declare_subscriber("key/expr", zenoh.handlers.RingChannel(50))

Callbacks

Caution

Calling Zenoh API functions, as well as performing any blocking operations from within a callback is disallowed. Even if this works in some particular cases, it’s unsafe and may lead to deadlocks or crashes at any moment or with the future updates of the library.

It is possible to pass a callable object as handler. This callable is invoked for each received zenoh.Sample or zenoh.Reply. This also means the subscriber or queryable runs in background mode, i.e., it remains active even if the returned object goes out of scope. This allows declaring a subscriber without managing the returned object’s lifetime.

def on_sample(sample):
    print(sample.payload.to_string())


# Subscriber runs in background mode
subscriber = session.declare_subscriber("key/expr", on_sample)
# The subscriber remains active even if 'subscriber' variable is not used

For more advanced callback handling, you can use zenoh.handlers.Callback to create a callback handler with cleanup functionality.

def on_sample(sample):
    print(sample.payload.to_string())


def on_cleanup():
    print("Subscriber undeclared")


callback = zenoh.handlers.Callback(on_sample, drop=on_cleanup)
subscriber = session.declare_subscriber("key/expr", callback)
# The subscriber remains active even if 'subscriber' variable is not used

Custom channel implementation

For advanced use cases, you can implement your own custom channel in Python and pass it in the tuple form (callback, handler) where callback is a callable and handler is your custom Python object. This solution has the same performance penalties as the callback API, but it can be useful in some scenarios.

The callback is invoked for each received item and stores the data in the custom channel, which is accessible via the zenoh.Subscriber.handler() property, in the same way as with built-in channels.

Custom channel with priority queue

class PriorityChannel:
    def __init__(self, maxsize=100):
        self.queue: queue.PriorityQueue = queue.PriorityQueue(maxsize)
        # Counter to preserve FIFO order for samples with same priority
        self._counter = 0

    def recv(self) -> zenoh.Sample:
        return self.queue.get()[2]

    def send(self, sample: zenoh.Sample):
        self.queue.put((sample.priority, self._counter, sample))
        self._counter += 1

Usage of the custom channel

with zenoh.open(zenoh.Config()) as session:
    channel = PriorityChannel(maxsize=50)
    subscriber = session.declare_subscriber("key/expression", (channel.send, channel))
    sample = subscriber.handler.recv()
    print(f">> Received: {sample.payload.to_string()}")