Components and Concepts
Session and Config
Zenoh supports two paradigms of communication: Publish/Subscribe and Query/Reply. The entities
that perform communication (for example, publishers, subscribers, queriers, and queryables) are
declared through a zenoh.Session. A session is created by the zenoh.open() function,
which takes a zenoh.Config as an argument.
The configuration is stored in a JSON file and can be read with zenoh.Config.from_file().
The file format is documented in the Zenoh Rust API
Config reference.
Important
The recommended way to create a session is using a context manager (with statement).
If a session is not explicitly closed or managed with a context manager, on exit object
finalizers may be called when the library thread has already been killed, which can
cause the script to hang.
Either use a context manager (recommended) or explicitly call zenoh.Session.close()
before your script exits. See examples in the Quick Start Examples section.
Example: Creating a session with context manager
# Recommended: Using context manager
# The session is automatically closed when exiting the 'with' block
with zenoh.open(zenoh.Config()) as session:
# Use the session
session.put("demo/example/hello", "Hello World!")
Example: Creating a session with explicit close
# Alternative: Explicit open and close
# You must explicitly close the session before script exit
session = zenoh.open(zenoh.Config())
try:
# Use the session
session.put("demo/example/hello", "Hello World!")
finally:
# Always close the session
session.close()
Key Expressions
Key expressions are Zenoh’s address space.
In Zenoh, data is associated with keys in the form of a slash-separated path, e.g., robot/sensor/temp. The
requesting side uses key expressions to address the data of interest. Key expressions can contain
wildcards:
*matches any chunk (a chunk is a sequence of characters between/separators)**matches any number of chunks (including zero chunks)
For example:
robot/sensor/*matchesrobot/sensor/temp,robot/sensor/humidity, etc.robot/**matchesrobot/sensor/temp,robot/actuator/motor,robot/status, etc.
The zenoh.KeyExpr class provides validation and operations on key
expressions. The zenoh.KeyExpr constructor validates the syntax of the provided string
and raises a zenoh.ZError exception if the syntax is invalid (e.g., it contains spaces, other illegal characters, or has empty chunks like foo//bar or /foo).
The zenoh.KeyExpr constructor raises an exception for key expressions that are valid but not in
canonical form.
For example, robot/sensor/**/* is valid, but its canonical form is robot/sensor/*/**.
The zenoh.KeyExpr.autocanonize() method can accept such key expressions and
convert them to their canonical form.
Example: Validating key expressions
try:
# Valid key expressions
valid_ke = KeyExpr("robot/sensor/temperature")
assert str(valid_ke) == "robot/sensor/temperature"
canonized_ke = KeyExpr.autocanonize("robot/sensor/**/*/**/**")
assert str(canonized_ke) == "robot/sensor/*/**"
# Invalid key expression (empty segment)
invalid_ke = KeyExpr("robot/sensor//*")
assert True, "This line should not be reached"
except zenoh.ZError as e:
print(f"Validation error: {e}")
Key expressions support operations such as intersection and inclusion (see
zenoh.KeyExpr.intersects() and zenoh.KeyExpr.includes()), which
help determine how different expressions relate to each other.
Example: Performing operations on key expressions
# Create a key expression with validation
sensor_ke = KeyExpr("robot/sensor")
assert str(sensor_ke) == "robot/sensor"
# Join with another segment
temp_ke = sensor_ke.join("temp")
assert str(temp_ke) == "robot/sensor/temp"
# Create a wildcard expression
all_sensors = sensor_ke.join("**")
assert str(all_sensors) == "robot/sensor/**"
# Check inclusion
assert all_sensors.includes(temp_ke)
assert not temp_ke.includes(all_sensors)
# Check intersection
assert all_sensors.intersects(temp_ke)
assert not sensor_ke.intersects(KeyExpr("robot/actuator"))
Key expressions can also be declared with the session to optimize routing and network usage:
Example: Declaring key expressions
# Declare a key expression for optimized routing
declared_ke = session.declare_keyexpr("robot/sensor/temperature")
# Use the declared key expression
publisher = session.declare_publisher(declared_ke)
Publish/Subscribe
Data is published via a zenoh.Publisher, which is declared using
zenoh.Session.declare_publisher(). The publisher exposes two primary operations:
zenoh.Publisher.put() and zenoh.Publisher.delete(). Publishing can also be performed
directly from the session via zenoh.Session.put() and zenoh.Session.delete().
Published data is received as zenoh.Sample instances by a zenoh.Subscriber,
which is declared using zenoh.Session.declare_subscriber(). The samples are delivered to the
callback or channel (Channels and callbacks).
Publishing can express two different semantics:
producing a sequence of values
updating a single value associated with a key expression
In the second case, it is necessary to indicate that a key is no longer associated
with any value; the zenoh.Publisher.delete() operation is used for this.
On the receiving side, the subscriber distinguishes between
zenoh.SampleKind.PUT and zenoh.SampleKind.DELETE using the
zenoh.Sample.kind field in the zenoh.Sample structure.
The delete operation allows a subscriber to work with a zenoh.Queryable
that caches the values associated with key expressions.
Example: Declaring a publisher and publishing data
# Declare a publisher and publish data
publisher = session.declare_publisher("key/expression")
publisher.put("value")
Example: Declaring a subscriber and receiving data
# Declare a subscriber and receive data
subscriber = session.declare_subscriber("key/expression")
for sample in subscriber:
print(f">> Received {sample.payload.to_string()}")
Example: Using session methods directly
# Direct put operation
session.put("key/expression", "value")
# Direct delete operation
session.delete("key/expression")
Query/Reply
In the query/reply paradigm, data is made available by a zenoh.Queryable and
requested by a zenoh.Querier or directly via zenoh.Session.get().
A zenoh.Queryable is declared using zenoh.Session.declare_queryable().
It serves zenoh.Query requests via a callback or channel
(Channels and callbacks).
The zenoh.Query provides the zenoh.Query.reply() method to reply with a
data sample of the zenoh.SampleKind.PUT kind, and
zenoh.Query.reply_del() to send a zenoh.SampleKind.DELETE reply.
See Publish/Subscribe for more details on the difference between the
two sample kinds. There is also the zenoh.Query.reply_err() method
which can be used to send a reply containing error information.
Data is requested from queryables via zenoh.Session.get() or via a
zenoh.Querier object. Each request returns zero or more
zenoh.Reply structures — one per queryable that matches the request.
Each reply contains either a zenoh.Sample from reply and reply_del
or a zenoh.ReplyError from reply_err.
Example: Declaring a queryable
# Queryable that replies with temperature data for a given day
queryable = session.declare_queryable("room/temperature/history")
query_count = 0
for query in queryable:
if "day" in query.selector.parameters:
day = query.selector.parameters["day"]
if day in temperature_data:
query.reply("room/temperature/history", temperature_data[day])
else:
query.reply_del("room/temperature/history")
else:
query.reply_err("missing day parameter")
Example: Requesting data using Session.get
# Request temperature for a specific day
replies = session.get("room/temperature/history?day=2023-03-15")
for reply in replies:
if reply.ok:
print(f">> Temperature is {reply.ok.payload.to_string()}")
else:
print(f">> Error: {reply.err.payload.to_string()}")
Example: Using a Querier
# Declare a querier for multiple queries
querier = session.declare_querier("room/temperature/history")
# Send a query with parameters
replies = querier.get(parameters="?day=2023-03-15")
for reply in replies:
if reply.ok:
print(f">> Temperature is {reply.ok.payload.to_string()}")
else:
print(f">> Error: {reply.err.payload.to_string()}")
Query Parameters
The query/reply API allows specifying additional parameters for the request.
A zenoh.Selector object is passed to the zenoh.Session.get() operation.
It combines a key expression with optional parameters and can be constructed from these
elements or by parsing a selector string. The selector string has
a syntax similar to a URL: it is a key expression followed by a question mark
and a list of parameters in the format “name=value”, separated by ;.
For example: key/expression?param1=value1;param2=value2.
Alternatively, parameters can be constructed programmatically using the
zenoh.Parameters class, which accepts a dictionary, and then combined
with a key expression to create a zenoh.Selector.
On the receiving side, queryables can access these parameters via
zenoh.Query.parameters.
Example: Constructing a Selector from dictionary
# Create parameters from a dictionary
params = zenoh.Parameters({"day": "2023-03-15", "format": "celsius"})
# Create a selector from key expression and parameters
selector = zenoh.Selector("room/temperature/history", params)
# Request data using the selector
replies = session.get(selector)
for reply in replies:
if reply.ok:
print(f">> {reply.ok.payload.to_string()}")
Data representation
Data is received as zenoh.Sample objects, which contain the
zenoh.Sample.payload and associated metadata like zenoh.Sample.timestamp,
zenoh.Sample.encoding, and zenoh.Sample.kind. Additionally, optional
user-defined metadata can be attached via zenoh.Sample.attachment.
Both zenoh.Sample.payload and zenoh.Sample.attachment are of type
zenoh.ZBytes, which represents raw byte data.
Example: Using zenoh.ZBytes
# Raw bytes
payload = zenoh.ZBytes(b"Hello, World!")
data = payload.to_bytes()
assert isinstance(data, bytes)
assert data == b"Hello, World!"
# String data
payload = zenoh.ZBytes("Hello, World!")
text = payload.to_string()
assert isinstance(text, str)
assert text == "Hello, World!"
Serialization and deserialization of basic types and structures is provided in the zenoh.ext
module via zenoh.ext.z_serialize() and zenoh.ext.z_deserialize().
Example: Data serialization
# Using zenoh.ext for serialization
from zenoh.ext import z_deserialize, z_serialize
# Serialize a dictionary
data = {"temperature": 25.5, "humidity": 60.0}
payload = z_serialize(data)
assert isinstance(payload, zenoh.ZBytes)
# Deserialize back
received = z_deserialize(dict[str, float], payload)
assert isinstance(received, dict)
assert received == {"temperature": 25.5, "humidity": 60.0}
Encoding
Zenoh uses zenoh.Encoding to indicate how data should be interpreted by the application. An encoding has a similar role to Content-Type in HTTP and is represented as a string in MIME-like format: type/subtype[;schema].
To optimize network usage, Zenoh internally maps some predefined encoding strings to integer identifiers. These encodings are provided as class attributes of the zenoh.Encoding class, such as zenoh.Encoding.ZENOH_BYTES, zenoh.Encoding.APPLICATION_JSON, etc. This internal mapping is not exposed to the application layer, but using these predefined encodings is more efficient than custom strings.
The Zenoh protocol does not impose any encoding value and does not operate on it. It can be seen as optional metadata that is carried over by Zenoh, allowing applications to perform different operations depending on the encoding value.
Additionally, a schema can be associated with the encoding. The convention is to use the ; separator if an encoding is created from a string. Alternatively, zenoh.Encoding.with_schema() can be used to add a schema to one of the predefined class attributes.
Example: Creating an zenoh.Encoding from a string and vice versa
encoding = zenoh.Encoding("text/plain")
text = str(encoding)
assert text == "text/plain"
Example: Using the schema
encoding1 = zenoh.Encoding("text/plain;utf-8")
encoding2 = zenoh.Encoding.TEXT_PLAIN.with_schema("utf-8")
assert encoding1 == encoding2
assert str(encoding1) == "text/plain;utf-8"
assert str(encoding2) == "text/plain;utf-8"
Scouting
Scouting is the process of discovering Zenoh nodes on the network. The scouting process depends on the transport layer and the Zenoh configuration. Note that it is not necessary to explicitly discover other nodes to publish, subscribe, or query data.
Scouting is performed using the zenoh.scout() function, which returns a
zenoh.Scout object that yields zenoh.Hello messages for each
discovered Zenoh node.
Scouting is different from liveliness requesting and monitoring. Liveliness
works on the Zenoh protocol logical level and allows getting information about resources in terms of
key expressions. On the other hand, scouting is about discovering Zenoh nodes visible
to the local node on the network. The result of scouting is a list of zenoh.Hello messages,
each containing information about a discovered Zenoh node:
unique node identifier (
zenoh.Hello.zid)node type (
zenoh.Hello.whatami)list of node’s network addresses (
zenoh.Hello.locators)
See more details at scouting documentation.
Example: Scouting for Zenoh nodes
scout = zenoh.scout(what="peer|router")
threading.Timer(1.0, lambda: scout.stop()).start()
for hello in scout:
print(hello)
Liveliness
Zenoh supports liveliness monitoring to notify when a specified resource appears or disappears on the network.
Sometimes it is necessary to know whether a Zenoh node is available. This can be achieved by declaring special publishers and queryables, but the dedicated liveliness API is more convenient and efficient.
The zenoh.Liveliness object is created by calling zenoh.Session.liveliness().
It allows a node to declare a zenoh.LivelinessToken associated with a key expression.
To declare the token, use zenoh.Liveliness.declare_token().
Other nodes can query this key expression using zenoh.Liveliness.get().
They can also subscribe using zenoh.Liveliness.declare_subscriber() to be notified when the token appears or disappears.
The history parameter of
zenoh.Liveliness.declare_subscriber() allows immediate receipt of tokens
that are already present on the network.
Example: Declaring a liveliness token
# Declare a liveliness token
token = session.liveliness().declare_token("node/A")
Example: Getting currently present liveliness tokens
# Get currently present liveliness tokens
replies = session.liveliness().get("node/A", timeout=5)
for reply in replies:
if reply.ok:
print(f"Alive token ('{reply.ok.key_expr}')")
else:
print(f"Received (ERROR: '{reply.err.payload.to_string()}')")
Example: Checking if a liveliness token is present and subscribing to changes
# Check if a liveliness token is present and subscribe to changes
subscriber = session.liveliness().declare_subscriber("node/A", history=True)
for sample in subscriber:
if sample.kind == zenoh.SampleKind.PUT:
print(f"Alive token ('{sample.key_expr}')")
elif sample.kind == zenoh.SampleKind.DELETE:
print(f"Dropped token ('{sample.key_expr}')")
Matching
The matching API lets the active side of communication (publisher or querier) learn whether there are interested parties on the other side (subscriber or queryable). This information can save bandwidth and CPU resources.
Declare a zenoh.MatchingListener via
zenoh.Publisher.declare_matching_listener() or
zenoh.Querier.declare_matching_listener().
The matching listener behaves like a subscriber, but instead of producing data
samples it yields zenoh.MatchingStatus instances whenever the matching
status changes — for example, when the first matching subscriber or queryable
appears or when the last one disappears.
Example: Declaring a matching listener for a publisher
# Declare a matching listener for a publisher
publisher = session.declare_publisher("key/expression")
listener = publisher.declare_matching_listener()
for status in listener:
if status.matching:
print(">> Publisher has at least one matching subscriber")
else:
print(">> Publisher has no matching subscribers")
Example: Declaring a matching listener for a querier
# Declare a matching listener for a querier
querier = session.declare_querier("service/endpoint")
listener = querier.declare_matching_listener()
for status in listener:
if status.matching:
print(">> Querier has at least one matching queryable")
else:
print(">> Querier has no matching queryables")
Channels and callbacks
There are two ways to receive sequential data from Zenoh primitives (for
example, a series of zenoh.Sample objects from a
zenoh.Subscriber or zenoh.Reply objects from a
zenoh.Query): by channel or by callback.
This behavior is controlled by the handler parameter of the declare
methods (for example, zenoh.Session.declare_subscriber() and
zenoh.Session.declare_querier()). The parameter can be either a callable
(a function or a method) or a channel type (blocking
zenoh.handlers.FifoChannel or non-blocking zenoh.handlers.RingChannel).
By default, the handler parameter is None, which uses
zenoh.handlers.DefaultHandler (a FIFO channel with default capacity).
Channels
When constructed with a zenoh.handlers.FifoChannel or zenoh.handlers.RingChannel
as handler (or using the default one), the returned object is iterable
and can be used in a for loop to receive data sequentially. It also provides explicit
methods such as zenoh.Subscriber.recv() to wait for data and
zenoh.Subscriber.try_recv() to attempt a non-blocking receive. The
subscriber (or queryable) is automatically undeclared when the object goes out of scope
or when zenoh.Subscriber.undeclare() is explicitly called.
# Default channel
subscriber_default = session.declare_subscriber("key/expr")
# Explicit FIFO channel with custom capacity
subscriber_fifo = session.declare_subscriber(
"key/expr", zenoh.handlers.FifoChannel(100)
)
# Ring channel (drops oldest when full)
subscriber_ring = session.declare_subscriber("key/expr", zenoh.handlers.RingChannel(50))
Callbacks
Caution
Calling Zenoh API functions, as well as performing any blocking operations from within a callback is disallowed. Even if this works in some particular cases, it’s unsafe and may lead to deadlocks or crashes at any moment or with the future updates of the library.
It is possible to pass a callable object as handler. This callable is invoked for each received
zenoh.Sample or zenoh.Reply. This also means the subscriber or queryable runs in
background mode, i.e., it remains active even if the returned object
goes out of scope. This allows declaring a subscriber without managing the
returned object’s lifetime.
def on_sample(sample):
print(sample.payload.to_string())
# Subscriber runs in background mode
subscriber = session.declare_subscriber("key/expr", on_sample)
# The subscriber remains active even if 'subscriber' variable is not used
For more advanced callback handling, you can use zenoh.handlers.Callback
to create a callback handler with cleanup functionality.
def on_sample(sample):
print(sample.payload.to_string())
def on_cleanup():
print("Subscriber undeclared")
callback = zenoh.handlers.Callback(on_sample, drop=on_cleanup)
subscriber = session.declare_subscriber("key/expr", callback)
# The subscriber remains active even if 'subscriber' variable is not used
Custom channel implementation
For advanced use cases, you can implement your own custom channel in Python and pass
it in the tuple form (callback, handler) where callback is a callable and handler
is your custom Python object. This solution has the same performance penalties as the
callback API, but it can be useful in some scenarios.
The callback is invoked for each received item and stores the data in the custom channel,
which is accessible via the zenoh.Subscriber.handler() property, in the same way
as with built-in channels.
Custom channel with priority queue
class PriorityChannel:
def __init__(self, maxsize=100):
self.queue: queue.PriorityQueue = queue.PriorityQueue(maxsize)
# Counter to preserve FIFO order for samples with same priority
self._counter = 0
def recv(self) -> zenoh.Sample:
return self.queue.get()[2]
def send(self, sample: zenoh.Sample):
self.queue.put((sample.priority, self._counter, sample))
self._counter += 1
Usage of the custom channel
with zenoh.open(zenoh.Config()) as session:
channel = PriorityChannel(maxsize=50)
subscriber = session.declare_subscriber("key/expression", (channel.send, channel))
sample = subscriber.handler.recv()
print(f">> Received: {sample.payload.to_string()}")