import asyncio
import qth
from yarp import NoValue, Value
from .version import __version__
__names__ = [
"set_property",
"get_property",
"watch_event",
"send_event",
"run_forever",
"set_default_qth_client",
"get_default_qth_client",
]
_default_client = None
[docs]def set_default_qth_client(client):
"""
Set the default :py:class:`qth.Client` object to be used by qth_yarp.
"""
global _default_client
_default_client = client
[docs]def get_default_qth_client():
"""
Get the default :py:class:`qth.Client` object.
One will be created with the name ``qth_yarp_based_client`` if no client
has been provided by :py:func:`set_default_client`.
"""
global _default_client
if _default_client is None:
_default_client = qth.Client("qth_yarp_based_client",
"A qth_yarp based client.")
return _default_client
[docs]def get_property(topic, initial_value=None,
register=False, description=None, one_to_many=False,
delete_on_unregister=True,
qth_client=None, **kwargs):
"""
Return a (continuous) :py:class:`yarp.Value` containing the value of a qth property.
Parameters
----------
topic : str
The Qth topic name. (Not verified for existance).
initial_value : object
The initial value to assign to the returned :py:class:`yarp.Value`
while waiting for the property to arrive from Qth. Defaults to ``None``
but you may wish to choose an alternative dummy value which won't crash
later processing.
If (and only if) ``register`` is True, also sets the Qth property to
this value.
register : bool
If True, registers this property with Qth. The 'description' argument
must also be provided if this is True.
description : str
If ``register`` is True, the description of the property to include
with the registration.
one_to_many : bool
If ``register`` is True, is this a one-to-many (True) or many-to-one
(False) property. Defaults to many-to-one (False).
on_unregister : value
(Keyword-only argument). The value to set this property to when this
Qth client disconnects. If not provided, no change is made. If this
argument is used, set ``delete_on_unregister`` to False since it
defaults to ``True`` and will conflict with this argument.
delete_on_unregister : bool
(Keyword-only argument). Should this value be deleted when this Qth
client disconnects? Defaults to ``True``.
qth_client : :py:class:`qth.Client`
The Qth :py:class:`qth.Client` object to use. If not provided, uses the
client returned by :py:func:`get_default_qth_client`.
"""
qth_client = qth_client or get_default_qth_client()
output_value = Value(initial_value)
def set_value(topic, value):
output_value.value = value
async def bind_value():
if register:
await qth_client.register(
topic,
qth.PROPERTY_ONE_TO_MANY
if one_to_many else
qth.PROPERTY_MANY_TO_ONE,
description,
delete_on_unregister=delete_on_unregister,
**kwargs)
await qth_client.set_property(topic, initial_value)
await qth_client.watch_property(topic, set_value)
asyncio.create_task(bind_value())
return output_value
[docs]def watch_event(topic,
register=False, description=None, one_to_many=False,
qth_client=None, **kwargs):
"""
Return an (instantaneous) :py:class:`yarp.Value` representing a qth event.
Parameters
----------
topic : str
The Qth topic name. (Not verified for existance).
register : bool
If True, registers this event with Qth. The 'description' argument
must also be provided if this is True.
description : str
If ``register`` is True, the description of the event to include with
the registration.
one_to_many : bool
If ``register`` is True, is this a one-to-many (True) or many-to-one
(False) event. Defaults to many-to-one (False).
on_unregister : value
(Keyword-only argument). The value send for this event to when this
Qth client disconnects. If not provided, event is sent.
qth_client : :py:class:`qth.Client`
The Qth :py:class:`qth.Client` object to use. If not provided, uses the
client returned by :py:func:`get_default_qth_client`.
"""
qth_client = qth_client or get_default_qth_client()
output_value = Value()
def set_value(topic, value):
output_value.set_instantaneous_value(value)
async def bind_value():
if register:
await qth_client.register(
topic,
qth.EVENT_ONE_TO_MANY
if one_to_many else
qth.EVENT_MANY_TO_ONE,
description,
**kwargs)
await qth_client.watch_event(topic, set_value)
asyncio.create_task(bind_value())
return output_value
[docs]def set_property(topic, value,
register=False, description=None, one_to_many=True,
delete_on_unregister=True,
ignore_no_value=True,
qth_client=None, **kwargs):
"""
Set a Qth property to the value of a continuous :py:class:`yarp.Value`.
Parameters
----------
topic : str
The Qth topic name. (Not verified for existance).
value : :py:class:`Value`
The :py:class:`yarp.Value` whose value will be written to the specified
qth property.
register : bool
If True, registers this property with Qth. The 'description' argument
must also be provided if this is True.
description : str
If ``register`` is True, the description of the property to include
with the registration.
one_to_many : bool
If ``register`` is True, is this a one-to-many (True) or many-to-one
(False) property. Defaults to one-to-many (True).
on_unregister : value
(Keyword-only argument). The value to set this property to when this
Qth client disconnects. If not provided, no change is made. If this
argument is used, set ``delete_on_unregister`` to False since it
defaults to ``True`` and will conflict with this argument.
delete_on_unregister : bool
(Keyword-only argument). Should this value be deleted when this Qth
client disconnects? Defaults to ``True``.
ignore_no_value : bool
(Keyword-only argument). Should the property not be written when
'NoValue' is set. Defaults to ``True``. If ``False``, the property will be
deleted when set to 'NoValue'.
qth_client : :py:class:`qth.Client`
The Qth :py:class:`qth.Client` object to use. If not provided, uses the
client returned by :py:func:`get_default_qth_client`.
"""
qth_client = qth_client or get_default_qth_client()
@value.on_value_changed
def update_property(new_value):
if new_value is NoValue:
if ignore_no_value:
pass
else:
asyncio.create_task(qth_client.delete_property(topic))
else:
asyncio.create_task(qth_client.set_property(topic, new_value))
async def bind_value():
if register:
await qth_client.register(
topic,
qth.PROPERTY_ONE_TO_MANY
if one_to_many else
qth.PROPERTY_MANY_TO_ONE,
description,
delete_on_unregister=delete_on_unregister,
**kwargs)
update_property(value.value)
asyncio.create_task(bind_value())
[docs]def send_event(topic, value,
register=False, description=None, one_to_many=True,
qth_client=None, **kwargs):
"""
Return an (instantaneous) :py:class:`yarp.Value` representing a qth event.
Parameters
----------
topic : str
The Qth topic name. (Not verified for existance).
value : :py:class:`yarp.Value`
An instantaneous :py:class:`yarp.Value` whose changes will be turned
into qth events.
register : bool
If True, registers this event with Qth. The 'description' argument
must also be provided if this is True.
description : str
If ``register`` is True, the description of the event to include with
the registration.
one_to_many : bool
If ``register`` is True, is this a one-to-many (True) or many-to-one
(False) event. Defaults to one-to-many (True).
on_unregister : value
(Keyword-only argument). The value send for this event to when this
Qth client disconnects. If not provided, event is sent.
qth_client : :py:class:`qth.Client`
The Qth :py:class:`qth.Client` object to use. If not provided, uses the
client returned by :py:func:`get_default_qth_client`.
"""
qth_client = qth_client or get_default_qth_client()
@value.on_value_changed
def update_event(new_value):
if new_value is not NoValue:
asyncio.create_task(qth_client.send_event(topic, new_value))
async def bind_value():
if register:
await qth_client.register(
topic,
qth.EVENT_ONE_TO_MANY
if one_to_many else
qth.EVENT_MANY_TO_ONE,
description,
**kwargs)
asyncio.create_task(bind_value())
[docs]def run_forever():
"""
Run the main event loop forever.
This function is a convenience method which is exactly equivalent to
calling::
import asyncio
loop = asyncio.get_event_loop()
loop.run_forever()
"""
asyncio.get_event_loop().run_forever()