from dataclasses import dataclass, fields
from io import IOBase
import sys
from typing import Union
from bluesky import RunEngine
from ophyd.areadetector.plugins import HDF5Plugin
from ophyd.areadetector.filestore_mixins import FileStoreHDF5IterativeWrite
from .callbacks import * # noqa: F403
from .registry import * # noqa: F403
from .signals import * # noqa: F403
[docs]
class HDF5PluginWithFileStore(HDF5Plugin, FileStoreHDF5IterativeWrite):
warmup_acquire_time: float = 1.0
warmup_acquire_period: float = 1.0
warmup_signal_timeout: float = 5.0
def __init__(
self,
prefix: str,
*,
override_path: bool = False,
write_path_template="/tmp",
read_attrs=[],
**kwargs,
):
"""
A simple HDF5 plugin class, with some additional behavior (see Parameters and Attributes below).
Parameters
----------
prefix : str
Prefix of this plugin (e.g. 'HDF1:')
override_path : bool, optional
Whether to use the common override logic of HDF5 ophyd plugins.
By default, it is disabled (the currently configured PV values will be used)
write_path_template : str, optional
Used when `override_path` is `True`. By default it is `/tmp`.
Attributes
----------
warmup_acquire_time : float
The value to set AcquireTime when doing a warmup. Defaults to 1.0.
warmup_acquire_period : float
The value to set AcquirePeriod when doing a warmup. Defaults to 1.0.
warmup_signal_timeout : float
The timeout to pass to each set().wait() call in the warmup. Defaults to 5.0.
"""
super().__init__(
prefix,
write_path_template=write_path_template,
read_attrs=read_attrs,
**kwargs,
)
self._override_path = override_path
[docs]
def warmup(self):
"""
A convenience method for 'priming' the plugin.
The plugin has to 'see' one acquisition before it is ready to capture.
This sets the array size, etc.
This method is an override of the similar-named method in HDF5Plugin.
"""
self.enable.set(1).wait()
from collections import OrderedDict
sigs = OrderedDict(
[
(self.parent.cam.array_callbacks, 1),
(self.parent.cam.image_mode, "Single"),
(self.parent.cam.trigger_mode, "Internal"),
(self.parent.cam.acquire_time, self.warmup_acquire_time),
(self.parent.cam.acquire_period, self.warmup_acquire_period),
(self.parent.cam.acquire, 1),
]
)
original_vals = {sig: sig.get() for sig in sigs}
for sig, val in sigs.items():
sig.set(val).wait(timeout=self.warmup_signal_timeout)
import time
while self.parent.cam.acquire.get() != 0:
time.sleep(0.01)
for sig, val in reversed(list(original_vals.items())):
sig.set(val).wait(timeout=self.warmup_signal_timeout)
[docs]
def make_filename(self):
if self._override_path:
return super().make_filename()
filename = self.file_name.get()
read_path = self.file_path.get()
write_path = read_path
return filename, read_path, write_path
[docs]
def stage(self):
if not self._override_path:
# stage_sigs will be ran after the override takes place.
self.stage_sigs["file_path"] = self.file_path.get()
self.stage_sigs["file_name"] = self.file_name.get()
self.stage_sigs["file_number"] = self.file_number.get()
# NOTE: Since stage_sigs is a OrderedDict, the insertion order matters.
# Here, we need to set file_number before starting the capture, so the
# file name sees such change, so capture must go last.
del self.stage_sigs["capture"]
self.stage_sigs["capture"] = 1
if not self.file_path_exists.get():
raise IOError(
"Path %s does not exist on IOC." "" % self.file_path.get()
)
super().stage()
[docs]
def unstage(self):
auto_increment = False
if not self._override_path:
# The super().unstage call will override our values to
# the dumb defaults. Before that, however, we have the
# updated values.
file_path = self.file_path.get()
file_name = self.file_name.get()
file_number = self.file_number.get()
auto_increment = self.auto_increment.get()
super().unstage()
if not self._override_path:
self.file_path.set(file_path).wait()
self.file_name.set(file_name).wait()
self.file_number.set(file_number + (1 if auto_increment else 0)).wait()
[docs]
class HDF5PluginWithFileStoreV34(HDF5PluginWithFileStore):
file_number_sync = None
file_number_write = None
pool_max_buffers = None
[docs]
@dataclass
class DebugOptions:
file: Union[IOBase, str, None] = sys.stdout
"""Where to save the log."""
datefmt: str = "%H:%M:%S"
"""What formatting to use for the date of an event."""
color: bool = True
"""Whether to use colored output in the logs."""
level: Union[str, int] = "DEBUG"
"""What is the minimum level of logging that will be processed."""
def asdict(self):
# https://docs.python.org/3/library/dataclasses.html#dataclasses.asdict
# Workaround for deepcopy failing on IOBase subclasses
return dict((field.name, getattr(self, field.name)) for field in fields(self))
[docs]
@staticmethod
def info_level():
"""Create a `DebugOptions` object with no `DEBUG` messages processed."""
return DebugOptions(level="INFO")
[docs]
def set_debug_mode(
run_engine: RunEngine,
bluesky_debug: Union[bool, DebugOptions, None] = None,
ophyd_debug: Union[bool, DebugOptions, None] = None,
mock_commands: bool = True,
print_documents: bool = False,
print_messages: bool = False,
) -> dict:
"""
Enables / disables debugging facilities for bluesky / ophyd.
Parameters
----------
run_engine : RunEngine
The RunEngine on which to set the debug options.
Note that the bluesky and ophyd debug options are global, so they affect all RunEngines.
bluesky_debug : DebugOptions or bool, optional
Whether to enable / disable debug logging, or options to pass to bluesky's logging configuration.
Passing None will leave the configurations unchanged.
ophyd_debug : DebugOptions or bool, optional
Whether to enable / disable debug logging, options to pass to ophyd's logging configuration.
Passing None will leave the configurations unchanged.
mock_commands : bool, optional
Whether to mock all of 'run_engine's commands, replacing the default commands for dummy ones
that only print to stdout what it would have done. Defaults to True.
If it is set, the return's "old_commands" key will be set to a dictionary, in which
each command name (key) will be associated with the old command function (value).
print_documents : bool, optional
Whether to subscribe 'run_engine' to a callback that prints every document generated. Defaults to False.
If it is set, the return's "print_sub_id" key will be set to the subscription ID returned by the RunEngine.
print_messages : bool, optional
Whether to override 'run_engine's 'msg_hook' attribute to print every generated Msg object. Defaults to False.
If it is set, the return's "old_msg_hook" key will be set to the last function in 'msg_hook'.
"""
return_dict = {}
from bluesky.log import config_bluesky_logging
from ophyd.log import config_ophyd_logging
if bluesky_debug is not None:
if bluesky_debug is True:
bluesky_debug = DebugOptions()
elif bluesky_debug is False:
bluesky_debug = DebugOptions.info_level()
config_bluesky_logging(**bluesky_debug.asdict())
if ophyd_debug is not None:
if ophyd_debug is True:
ophyd_debug = DebugOptions()
elif ophyd_debug is False:
ophyd_debug = DebugOptions.info_level()
config_ophyd_logging(**ophyd_debug.asdict())
if mock_commands:
def mock_command(command):
async def __inner(msg):
print(" --- {}: {}".format(command, msg))
if msg.obj is not None:
return {msg.obj: None}
__inner.__doc__ = "RunEngine '{}' mock implementation.".format(command)
return __inner
return_dict["old_commands"] = dict()
for command in run_engine.commands:
return_dict["old_commands"][command] = run_engine._command_registry[command]
run_engine.register_command(command, mock_command(command))
if print_documents:
def pretty_doc_print(name, doc):
if name == "start":
print()
print("[{}] - {}".format(name, str(doc)))
return_dict["print_sub_id"] = run_engine.subscribe(pretty_doc_print)
if print_messages:
def pretty_msg_print(msg):
print("[{}] {}".format(msg.command, repr(msg)))
return_dict["old_msg_hook"] = run_engine.msg_hook
run_engine.msg_hook = pretty_msg_print
return return_dict
[docs]
def is_in_queueserver() -> bool:
"""
Returns whether the current code is running in a queueserver environment (e.g. in the startup sequence), or not.
This is very similar to queueserver's :func:`bluesky_queueserver.is_re_worker_active`, but without a hard dependency on queueserver.
"""
try:
from bluesky_queueserver import is_re_worker_active
except ImportError:
is_re_worker_active = lambda: False # noqa: E731
return is_re_worker_active()