Source code for mxcubeweb.core.adapter.sample_changer_adapter

import json
import logging
from typing import ClassVar

import gevent
from markupsafe import escape
from mxcubecore import HardwareRepository as HWR
from mxcubecore import queue_entry
from mxcubecore.HardwareObjects.abstract.AbstractSampleChanger import (
    SampleChanger,
    SampleChangerState,
)

from mxcubeweb.core.adapter.adapter_base import AdapterBase
from mxcubeweb.core.models.adaptermodels import (
    SampleChangerCommandInputModel,
    SampleInputModel,
)
from mxcubeweb.core.models.configmodels import ResourceHandlerConfigModel

logger = logging.getLogger("MX3.HWR")

resource_handler_config = ResourceHandlerConfigModel(
    name="sample_changer",
    url_prefix="/mxcube/api/v0.1/sample_changer",
    attributes=[
        "loaded_sample",
        "get_contents",
        "get_value",
        "sync_with_crims",
    ],
    commands=[
        "select_location",
        "scan_location",
        "unmount_current",
        "mount_sample",
        "send_command",
    ],
)


[docs]class SampleChangerAdapter(AdapterBase): """Adapter for AbstractSampleChanger.""" SUPPORTED_TYPES: ClassVar[list[object]] = [SampleChanger] def __init__(self, ho, role, app): super().__init__(ho, role, app, resource_handler_config) self.app = app sc = HWR.beamline.sample_changer # Initialize hwobj signals sc.connect("stateChanged", self._sc_state_changed) sc.connect("isCollisionSafe", self._is_collision_safe) sc.connect("loadedSampleChanged", self._loaded_sample_changed) sc.connect("infoChanged", self._sample_state_update) if HWR.beamline.sample_changer_maintenance is not None: HWR.beamline.sample_changer_maintenance.connect( "globalStateChanged", self._sc_maintenance_update ) HWR.beamline.sample_changer_maintenance.connect( "gripperChanged", self._gripper_changed ) def _sc_state_changed(self, *args): new_state = args[0] state_str = SampleChangerState.STATE_DESC.get(new_state, "Unknown").upper() self.app.server.emit("sc_state", state_str, namespace="/hwr") def _is_collision_safe(self, *args): new_state = args[0] # we are only interested when it becames true if new_state: msg = { "signal": "isCollisionSafe", "message": "Sample moved to safe area", } self.app.server.emit("sc", msg, namespace="/hwr") def _loaded_sample_changed(self, sample): if hasattr(sample, "get_address"): address = sample.get_address() barcode = sample.get_id() else: address = "" barcode = "" logging.getLogger("HWR").info("Loaded sample changed: %s", address) try: sample_id = address if HWR.beamline.sample_changer.has_loaded_sample(): self.app.lims.set_current_sample(sample_id) else: sample = HWR.beamline.sample_changer.get_loaded_sample() address = sample.get_address() if sample else None self.app.lims.set_current_sample(address) self.app.server.emit( "loaded_sample_changed", {"address": address, "barcode": barcode}, namespace="/hwr", ) self._sc_load_ready(address) except Exception: logging.getLogger("HWR").exception("Error setting loaded sample") def _sc_load_ready(self, location): msg = { "signal": "loadReady", "location": location, "message": "Sample changer, loaded sample", } self.app.server.emit("sc", msg, namespace="/hwr") def _sc_unload(self, location): msg = { "signal": "operatingSampleChanger", "location": location, "message": "Please wait, unloading sample", } self.app.server.emit("sc", msg, namespace="/hwr") def _sample_state_update(self, sample=None): if sample: sample_data = { "sampleID": sample.get_address(), "location": sample.get_address(), "state": sample.state, } self.app.server.emit( "sc_sample_state_update", {"sample": sample_data}, namespace="/hwr", ) def _sc_maintenance_update(self, *args): if len(args) == 3: # Restore the `global_state` parameter removed in this commit 337efd37 global_state, cmd_state, message = args else: # Be backward compatible with HW objects which are emitting signal with # 2 arguments global_state = {} cmd_state, message = args try: self.app.server.emit( "sc_maintenance_update", { "global_state": global_state, "commands_state": json.dumps(cmd_state), "message": message, }, namespace="/hwr", ) except Exception: logging.getLogger("HWR").exception("error sending message") def _gripper_changed(self): self.app.queue.queue_clear() self.app.server.emit( "queue", {"Signal": "update", "message": "all"}, namespace="/hwr" ) def _mount_sample(self, sample: SampleInputModel): # Clear sample view from shapes beore mounting the new sample, to avoid # unnecessary updates to shapes while the sample is being un-mounted. HWR.beamline.sample_view.clear_all() sc = HWR.beamline.sample_changer res = False try: msg = { "signal": "operatingSampleChanger", "location": sample.location, "message": "Please wait, loading sample", } self.app.server.emit("sc", msg, namespace="/hwr") sid = self.app.lims.get_current_sample().get("sampleID", False) current_queue = self.app.queue.queue_to_dict() if sample.location != "Manual": msg = f"Mounting sample: {sample.location} ({sample.sample_name})" logging.getLogger("user_level_log").info(msg) if ( not sc.get_loaded_sample() or sc.get_loaded_sample().get_address() != sample.location ): res = sc.load(sample.sample_id, wait=True) if ( res and HWR.beamline.queue_manager.centring_method == queue_entry.CENTRING_METHOD.LOOP and not HWR.beamline.diffractometer.in_plate_mode and not self.app.harvester.mount_from_harvester() ): HWR.beamline.sample_view.reject_centring() msg = "Starting autoloop centring ..." logging.getLogger("MX3.HWR").info(msg) HWR.beamline.sample_view.start_auto_centring() elif HWR.beamline.diffractometer.in_plate_mode: msg = "Starting autoloop Focusing ..." logging.getLogger("MX3.HWR").info(msg) sc.move_to_crystal_position(None) else: msg = f"Mounting sample: {sample.sample_name}" logging.getLogger("user_level_log").info(msg) self.app.lims.set_current_sample(sample.sample_id) res = True except Exception as ex: logging.getLogger("MX3.HWR").exception("[SC] sample could not be mounted") raise RuntimeError(str(ex)) from ex else: if sid and current_queue.get(sid, False): # We remove the current sample from the queue, if we are moving # from one sample to another and the current sample is in the queue node_id = current_queue[sid]["queueID"] self.app.queue.set_enabled_entry(node_id, False) # noqa: FBT003 self.app.queue.queue_toggle_sample(self.app.queue.get_entry(node_id)[1]) finally: self._sc_load_ready(sample.location) return res def _unmount_sample(self, location): try: self._sc_unload(location) if location != "Manual": HWR.beamline.sample_changer.unload(location, wait=False) else: self.app.lims.set_current_sample(None) self._sc_load_ready(location) msg = f"[SC] unmounted {location}" logging.getLogger("MX3.HWR").info(msg) except Exception: msg = "[SC] sample could not be mounted" logging.getLogger("MX3.HWR").exception(msg) raise else: HWR.beamline.queue_model.mounted_sample = "" HWR.beamline.sample_view.clear_all() def get_value(self) -> dict: if HWR.beamline.sample_changer_maintenance is not None: global_state, cmdstate, msg = self.get_global_state() cmds = HWR.beamline.sample_changer_maintenance.get_cmd_info() else: global_state = {} cmdstate = "SC maintenance controller not defined" cmds = [] msg = "" contents = self._ho.get_contents_as_dict() address, barcode = self.get_loaded_sample() loaded_sample = {"address": address, "barcode": barcode} try: state = HWR.beamline.sample_changer.get_status().upper() except Exception: logging.getLogger("MX3.HWR").exception("") state = "OFFLINE" return { "state": state, "loaded_sample": loaded_sample, "contents": contents, "global_state": { "global_state": global_state, "commands_state": cmdstate, }, "cmds": {"cmds": cmds}, "msg": msg, "plate_mode": HWR.beamline.diffractometer.in_plate_mode, }
[docs] def state(self): return "READY" if self._ho.is_ready() else "BUSY"
def loaded_sample(self) -> dict: if self._ho.has_loaded_sample(): address, barcode = self._ho.get_loaded_sample() else: address, barcode = "", "" return {"address": address, "barcode": barcode} def get_contents(self): return self._ho.get_contents_as_dict() def get_loaded_sample(self): try: sample = HWR.beamline.sample_changer.get_loaded_sample() except Exception: logging.getLogger("MX3.HWR").exception("") sample = None if sample is not None: address = sample.get_address() barcode = sample.get_id() else: address = "" barcode = "" return address, barcode def get_global_state(self): try: return HWR.beamline.sample_changer_maintenance.get_global_state() except Exception: logging.getLogger("MX3.HWR").exception("Could not get sc global state") return ( {}, "SC maintenance controller doesn't response", "Can't retrieve the global state", ) def select_location(self, loc: str): self._ho.select(loc) return self._ho.get_contents_as_dict() def scan_location(self, loc: str): loc = None if loc == "" else loc self._ho.scan(loc, recursive=True) return self._ho.get_contents_as_dict() def mount_sample(self, sample: SampleInputModel, wait=True): # noqa: FBT002 if wait: self._mount_sample(sample) else: gevent.spawn(self._mount_sample, sample) return HWR.beamline.sample_changer.get_contents_as_dict() def unmount_current(self): sc_sample = HWR.beamline.sample_changer.get_loaded_sample() if sc_sample: location = sc_sample.get_address() self._unmount_sample(location) else: self._unmount_sample("Manual") return HWR.beamline.sample_changer.get_contents_as_dict() def send_command(self, command: SampleChangerCommandInputModel): try: return { "response": HWR.beamline.sample_changer_maintenance.send_command( command.cmd, command.arguments ) } except Exception as _ex: logger.exception("SC cannot execute command %s", command.cmd) msg = f"Cannot execute command {escape(command.cmd)}" raise RuntimeError(msg) from _ex
[docs] def sync_with_crims(self): """Synchronize with Crims. To be used mostly when diffractometer is in plate mode. This returns a list of crystal dicts available in Crims that have been harvested. With this, the user can visualize more easily where the crystals are in the plate GUI. """ xtal_list = [] try: processing_plan = HWR.beamline.sample_changer.sync_with_crims() for x in processing_plan.plate.xtal_list: response = { "crystal_uuid": x.crystal_uuid, "row": x.row, "column": x.column, "shelf": x.shelf, "offset_x": x.offset_x, "offset_y": x.offset_y, "image_url": x.image_url, "image_date": x.image_date, "sample": x.sample, } xtal_list.append(response) except Exception: logging.getLogger("MX3.HWR").exception("Could not get crystal List") return {"xtal_list": xtal_list}