Source code for bluesky.plans

import collections
import inspect
import os
import sys
import time
from collections import defaultdict
from functools import partial
from itertools import chain, zip_longest
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Union

import numpy as np
from cycler import Cycler

try:
    # cytools is a drop-in replacement for toolz, implemented in Cython
    from cytools import partition
except ImportError:
    from toolz import partition

from . import plan_patterns, utils
from . import plan_stubs as bps
from . import preprocessors as bpp
from .protocols import Flyable, Movable, NamedMovable, Readable
from .utils import (
    CustomPlanMetadata,
    Msg,
    MsgGenerator,
    ScalarOrIterableFloat,
    get_hinted_fields,
)

#: Plan function that can be used for each shot in a detector acquisition involving no actuation
PerShot = Callable[[Sequence[Readable], Optional[bps.TakeReading]], MsgGenerator]

#: Plan function that can be used for each step in a scan
PerStep1D = Callable[
    [Sequence[Readable], Movable, Any, Optional[bps.TakeReading]],
    MsgGenerator,
]
PerStepND = Callable[
    [
        Sequence[Readable],
        Mapping[Movable, Any],
        Dict[Movable, Any],
        Optional[bps.TakeReading],
    ],
    MsgGenerator,
]
PerStep = Union[PerStep1D, PerStepND]


def _check_detectors_type_input(detectors):
    if not isinstance(detectors, Sequence):
        raise TypeError("The input argument must be either as a list or a tuple of Readable objects.")


def derive_default_hints(motors: List[Any]) -> Dict[str, Sequence]:
    x_fields = [field for motor in motors for field in get_hinted_fields(motor)]

    default_dimensions = [(x_fields, "primary")] if x_fields else []

    return {"dimensions": default_dimensions} if default_dimensions else {}


