import copy
import typing
import numpy as np
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO
from ophyd.utils.epics_pvs import _wait_for_value
[docs]
def add_components_to_device(
obj: Device,
components: typing.Iterable[tuple[str, Component]],
*,
for_each_sig: typing.Optional[typing.Callable] = None,
):
"""
Add a collection of components to a device, after it has been initialized.
Parameters
----------
obj : Device
The device to which the components will be added.
components : iterable of (component name, component) tuples
The components that will be added to `obj`.
for_each_sig : callable, optional
Callback that is called on each signal addition, with signature (name: str, sig: Signal) -> Any.
By default, it does nothing.
One common usage is to call setattr of the signal to its parent.
Examples
--------
Add four signals named ``channel_{x}`` to ``obj``, each with prefix ``CH{x}``:
.. code-block:: python
components = (
(
f"channel_{i}",
Component(EpicsSignal, f"CH{i}:"),
)
for i in range(1, 5)
)
add_components_to_device(
obj, components, for_each_sig=lambda name, sig: setattr(self, name, sig)
)
Add an arbitrary number of signals named ``scale_{x}`` to ``obj``,
each with prefix ``SC{x}``, while also adding them to a list ``scales``:
.. code-block:: python
def for_each_sig(name, sig):
setattr(self, name, sig)
self.scales.append(sig)
n_scales = 8
components = (
(f"scale_{i}", Component(Scale, f"SC{i}:")) for i in range(n_scales)
)
add_components_to_device(self, components, for_each_sig=for_each_sig)
"""
if not hasattr(obj.__class__, "_old_sig_attrs"):
obj.__class__._old_sig_attrs = copy.deepcopy(obj._sig_attrs)
obj._sig_attrs = copy.deepcopy(obj.__class__._old_sig_attrs)
for component_name, component in components:
component.__set_name__(component, component_name)
obj._sig_attrs[component_name] = component
obj._component_kinds[component_name] = component.kind
obj._instantiate_component(component_name)
if for_each_sig is not None:
for_each_sig(name=component_name, sig=obj._signals[component_name])
[docs]
class EpicsSignalWithCustomReadout(EpicsSignal):
"""
An EpicsSignal subclass extending the validation of the result of a 'set' operation.
This is useful in cases where the readout of a particular 'set' is different from the
value you set, like when making a command to an IOC, and expecting a "Done" string in return.
Parameters
----------
enforce_type : type, optional
Whether to try to apply a type conversion to the readout value.
If not set, defaults to not trying any type conversion (the default EpicsSignal behavior).
"""
def __init__(self, read_pv, write_pv, enforce_type=None, **kwargs):
super(EpicsSignalWithCustomReadout, self).__init__(
read_pv=read_pv, write_pv=write_pv, **kwargs
)
self._expected_readout = None
self._enforce_type = enforce_type
# FIXME: Use the default value for timeout
[docs]
def set(self, value, *, expected_readout=None, timeout=5.0, settle_time=None):
"""
Set the value of the Signal and return a Status object.
If put completion is used for this EpicsSignal, the status object will
complete once EPICS reports the put has completed.
Otherwise the readback will be polled until equal to 'expected_readout',
and if 'enforce_type' was set in the constructor, both of the values will
be cast to that type, raising an Exception if the conversion is not possible.
Parameters
----------
value : any
expected_readout : any, optional
Expected value of the 'read_pv' after successfully setting the value.
If not set, defaults to 'value'.
timeout : float, optional
Maximum time to wait.
settle_time: float, optional
Delay after the set() has completed to indicate completion
to the caller
Returns
-------
st : Status
See Also
--------
EpicsSignal.set
"""
self._expected_readout = (
expected_readout if expected_readout is not None else value
)
if self._enforce_type is not None:
self._expected_readout = create_loose_comparator(
self._enforce_type, self._expected_readout
)
return super(EpicsSignalWithCustomReadout, self).set(
value=value, timeout=timeout, settle_time=settle_time
)
def _set_and_wait(self, value, timeout, **kwargs):
self.put(value, **kwargs)
if is_loose_comparator(self._expected_readout) and (
self.tolerance or self.rtolerance
):
self._expected_readout.atol = self.tolerance
self._expected_readout.rtol = self.rtolerance
self.tolerance = None
self.rtolerance = None
_wait_for_value(
self,
self._expected_readout,
poll_time=0.01,
timeout=timeout,
atol=self.tolerance,
rtol=self.rtolerance,
)
[docs]
class EpicsSignalWithCustomReadoutRBV(EpicsSignalWithCustomReadout):
"""An EpicsSignalWithCustomReadout subclass setting the read_pv to 'write_pv + _RBV' by default."""
def __init__(self, write_pv, **kwargs):
super().__init__(read_pv=write_pv + "_RBV", write_pv=write_pv, **kwargs)
[docs]
class EpicsSignalMon(EpicsSignalRO):
def __init__(self, prefix, **kwargs):
super().__init__(prefix + "-Mon", **kwargs)
[docs]
class EpicsSignalWithRBSP(EpicsSignal):
"""
A simple Signal with a similar logic of EpicsSignalWithRBV, but
pvname is -RB and write_pv is -SP.
"""
def __init__(self, prefix, **kwargs):
super().__init__(prefix + "-RB", write_pv=prefix + "-SP", **kwargs)
[docs]
class EpicsSignalCmd(EpicsSignal):
"""
A EpicsSignal with Cmd added to prefix.
"""
def __init__(self, prefix, **kwargs):
super().__init__(prefix + "-Cmd", **kwargs)
def create_loose_comparator(common_type, readout):
ret = _LooseComparator(common_type)()
ret._value = readout
return ret
def is_loose_comparator(obj):
return hasattr(obj, "_loose_comparator")
class _LooseComparator:
"""
An object that will try to cast itself and other values to a common type in a comparison.
The common type must be a numerical type, otherwise the behavior is undefined.
For example, this can be used to automatically cast string variables into float for numerical comparisons.
"""
class _WrapperObjectMetaclass(type):
def __new__(cls, class_name, class_parents, class_attrs, common_type=object):
if "_value" not in class_attrs:
class_attrs["_value"] = None
def repr(self):
return "Wrapper object: Value={}, Common type={}".format(
self._value, common_type
)
class_attrs["__repr__"] = repr
# fmt: off
binary_attrs_list = [
"__lt__", "__le__", "__gt__", "__ge__",
"__add__", "__sub__", "__mul__", "__matmul__", "__truediv__", "__floordiv__", "__mod__", "__divmod__", "__pow__", "__lshift__", "__rshift__", "__and__", "__xor__", "__or__",
"__radd__", "__rsub__", "__rmul__", "__rmatmul__", "__rtruediv__", "__rfloordiv__", "__rmod__", "__rdivmod__", "__rpow__", "__rlshift__", "__rrshift__", "__rand__", "__rxor__", "__ror__",
"__iadd__", "__isub__", "__imul__", "__imatmul__", "__itruediv__", "__ifloordiv__", "__imod__", "__ipow__", "__ilshift__", "__irshift__", "__iand__", "__ixor__", "__ior__",
]
# fmt: on
def create_binary_attr_impl(attr):
def attrs_impl(self, other):
# Calls 'common_type(self) <attr> common_type(other)'
return getattr(common_type(self._value), attr)(common_type(other))
return attrs_impl
# Special case for __eq__ and __ne__ to deal with tolerance
def eq_close(self, other):
atol = getattr(self, "atol", 0) or 0
rtol = getattr(self, "rtol", 0) or 0
a = common_type(self._value)
b = common_type(other)
return np.isclose(a, b, atol=atol, rtol=rtol)
def ne_close(self, other):
return not eq_close(self, other)
class_attrs["__eq__"] = eq_close
class_attrs["__ne__"] = ne_close
class_attrs.update(
{attr: create_binary_attr_impl(attr) for attr in binary_attrs_list}
)
return super().__new__(cls, class_name, class_parents, class_attrs)
def __init__(self, common_type):
class _WrapperObject(
metaclass=_LooseComparator._WrapperObjectMetaclass, common_type=common_type
):
_loose_comparator = None
self.__inner_cls = _WrapperObject
def __call__(self, arg=None):
ret = self.__inner_cls()
ret._value = arg
return ret