Source code for sophys.common.devices.crio

import threading

from ophyd import (
    Component,
    FormattedComponent as FormattedCpt,
    Device,
    EpicsSignal,
    EpicsSignalRO,
    EpicsSignalWithRBV,
    Kind,
)
from ophyd.device import create_device_from_components
from ophyd.status import Status

# Useful resources:
# https://cnpemcamp.sharepoint.com/sites/lnls/groups/sol/SitePages/CRIO-PVs.aspx


class _BaseCRIO(Device):
    pv_averaging_time = FormattedCpt(
        EpicsSignal, "{global_prefix}PvAvgTime", kind=Kind.config
    )
    file_averaging_time = FormattedCpt(
        EpicsSignal, "{global_prefix}FileAvgTime", kind=Kind.config
    )
    saving_to_file = FormattedCpt(
        EpicsSignalRO, "{global_prefix}AnalogSaving2File", kind=Kind.omitted
    )
    disable_file_close = FormattedCpt(
        EpicsSignal, "{global_prefix}DisableFileClose", kind=Kind.omitted
    )

    def __init__(self, prefix: str, **kwargs):
        # Get the prefix before the second to last ':' (without the card information)
        self.global_prefix = prefix[:-1].rpartition(":")[0] + ":"

        super().__init__(prefix, **kwargs)

        self._seen_devices = set()

    def cached_trigger(self, calling_child):
        """Call `trigger` only once per `trigger_and_read` operation."""
        # Only go forward if seen_devices is empty (first time here), or if
        # calling_child is in seen_devices (another trigger_and_read pair).
        # NOTE: This is so that we only trigger once for each point in a scan.
        if len(self._seen_devices) != 0 and calling_child not in self._seen_devices:
            self._seen_devices.add(calling_child)

            sts = Status()
            sts.set_finished()
            return sts

        self._seen_devices.clear()
        self._seen_devices.add(calling_child)

        return self.trigger()

    def trigger(self):
        sts = super().trigger()

        # Restart averaging window
        avg_time = self.pv_averaging_time.get()
        # If too small for the firmware, it will use the smallest it can.
        self.pv_averaging_time.set(0.001).wait()
        self.pv_averaging_time.set(avg_time).wait()

        status = Status()

        threading.Timer(avg_time, status.set_finished).start()

        return sts & status


class BaseAnalogInput(EpicsSignalRO):
    def trigger(self):
        sts = super().trigger()
        sts &= self.parent.cached_trigger(self)
        return sts


def __createAnalogIn(num: int) -> dict:
    components = {}
    for i in range(num):
        components.update(
            {
                f"ai{i}": Component(BaseAnalogInput, f"ai{i}", kind=Kind.hinted),
                f"ai{i}_offset": Component(
                    EpicsSignal, f"ai{i}_Offset", kind=Kind.config
                ),
                f"ai{i}_scale_factor": Component(
                    EpicsSignal, f"ai{i}_SF", kind=Kind.config
                ),
            }
        )
    return components


__CRIO_9215_docstring = """
An Ophyd device for the NI-9215 CompactRIO device, with 4 analog inputs.
"""

__CRIO_9220_docstring = """
An Ophyd device for the NI-9220 CompactRIO device, with 16 analog inputs.
"""

__CRIO_9223_docstring = """
An Ophyd device for the NI-9223 CompactRIO device, with 4 analog inputs.
"""


CRIO_9215 = create_device_from_components(
    "CRIO_9215",
    base_class=_BaseCRIO,
    docstring=__CRIO_9215_docstring,
    **__createAnalogIn(4),
)
CRIO_9220 = create_device_from_components(
    "CRIO_9220",
    base_class=_BaseCRIO,
    docstring=__CRIO_9220_docstring,
    **__createAnalogIn(16),
)
CRIO_9223 = create_device_from_components(
    "CRIO_9223",
    base_class=_BaseCRIO,
    docstring=__CRIO_9223_docstring,
    **__createAnalogIn(4),
)


[docs] class CRIO_9403(_BaseCRIO): """ An Ophyd device for the NI-9403 CompactRIO device. Commonly used as a :ref:`tatu`, this device contains low-level debug signals for such usages. """ bi0 = Component(EpicsSignalRO, "bi0") bi1 = Component(EpicsSignalRO, "bi1") bi2 = Component(EpicsSignalRO, "bi2") bi3 = Component(EpicsSignalRO, "bi3") bo4 = Component(EpicsSignalWithRBV, "bo4") bo5 = Component(EpicsSignalWithRBV, "bo5") bo6 = Component(EpicsSignalWithRBV, "bo6") bo7 = Component(EpicsSignalWithRBV, "bo7") bi8 = Component(EpicsSignalRO, "bi8") bi9 = Component(EpicsSignalRO, "bi9") bi10 = Component(EpicsSignalRO, "bi10") bi11 = Component(EpicsSignalRO, "bi11") bo12 = Component(EpicsSignalWithRBV, "bo12") bo13 = Component(EpicsSignalWithRBV, "bo13") bo14 = Component(EpicsSignalWithRBV, "bo14") bo15 = Component(EpicsSignalWithRBV, "bo15") bi16 = Component(EpicsSignalRO, "bi16") bi17 = Component(EpicsSignalRO, "bi17") bi18 = Component(EpicsSignalRO, "bi18") bi19 = Component(EpicsSignalRO, "bi19") bo20 = Component(EpicsSignalWithRBV, "bo20") bo21 = Component(EpicsSignalWithRBV, "bo21") bo22 = Component(EpicsSignalWithRBV, "bo22") bo23 = Component(EpicsSignalWithRBV, "bo23")