[docs] def count( detectors: Sequence[Readable], num: Optional[int] = 1, delay: ScalarOrIterableFloat = 0.0, *, per_shot: Optional[PerShot] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """ Take one or more readings from detectors. Parameters ---------- detectors : list or tuple list of 'readable' objects num : integer, optional number of readings to take; default is 1 If None, capture data until canceled delay : iterable or scalar, optional Time delay in seconds between successive readings; default is 0. per_shot : callable, optional hook for customizing action of inner loop (messages per step) Expected signature :: def f(detectors: Iterable[OphydObj]) -> Generator[Msg]: ... md : dict, optional metadata Notes ----- If ``delay`` is an iterable, it must have at least ``num - 1`` entries or the plan will raise a ``ValueError`` during iteration. """ _check_detectors_type_input(detectors) if num is None: num_intervals = None else: num_intervals = num - 1 _md = { "detectors": [det.name for det in detectors], "num_points": num, "num_intervals": num_intervals, "plan_args": {"detectors": list(map(repr, detectors)), "num": num, "delay": delay}, "plan_name": "count", "hints": {}, } _md.update(md or {}) _md["hints"].setdefault("dimensions", [(("time",), "primary")]) # type: ignore # per_shot might define a different stream, so do not predeclare primary predeclare = per_shot is None and os.environ.get("BLUESKY_PREDECLARE", False) msg_per_step: PerShot = per_shot if per_shot else bps.one_shot @bpp.stage_decorator(detectors) @bpp.run_decorator(md=_md) def inner_count() -> MsgGenerator[str]: if predeclare: yield from bps.declare_stream(*detectors, name="primary") return (yield from bps.repeat(partial(msg_per_step, detectors), num=num, delay=delay)) return (yield from inner_count())
[docs] def list_scan( detectors: Sequence[Readable], *args: Tuple[Union[Movable, Any], List[Any]], per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """ Scan over one or more variables in steps simultaneously (inner product). Parameters ---------- detectors : list or tuple list of 'readable' objects *args : For one dimension, ``motor, [point1, point2, ....]``. In general: .. code-block:: python motor1, [point1, point2, ...], motor2, [point1, point2, ...], ..., motorN, [point1, point2, ...] Motors can be any 'settable' object (motor, temp controller, etc.) per_step : callable, optional hook for customizing action of inner loop (messages per step) Expected signature: ``f(detectors, motor, step) -> plan (a generator)`` md : dict, optional metadata See Also -------- :func:`bluesky.plans.rel_list_scan` :func:`bluesky.plans.list_grid_scan` :func:`bluesky.plans.rel_list_grid_scan` """ _check_detectors_type_input(detectors) if len(args) % 2 != 0: raise ValueError("The list of arguments must contain a list of points for each defined motor") md = md or {} # reset md if it is None. # set some variables and check that all lists are the same length lengths = {} motors: List[Any] = [] pos_lists = [] length = None for motor, pos_list in partition(2, args): pos_list = list(pos_list) # Ensure list (accepts any finite iterable). lengths[motor.name] = len(pos_list) if not length: length = len(pos_list) motors.append(motor) pos_lists.append(pos_list) length_check = all(elem == list(lengths.values())[0] for elem in list(lengths.values())) if not length_check: raise ValueError( "The lengths of all lists in *args must be the same. " "However the lengths in args are : " f"{lengths}" ) md_args = list(chain(*((repr(motor), pos_list) for motor, pos_list in partition(2, args)))) motor_names = list(lengths.keys()) num_intervals: int = (length or 1) - 1 _md = { "detectors": [det.name for det in detectors], "motors": motor_names, "num_points": length, "num_intervals": num_intervals, "plan_args": {"detectors": list(map(repr, detectors)), "args": md_args, "per_step": repr(per_step)}, "plan_name": "list_scan", "plan_pattern": "inner_list_product", "plan_pattern_module": plan_patterns.__name__, "plan_pattern_args": dict(args=md_args), # noqa: C408 "hints": {}, } _md.update(md or {}) # add the motor-based hints _md["hints"] = derive_default_hints(motors) assert isinstance(_md["hints"], dict), "Hints must be a dictionary" # override any hints from the original md (if exists) _md["hints"].update(md.get("hints", {})) full_cycler = plan_patterns.inner_list_product(args) return (yield from scan_nd(detectors, full_cycler, per_step=per_step, md=_md))
[docs] def rel_list_scan( detectors: Sequence[Readable], *args: Union[Movable, Any], per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """ Scan over one variable in steps relative to current position. Parameters ---------- detectors : list list of 'readable' objects *args : For one dimension, ``motor, [point1, point2, ....]``. In general: .. code-block:: python motor1, [point1, point2, ...], motor2, [point1, point2, ...], ..., motorN, [point1, point2, ...] Motors can be any 'settable' object (motor, temp controller, etc.) point1, point2 etc are relative to the current location. motor : object any 'settable' object (motor, temp controller, etc.) steps : list list of positions relative to current position per_step : callable, optional hook for customizing action of inner loop (messages per step) Expected signature: ``f(detectors, motor, step)`` md : dict, optional metadata See Also -------- :func:`bluesky.plans.list_scan` :func:`bluesky.plans.list_grid_scan` :func:`bluesky.plans.rel_list_grid_scan` """ # TODO read initial positions (redundantly) so they can be put in md here _md = {"plan_name": "rel_list_scan"} _md.update(md or {}) motors = [motor for motor, pos_list in partition(2, args)] @bpp.reset_positions_decorator(motors) @bpp.relative_set_decorator(motors) def inner_relative_list_scan(): return (yield from list_scan(detectors, *args, per_step=per_step, md=_md)) return (yield from inner_relative_list_scan())
[docs] def list_grid_scan( detectors: Sequence[Readable], *args: Union[Movable, Any], snake_axes: bool = False, per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """ Scan over a mesh; each motor is on an independent trajectory. Parameters ---------- detectors: list or tuple list of 'readable' objects args: list patterned like (``motor1, position_list1,`` ``motor2, position_list2,`` ``motor3, position_list3,`` ``...,`` ``motorN, position_listN``) The first motor is the "slowest", the outer loop. ``position_list``'s are lists of positions, all lists must have the same length. Motors can be any 'settable' object (motor, temp controller, etc.). snake_axes: boolean or iterable, optional which axes should be snaked, either ``False`` (do not snake any axes), ``True`` (snake all axes) or a list of axes to snake. "Snaking" an axis is defined as following snake-like, winding trajectory instead of a simple left-to-right trajectory.The elements of the list are motors that are listed in `args`. The list must not contain the slowest (first) motor, since it can't be snaked. per_step: callable, optional hook for customizing action of inner loop (messages per step). See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default) for details. md: dict, optional metadata See Also -------- :func:`bluesky.plans.rel_list_grid_scan` :func:`bluesky.plans.list_scan` :func:`bluesky.plans.rel_list_scan` """ full_cycler = plan_patterns.outer_list_product(args, snake_axes) md_args = [] motor_names = [] motors = [] for i, (motor, pos_list) in enumerate(partition(2, args)): # noqa: B007 md_args.extend([repr(motor), pos_list]) motor_names.append(motor.name) motors.append(motor) _md = { "shape": tuple(len(pos_list) for motor, pos_list in partition(2, args)), "extents": tuple([min(pos_list), max(pos_list)] for motor, pos_list in partition(2, args)), "snake_axes": repr(snake_axes), "plan_args": {"detectors": list(map(repr, detectors)), "args": md_args, "per_step": repr(per_step)}, "plan_name": "list_grid_scan", "plan_pattern": "outer_list_product", "plan_pattern_args": dict(args=md_args, snake_axes=repr(snake_axes)), # noqa: C408 "plan_pattern_module": plan_patterns.__name__, "motors": tuple(motor_names), "hints": {}, } _md.update(md or {}) # type: ignore try: motor_hints = [(m.hints["fields"], "primary") for m in motors] assert isinstance(_md["hints"], dict), "Hints must be a dictionary" _md["hints"].setdefault("dimensions", motor_hints) except (AttributeError, KeyError): ... return (yield from scan_nd(detectors, full_cycler, per_step=per_step, md=_md))
[docs] def rel_list_grid_scan( detectors: Sequence[Readable], *args: Union[Movable, Any], snake_axes: bool = False, per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """ Scan over a mesh; each motor is on an independent trajectory. Each point is relative to the current position. Parameters ---------- detectors : list or tuple list of 'readable' objects args patterned like (``motor1, position_list1,`` ``motor2, position_list2,`` ``motor3, position_list3,`` ``...,`` ``motorN, position_listN``) The first motor is the "slowest", the outer loop. ``position_list``'s are lists of positions, all lists must have the same length. Motors can be any 'settable' object (motor, temp controller, etc.). snake_axes : boolean or Iterable, optional which axes should be snaked, either ``False`` (do not snake any axes), ``True`` (snake all axes) or a list of axes to snake. "Snaking" an axis is defined as following snake-like, winding trajectory instead of a simple left-to-right trajectory.The elements of the list are motors that are listed in `args`. The list must not contain the slowest (first) motor, since it can't be snaked. per_step : callable, optional hook for customizing action of inner loop (messages per step). See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default) for details. md : dict, optional metadata See Also -------- :func:`bluesky.plans.list_grid_scan` :func:`bluesky.plans.list_scan` :func:`bluesky.plans.rel_list_scan` """ _check_detectors_type_input(detectors) _md = {"plan_name": "rel_list_grid_scan"} _md.update(md or {}) motors = [motor for motor, pos_list in partition(2, args)] @bpp.reset_positions_decorator(motors) @bpp.relative_set_decorator(motors) def inner_relative_list_grid_scan(): return (yield from list_grid_scan(detectors, *args, snake_axes=snake_axes, per_step=per_step, md=_md)) return (yield from inner_relative_list_grid_scan())
def _scan_1d( detectors: Sequence[Readable], motor: NamedMovable, start: float, stop: float, num: int, *, per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """ Scan over one variable in equally spaced steps. Parameters ---------- detectors : list or tuple list of 'readable' objects motor : object any 'settable' object (motor, temp controller, etc.) start : float starting position of motor stop : float ending position of motor num : int number of steps per_step : callable, optional hook for customizing action of inner loop (messages per step) Expected signature: ``f(detectors, motor, step)`` md : dict, optional metadata See Also -------- :func:`bluesky.plans.rel_scan` """ _check_detectors_type_input(detectors) _md: CustomPlanMetadata = { "detectors": [det.name for det in detectors], "motors": [motor.name], "num_points": num, "num_intervals": num - 1, "plan_args": { "detectors": list(map(repr, detectors)), "num": num, "motor": repr(motor), "start": start, "stop": stop, "per_step": repr(per_step), }, "plan_name": "scan", "plan_pattern": "linspace", "plan_pattern_module": "numpy", "plan_pattern_args": dict(start=start, stop=stop, num=num), # noqa: C408 "hints": {}, } _md.update(md or {}) try: dimensions = [(motor.hints["fields"], "primary")] except (AttributeError, KeyError): pass else: _md["hints"].setdefault("dimensions", dimensions) # type: ignore if per_step is None: per_step = bps.one_1d_step steps = np.linspace(**_md["plan_pattern_args"]) @bpp.stage_decorator(list(detectors) + [motor]) @bpp.run_decorator(md=_md) def inner_scan(): for step in steps: yield from per_step(detectors, motor, step) return (yield from inner_scan()) def _rel_scan_1d( detectors: Sequence[Readable], motor: Movable, start: float, stop: float, num: int, *, per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """ Scan over one variable in equally spaced steps relative to current positon. Parameters ---------- detectors : list or tuple list of 'readable' objects motor : object any 'settable' object (motor, temp controller, etc.) start : float starting position of motor stop : float ending position of motor num : int number of steps per_step : callable, optional hook for customizing action of inner loop (messages per step) Expected signature: ``f(detectors, motor, step)`` md : dict, optional metadata See Also -------- :func:`bluesky.plans.scan` """ _md = {"plan_name": "rel_scan"} _md.update(md or {}) # TODO read initial positions (redundantly) so they can be put in md here @bpp.reset_positions_decorator([motor]) @bpp.relative_set_decorator([motor]) def inner_relative_scan(): return (yield from _scan_1d(detectors, motor, start, stop, num, per_step=per_step, md=_md)) return (yield from inner_relative_scan())
[docs] def log_scan( detectors: Sequence[Readable], motor: NamedMovable, start: float, stop: float, num: int, *, per_step=None, md=None, ) -> MsgGenerator[str]: """ Scan over one variable in log-spaced steps. Parameters ---------- detectors : list or tuple list of 'readable' objects motor : object any 'settable' object (motor, temp controller, etc.) start : float starting position of motor stop : float ending position of motor num : int number of steps per_step : callable, optional hook for customizing action of inner loop (messages per step) Expected signature: ``f(detectors, motor, step)`` md : dict, optional metadata See Also -------- :func:`bluesky.plans.rel_log_scan` """ _check_detectors_type_input(detectors) _md: CustomPlanMetadata = { "detectors": [det.name for det in detectors], "motors": [motor.name], "num_points": num, "num_intervals": num - 1, "plan_args": { "detectors": list(map(repr, detectors)), "num": num, "start": start, "stop": stop, "motor": repr(motor), "per_step": repr(per_step), }, "plan_name": "log_scan", "plan_pattern": "logspace", "plan_pattern_module": "numpy", "plan_pattern_args": dict(start=start, stop=stop, num=num), # noqa: C408 "hints": {}, } _md.update(md or {}) try: dimensions = [(motor.hints["fields"], "primary")] except (AttributeError, KeyError): pass else: _md["hints"].setdefault("dimensions", dimensions) # type: ignore predeclare = per_step is None and os.environ.get("BLUESKY_PREDECLARE", False) if per_step is None: per_step = bps.one_1d_step steps = np.logspace(**_md["plan_pattern_args"]) @bpp.stage_decorator(list(detectors) + [motor]) @bpp.run_decorator(md=_md) def inner_log_scan(): if predeclare: yield from bps.declare_stream(motor, *detectors, name="primary") for step in steps: yield from per_step(detectors, motor, step) return (yield from inner_log_scan())
[docs] def rel_log_scan( detectors: Sequence[Readable], motor: Movable, start: float, stop: float, num: int, *, per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """ Scan over one variable in log-spaced steps relative to current position. Parameters ---------- detectors : list or tuple list of 'readable' objects motor : object any 'settable' object (motor, temp controller, etc.) start : float starting position of motor stop : float ending position of motor num : int number of steps per_step : callable, optional hook for customizing action of inner loop (messages per step) Expected signature: ``f(detectors, motor, step)`` md : dict, optional metadata See Also -------- :func:`bluesky.plans.log_scan` """ # TODO read initial positions (redundantly) so they can be put in md here _md = {"plan_name": "rel_log_scan"} _md.update(md or {}) @bpp.reset_positions_decorator([motor]) @bpp.relative_set_decorator([motor]) def inner_relative_log_scan(): return (yield from log_scan(detectors, motor, start, stop, num, per_step=per_step, md=_md)) return (yield from inner_relative_log_scan())
[docs] def adaptive_scan( detectors: Sequence[Readable], target_field: str, motor: NamedMovable, start: float, stop: float, min_step: float, max_step: float, target_delta: float, backstep: bool, threshold: Optional[float] = 0.8, *, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """ Scan over one variable with adaptively tuned step size. Parameters ---------- detectors : list or tuple list of 'readable' objects target_field : string data field whose output is the focus of the adaptive tuning motor : object any 'settable' object (motor, temp controller, etc.) start : float starting position of motor stop : float ending position of motor min_step : float smallest step for fast-changing regions max_step : float largest step for slow-chaning regions target_delta : float desired fractional change in detector signal between steps backstep : bool whether backward steps are allowed -- this is concern with some motors threshold : float, optional threshold for going backward and rescanning a region, default is 0.8 md : dict, optional metadata See Also -------- :func:`bluesky.plans.rel_adaptive_scan` """ _check_detectors_type_input(detectors) if not 0 < min_step < max_step: raise ValueError("min_step and max_step must meet condition of max_step > min_step > 0") _md = { "detectors": [det.name for det in detectors], "motors": [motor.name], "plan_args": { "detectors": list(map(repr, detectors)), "motor": repr(motor), "start": start, "stop": stop, "min_step": min_step, "max_step": max_step, "target_delta": target_delta, "backstep": backstep, "threshold": threshold, }, "plan_name": "adaptive_scan", "hints": {}, } _md.update(md or {}) try: dimensions = [(motor.hints["fields"], "primary")] except (AttributeError, KeyError): pass else: _md["hints"].setdefault("dimensions", dimensions) # type: ignore @bpp.stage_decorator(list(detectors) + [motor]) @bpp.run_decorator(md=_md) def adaptive_core(): next_pos = start step = (max_step - min_step) / 2 past_I = None cur_I = None cur_det = {} if stop >= start: direction_sign = 1 else: direction_sign = -1 devices = tuple(utils.separate_devices(detectors + [motor])) if os.environ.get("BLUESKY_PREDECLARE", False): yield from bps.declare_stream(*devices, name="primary") while next_pos * direction_sign < stop * direction_sign: yield Msg("checkpoint") yield from bps.mv(motor, next_pos) yield Msg("create", None, name="primary") for det in detectors: yield Msg("trigger", det, group="B") yield Msg("wait", None, "B") for det in devices: cur_det = yield Msg("read", det) if target_field in cur_det: cur_I = cur_det[target_field]["value"] yield Msg("save") # special case first first loop if past_I is None: past_I = cur_I next_pos += step * direction_sign continue dI = np.abs(cur_I - past_I) slope = dI / step if slope: new_step = np.clip(target_delta / slope, min_step, max_step) else: new_step = np.min([step * 1.1, max_step]) # if we over stepped, go back and try again if backstep and (new_step < step * threshold): next_pos -= step step = new_step else: past_I = cur_I step = 0.2 * new_step + 0.8 * step next_pos += step * direction_sign return (yield from adaptive_core())
[docs] def rel_adaptive_scan( detectors: Sequence[Readable], target_field: str, motor: Movable, start: float, stop: float, min_step: float, max_step: float, target_delta: float, backstep: bool, threshold: Optional[float] = 0.8, *, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """ Relative scan over one variable with adaptively tuned step size. Parameters ---------- detectors : list or tuple list of 'readable' objects target_field : string data field whose output is the focus of the adaptive tuning motor : object any 'settable' object (motor, temp controller, etc.) start : float starting position of motor stop : float ending position of motor min_step : float smallest step for fast-changing regions max_step : float largest step for slow-chaning regions target_delta : float desired fractional change in detector signal between steps backstep : bool whether backward steps are allowed -- this is concern with some motors threshold : float, optional threshold for going backward and rescanning a region, default is 0.8 md : dict, optional metadata See Also -------- :func:`bluesky.plans.adaptive_scan` """ _md = {"plan_name": "rel_adaptive_scan"} _md.update(md or {}) @bpp.reset_positions_decorator([motor]) @bpp.relative_set_decorator([motor]) def inner_relative_adaptive_scan(): return ( yield from adaptive_scan( detectors, target_field, motor, start, stop, min_step, max_step, target_delta, backstep, threshold, md=_md, ) ) return (yield from inner_relative_adaptive_scan())
[docs] def tune_centroid( detectors: Sequence[Readable], signal: str, motor: NamedMovable, start: float, stop: float, min_step: float, num: int = 10, step_factor: float = 3.0, snake: bool = False, *, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: r""" plan: tune a motor to the centroid of signal(motor) Initially, traverse the range from start to stop with the number of points specified. Repeat with progressively smaller step size until the minimum step size is reached. Rescans will be centered on the signal centroid (for $I(x)$, centroid$= \sum{I}/\sum{x*I}$) with original scan range reduced by ``step_factor``. Set ``snake=True`` if your positions are reproducible moving from either direction. This will not necessarily decrease the number of traversals required to reach convergence. Snake motion reduces the total time spent on motion to reset the positioner. For some positioners, such as those with hysteresis, snake scanning may not be appropriate. For such positioners, always approach the positions from the same direction. Note: Ideally the signal has only one peak in the range to be scanned. It is assumed the signal is not polymodal between ``start`` and ``stop``. Parameters ---------- detectors : list or tuple list of 'readable' objects signal : string detector field whose output is to maximize motor : object any 'settable' object (motor, temp controller, etc.) start : float start of range stop : float end of range, note: start < stop min_step : float smallest step size to use. num : int, optional number of points with each traversal, default = 10 step_factor : float, optional used in calculating new range after each pass note: step_factor > 1.0, default = 3 snake : bool, optional if False (default), always scan from start to stop md : dict, optional metadata Examples -------- Find the center of a peak using synthetic hardware. >>> from ophyd.sim import SynAxis, SynGauss >>> motor = SynAxis(name='motor') >>> det = SynGauss(name='det', motor, 'motor', ... center=-1.3, Imax=1e5, sigma=0.05) >>> RE(tune_centroid([det], "det", motor, -1.5, -0.5, 0.01, 10)) """ _check_detectors_type_input(detectors) if min_step <= 0: raise ValueError("min_step must be positive") if step_factor <= 1.0: raise ValueError("step_factor must be greater than 1.0") try: (motor_name,) = motor.hints["fields"] except (AttributeError, ValueError): motor_name = motor.name _md = { "detectors": [det.name for det in detectors], "motors": [motor.name], "plan_args": { "detectors": list(map(repr, detectors)), "motor": repr(motor), "start": start, "stop": stop, "num": num, "min_step": min_step, }, "plan_name": "tune_centroid", "hints": {}, } _md.update(md or {}) try: dimensions = [(motor.hints["fields"], "primary")] except (AttributeError, KeyError): pass else: _md["hints"].setdefault("dimensions", dimensions) # type: ignore low_limit = min(start, stop) high_limit = max(start, stop) @bpp.stage_decorator(list(detectors) + [motor]) @bpp.run_decorator(md=_md) def _tune_core(start: float, stop: float, num: int, signal: str): next_pos = start step = (stop - start) / (num - 1) peak_position = None cur_I = None sum_I = 0 # for peak centroid calculation, I(x) sum_xI = 0 if os.environ.get("BLUESKY_PREDECLARE", False): yield from bps.declare_stream(motor, *detectors, name="primary") # type: ignore while abs(step) >= min_step and low_limit <= next_pos <= high_limit: yield Msg("checkpoint") yield from bps.mv(motor, next_pos) # type: ignore # Movable ret = yield from bps.trigger_and_read(list(detectors) + [motor]) # type: ignore cur_I = ret[signal]["value"] sum_I += cur_I position = ret[motor_name]["value"] sum_xI += position * cur_I next_pos += step in_range = min(start, stop) <= next_pos <= max(start, stop) if not in_range: if sum_I == 0: return peak_position = sum_xI / sum_I # centroid sum_I, sum_xI = 0, 0 # reset for next pass new_scan_range = (stop - start) / step_factor start = np.clip(peak_position - new_scan_range / 2, low_limit, high_limit) stop = np.clip(peak_position + new_scan_range / 2, low_limit, high_limit) if snake: start, stop = stop, start step = (stop - start) / (num - 1) next_pos = start # print("peak position = {}".format(peak_position)) # print("start = {}".format(start)) # print("stop = {}".format(stop)) # finally, move to peak position if peak_position is not None: # improvement: report final peak_position # print("final position = {}".format(peak_position)) yield from bps.mv(motor, peak_position) # type: ignore # Movable return (yield from _tune_core(start, stop, num, signal))
[docs] def scan_nd( detectors: Sequence[Readable], cycler: Cycler, *, per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """ Scan over an arbitrary N-dimensional trajectory. Parameters ---------- detectors : list or tuple cycler : Cycler cycler.Cycler object mapping movable interfaces to positions per_step : callable, optional hook for customizing action of inner loop (messages per step). See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default) for details. md : dict, optional metadata See Also -------- :func:`bluesky.plans.inner_product_scan` :func:`bluesky.plans.grid_scan` Examples -------- >>> from cycler import cycler >>> cy = cycler(motor1, [1, 2, 3]) * cycler(motor2, [4, 5, 6]) >>> scan_nd([sensor], cy) """ _check_detectors_type_input(detectors) _md = { "detectors": [det.name for det in detectors], "motors": [motor.name for motor in cycler.keys], "num_points": len(cycler), "num_intervals": len(cycler) - 1, "plan_args": { "detectors": list(map(repr, detectors)), "cycler": repr(cycler), "per_step": repr(per_step), }, "plan_name": "scan_nd", "hints": {}, } _md.update(md or {}) try: dimensions = [(motor.hints["fields"], "primary") for motor in cycler.keys] except (AttributeError, KeyError): # Not all motors provide a 'fields' hint, so we have to skip it. pass else: # We know that hints exists. Either: # - the user passed it in and we are extending it # - the user did not pass it in and we got the default {} # If the user supplied hints includes a dimension entry, do not # change it, else set it to the one generated above _md["hints"].setdefault("dimensions", dimensions) # type: ignore predeclare = per_step is None and os.environ.get("BLUESKY_PREDECLARE", False) if per_step is None: per_step = bps.one_nd_step else: # Ensure that the user-defined per-step has the expected signature. sig = inspect.signature(per_step) def _verify_1d_step(sig): if len(sig.parameters) < 3: return False for name, (p_name, p) in zip_longest(["detectors", "motor", "step"], sig.parameters.items()): # this is one of the first 3 positional arguements, check that the name matches if name is not None: if name != p_name: return False # if there are any extra arguments, check that they have a default else: if p.kind is p.VAR_KEYWORD or p.kind is p.VAR_POSITIONAL: continue if p.default is p.empty: return False return True def _verify_nd_step(sig): if len(sig.parameters) < 3: return False for name, (p_name, p) in zip_longest(["detectors", "step", "pos_cache"], sig.parameters.items()): # this is one of the first 3 positional arguements, check that the name matches if name is not None: if name != p_name: return False # if there are any extra arguments, check that they have a default else: if p.kind is p.VAR_KEYWORD or p.kind is p.VAR_POSITIONAL: continue if p.default is p.empty: return False return True if sig == inspect.signature(bps.one_nd_step): pass elif _verify_nd_step(sig): # check other signature for back-compatibility pass elif _verify_1d_step(sig): # Accept this signature for back-compat reasons (because # inner_product_scan was renamed scan). dims = len(list(cycler.keys)) if dims != 1: raise TypeError("Signature of per_step assumes 1D trajectory " f"but {dims} motors are specified.") (motor,) = cycler.keys user_per_step = per_step def adapter(detectors, step, pos_cache): # one_nd_step 'step' parameter is a dict; one_id_step 'step' # parameter is a value (step,) = step.values() return (yield from user_per_step(detectors, motor, step)) per_step = adapter # type: ignore else: raise TypeError( "per_step must be a callable with the signature \n " "<Signature (detectors, step, pos_cache)> or " "<Signature (detectors, motor, step)>. \n" f"per_step signature received: {sig}" ) pos_cache: Dict = defaultdict(lambda: None) # where last position is stashed cycler = utils.merge_cycler(cycler) motors = list(cycler.keys) @bpp.stage_decorator(list(detectors) + motors) @bpp.run_decorator(md=_md) def inner_scan_nd(): if predeclare: yield from bps.declare_stream(*motors, *detectors, name="primary") for step in list(cycler): yield from per_step(detectors, step, pos_cache) return (yield from inner_scan_nd())
def inner_product_scan( detectors: Sequence[Readable], num: int, *args: Union[Movable, Any], per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[None]: # For scan, num is the _last_ positional arg instead of the first one. # Notice the swapped order here. md = md or {} md.setdefault("plan_name", "inner_product_scan") yield from scan(detectors, *args, num, per_step=None, md=md)
[docs] def scan( detectors: Sequence[Readable], *args: Union[Movable, Any], num: Optional[int] = None, per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """ Scan over one multi-motor trajectory. Parameters ---------- detectors : list or tuple list of 'readable' objects *args : For one dimension, ``motor, start, stop``. In general: .. code-block:: python motor1, start1, stop1, motor2, start2, stop2, ..., motorN, startN, stopN Motors can be any 'settable' object (motor, temp controller, etc.) num : integer number of points per_step : callable, optional hook for customizing action of inner loop (messages per step). See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default) for details. md : dict, optional metadata See Also -------- :func:`bluesky.plans.relative_inner_product_scan` :func:`bluesky.plans.grid_scan` :func:`bluesky.plans.scan_nd` """ _check_detectors_type_input(detectors) # For back-compat reasons, we accept 'num' as the last positional argument: # scan(detectors, motor, -1, 1, 3) # or by keyword: # scan(detectors, motor, -1, 1, num=3) # ... which requires some special processing. if num is None: if len(args) % 3 != 1: raise ValueError( "The number of points to scan must be provided " "as the last positional argument or as keyword " "argument 'num'." ) num = args[-1] # type: ignore args = args[:-1] elif not (float(num).is_integer() and num > 0.0): raise ValueError( f"The parameter `num` is expected to be a number of " f"steps (not step size!) It must therefore be a " f"whole number. The given value was {num}." ) md_args = list(chain(*((repr(motor), start, stop) for motor, start, stop in partition(3, args)))) motor_names = tuple(motor.name for motor, start, stop in partition(3, args)) md = md or {} _md = { "plan_args": { "detectors": list(map(repr, detectors)), "num": num, "args": md_args, "per_step": repr(per_step), }, "plan_name": "scan", "plan_pattern": "inner_product", "plan_pattern_module": plan_patterns.__name__, "plan_pattern_args": dict(num=num, args=md_args), # noqa: C408 "motors": motor_names, } _md.update(md) # get hints for best effort callback motors = [motor for motor, start, stop in partition(3, args)] # Give a hint that the motors all lie along the same axis # [(['motor1', 'motor2', ...], 'primary'), ] is 1D (this case) # [ ('motor1', 'primary'), ('motor2', 'primary'), ... ] is 2D for example # call x_fields because these are meant to be the x (independent) axis x_fields = [] for motor in motors: x_fields.extend(get_hinted_fields(motor)) default_dimensions = [(x_fields, "primary")] default_hints: Dict[str, Sequence] = {} if len(x_fields) > 0: default_hints.update(dimensions=default_dimensions) # now add default_hints and override any hints from the original md (if # exists) _md["hints"] = default_hints _md["hints"].update(md.get("hints", {}) or {}) # type: ignore full_cycler = plan_patterns.inner_product(num=num, args=args) return (yield from scan_nd(detectors, full_cycler, per_step=per_step, md=_md))
[docs] def grid_scan( detectors: Sequence[Readable], *args, snake_axes: Optional[Union[Iterable, bool]] = None, per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """ Scan over a mesh; each motor is on an independent trajectory. Parameters ---------- detectors: list or tuple list of 'readable' objects ``*args`` patterned like (``motor1, start1, stop1, num1,`` ``motor2, start2, stop2, num2,`` ``motor3, start3, stop3, num3,`` ... ``motorN, startN, stopN, numN``) The first motor is the "slowest", the outer loop. For all motors except the first motor, there is a "snake" argument: a boolean indicating whether to following snake-like, winding trajectory or a simple left-to-right trajectory. snake_axes: boolean or iterable, optional which axes should be snaked, either ``False`` (do not snake any axes), ``True`` (snake all axes) or a list of axes to snake. "Snaking" an axis is defined as following snake-like, winding trajectory instead of a simple left-to-right trajectory. The elements of the list are motors that are listed in `args`. The list must not contain the slowest (first) motor, since it can't be snaked. per_step: callable, optional hook for customizing action of inner loop (messages per step). See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default) for details. md: dict, optional metadata See Also -------- :func:`bluesky.plans.rel_grid_scan` :func:`bluesky.plans.inner_product_scan` :func:`bluesky.plans.scan_nd` """ # Notes: (not to be included in the documentation) # The deprecated function call with no 'snake_axes' argument and 'args' # patterned like (``motor1, start1, stop1, num1,`` # ``motor2, start2, stop2, num2, snake2,`` # ``motor3, start3, stop3, num3, snake3,`` ... # ``motorN, startN, stopN, numN, snakeN``) # The first motor is the "slowest", the outer loop. For all motors # except the first motor, there is a "snake" argument: a boolean # indicating whether to following snake-like, winding trajectory or a # simple left-to-right trajectory. # Ideally, deprecated and new argument lists should not be mixed. # The function will still accept `args` in the old format even if `snake_axes` is # supplied, but if `snake_axes` is not `None` (the default value), it overrides # any values of `snakeX` in `args`. _check_detectors_type_input(detectors) args_pattern = plan_patterns.classify_outer_product_args_pattern(args) if (snake_axes is not None) and (args_pattern == plan_patterns.OuterProductArgsPattern.PATTERN_2): raise ValueError( "Mixing of deprecated and new API interface is not allowed: " "the parameter 'snake_axes' can not be used if snaking is " "set as part of 'args'" ) # For consistency, set 'snake_axes' to False if new API call is detected if (snake_axes is None) and (args_pattern != plan_patterns.OuterProductArgsPattern.PATTERN_2): snake_axes = False chunk_args = list(plan_patterns.chunk_outer_product_args(args, args_pattern)) # 'chunk_args' is a list of tuples of the form: (motor, start, stop, num, snake) # If the function is called using deprecated pattern for arguments, then # 'snake' may be set True for some motors, otherwise the 'snake' is always False. # The list of controlled motors motors = [_[0] for _ in chunk_args] # Check that the same motor is not listed multiple times. This indicates an error in the script. if len(set(motors)) != len(motors): raise ValueError(f"Some motors are listed multiple times in the argument list 'args': " f"'{motors}'") if snake_axes is not None: def _set_snaking(chunk, value): """Returns the tuple `chunk` with modified 'snake' value""" _motor, _start, _stop, _num, _snake = chunk return _motor, _start, _stop, _num, value if isinstance(snake_axes, collections.abc.Iterable) and not isinstance(snake_axes, str): # Always convert to a tuple (in case a `snake_axes` is an iterator). snake_axes = tuple(snake_axes) # Check if the list of axes (motors) contains repeated entries. if len(set(snake_axes)) != len(snake_axes): raise ValueError(f"The list of axes 'snake_axes' contains repeated elements: " f"'{snake_axes}'") # Check if the snaking is enabled for the slowest motor. if len(motors) and (motors[0] in snake_axes): raise ValueError(f"The list of axes 'snake_axes' contains the slowest motor: " f"'{snake_axes}'") # Check that all motors in the chunk_args are controlled in the scan. # It is very likely that the script running the plan has a bug. if any([_ not in motors for _ in snake_axes]): # noqa: C419 raise ValueError( f"The list of axes 'snake_axes' contains motors " f"that are not controlled during the scan: " f"'{snake_axes}'" ) # Enable snaking for the selected axes. # If the argument `snake_axes` is specified (not None), then # any `snakeX` values that could be specified in `args` are ignored. for n, chunk in enumerate(chunk_args): if n > 0: # The slowest motor is never snaked motor = chunk[0] if motor in snake_axes: chunk_args[n] = _set_snaking(chunk, True) else: chunk_args[n] = _set_snaking(chunk, False) elif snake_axes is True: # 'snake_axes' has boolean value `True` # Set all 'snake' values except for the slowest motor chunk_args = [_set_snaking(_, True) if n > 0 else _ for n, _ in enumerate(chunk_args)] elif snake_axes is False: # 'snake_axes' has boolean value `True` # Set all 'snake' values chunk_args = [_set_snaking(_, False) for _ in chunk_args] else: raise ValueError( f"Parameter 'snake_axes' is not iterable, boolean or None: " f"'{snake_axes}', type: {type(snake_axes)}" ) # Prepare the argument list for the `outer_product` function args_modified = [] for n, chunk in enumerate(chunk_args): if n == 0: args_modified.extend(chunk[:-1]) else: args_modified.extend(chunk) full_cycler = plan_patterns.outer_product(args=args_modified) md_args = [] motor_names = [] motors = [] for i, (motor, start, stop, num, snake) in enumerate(chunk_args): md_args.extend([repr(motor), start, stop, num]) if i > 0: # snake argument only shows up after the first motor md_args.append(snake) motor_names.append(motor.name) motors.append(motor) _md = { "shape": tuple(num for motor, start, stop, num, snake in chunk_args), "extents": tuple([start, stop] for motor, start, stop, num, snake in chunk_args), "snaking": tuple(snake for motor, start, stop, num, snake in chunk_args), # 'num_points': inserted by scan_nd "plan_args": {"detectors": list(map(repr, detectors)), "args": md_args, "per_step": repr(per_step)}, "plan_name": "grid_scan", "plan_pattern": "outer_product", "plan_pattern_args": dict(args=md_args), # noqa: C408 "plan_pattern_module": plan_patterns.__name__, "motors": tuple(motor_names), "hints": {}, } _md.update(md or {}) _md["hints"].setdefault("gridding", "rectilinear") # type: ignore try: _md["hints"].setdefault("dimensions", [(m.hints["fields"], "primary") for m in motors]) # type: ignore except (AttributeError, KeyError): ... return (yield from scan_nd(detectors, full_cycler, per_step=per_step, md=_md))
[docs] def rel_grid_scan( detectors: Sequence[Readable], *args: Union[Movable, Any], snake_axes: Optional[Union[Iterable, bool]] = None, per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """ Scan over a mesh relative to current position. Parameters ---------- detectors: list list of 'readable' objects ``*args`` patterned like (``motor1, start1, stop1, num1,`` ``motor2, start2, stop2, num2,`` ``motor3, start3, stop3, num3,`` ... ``motorN, startN, stopN, numN``) The first motor is the "slowest", the outer loop. For all motors except the first motor, there is a "snake" argument: a boolean indicating whether to following snake-like, winding trajectory or a simple left-to-right trajectory. snake_axes: boolean or iterable, optional which axes should be snaked, either ``False`` (do not snake any axes), ``True`` (snake all axes) or a list of axes to snake. "Snaking" an axis is defined as following snake-like, winding trajectory instead of a simple left-to-right trajectory. The elements of the list are motors that are listed in `args`. The list must not contain the slowest (first) motor, since it can't be snaked. per_step: callable, optional hook for customizing action of inner loop (messages per step). See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default) for details. md: dict, optional metadata See Also -------- :func:`bluesky.plans.relative_inner_product_scan` :func:`bluesky.plans.grid_scan` :func:`bluesky.plans.scan_nd` """ # Notes: the deprecated function call is also supported. See the notes # following the docstring for 'grid_scan' function _md = {"plan_name": "rel_grid_scan"} _md.update(md or {}) motors = [m[0] for m in plan_patterns.chunk_outer_product_args(args)] @bpp.reset_positions_decorator(motors) @bpp.relative_set_decorator(motors) def inner_rel_grid_scan(): return (yield from grid_scan(detectors, *args, snake_axes=snake_axes, per_step=per_step, md=_md)) return (yield from inner_rel_grid_scan())
def relative_inner_product_scan( # type: ignore detectors: Sequence[Readable], num: int, *args: Union[Movable, Any], per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: # For rel_scan, num is the _last_ positional arg instead of the first one. # Notice the swapped order here. md = md or {} md.setdefault("plan_name", "relative_inner_product_scan") yield from rel_scan(detectors, *args, num, per_step=per_step, md=md)
[docs] def rel_scan( detectors: Sequence[Readable], *args: Union[Movable, Any], num=None, per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """ Scan over one multi-motor trajectory relative to current position. Parameters ---------- detectors : list list of 'readable' objects *args : For one dimension, ``motor, start, stop``. In general: .. code-block:: python motor1, start1, stop1, motor2, start2, start2, ..., motorN, startN, stopN, Motors can be any 'settable' object (motor, temp controller, etc.) num : integer number of points per_step : callable, optional hook for customizing action of inner loop (messages per step). See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default) for details. md : dict, optional metadata See Also -------- :func:`bluesky.plans.rel_grid_scan` :func:`bluesky.plans.inner_product_scan` :func:`bluesky.plans.scan_nd` """ _check_detectors_type_input(detectors) _md = {"plan_name": "rel_scan"} md = md or {} _md.update(md) motors = [motor for motor, start, stop in partition(3, args)] @bpp.reset_positions_decorator(motors) @bpp.relative_set_decorator(motors) def inner_rel_scan(): return (yield from scan(detectors, *args, num=num, per_step=per_step, md=_md)) return (yield from inner_rel_scan())
[docs] def tweak( detector: Readable, target_field: str, motor: NamedMovable, step: float, *, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """ Move and motor and read a detector with an interactive prompt. Parameters ---------- detetector : Device target_field : string data field whose output is the focus of the adaptive tuning motor : Device step : float initial suggestion for step size md : dict, optional metadata """ prompt_str = "{0}, {1:.3}, {2:.3}, ({3}) " _md = { "detectors": [detector.name], "motors": [motor.name], "plan_args": { "detector": repr(detector), "target_field": target_field, "motor": repr(motor), "step": step, }, "plan_name": "tweak", "hints": {}, } try: dimensions = [(motor.hints["fields"], "primary")] except (AttributeError, KeyError): pass else: _md["hints"].update({"dimensions": dimensions}) # type: ignore _md.update(md or {}) d = detector try: from IPython.display import clear_output except ImportError: # Define a no-op for clear_output. def clear_output(wait=False): pass @bpp.stage_decorator([detector, motor]) @bpp.run_decorator(md=_md) def tweak_core(): nonlocal step while True: yield Msg("create", None, name="primary") ret_mot = yield Msg("read", motor) if ret_mot is None: return key = list(ret_mot.keys())[0] pos = ret_mot[key]["value"] yield Msg("trigger", d, group="A") yield Msg("wait", None, "A") reading = yield Msg("read", d) val = reading[target_field]["value"] yield Msg("save") prompt = prompt_str.format(motor.name, float(pos), float(val), step) new_step = yield Msg("input", prompt=prompt) if new_step: try: step = float(new_step) except ValueError: break yield Msg("set", motor, pos + step, group="A") print("Motor moving...") sys.stdout.flush() yield Msg("wait", None, "A") clear_output(wait=True) # stackoverflow.com/a/12586667/380231 print("\x1b[1A\x1b[2K\x1b[1A") return (yield from tweak_core())
[docs] def spiral_fermat( detectors: Sequence[Readable], x_motor: NamedMovable, y_motor: NamedMovable, x_start: float, y_start: float, x_range: float, y_range: float, dr: float, factor: float, *, dr_y: Optional[float] = None, tilt: Optional[float] = 0.0, per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """Absolute fermat spiral scan, centered around (x_start, y_start) Parameters ---------- detectors : list list of 'readable' objects x_motor : object any 'settable' object (motor, temp controller, etc.) y_motor : object any 'settable' object (motor, temp controller, etc.) x_start : float x center y_start : float y center x_range : float x width of spiral y_range : float y width of spiral dr : float delta radius factor : float radius gets divided by this dr_y : float, optional Delta radius along the major axis of the ellipse, if not specifed defaults to dr. tilt : float, optional Tilt angle in radians, default 0.0 per_step : callable, optional hook for customizing action of inner loop (messages per step). See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default) for details. md : dict, optional metadata See Also -------- :func:`bluesky.plans.spiral` :func:`bluesky.plans.rel_spiral` :func:`bluesky.plans.rel_spiral_fermat` """ pattern_args = dict( # noqa: C408 x_motor=x_motor, y_motor=y_motor, x_start=x_start, y_start=y_start, x_range=x_range, y_range=y_range, dr=dr, factor=factor, dr_y=dr_y, tilt=tilt, ) cyc = plan_patterns.spiral_fermat(**pattern_args) # Before including pattern_args in metadata, replace objects with reprs. pattern_args["x_motor"] = repr(x_motor) pattern_args["y_motor"] = repr(y_motor) _md = { "plan_args": { "detectors": list(map(repr, detectors)), "x_motor": repr(x_motor), "y_motor": repr(y_motor), "x_start": x_start, "y_start": y_start, "x_range": x_range, "y_range": y_range, "dr": dr, "factor": factor, "dr_y": dr_y, "tilt": tilt, "per_step": repr(per_step), }, "extents": tuple([[x_start - x_range, x_start + x_range], [y_start - y_range, y_start + y_range]]), # noqa: C409 "plan_name": "spiral_fermat", "plan_pattern": "spiral_fermat", "plan_pattern_module": plan_patterns.__name__, "plan_pattern_args": pattern_args, "hints": {}, } try: dimensions = [(x_motor.hints["fields"], "primary"), (y_motor.hints["fields"], "primary")] except (AttributeError, KeyError): pass else: _md["hints"].update({"dimensions": dimensions}) # type: ignore _md.update(md or {}) return (yield from scan_nd(detectors, cyc, per_step=per_step, md=_md))
[docs] def rel_spiral_fermat( detectors: Sequence[Readable], x_motor: Movable, y_motor: Movable, x_range: float, y_range: float, dr: float, factor: float, *, dr_y: Optional[float] = None, tilt: Optional[float] = 0.0, per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """Relative fermat spiral scan Parameters ---------- detectors : list list of 'readable' objects x_motor : object any 'settable' object (motor, temp controller, etc.) y_motor : object any 'settable' object (motor, temp controller, etc.) x_range : float x width of spiral y_range : float y width of spiral dr : float delta radius factor : float radius gets divided by this dr_y : float, optional Delta radius along the major axis of the ellipse, if not specifed defaults to dr tilt : float, optional Tilt angle in radians, default 0.0 per_step : callable, optional hook for customizing action of inner loop (messages per step). See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default) for details. md : dict, optional metadata See Also -------- :func:`bluesky.plans.spiral` :func:`bluesky.plans.rel_spiral` :func:`bluesky.plans.spiral_fermat` """ _md = {"plan_name": "rel_spiral_fermat"} _md.update(md or {}) @bpp.reset_positions_decorator([x_motor, y_motor]) @bpp.relative_set_decorator([x_motor, y_motor]) def inner_relative_spiral_fermat(): return ( yield from spiral_fermat( detectors, x_motor, y_motor, 0, 0, x_range, y_range, dr, factor, dr_y=dr_y, tilt=tilt, per_step=per_step, md=_md, ) ) return (yield from inner_relative_spiral_fermat())
[docs] def spiral( detectors: Sequence[Readable], x_motor: NamedMovable, y_motor: NamedMovable, x_start: float, y_start: float, x_range: float, y_range: float, dr: float, nth: float, *, dr_y: Optional[float] = None, tilt: Optional[float] = 0.0, per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """Spiral scan, centered around (x_start, y_start) Parameters ---------- x_motor : object any 'settable' object (motor, temp controller, etc.) y_motor : object any 'settable' object (motor, temp controller, etc.) x_start : float x center y_start : float y center x_range : float x width of spiral y_range : float y width of spiral dr : float Delta radius along the minor axis of the ellipse. dr_y : float, optional Delta radius along the major axis of the ellipse. If None, defaults to dr. nth : float Number of theta steps tilt : float, optional Tilt angle in radians, default 0.0 per_step : callable, optional hook for customizing action of inner loop (messages per step). See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default) for details. md : dict, optional metadata See Also -------- :func:`bluesky.plans.rel_spiral` :func:`bluesky.plans.spiral_fermat` :func:`bluesky.plans.rel_spiral_fermat` """ pattern_args = dict( # noqa: C408 x_motor=x_motor, y_motor=y_motor, x_start=x_start, y_start=y_start, x_range=x_range, y_range=y_range, dr=dr, nth=nth, dr_y=dr_y, tilt=tilt, ) cyc = plan_patterns.spiral(**pattern_args) # Before including pattern_args in metadata, replace objects with reprs. pattern_args["x_motor"] = repr(x_motor) pattern_args["y_motor"] = repr(y_motor) _md = { "plan_args": { "detectors": list(map(repr, detectors)), "x_motor": repr(x_motor), "y_motor": repr(y_motor), "x_start": x_start, "y_start": y_start, "x_range": x_range, "y_range": y_range, "dr": dr, "dr_y": dr_y, "nth": nth, "tilt": tilt, "per_step": repr(per_step), }, "extents": tuple([[x_start - x_range, x_start + x_range], [y_start - y_range, y_start + y_range]]), # noqa: C409 "plan_name": "spiral", "plan_pattern": "spiral", "plan_pattern_args": pattern_args, "plan_pattern_module": plan_patterns.__name__, "hints": {}, } try: dimensions = [(x_motor.hints["fields"], "primary"), (y_motor.hints["fields"], "primary")] except (AttributeError, KeyError): pass else: _md["hints"].update({"dimensions": dimensions}) # type: ignore _md.update(md or {}) # type: ignore return (yield from scan_nd(detectors, cyc, per_step=per_step, md=_md))
[docs] def rel_spiral( detectors: Sequence[Readable], x_motor: Movable, y_motor: Movable, x_range: float, y_range: float, dr: float, nth: float, *, dr_y: Optional[float] = None, tilt: float = 0.0, per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """Relative spiral scan Parameters ---------- x_motor : object any 'settable' object (motor, temp controller, etc.) y_motor : object any 'settable' object (motor, temp controller, etc.) x_range : float x width of spiral y_range : float y width of spiral dr : float Delta radius along the minor axis of the ellipse. dr_y : float, optional Delta radius along the major axis of the ellipse. If None, it defaults to dr. nth : float Number of theta steps tilt : float, optional Tilt angle in radians, default 0.0 per_step : callable, optional hook for customizing action of inner loop (messages per step). See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default) for details. md : dict, optional metadata See Also -------- :func:`bluesky.plans.spiral` :func:`bluesky.plans.spiral_fermat` """ _md = {"plan_name": "rel_spiral"} _md.update(md or {}) @bpp.reset_positions_decorator([x_motor, y_motor]) @bpp.relative_set_decorator([x_motor, y_motor]) def inner_relative_spiral(): return ( yield from spiral( detectors, x_motor, y_motor, 0, 0, x_range, y_range, dr, nth, dr_y=dr_y, tilt=tilt, per_step=per_step, md=_md, ) ) return (yield from inner_relative_spiral())
[docs] def spiral_square( detectors: Sequence[Readable], x_motor: NamedMovable, y_motor: NamedMovable, x_center: float, y_center: float, x_range: float, y_range: float, x_num: float, y_num: float, *, per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """Absolute square spiral scan, centered around (x_center, y_center) Parameters ---------- detectors : list list of 'readable' objects x_motor : object any 'settable' object (motor, temp controller, etc.) y_motor : object any 'settable' object (motor, temp controller, etc.) x_center : float x center y_center : float y center x_range : float x width of spiral y_range : float y width of spiral x_num : float number of x axis points y_num : float Number of y axis points. per_step : callable, optional hook for cutomizing action of inner loop (messages per step). See docstring of :func:`bluesky.plans.one_nd_step` (the default) for details. md : dict, optional metadata See Also -------- :func:`bluesky.plans.relative_spiral_square` :func:`bluesky.plans.spiral` :func:`bluesky.plans.relative_spiral` :func:`bluesky.plans.spiral_fermat` :func:`bluesky.plans.relative_spiral_fermat` """ pattern_args = dict( # noqa: C408 x_motor=x_motor, y_motor=y_motor, x_center=x_center, y_center=y_center, x_range=x_range, y_range=y_range, x_num=x_num, y_num=y_num, ) cyc = plan_patterns.spiral_square_pattern(**pattern_args) # Before including pattern_args in metadata, replace objects with reprs. pattern_args["x_motor"] = repr(x_motor) pattern_args["y_motor"] = repr(y_motor) _md = { "plan_args": { "detectors": list(map(repr, detectors)), "x_motor": repr(x_motor), "y_motor": repr(y_motor), "x_center": x_center, "y_center": y_center, "x_range": x_range, "y_range": y_range, "x_num": x_num, "y_num": y_num, "per_step": repr(per_step), }, "plan_name": "spiral_square", "plan_pattern": "spiral_square", "shape": (y_num, x_num), "extents": ( (y_center - y_range / 2, y_center + y_range / 2), (x_center - x_range / 2, x_center + x_range / 2), ), "hints": {}, } _md.update(md or {}) _md["hints"].setdefault("gridding", "rectilinear_nonsequential") # type: ignore try: _md["hints"].setdefault("dimensions", [(m.hints["fields"], "primary") for m in [y_motor, x_motor]]) # type: ignore except (AttributeError, KeyError): ... return (yield from scan_nd(detectors, cyc, per_step=per_step, md=_md))
[docs] def rel_spiral_square( detectors: Sequence[Readable], x_motor: Movable, y_motor: Movable, x_range: float, y_range: float, x_num: float, y_num: float, *, per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """Relative square spiral scan, centered around current (x, y) position. Parameters ---------- detectors : list list of 'readable' objects x_motor : object any 'settable' object (motor, temp controller, etc.) y_motor : object any 'settable' object (motor, temp controller, etc.) x_range : float x width of spiral y_range : float y width of spiral x_num : float number of x axis points y_num : float Number of y axis points. per_step : callable, optional hook for cutomizing action of inner loop (messages per step). See docstring of :func:`bluesky.plans.one_nd_step` (the default) for details. md : dict, optional metadata See Also -------- :func:`bluesky.plans.spiral_square` :func:`bluesky.plans.spiral` :func:`bluesky.plans.relative_spiral` :func:`bluesky.plans.spiral_fermat` :func:`bluesky.plans.relative_spiral_fermat` """ _md = {"plan_name": "rel_spiral_square"} _md.update(md or {}) @bpp.reset_positions_decorator([x_motor, y_motor]) @bpp.relative_set_decorator([x_motor, y_motor]) def inner_relative_spiral(): return ( yield from spiral_square( detectors, x_motor, y_motor, 0, 0, x_range, y_range, x_num, y_num, per_step=per_step, md=_md, ) ) return (yield from inner_relative_spiral())
def ramp_plan( go_plan: MsgGenerator, monitor_sig: Readable, inner_plan_func: Callable[[], MsgGenerator], take_pre_data: bool = True, timeout: Optional[float] = None, period: Optional[float] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """Take data while ramping one or more positioners. The pseudo code for this plan is :: sts = (yield from go_plan) yield from open_run() yield from inner_plan_func() while not st.done: yield from inner_plan_func() yield from inner_plan_func() yield from close_run() Parameters ---------- go_plan : generator plan to start the ramp. This will be run inside of a open/close run. This plan must return a `ophyd.StatusBase` object. monitor_sig : readable signal to be monitored inner_plan_func : generator function generator which takes no input This will be called for every data point. This should create one or more events. take_pre_data: Bool, optional If True, add a pre data at beginning timeout : float, optional If not None, the maximum time the ramp can run. In seconds period : float, optional If not None, take data no faster than this. If None, take data as fast as possible If running the inner plan takes longer than `period` than take data with no dead time. In seconds. """ _md = {"plan_name": "ramp_plan"} _md.update(md or {}) @bpp.monitor_during_decorator((monitor_sig,)) @bpp.run_decorator(md=_md) def polling_plan(): fail_time = None if timeout is not None: # sort out if we should watch the clock fail_time = time.time() + timeout # take a 'pre' data point if take_pre_data: yield from inner_plan_func() # start the ramp status = yield from go_plan while not status.done: start_time = time.time() yield from inner_plan_func() if fail_time is not None: if time.time() > fail_time: raise utils.RampFail() if period is not None: cur_time = time.time() wait_time = (start_time + period) - cur_time if wait_time > 0: yield from bps.sleep(wait_time) # take a 'post' data point yield from inner_plan_func() return (yield from polling_plan())
[docs] def fly( flyers: List[Flyable], *, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """ Perform a fly scan with one or more 'flyers'. Parameters ---------- flyers : collection objects that support the flyer interface md : dict, optional metadata Yields ------ msg : Msg 'kickoff', 'wait', 'complete, 'wait', 'collect' messages See Also -------- :func:`bluesky.preprocessors.fly_during_wrapper` :func:`bluesky.preprocessors.fly_during_decorator` """ uid = yield from bps.open_run(md) for flyer in flyers: yield from bps.kickoff(flyer, wait=True) for flyer in flyers: yield from bps.complete(flyer, wait=True) for flyer in flyers: yield from bps.collect(flyer) yield from bps.close_run() return uid
def x2x_scan( detectors: Sequence[Readable], motor1: NamedMovable, motor2: NamedMovable, start: float, stop: float, num: int, *, per_step: Optional[PerStep] = None, md: Optional[CustomPlanMetadata] = None, ) -> MsgGenerator[str]: """ Relatively scan over two motors in a 2:1 ratio This is a generalized version of a theta2theta scan Parameters ---------- detectors : list or tuple list of 'readable' objects motor1, motor2 : Positioner The second motor will move half as much as the first start, stop : float The relative limits of the first motor. The second motor will move between ``start / 2`` and ``stop / 2`` num : int number of steps in the scan per_step : callable, optional hook for cutomizing action of inner loop (messages per step). See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default) for details. md : dict, optional metadata """ _md = { "plan_name": "x2x_scan", "plan_args": { "detectors": list(map(repr, detectors)), "motor1": motor1.name, "motor2": motor2.name, "start": start, "stop": stop, "num": num, "per_step": repr(per_step), }, } _md.update(md or {}) return ( yield from relative_inner_product_scan( detectors, num, motor1, start, stop, motor2, start / 2, stop / 2, per_step=per_step, md=_md ) ) relative_list_scan = rel_list_scan # back-compat relative_scan = rel_scan # back-compat relative_log_scan = rel_log_scan # back-compat relative_adaptive_scan = rel_adaptive_scan # back-compat outer_product_scan = grid_scan # back-compat relative_outer_product_scan = rel_grid_scan # back-compat relative_spiral_fermat = rel_spiral_fermat # back-compat relative_spiral = rel_spiral # back-compat