import collections
import inspect
import os
import sys
import time
from collections import defaultdict
from functools import partial
from itertools import chain, zip_longest
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Union
import numpy as np
from cycler import Cycler
try:
# cytools is a drop-in replacement for toolz, implemented in Cython
from cytools import partition
except ImportError:
from toolz import partition
from . import plan_patterns, utils
from . import plan_stubs as bps
from . import preprocessors as bpp
from .protocols import Flyable, Movable, NamedMovable, Readable
from .utils import (
CustomPlanMetadata,
Msg,
MsgGenerator,
ScalarOrIterableFloat,
get_hinted_fields,
)
#: Plan function that can be used for each shot in a detector acquisition involving no actuation
PerShot = Callable[[Sequence[Readable], Optional[bps.TakeReading]], MsgGenerator]
#: Plan function that can be used for each step in a scan
PerStep1D = Callable[
[Sequence[Readable], Movable, Any, Optional[bps.TakeReading]],
MsgGenerator,
]
PerStepND = Callable[
[
Sequence[Readable],
Mapping[Movable, Any],
Dict[Movable, Any],
Optional[bps.TakeReading],
],
MsgGenerator,
]
PerStep = Union[PerStep1D, PerStepND]
def _check_detectors_type_input(detectors):
if not isinstance(detectors, Sequence):
raise TypeError("The input argument must be either as a list or a tuple of Readable objects.")
def derive_default_hints(motors: List[Any]) -> Dict[str, Sequence]:
x_fields = [field for motor in motors for field in get_hinted_fields(motor)]
default_dimensions = [(x_fields, "primary")] if x_fields else []
return {"dimensions": default_dimensions} if default_dimensions else {}
[docs]
def count(
detectors: Sequence[Readable],
num: Optional[int] = 1,
delay: ScalarOrIterableFloat = 0.0,
*,
per_shot: Optional[PerShot] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""
Take one or more readings from detectors.
Parameters
----------
detectors : list or tuple
list of 'readable' objects
num : integer, optional
number of readings to take; default is 1
If None, capture data until canceled
delay : iterable or scalar, optional
Time delay in seconds between successive readings; default is 0.
per_shot : callable, optional
hook for customizing action of inner loop (messages per step)
Expected signature ::
def f(detectors: Iterable[OphydObj]) -> Generator[Msg]:
...
md : dict, optional
metadata
Notes
-----
If ``delay`` is an iterable, it must have at least ``num - 1`` entries or
the plan will raise a ``ValueError`` during iteration.
"""
_check_detectors_type_input(detectors)
if num is None:
num_intervals = None
else:
num_intervals = num - 1
_md = {
"detectors": [det.name for det in detectors],
"num_points": num,
"num_intervals": num_intervals,
"plan_args": {"detectors": list(map(repr, detectors)), "num": num, "delay": delay},
"plan_name": "count",
"hints": {},
}
_md.update(md or {})
_md["hints"].setdefault("dimensions", [(("time",), "primary")]) # type: ignore
# per_shot might define a different stream, so do not predeclare primary
predeclare = per_shot is None and os.environ.get("BLUESKY_PREDECLARE", False)
msg_per_step: PerShot = per_shot if per_shot else bps.one_shot
@bpp.stage_decorator(detectors)
@bpp.run_decorator(md=_md)
def inner_count() -> MsgGenerator[str]:
if predeclare:
yield from bps.declare_stream(*detectors, name="primary")
return (yield from bps.repeat(partial(msg_per_step, detectors), num=num, delay=delay))
return (yield from inner_count())
[docs]
def list_scan(
detectors: Sequence[Readable],
*args: Tuple[Union[Movable, Any], List[Any]],
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""
Scan over one or more variables in steps simultaneously (inner product).
Parameters
----------
detectors : list or tuple
list of 'readable' objects
*args :
For one dimension, ``motor, [point1, point2, ....]``.
In general:
.. code-block:: python
motor1, [point1, point2, ...],
motor2, [point1, point2, ...],
...,
motorN, [point1, point2, ...]
Motors can be any 'settable' object (motor, temp controller, etc.)
per_step : callable, optional
hook for customizing action of inner loop (messages per step)
Expected signature:
``f(detectors, motor, step) -> plan (a generator)``
md : dict, optional
metadata
See Also
--------
:func:`bluesky.plans.rel_list_scan`
:func:`bluesky.plans.list_grid_scan`
:func:`bluesky.plans.rel_list_grid_scan`
"""
_check_detectors_type_input(detectors)
if len(args) % 2 != 0:
raise ValueError("The list of arguments must contain a list of points for each defined motor")
md = md or {} # reset md if it is None.
# set some variables and check that all lists are the same length
lengths = {}
motors: List[Any] = []
pos_lists = []
length = None
for motor, pos_list in partition(2, args):
pos_list = list(pos_list) # Ensure list (accepts any finite iterable).
lengths[motor.name] = len(pos_list)
if not length:
length = len(pos_list)
motors.append(motor)
pos_lists.append(pos_list)
length_check = all(elem == list(lengths.values())[0] for elem in list(lengths.values()))
if not length_check:
raise ValueError(
"The lengths of all lists in *args must be the same. "
"However the lengths in args are : "
f"{lengths}"
)
md_args = list(chain(*((repr(motor), pos_list) for motor, pos_list in partition(2, args))))
motor_names = list(lengths.keys())
num_intervals: int = (length or 1) - 1
_md = {
"detectors": [det.name for det in detectors],
"motors": motor_names,
"num_points": length,
"num_intervals": num_intervals,
"plan_args": {"detectors": list(map(repr, detectors)), "args": md_args, "per_step": repr(per_step)},
"plan_name": "list_scan",
"plan_pattern": "inner_list_product",
"plan_pattern_module": plan_patterns.__name__,
"plan_pattern_args": dict(args=md_args), # noqa: C408
"hints": {},
}
_md.update(md or {})
# add the motor-based hints
_md["hints"] = derive_default_hints(motors)
assert isinstance(_md["hints"], dict), "Hints must be a dictionary"
# override any hints from the original md (if exists)
_md["hints"].update(md.get("hints", {}))
full_cycler = plan_patterns.inner_list_product(args)
return (yield from scan_nd(detectors, full_cycler, per_step=per_step, md=_md))
[docs]
def rel_list_scan(
detectors: Sequence[Readable],
*args: Union[Movable, Any],
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""
Scan over one variable in steps relative to current position.
Parameters
----------
detectors : list
list of 'readable' objects
*args :
For one dimension, ``motor, [point1, point2, ....]``.
In general:
.. code-block:: python
motor1, [point1, point2, ...],
motor2, [point1, point2, ...],
...,
motorN, [point1, point2, ...]
Motors can be any 'settable' object (motor, temp controller, etc.)
point1, point2 etc are relative to the current location.
motor : object
any 'settable' object (motor, temp controller, etc.)
steps : list
list of positions relative to current position
per_step : callable, optional
hook for customizing action of inner loop (messages per step)
Expected signature: ``f(detectors, motor, step)``
md : dict, optional
metadata
See Also
--------
:func:`bluesky.plans.list_scan`
:func:`bluesky.plans.list_grid_scan`
:func:`bluesky.plans.rel_list_grid_scan`
"""
# TODO read initial positions (redundantly) so they can be put in md here
_md = {"plan_name": "rel_list_scan"}
_md.update(md or {})
motors = [motor for motor, pos_list in partition(2, args)]
@bpp.reset_positions_decorator(motors)
@bpp.relative_set_decorator(motors)
def inner_relative_list_scan():
return (yield from list_scan(detectors, *args, per_step=per_step, md=_md))
return (yield from inner_relative_list_scan())
[docs]
def list_grid_scan(
detectors: Sequence[Readable],
*args: Union[Movable, Any],
snake_axes: bool = False,
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""
Scan over a mesh; each motor is on an independent trajectory.
Parameters
----------
detectors: list or tuple
list of 'readable' objects
args: list
patterned like (``motor1, position_list1,``
``motor2, position_list2,``
``motor3, position_list3,``
``...,``
``motorN, position_listN``)
The first motor is the "slowest", the outer loop. ``position_list``'s
are lists of positions, all lists must have the same length. Motors
can be any 'settable' object (motor, temp controller, etc.).
snake_axes: boolean or iterable, optional
which axes should be snaked, either ``False`` (do not snake any axes),
``True`` (snake all axes) or a list of axes to snake. "Snaking" an axis
is defined as following snake-like, winding trajectory instead of a
simple left-to-right trajectory.The elements of the list are motors
that are listed in `args`. The list must not contain the slowest
(first) motor, since it can't be snaked.
per_step: callable, optional
hook for customizing action of inner loop (messages per step).
See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default)
for details.
md: dict, optional
metadata
See Also
--------
:func:`bluesky.plans.rel_list_grid_scan`
:func:`bluesky.plans.list_scan`
:func:`bluesky.plans.rel_list_scan`
"""
full_cycler = plan_patterns.outer_list_product(args, snake_axes)
md_args = []
motor_names = []
motors = []
for i, (motor, pos_list) in enumerate(partition(2, args)): # noqa: B007
md_args.extend([repr(motor), pos_list])
motor_names.append(motor.name)
motors.append(motor)
_md = {
"shape": tuple(len(pos_list) for motor, pos_list in partition(2, args)),
"extents": tuple([min(pos_list), max(pos_list)] for motor, pos_list in partition(2, args)),
"snake_axes": repr(snake_axes),
"plan_args": {"detectors": list(map(repr, detectors)), "args": md_args, "per_step": repr(per_step)},
"plan_name": "list_grid_scan",
"plan_pattern": "outer_list_product",
"plan_pattern_args": dict(args=md_args, snake_axes=repr(snake_axes)), # noqa: C408
"plan_pattern_module": plan_patterns.__name__,
"motors": tuple(motor_names),
"hints": {},
}
_md.update(md or {}) # type: ignore
try:
motor_hints = [(m.hints["fields"], "primary") for m in motors]
assert isinstance(_md["hints"], dict), "Hints must be a dictionary"
_md["hints"].setdefault("dimensions", motor_hints)
except (AttributeError, KeyError):
...
return (yield from scan_nd(detectors, full_cycler, per_step=per_step, md=_md))
[docs]
def rel_list_grid_scan(
detectors: Sequence[Readable],
*args: Union[Movable, Any],
snake_axes: bool = False,
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""
Scan over a mesh; each motor is on an independent trajectory. Each point is
relative to the current position.
Parameters
----------
detectors : list or tuple
list of 'readable' objects
args
patterned like (``motor1, position_list1,``
``motor2, position_list2,``
``motor3, position_list3,``
``...,``
``motorN, position_listN``)
The first motor is the "slowest", the outer loop. ``position_list``'s
are lists of positions, all lists must have the same length. Motors
can be any 'settable' object (motor, temp controller, etc.).
snake_axes : boolean or Iterable, optional
which axes should be snaked, either ``False`` (do not snake any axes),
``True`` (snake all axes) or a list of axes to snake. "Snaking" an axis
is defined as following snake-like, winding trajectory instead of a
simple left-to-right trajectory.The elements of the list are motors
that are listed in `args`. The list must not contain the slowest
(first) motor, since it can't be snaked.
per_step : callable, optional
hook for customizing action of inner loop (messages per step).
See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default)
for details.
md : dict, optional
metadata
See Also
--------
:func:`bluesky.plans.list_grid_scan`
:func:`bluesky.plans.list_scan`
:func:`bluesky.plans.rel_list_scan`
"""
_check_detectors_type_input(detectors)
_md = {"plan_name": "rel_list_grid_scan"}
_md.update(md or {})
motors = [motor for motor, pos_list in partition(2, args)]
@bpp.reset_positions_decorator(motors)
@bpp.relative_set_decorator(motors)
def inner_relative_list_grid_scan():
return (yield from list_grid_scan(detectors, *args, snake_axes=snake_axes, per_step=per_step, md=_md))
return (yield from inner_relative_list_grid_scan())
def _scan_1d(
detectors: Sequence[Readable],
motor: NamedMovable,
start: float,
stop: float,
num: int,
*,
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""
Scan over one variable in equally spaced steps.
Parameters
----------
detectors : list or tuple
list of 'readable' objects
motor : object
any 'settable' object (motor, temp controller, etc.)
start : float
starting position of motor
stop : float
ending position of motor
num : int
number of steps
per_step : callable, optional
hook for customizing action of inner loop (messages per step)
Expected signature: ``f(detectors, motor, step)``
md : dict, optional
metadata
See Also
--------
:func:`bluesky.plans.rel_scan`
"""
_check_detectors_type_input(detectors)
_md: CustomPlanMetadata = {
"detectors": [det.name for det in detectors],
"motors": [motor.name],
"num_points": num,
"num_intervals": num - 1,
"plan_args": {
"detectors": list(map(repr, detectors)),
"num": num,
"motor": repr(motor),
"start": start,
"stop": stop,
"per_step": repr(per_step),
},
"plan_name": "scan",
"plan_pattern": "linspace",
"plan_pattern_module": "numpy",
"plan_pattern_args": dict(start=start, stop=stop, num=num), # noqa: C408
"hints": {},
}
_md.update(md or {})
try:
dimensions = [(motor.hints["fields"], "primary")]
except (AttributeError, KeyError):
pass
else:
_md["hints"].setdefault("dimensions", dimensions) # type: ignore
if per_step is None:
per_step = bps.one_1d_step
steps = np.linspace(**_md["plan_pattern_args"])
@bpp.stage_decorator(list(detectors) + [motor])
@bpp.run_decorator(md=_md)
def inner_scan():
for step in steps:
yield from per_step(detectors, motor, step)
return (yield from inner_scan())
def _rel_scan_1d(
detectors: Sequence[Readable],
motor: Movable,
start: float,
stop: float,
num: int,
*,
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""
Scan over one variable in equally spaced steps relative to current positon.
Parameters
----------
detectors : list or tuple
list of 'readable' objects
motor : object
any 'settable' object (motor, temp controller, etc.)
start : float
starting position of motor
stop : float
ending position of motor
num : int
number of steps
per_step : callable, optional
hook for customizing action of inner loop (messages per step)
Expected signature: ``f(detectors, motor, step)``
md : dict, optional
metadata
See Also
--------
:func:`bluesky.plans.scan`
"""
_md = {"plan_name": "rel_scan"}
_md.update(md or {})
# TODO read initial positions (redundantly) so they can be put in md here
@bpp.reset_positions_decorator([motor])
@bpp.relative_set_decorator([motor])
def inner_relative_scan():
return (yield from _scan_1d(detectors, motor, start, stop, num, per_step=per_step, md=_md))
return (yield from inner_relative_scan())
[docs]
def log_scan(
detectors: Sequence[Readable],
motor: NamedMovable,
start: float,
stop: float,
num: int,
*,
per_step=None,
md=None,
) -> MsgGenerator[str]:
"""
Scan over one variable in log-spaced steps.
Parameters
----------
detectors : list or tuple
list of 'readable' objects
motor : object
any 'settable' object (motor, temp controller, etc.)
start : float
starting position of motor
stop : float
ending position of motor
num : int
number of steps
per_step : callable, optional
hook for customizing action of inner loop (messages per step)
Expected signature: ``f(detectors, motor, step)``
md : dict, optional
metadata
See Also
--------
:func:`bluesky.plans.rel_log_scan`
"""
_check_detectors_type_input(detectors)
_md: CustomPlanMetadata = {
"detectors": [det.name for det in detectors],
"motors": [motor.name],
"num_points": num,
"num_intervals": num - 1,
"plan_args": {
"detectors": list(map(repr, detectors)),
"num": num,
"start": start,
"stop": stop,
"motor": repr(motor),
"per_step": repr(per_step),
},
"plan_name": "log_scan",
"plan_pattern": "logspace",
"plan_pattern_module": "numpy",
"plan_pattern_args": dict(start=start, stop=stop, num=num), # noqa: C408
"hints": {},
}
_md.update(md or {})
try:
dimensions = [(motor.hints["fields"], "primary")]
except (AttributeError, KeyError):
pass
else:
_md["hints"].setdefault("dimensions", dimensions) # type: ignore
predeclare = per_step is None and os.environ.get("BLUESKY_PREDECLARE", False)
if per_step is None:
per_step = bps.one_1d_step
steps = np.logspace(**_md["plan_pattern_args"])
@bpp.stage_decorator(list(detectors) + [motor])
@bpp.run_decorator(md=_md)
def inner_log_scan():
if predeclare:
yield from bps.declare_stream(motor, *detectors, name="primary")
for step in steps:
yield from per_step(detectors, motor, step)
return (yield from inner_log_scan())
[docs]
def rel_log_scan(
detectors: Sequence[Readable],
motor: Movable,
start: float,
stop: float,
num: int,
*,
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""
Scan over one variable in log-spaced steps relative to current position.
Parameters
----------
detectors : list or tuple
list of 'readable' objects
motor : object
any 'settable' object (motor, temp controller, etc.)
start : float
starting position of motor
stop : float
ending position of motor
num : int
number of steps
per_step : callable, optional
hook for customizing action of inner loop (messages per step)
Expected signature: ``f(detectors, motor, step)``
md : dict, optional
metadata
See Also
--------
:func:`bluesky.plans.log_scan`
"""
# TODO read initial positions (redundantly) so they can be put in md here
_md = {"plan_name": "rel_log_scan"}
_md.update(md or {})
@bpp.reset_positions_decorator([motor])
@bpp.relative_set_decorator([motor])
def inner_relative_log_scan():
return (yield from log_scan(detectors, motor, start, stop, num, per_step=per_step, md=_md))
return (yield from inner_relative_log_scan())
[docs]
def adaptive_scan(
detectors: Sequence[Readable],
target_field: str,
motor: NamedMovable,
start: float,
stop: float,
min_step: float,
max_step: float,
target_delta: float,
backstep: bool,
threshold: Optional[float] = 0.8,
*,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""
Scan over one variable with adaptively tuned step size.
Parameters
----------
detectors : list or tuple
list of 'readable' objects
target_field : string
data field whose output is the focus of the adaptive tuning
motor : object
any 'settable' object (motor, temp controller, etc.)
start : float
starting position of motor
stop : float
ending position of motor
min_step : float
smallest step for fast-changing regions
max_step : float
largest step for slow-chaning regions
target_delta : float
desired fractional change in detector signal between steps
backstep : bool
whether backward steps are allowed -- this is concern with some motors
threshold : float, optional
threshold for going backward and rescanning a region, default is 0.8
md : dict, optional
metadata
See Also
--------
:func:`bluesky.plans.rel_adaptive_scan`
"""
_check_detectors_type_input(detectors)
if not 0 < min_step < max_step:
raise ValueError("min_step and max_step must meet condition of max_step > min_step > 0")
_md = {
"detectors": [det.name for det in detectors],
"motors": [motor.name],
"plan_args": {
"detectors": list(map(repr, detectors)),
"motor": repr(motor),
"start": start,
"stop": stop,
"min_step": min_step,
"max_step": max_step,
"target_delta": target_delta,
"backstep": backstep,
"threshold": threshold,
},
"plan_name": "adaptive_scan",
"hints": {},
}
_md.update(md or {})
try:
dimensions = [(motor.hints["fields"], "primary")]
except (AttributeError, KeyError):
pass
else:
_md["hints"].setdefault("dimensions", dimensions) # type: ignore
@bpp.stage_decorator(list(detectors) + [motor])
@bpp.run_decorator(md=_md)
def adaptive_core():
next_pos = start
step = (max_step - min_step) / 2
past_I = None
cur_I = None
cur_det = {}
if stop >= start:
direction_sign = 1
else:
direction_sign = -1
devices = tuple(utils.separate_devices(detectors + [motor]))
if os.environ.get("BLUESKY_PREDECLARE", False):
yield from bps.declare_stream(*devices, name="primary")
while next_pos * direction_sign < stop * direction_sign:
yield Msg("checkpoint")
yield from bps.mv(motor, next_pos)
yield Msg("create", None, name="primary")
for det in detectors:
yield Msg("trigger", det, group="B")
yield Msg("wait", None, "B")
for det in devices:
cur_det = yield Msg("read", det)
if target_field in cur_det:
cur_I = cur_det[target_field]["value"]
yield Msg("save")
# special case first first loop
if past_I is None:
past_I = cur_I
next_pos += step * direction_sign
continue
dI = np.abs(cur_I - past_I)
slope = dI / step
if slope:
new_step = np.clip(target_delta / slope, min_step, max_step)
else:
new_step = np.min([step * 1.1, max_step])
# if we over stepped, go back and try again
if backstep and (new_step < step * threshold):
next_pos -= step
step = new_step
else:
past_I = cur_I
step = 0.2 * new_step + 0.8 * step
next_pos += step * direction_sign
return (yield from adaptive_core())
[docs]
def rel_adaptive_scan(
detectors: Sequence[Readable],
target_field: str,
motor: Movable,
start: float,
stop: float,
min_step: float,
max_step: float,
target_delta: float,
backstep: bool,
threshold: Optional[float] = 0.8,
*,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""
Relative scan over one variable with adaptively tuned step size.
Parameters
----------
detectors : list or tuple
list of 'readable' objects
target_field : string
data field whose output is the focus of the adaptive tuning
motor : object
any 'settable' object (motor, temp controller, etc.)
start : float
starting position of motor
stop : float
ending position of motor
min_step : float
smallest step for fast-changing regions
max_step : float
largest step for slow-chaning regions
target_delta : float
desired fractional change in detector signal between steps
backstep : bool
whether backward steps are allowed -- this is concern with some motors
threshold : float, optional
threshold for going backward and rescanning a region, default is 0.8
md : dict, optional
metadata
See Also
--------
:func:`bluesky.plans.adaptive_scan`
"""
_md = {"plan_name": "rel_adaptive_scan"}
_md.update(md or {})
@bpp.reset_positions_decorator([motor])
@bpp.relative_set_decorator([motor])
def inner_relative_adaptive_scan():
return (
yield from adaptive_scan(
detectors,
target_field,
motor,
start,
stop,
min_step,
max_step,
target_delta,
backstep,
threshold,
md=_md,
)
)
return (yield from inner_relative_adaptive_scan())
[docs]
def tune_centroid(
detectors: Sequence[Readable],
signal: str,
motor: NamedMovable,
start: float,
stop: float,
min_step: float,
num: int = 10,
step_factor: float = 3.0,
snake: bool = False,
*,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
r"""
plan: tune a motor to the centroid of signal(motor)
Initially, traverse the range from start to stop with
the number of points specified. Repeat with progressively
smaller step size until the minimum step size is reached.
Rescans will be centered on the signal centroid
(for $I(x)$, centroid$= \sum{I}/\sum{x*I}$)
with original scan range reduced by ``step_factor``.
Set ``snake=True`` if your positions are reproducible
moving from either direction. This will not necessarily
decrease the number of traversals required to reach convergence.
Snake motion reduces the total time spent on motion
to reset the positioner. For some positioners, such as
those with hysteresis, snake scanning may not be appropriate.
For such positioners, always approach the positions from the
same direction.
Note: Ideally the signal has only one peak in the range to
be scanned. It is assumed the signal is not polymodal
between ``start`` and ``stop``.
Parameters
----------
detectors : list or tuple
list of 'readable' objects
signal : string
detector field whose output is to maximize
motor : object
any 'settable' object (motor, temp controller, etc.)
start : float
start of range
stop : float
end of range, note: start < stop
min_step : float
smallest step size to use.
num : int, optional
number of points with each traversal, default = 10
step_factor : float, optional
used in calculating new range after each pass
note: step_factor > 1.0, default = 3
snake : bool, optional
if False (default), always scan from start to stop
md : dict, optional
metadata
Examples
--------
Find the center of a peak using synthetic hardware.
>>> from ophyd.sim import SynAxis, SynGauss
>>> motor = SynAxis(name='motor')
>>> det = SynGauss(name='det', motor, 'motor',
... center=-1.3, Imax=1e5, sigma=0.05)
>>> RE(tune_centroid([det], "det", motor, -1.5, -0.5, 0.01, 10))
"""
_check_detectors_type_input(detectors)
if min_step <= 0:
raise ValueError("min_step must be positive")
if step_factor <= 1.0:
raise ValueError("step_factor must be greater than 1.0")
try:
(motor_name,) = motor.hints["fields"]
except (AttributeError, ValueError):
motor_name = motor.name
_md = {
"detectors": [det.name for det in detectors],
"motors": [motor.name],
"plan_args": {
"detectors": list(map(repr, detectors)),
"motor": repr(motor),
"start": start,
"stop": stop,
"num": num,
"min_step": min_step,
},
"plan_name": "tune_centroid",
"hints": {},
}
_md.update(md or {})
try:
dimensions = [(motor.hints["fields"], "primary")]
except (AttributeError, KeyError):
pass
else:
_md["hints"].setdefault("dimensions", dimensions) # type: ignore
low_limit = min(start, stop)
high_limit = max(start, stop)
@bpp.stage_decorator(list(detectors) + [motor])
@bpp.run_decorator(md=_md)
def _tune_core(start: float, stop: float, num: int, signal: str):
next_pos = start
step = (stop - start) / (num - 1)
peak_position = None
cur_I = None
sum_I = 0 # for peak centroid calculation, I(x)
sum_xI = 0
if os.environ.get("BLUESKY_PREDECLARE", False):
yield from bps.declare_stream(motor, *detectors, name="primary") # type: ignore
while abs(step) >= min_step and low_limit <= next_pos <= high_limit:
yield Msg("checkpoint")
yield from bps.mv(motor, next_pos) # type: ignore # Movable
ret = yield from bps.trigger_and_read(list(detectors) + [motor]) # type: ignore
cur_I = ret[signal]["value"]
sum_I += cur_I
position = ret[motor_name]["value"]
sum_xI += position * cur_I
next_pos += step
in_range = min(start, stop) <= next_pos <= max(start, stop)
if not in_range:
if sum_I == 0:
return
peak_position = sum_xI / sum_I # centroid
sum_I, sum_xI = 0, 0 # reset for next pass
new_scan_range = (stop - start) / step_factor
start = np.clip(peak_position - new_scan_range / 2, low_limit, high_limit)
stop = np.clip(peak_position + new_scan_range / 2, low_limit, high_limit)
if snake:
start, stop = stop, start
step = (stop - start) / (num - 1)
next_pos = start
# print("peak position = {}".format(peak_position))
# print("start = {}".format(start))
# print("stop = {}".format(stop))
# finally, move to peak position
if peak_position is not None:
# improvement: report final peak_position
# print("final position = {}".format(peak_position))
yield from bps.mv(motor, peak_position) # type: ignore # Movable
return (yield from _tune_core(start, stop, num, signal))
[docs]
def scan_nd(
detectors: Sequence[Readable],
cycler: Cycler,
*,
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""
Scan over an arbitrary N-dimensional trajectory.
Parameters
----------
detectors : list or tuple
cycler : Cycler
cycler.Cycler object mapping movable interfaces to positions
per_step : callable, optional
hook for customizing action of inner loop (messages per step).
See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default)
for details.
md : dict, optional
metadata
See Also
--------
:func:`bluesky.plans.inner_product_scan`
:func:`bluesky.plans.grid_scan`
Examples
--------
>>> from cycler import cycler
>>> cy = cycler(motor1, [1, 2, 3]) * cycler(motor2, [4, 5, 6])
>>> scan_nd([sensor], cy)
"""
_check_detectors_type_input(detectors)
_md = {
"detectors": [det.name for det in detectors],
"motors": [motor.name for motor in cycler.keys],
"num_points": len(cycler),
"num_intervals": len(cycler) - 1,
"plan_args": {
"detectors": list(map(repr, detectors)),
"cycler": repr(cycler),
"per_step": repr(per_step),
},
"plan_name": "scan_nd",
"hints": {},
}
_md.update(md or {})
try:
dimensions = [(motor.hints["fields"], "primary") for motor in cycler.keys]
except (AttributeError, KeyError):
# Not all motors provide a 'fields' hint, so we have to skip it.
pass
else:
# We know that hints exists. Either:
# - the user passed it in and we are extending it
# - the user did not pass it in and we got the default {}
# If the user supplied hints includes a dimension entry, do not
# change it, else set it to the one generated above
_md["hints"].setdefault("dimensions", dimensions) # type: ignore
predeclare = per_step is None and os.environ.get("BLUESKY_PREDECLARE", False)
if per_step is None:
per_step = bps.one_nd_step
else:
# Ensure that the user-defined per-step has the expected signature.
sig = inspect.signature(per_step)
def _verify_1d_step(sig):
if len(sig.parameters) < 3:
return False
for name, (p_name, p) in zip_longest(["detectors", "motor", "step"], sig.parameters.items()):
# this is one of the first 3 positional arguements, check that the name matches
if name is not None:
if name != p_name:
return False
# if there are any extra arguments, check that they have a default
else:
if p.kind is p.VAR_KEYWORD or p.kind is p.VAR_POSITIONAL:
continue
if p.default is p.empty:
return False
return True
def _verify_nd_step(sig):
if len(sig.parameters) < 3:
return False
for name, (p_name, p) in zip_longest(["detectors", "step", "pos_cache"], sig.parameters.items()):
# this is one of the first 3 positional arguements, check that the name matches
if name is not None:
if name != p_name:
return False
# if there are any extra arguments, check that they have a default
else:
if p.kind is p.VAR_KEYWORD or p.kind is p.VAR_POSITIONAL:
continue
if p.default is p.empty:
return False
return True
if sig == inspect.signature(bps.one_nd_step):
pass
elif _verify_nd_step(sig):
# check other signature for back-compatibility
pass
elif _verify_1d_step(sig):
# Accept this signature for back-compat reasons (because
# inner_product_scan was renamed scan).
dims = len(list(cycler.keys))
if dims != 1:
raise TypeError("Signature of per_step assumes 1D trajectory " f"but {dims} motors are specified.")
(motor,) = cycler.keys
user_per_step = per_step
def adapter(detectors, step, pos_cache):
# one_nd_step 'step' parameter is a dict; one_id_step 'step'
# parameter is a value
(step,) = step.values()
return (yield from user_per_step(detectors, motor, step))
per_step = adapter # type: ignore
else:
raise TypeError(
"per_step must be a callable with the signature \n "
"<Signature (detectors, step, pos_cache)> or "
"<Signature (detectors, motor, step)>. \n"
f"per_step signature received: {sig}"
)
pos_cache: Dict = defaultdict(lambda: None) # where last position is stashed
cycler = utils.merge_cycler(cycler)
motors = list(cycler.keys)
@bpp.stage_decorator(list(detectors) + motors)
@bpp.run_decorator(md=_md)
def inner_scan_nd():
if predeclare:
yield from bps.declare_stream(*motors, *detectors, name="primary")
for step in list(cycler):
yield from per_step(detectors, step, pos_cache)
return (yield from inner_scan_nd())
def inner_product_scan(
detectors: Sequence[Readable],
num: int,
*args: Union[Movable, Any],
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[None]:
# For scan, num is the _last_ positional arg instead of the first one.
# Notice the swapped order here.
md = md or {}
md.setdefault("plan_name", "inner_product_scan")
yield from scan(detectors, *args, num, per_step=None, md=md)
[docs]
def scan(
detectors: Sequence[Readable],
*args: Union[Movable, Any],
num: Optional[int] = None,
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""
Scan over one multi-motor trajectory.
Parameters
----------
detectors : list or tuple
list of 'readable' objects
*args :
For one dimension, ``motor, start, stop``.
In general:
.. code-block:: python
motor1, start1, stop1,
motor2, start2, stop2,
...,
motorN, startN, stopN
Motors can be any 'settable' object (motor, temp controller, etc.)
num : integer
number of points
per_step : callable, optional
hook for customizing action of inner loop (messages per step).
See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default)
for details.
md : dict, optional
metadata
See Also
--------
:func:`bluesky.plans.relative_inner_product_scan`
:func:`bluesky.plans.grid_scan`
:func:`bluesky.plans.scan_nd`
"""
_check_detectors_type_input(detectors)
# For back-compat reasons, we accept 'num' as the last positional argument:
# scan(detectors, motor, -1, 1, 3)
# or by keyword:
# scan(detectors, motor, -1, 1, num=3)
# ... which requires some special processing.
if num is None:
if len(args) % 3 != 1:
raise ValueError(
"The number of points to scan must be provided "
"as the last positional argument or as keyword "
"argument 'num'."
)
num = args[-1] # type: ignore
args = args[:-1]
elif not (float(num).is_integer() and num > 0.0):
raise ValueError(
f"The parameter `num` is expected to be a number of "
f"steps (not step size!) It must therefore be a "
f"whole number. The given value was {num}."
)
md_args = list(chain(*((repr(motor), start, stop) for motor, start, stop in partition(3, args))))
motor_names = tuple(motor.name for motor, start, stop in partition(3, args))
md = md or {}
_md = {
"plan_args": {
"detectors": list(map(repr, detectors)),
"num": num,
"args": md_args,
"per_step": repr(per_step),
},
"plan_name": "scan",
"plan_pattern": "inner_product",
"plan_pattern_module": plan_patterns.__name__,
"plan_pattern_args": dict(num=num, args=md_args), # noqa: C408
"motors": motor_names,
}
_md.update(md)
# get hints for best effort callback
motors = [motor for motor, start, stop in partition(3, args)]
# Give a hint that the motors all lie along the same axis
# [(['motor1', 'motor2', ...], 'primary'), ] is 1D (this case)
# [ ('motor1', 'primary'), ('motor2', 'primary'), ... ] is 2D for example
# call x_fields because these are meant to be the x (independent) axis
x_fields = []
for motor in motors:
x_fields.extend(get_hinted_fields(motor))
default_dimensions = [(x_fields, "primary")]
default_hints: Dict[str, Sequence] = {}
if len(x_fields) > 0:
default_hints.update(dimensions=default_dimensions)
# now add default_hints and override any hints from the original md (if
# exists)
_md["hints"] = default_hints
_md["hints"].update(md.get("hints", {}) or {}) # type: ignore
full_cycler = plan_patterns.inner_product(num=num, args=args)
return (yield from scan_nd(detectors, full_cycler, per_step=per_step, md=_md))
[docs]
def grid_scan(
detectors: Sequence[Readable],
*args,
snake_axes: Optional[Union[Iterable, bool]] = None,
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""
Scan over a mesh; each motor is on an independent trajectory.
Parameters
----------
detectors: list or tuple
list of 'readable' objects
``*args``
patterned like (``motor1, start1, stop1, num1,``
``motor2, start2, stop2, num2,``
``motor3, start3, stop3, num3,`` ...
``motorN, startN, stopN, numN``)
The first motor is the "slowest", the outer loop. For all motors
except the first motor, there is a "snake" argument: a boolean
indicating whether to following snake-like, winding trajectory or a
simple left-to-right trajectory.
snake_axes: boolean or iterable, optional
which axes should be snaked, either ``False`` (do not snake any axes),
``True`` (snake all axes) or a list of axes to snake. "Snaking" an axis
is defined as following snake-like, winding trajectory instead of a
simple left-to-right trajectory. The elements of the list are motors
that are listed in `args`. The list must not contain the slowest
(first) motor, since it can't be snaked.
per_step: callable, optional
hook for customizing action of inner loop (messages per step).
See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default)
for details.
md: dict, optional
metadata
See Also
--------
:func:`bluesky.plans.rel_grid_scan`
:func:`bluesky.plans.inner_product_scan`
:func:`bluesky.plans.scan_nd`
"""
# Notes: (not to be included in the documentation)
# The deprecated function call with no 'snake_axes' argument and 'args'
# patterned like (``motor1, start1, stop1, num1,``
# ``motor2, start2, stop2, num2, snake2,``
# ``motor3, start3, stop3, num3, snake3,`` ...
# ``motorN, startN, stopN, numN, snakeN``)
# The first motor is the "slowest", the outer loop. For all motors
# except the first motor, there is a "snake" argument: a boolean
# indicating whether to following snake-like, winding trajectory or a
# simple left-to-right trajectory.
# Ideally, deprecated and new argument lists should not be mixed.
# The function will still accept `args` in the old format even if `snake_axes` is
# supplied, but if `snake_axes` is not `None` (the default value), it overrides
# any values of `snakeX` in `args`.
_check_detectors_type_input(detectors)
args_pattern = plan_patterns.classify_outer_product_args_pattern(args)
if (snake_axes is not None) and (args_pattern == plan_patterns.OuterProductArgsPattern.PATTERN_2):
raise ValueError(
"Mixing of deprecated and new API interface is not allowed: "
"the parameter 'snake_axes' can not be used if snaking is "
"set as part of 'args'"
)
# For consistency, set 'snake_axes' to False if new API call is detected
if (snake_axes is None) and (args_pattern != plan_patterns.OuterProductArgsPattern.PATTERN_2):
snake_axes = False
chunk_args = list(plan_patterns.chunk_outer_product_args(args, args_pattern))
# 'chunk_args' is a list of tuples of the form: (motor, start, stop, num, snake)
# If the function is called using deprecated pattern for arguments, then
# 'snake' may be set True for some motors, otherwise the 'snake' is always False.
# The list of controlled motors
motors = [_[0] for _ in chunk_args]
# Check that the same motor is not listed multiple times. This indicates an error in the script.
if len(set(motors)) != len(motors):
raise ValueError(f"Some motors are listed multiple times in the argument list 'args': " f"'{motors}'")
if snake_axes is not None:
def _set_snaking(chunk, value):
"""Returns the tuple `chunk` with modified 'snake' value"""
_motor, _start, _stop, _num, _snake = chunk
return _motor, _start, _stop, _num, value
if isinstance(snake_axes, collections.abc.Iterable) and not isinstance(snake_axes, str):
# Always convert to a tuple (in case a `snake_axes` is an iterator).
snake_axes = tuple(snake_axes)
# Check if the list of axes (motors) contains repeated entries.
if len(set(snake_axes)) != len(snake_axes):
raise ValueError(f"The list of axes 'snake_axes' contains repeated elements: " f"'{snake_axes}'")
# Check if the snaking is enabled for the slowest motor.
if len(motors) and (motors[0] in snake_axes):
raise ValueError(f"The list of axes 'snake_axes' contains the slowest motor: " f"'{snake_axes}'")
# Check that all motors in the chunk_args are controlled in the scan.
# It is very likely that the script running the plan has a bug.
if any([_ not in motors for _ in snake_axes]): # noqa: C419
raise ValueError(
f"The list of axes 'snake_axes' contains motors "
f"that are not controlled during the scan: "
f"'{snake_axes}'"
)
# Enable snaking for the selected axes.
# If the argument `snake_axes` is specified (not None), then
# any `snakeX` values that could be specified in `args` are ignored.
for n, chunk in enumerate(chunk_args):
if n > 0: # The slowest motor is never snaked
motor = chunk[0]
if motor in snake_axes:
chunk_args[n] = _set_snaking(chunk, True)
else:
chunk_args[n] = _set_snaking(chunk, False)
elif snake_axes is True: # 'snake_axes' has boolean value `True`
# Set all 'snake' values except for the slowest motor
chunk_args = [_set_snaking(_, True) if n > 0 else _ for n, _ in enumerate(chunk_args)]
elif snake_axes is False: # 'snake_axes' has boolean value `True`
# Set all 'snake' values
chunk_args = [_set_snaking(_, False) for _ in chunk_args]
else:
raise ValueError(
f"Parameter 'snake_axes' is not iterable, boolean or None: "
f"'{snake_axes}', type: {type(snake_axes)}"
)
# Prepare the argument list for the `outer_product` function
args_modified = []
for n, chunk in enumerate(chunk_args):
if n == 0:
args_modified.extend(chunk[:-1])
else:
args_modified.extend(chunk)
full_cycler = plan_patterns.outer_product(args=args_modified)
md_args = []
motor_names = []
motors = []
for i, (motor, start, stop, num, snake) in enumerate(chunk_args):
md_args.extend([repr(motor), start, stop, num])
if i > 0:
# snake argument only shows up after the first motor
md_args.append(snake)
motor_names.append(motor.name)
motors.append(motor)
_md = {
"shape": tuple(num for motor, start, stop, num, snake in chunk_args),
"extents": tuple([start, stop] for motor, start, stop, num, snake in chunk_args),
"snaking": tuple(snake for motor, start, stop, num, snake in chunk_args),
# 'num_points': inserted by scan_nd
"plan_args": {"detectors": list(map(repr, detectors)), "args": md_args, "per_step": repr(per_step)},
"plan_name": "grid_scan",
"plan_pattern": "outer_product",
"plan_pattern_args": dict(args=md_args), # noqa: C408
"plan_pattern_module": plan_patterns.__name__,
"motors": tuple(motor_names),
"hints": {},
}
_md.update(md or {})
_md["hints"].setdefault("gridding", "rectilinear") # type: ignore
try:
_md["hints"].setdefault("dimensions", [(m.hints["fields"], "primary") for m in motors]) # type: ignore
except (AttributeError, KeyError):
...
return (yield from scan_nd(detectors, full_cycler, per_step=per_step, md=_md))
[docs]
def rel_grid_scan(
detectors: Sequence[Readable],
*args: Union[Movable, Any],
snake_axes: Optional[Union[Iterable, bool]] = None,
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""
Scan over a mesh relative to current position.
Parameters
----------
detectors: list
list of 'readable' objects
``*args``
patterned like (``motor1, start1, stop1, num1,``
``motor2, start2, stop2, num2,``
``motor3, start3, stop3, num3,`` ...
``motorN, startN, stopN, numN``)
The first motor is the "slowest", the outer loop. For all motors
except the first motor, there is a "snake" argument: a boolean
indicating whether to following snake-like, winding trajectory or a
simple left-to-right trajectory.
snake_axes: boolean or iterable, optional
which axes should be snaked, either ``False`` (do not snake any axes),
``True`` (snake all axes) or a list of axes to snake. "Snaking" an axis
is defined as following snake-like, winding trajectory instead of a
simple left-to-right trajectory. The elements of the list are motors
that are listed in `args`. The list must not contain the slowest
(first) motor, since it can't be snaked.
per_step: callable, optional
hook for customizing action of inner loop (messages per step).
See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default)
for details.
md: dict, optional
metadata
See Also
--------
:func:`bluesky.plans.relative_inner_product_scan`
:func:`bluesky.plans.grid_scan`
:func:`bluesky.plans.scan_nd`
"""
# Notes: the deprecated function call is also supported. See the notes
# following the docstring for 'grid_scan' function
_md = {"plan_name": "rel_grid_scan"}
_md.update(md or {})
motors = [m[0] for m in plan_patterns.chunk_outer_product_args(args)]
@bpp.reset_positions_decorator(motors)
@bpp.relative_set_decorator(motors)
def inner_rel_grid_scan():
return (yield from grid_scan(detectors, *args, snake_axes=snake_axes, per_step=per_step, md=_md))
return (yield from inner_rel_grid_scan())
def relative_inner_product_scan( # type: ignore
detectors: Sequence[Readable],
num: int,
*args: Union[Movable, Any],
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
# For rel_scan, num is the _last_ positional arg instead of the first one.
# Notice the swapped order here.
md = md or {}
md.setdefault("plan_name", "relative_inner_product_scan")
yield from rel_scan(detectors, *args, num, per_step=per_step, md=md)
[docs]
def rel_scan(
detectors: Sequence[Readable],
*args: Union[Movable, Any],
num=None,
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""
Scan over one multi-motor trajectory relative to current position.
Parameters
----------
detectors : list
list of 'readable' objects
*args :
For one dimension, ``motor, start, stop``.
In general:
.. code-block:: python
motor1, start1, stop1,
motor2, start2, start2,
...,
motorN, startN, stopN,
Motors can be any 'settable' object (motor, temp controller, etc.)
num : integer
number of points
per_step : callable, optional
hook for customizing action of inner loop (messages per step).
See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default)
for details.
md : dict, optional
metadata
See Also
--------
:func:`bluesky.plans.rel_grid_scan`
:func:`bluesky.plans.inner_product_scan`
:func:`bluesky.plans.scan_nd`
"""
_check_detectors_type_input(detectors)
_md = {"plan_name": "rel_scan"}
md = md or {}
_md.update(md)
motors = [motor for motor, start, stop in partition(3, args)]
@bpp.reset_positions_decorator(motors)
@bpp.relative_set_decorator(motors)
def inner_rel_scan():
return (yield from scan(detectors, *args, num=num, per_step=per_step, md=_md))
return (yield from inner_rel_scan())
[docs]
def tweak(
detector: Readable,
target_field: str,
motor: NamedMovable,
step: float,
*,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""
Move and motor and read a detector with an interactive prompt.
Parameters
----------
detetector : Device
target_field : string
data field whose output is the focus of the adaptive tuning
motor : Device
step : float
initial suggestion for step size
md : dict, optional
metadata
"""
prompt_str = "{0}, {1:.3}, {2:.3}, ({3}) "
_md = {
"detectors": [detector.name],
"motors": [motor.name],
"plan_args": {
"detector": repr(detector),
"target_field": target_field,
"motor": repr(motor),
"step": step,
},
"plan_name": "tweak",
"hints": {},
}
try:
dimensions = [(motor.hints["fields"], "primary")]
except (AttributeError, KeyError):
pass
else:
_md["hints"].update({"dimensions": dimensions}) # type: ignore
_md.update(md or {})
d = detector
try:
from IPython.display import clear_output
except ImportError:
# Define a no-op for clear_output.
def clear_output(wait=False):
pass
@bpp.stage_decorator([detector, motor])
@bpp.run_decorator(md=_md)
def tweak_core():
nonlocal step
while True:
yield Msg("create", None, name="primary")
ret_mot = yield Msg("read", motor)
if ret_mot is None:
return
key = list(ret_mot.keys())[0]
pos = ret_mot[key]["value"]
yield Msg("trigger", d, group="A")
yield Msg("wait", None, "A")
reading = yield Msg("read", d)
val = reading[target_field]["value"]
yield Msg("save")
prompt = prompt_str.format(motor.name, float(pos), float(val), step)
new_step = yield Msg("input", prompt=prompt)
if new_step:
try:
step = float(new_step)
except ValueError:
break
yield Msg("set", motor, pos + step, group="A")
print("Motor moving...")
sys.stdout.flush()
yield Msg("wait", None, "A")
clear_output(wait=True)
# stackoverflow.com/a/12586667/380231
print("\x1b[1A\x1b[2K\x1b[1A")
return (yield from tweak_core())
[docs]
def spiral_fermat(
detectors: Sequence[Readable],
x_motor: NamedMovable,
y_motor: NamedMovable,
x_start: float,
y_start: float,
x_range: float,
y_range: float,
dr: float,
factor: float,
*,
dr_y: Optional[float] = None,
tilt: Optional[float] = 0.0,
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""Absolute fermat spiral scan, centered around (x_start, y_start)
Parameters
----------
detectors : list
list of 'readable' objects
x_motor : object
any 'settable' object (motor, temp controller, etc.)
y_motor : object
any 'settable' object (motor, temp controller, etc.)
x_start : float
x center
y_start : float
y center
x_range : float
x width of spiral
y_range : float
y width of spiral
dr : float
delta radius
factor : float
radius gets divided by this
dr_y : float, optional
Delta radius along the major axis of the ellipse, if not specifed
defaults to dr.
tilt : float, optional
Tilt angle in radians, default 0.0
per_step : callable, optional
hook for customizing action of inner loop (messages per step).
See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default)
for details.
md : dict, optional
metadata
See Also
--------
:func:`bluesky.plans.spiral`
:func:`bluesky.plans.rel_spiral`
:func:`bluesky.plans.rel_spiral_fermat`
"""
pattern_args = dict( # noqa: C408
x_motor=x_motor,
y_motor=y_motor,
x_start=x_start,
y_start=y_start,
x_range=x_range,
y_range=y_range,
dr=dr,
factor=factor,
dr_y=dr_y,
tilt=tilt,
)
cyc = plan_patterns.spiral_fermat(**pattern_args)
# Before including pattern_args in metadata, replace objects with reprs.
pattern_args["x_motor"] = repr(x_motor)
pattern_args["y_motor"] = repr(y_motor)
_md = {
"plan_args": {
"detectors": list(map(repr, detectors)),
"x_motor": repr(x_motor),
"y_motor": repr(y_motor),
"x_start": x_start,
"y_start": y_start,
"x_range": x_range,
"y_range": y_range,
"dr": dr,
"factor": factor,
"dr_y": dr_y,
"tilt": tilt,
"per_step": repr(per_step),
},
"extents": tuple([[x_start - x_range, x_start + x_range], [y_start - y_range, y_start + y_range]]), # noqa: C409
"plan_name": "spiral_fermat",
"plan_pattern": "spiral_fermat",
"plan_pattern_module": plan_patterns.__name__,
"plan_pattern_args": pattern_args,
"hints": {},
}
try:
dimensions = [(x_motor.hints["fields"], "primary"), (y_motor.hints["fields"], "primary")]
except (AttributeError, KeyError):
pass
else:
_md["hints"].update({"dimensions": dimensions}) # type: ignore
_md.update(md or {})
return (yield from scan_nd(detectors, cyc, per_step=per_step, md=_md))
[docs]
def rel_spiral_fermat(
detectors: Sequence[Readable],
x_motor: Movable,
y_motor: Movable,
x_range: float,
y_range: float,
dr: float,
factor: float,
*,
dr_y: Optional[float] = None,
tilt: Optional[float] = 0.0,
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""Relative fermat spiral scan
Parameters
----------
detectors : list
list of 'readable' objects
x_motor : object
any 'settable' object (motor, temp controller, etc.)
y_motor : object
any 'settable' object (motor, temp controller, etc.)
x_range : float
x width of spiral
y_range : float
y width of spiral
dr : float
delta radius
factor : float
radius gets divided by this
dr_y : float, optional
Delta radius along the major axis of the ellipse, if not specifed
defaults to dr
tilt : float, optional
Tilt angle in radians, default 0.0
per_step : callable, optional
hook for customizing action of inner loop (messages per step).
See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default)
for details.
md : dict, optional
metadata
See Also
--------
:func:`bluesky.plans.spiral`
:func:`bluesky.plans.rel_spiral`
:func:`bluesky.plans.spiral_fermat`
"""
_md = {"plan_name": "rel_spiral_fermat"}
_md.update(md or {})
@bpp.reset_positions_decorator([x_motor, y_motor])
@bpp.relative_set_decorator([x_motor, y_motor])
def inner_relative_spiral_fermat():
return (
yield from spiral_fermat(
detectors,
x_motor,
y_motor,
0,
0,
x_range,
y_range,
dr,
factor,
dr_y=dr_y,
tilt=tilt,
per_step=per_step,
md=_md,
)
)
return (yield from inner_relative_spiral_fermat())
[docs]
def spiral(
detectors: Sequence[Readable],
x_motor: NamedMovable,
y_motor: NamedMovable,
x_start: float,
y_start: float,
x_range: float,
y_range: float,
dr: float,
nth: float,
*,
dr_y: Optional[float] = None,
tilt: Optional[float] = 0.0,
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""Spiral scan, centered around (x_start, y_start)
Parameters
----------
x_motor : object
any 'settable' object (motor, temp controller, etc.)
y_motor : object
any 'settable' object (motor, temp controller, etc.)
x_start : float
x center
y_start : float
y center
x_range : float
x width of spiral
y_range : float
y width of spiral
dr : float
Delta radius along the minor axis of the ellipse.
dr_y : float, optional
Delta radius along the major axis of the ellipse. If None, defaults to
dr.
nth : float
Number of theta steps
tilt : float, optional
Tilt angle in radians, default 0.0
per_step : callable, optional
hook for customizing action of inner loop (messages per step).
See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default)
for details.
md : dict, optional
metadata
See Also
--------
:func:`bluesky.plans.rel_spiral`
:func:`bluesky.plans.spiral_fermat`
:func:`bluesky.plans.rel_spiral_fermat`
"""
pattern_args = dict( # noqa: C408
x_motor=x_motor,
y_motor=y_motor,
x_start=x_start,
y_start=y_start,
x_range=x_range,
y_range=y_range,
dr=dr,
nth=nth,
dr_y=dr_y,
tilt=tilt,
)
cyc = plan_patterns.spiral(**pattern_args)
# Before including pattern_args in metadata, replace objects with reprs.
pattern_args["x_motor"] = repr(x_motor)
pattern_args["y_motor"] = repr(y_motor)
_md = {
"plan_args": {
"detectors": list(map(repr, detectors)),
"x_motor": repr(x_motor),
"y_motor": repr(y_motor),
"x_start": x_start,
"y_start": y_start,
"x_range": x_range,
"y_range": y_range,
"dr": dr,
"dr_y": dr_y,
"nth": nth,
"tilt": tilt,
"per_step": repr(per_step),
},
"extents": tuple([[x_start - x_range, x_start + x_range], [y_start - y_range, y_start + y_range]]), # noqa: C409
"plan_name": "spiral",
"plan_pattern": "spiral",
"plan_pattern_args": pattern_args,
"plan_pattern_module": plan_patterns.__name__,
"hints": {},
}
try:
dimensions = [(x_motor.hints["fields"], "primary"), (y_motor.hints["fields"], "primary")]
except (AttributeError, KeyError):
pass
else:
_md["hints"].update({"dimensions": dimensions}) # type: ignore
_md.update(md or {}) # type: ignore
return (yield from scan_nd(detectors, cyc, per_step=per_step, md=_md))
[docs]
def rel_spiral(
detectors: Sequence[Readable],
x_motor: Movable,
y_motor: Movable,
x_range: float,
y_range: float,
dr: float,
nth: float,
*,
dr_y: Optional[float] = None,
tilt: float = 0.0,
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""Relative spiral scan
Parameters
----------
x_motor : object
any 'settable' object (motor, temp controller, etc.)
y_motor : object
any 'settable' object (motor, temp controller, etc.)
x_range : float
x width of spiral
y_range : float
y width of spiral
dr : float
Delta radius along the minor axis of the ellipse.
dr_y : float, optional
Delta radius along the major axis of the ellipse. If None, it
defaults to dr.
nth : float
Number of theta steps
tilt : float, optional
Tilt angle in radians, default 0.0
per_step : callable, optional
hook for customizing action of inner loop (messages per step).
See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default)
for details.
md : dict, optional
metadata
See Also
--------
:func:`bluesky.plans.spiral`
:func:`bluesky.plans.spiral_fermat`
"""
_md = {"plan_name": "rel_spiral"}
_md.update(md or {})
@bpp.reset_positions_decorator([x_motor, y_motor])
@bpp.relative_set_decorator([x_motor, y_motor])
def inner_relative_spiral():
return (
yield from spiral(
detectors,
x_motor,
y_motor,
0,
0,
x_range,
y_range,
dr,
nth,
dr_y=dr_y,
tilt=tilt,
per_step=per_step,
md=_md,
)
)
return (yield from inner_relative_spiral())
[docs]
def spiral_square(
detectors: Sequence[Readable],
x_motor: NamedMovable,
y_motor: NamedMovable,
x_center: float,
y_center: float,
x_range: float,
y_range: float,
x_num: float,
y_num: float,
*,
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""Absolute square spiral scan, centered around (x_center, y_center)
Parameters
----------
detectors : list
list of 'readable' objects
x_motor : object
any 'settable' object (motor, temp controller, etc.)
y_motor : object
any 'settable' object (motor, temp controller, etc.)
x_center : float
x center
y_center : float
y center
x_range : float
x width of spiral
y_range : float
y width of spiral
x_num : float
number of x axis points
y_num : float
Number of y axis points.
per_step : callable, optional
hook for cutomizing action of inner loop (messages per step).
See docstring of :func:`bluesky.plans.one_nd_step` (the default) for
details.
md : dict, optional
metadata
See Also
--------
:func:`bluesky.plans.relative_spiral_square`
:func:`bluesky.plans.spiral`
:func:`bluesky.plans.relative_spiral`
:func:`bluesky.plans.spiral_fermat`
:func:`bluesky.plans.relative_spiral_fermat`
"""
pattern_args = dict( # noqa: C408
x_motor=x_motor,
y_motor=y_motor,
x_center=x_center,
y_center=y_center,
x_range=x_range,
y_range=y_range,
x_num=x_num,
y_num=y_num,
)
cyc = plan_patterns.spiral_square_pattern(**pattern_args)
# Before including pattern_args in metadata, replace objects with reprs.
pattern_args["x_motor"] = repr(x_motor)
pattern_args["y_motor"] = repr(y_motor)
_md = {
"plan_args": {
"detectors": list(map(repr, detectors)),
"x_motor": repr(x_motor),
"y_motor": repr(y_motor),
"x_center": x_center,
"y_center": y_center,
"x_range": x_range,
"y_range": y_range,
"x_num": x_num,
"y_num": y_num,
"per_step": repr(per_step),
},
"plan_name": "spiral_square",
"plan_pattern": "spiral_square",
"shape": (y_num, x_num),
"extents": (
(y_center - y_range / 2, y_center + y_range / 2),
(x_center - x_range / 2, x_center + x_range / 2),
),
"hints": {},
}
_md.update(md or {})
_md["hints"].setdefault("gridding", "rectilinear_nonsequential") # type: ignore
try:
_md["hints"].setdefault("dimensions", [(m.hints["fields"], "primary") for m in [y_motor, x_motor]]) # type: ignore
except (AttributeError, KeyError):
...
return (yield from scan_nd(detectors, cyc, per_step=per_step, md=_md))
[docs]
def rel_spiral_square(
detectors: Sequence[Readable],
x_motor: Movable,
y_motor: Movable,
x_range: float,
y_range: float,
x_num: float,
y_num: float,
*,
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""Relative square spiral scan, centered around current (x, y) position.
Parameters
----------
detectors : list
list of 'readable' objects
x_motor : object
any 'settable' object (motor, temp controller, etc.)
y_motor : object
any 'settable' object (motor, temp controller, etc.)
x_range : float
x width of spiral
y_range : float
y width of spiral
x_num : float
number of x axis points
y_num : float
Number of y axis points.
per_step : callable, optional
hook for cutomizing action of inner loop (messages per step).
See docstring of :func:`bluesky.plans.one_nd_step` (the default) for
details.
md : dict, optional
metadata
See Also
--------
:func:`bluesky.plans.spiral_square`
:func:`bluesky.plans.spiral`
:func:`bluesky.plans.relative_spiral`
:func:`bluesky.plans.spiral_fermat`
:func:`bluesky.plans.relative_spiral_fermat`
"""
_md = {"plan_name": "rel_spiral_square"}
_md.update(md or {})
@bpp.reset_positions_decorator([x_motor, y_motor])
@bpp.relative_set_decorator([x_motor, y_motor])
def inner_relative_spiral():
return (
yield from spiral_square(
detectors,
x_motor,
y_motor,
0,
0,
x_range,
y_range,
x_num,
y_num,
per_step=per_step,
md=_md,
)
)
return (yield from inner_relative_spiral())
def ramp_plan(
go_plan: MsgGenerator,
monitor_sig: Readable,
inner_plan_func: Callable[[], MsgGenerator],
take_pre_data: bool = True,
timeout: Optional[float] = None,
period: Optional[float] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""Take data while ramping one or more positioners.
The pseudo code for this plan is ::
sts = (yield from go_plan)
yield from open_run()
yield from inner_plan_func()
while not st.done:
yield from inner_plan_func()
yield from inner_plan_func()
yield from close_run()
Parameters
----------
go_plan : generator
plan to start the ramp. This will be run inside of a open/close
run.
This plan must return a `ophyd.StatusBase` object.
monitor_sig : readable
signal to be monitored
inner_plan_func : generator function
generator which takes no input
This will be called for every data point. This should create
one or more events.
take_pre_data: Bool, optional
If True, add a pre data at beginning
timeout : float, optional
If not None, the maximum time the ramp can run.
In seconds
period : float, optional
If not None, take data no faster than this. If None, take
data as fast as possible
If running the inner plan takes longer than `period` than take
data with no dead time.
In seconds.
"""
_md = {"plan_name": "ramp_plan"}
_md.update(md or {})
@bpp.monitor_during_decorator((monitor_sig,))
@bpp.run_decorator(md=_md)
def polling_plan():
fail_time = None
if timeout is not None:
# sort out if we should watch the clock
fail_time = time.time() + timeout
# take a 'pre' data point
if take_pre_data:
yield from inner_plan_func()
# start the ramp
status = yield from go_plan
while not status.done:
start_time = time.time()
yield from inner_plan_func()
if fail_time is not None:
if time.time() > fail_time:
raise utils.RampFail()
if period is not None:
cur_time = time.time()
wait_time = (start_time + period) - cur_time
if wait_time > 0:
yield from bps.sleep(wait_time)
# take a 'post' data point
yield from inner_plan_func()
return (yield from polling_plan())
[docs]
def fly(
flyers: List[Flyable],
*,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""
Perform a fly scan with one or more 'flyers'.
Parameters
----------
flyers : collection
objects that support the flyer interface
md : dict, optional
metadata
Yields
------
msg : Msg
'kickoff', 'wait', 'complete, 'wait', 'collect' messages
See Also
--------
:func:`bluesky.preprocessors.fly_during_wrapper`
:func:`bluesky.preprocessors.fly_during_decorator`
"""
uid = yield from bps.open_run(md)
for flyer in flyers:
yield from bps.kickoff(flyer, wait=True)
for flyer in flyers:
yield from bps.complete(flyer, wait=True)
for flyer in flyers:
yield from bps.collect(flyer)
yield from bps.close_run()
return uid
def x2x_scan(
detectors: Sequence[Readable],
motor1: NamedMovable,
motor2: NamedMovable,
start: float,
stop: float,
num: int,
*,
per_step: Optional[PerStep] = None,
md: Optional[CustomPlanMetadata] = None,
) -> MsgGenerator[str]:
"""
Relatively scan over two motors in a 2:1 ratio
This is a generalized version of a theta2theta scan
Parameters
----------
detectors : list or tuple
list of 'readable' objects
motor1, motor2 : Positioner
The second motor will move half as much as the first
start, stop : float
The relative limits of the first motor. The second motor
will move between ``start / 2`` and ``stop / 2``
num : int
number of steps in the scan
per_step : callable, optional
hook for cutomizing action of inner loop (messages per step).
See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default)
for details.
md : dict, optional
metadata
"""
_md = {
"plan_name": "x2x_scan",
"plan_args": {
"detectors": list(map(repr, detectors)),
"motor1": motor1.name,
"motor2": motor2.name,
"start": start,
"stop": stop,
"num": num,
"per_step": repr(per_step),
},
}
_md.update(md or {})
return (
yield from relative_inner_product_scan(
detectors, num, motor1, start, stop, motor2, start / 2, stop / 2, per_step=per_step, md=_md
)
)
relative_list_scan = rel_list_scan # back-compat
relative_scan = rel_scan # back-compat
relative_log_scan = rel_log_scan # back-compat
relative_adaptive_scan = rel_adaptive_scan # back-compat
outer_product_scan = grid_scan # back-compat
relative_outer_product_scan = rel_grid_scan # back-compat
relative_spiral_fermat = rel_spiral_fermat # back-compat
relative_spiral = rel_spiral # back-compat