Source code for mxcubeweb.core.adapter.adapter_base

import contextlib
import inspect
import logging
import traceback
import typing
from typing import Any, ClassVar

import gevent
from pydantic import (
    Field,
    ValidationError,
    create_model,
)

from mxcubeweb.core.models.adaptermodels import (
    HOActuatorModel,
    HOModel,
)
from mxcubeweb.core.models.configmodels import ResourceHandlerConfigModel
from mxcubeweb.core.server.resource_handler import (
    ResourceHandlerFactory,
)

default_resource_handler_config = ResourceHandlerConfigModel(
    commands=[
        "set_value",
    ],
    attributes=[
        "data",
        "get_value",
    ],
)


[docs]class AdapterBase: """Hardware Object Adapter Base class.""" # List of supported HO base classes (or callable for more advanced matching) SUPPORTED_TYPES: ClassVar[list[object]] = [] # Dictionary of supported types and their adapters SUPPORTED_TYPES_TO_ADAPTERS = {} ATTRIBUTES = [] METHODS = [] ADAPTER_DICT = {} def __init__(self, ho, role, app, resource_handler_config=None): # noqa: D417 """Initialize. Args: ho (object): Hardware object to mediate for. (str): The name of the object """ self.app = app self._ho = ho self._name = role self._available = True self._read_only = False self._type = type(self).__name__.replace("Adapter", "").upper() self._unique = True self._msg = "" cls_name = self.__class__.__name__.lower() if resource_handler_config: cls_name = resource_handler_config.name or cls_name if cls_name not in self.ADAPTER_DICT: self.ADAPTER_DICT[cls_name] = {} if ho is not None: self.ADAPTER_DICT[cls_name][ho.name] = self if resource_handler_config: _name = resource_handler_config.name or cls_name ResourceHandlerFactory.create_or_get( name=_name, url_prefix="/mxcube/api/v0.1/hwobj/" + (resource_handler_config.name or self._type.lower()), handler_dict=self.ADAPTER_DICT[cls_name], app=self.app, exports=resource_handler_config.exports, commands=resource_handler_config.commands, attributes=resource_handler_config.attributes, ) @classmethod def can_adapt(cls, ho): return any(isinstance(ho, t) for t in cls.SUPPORTED_TYPES) def __init_subclass__(cls, **kwargs): for ho_class in cls.SUPPORTED_TYPES: if ho_class in AdapterBase.SUPPORTED_TYPES_TO_ADAPTERS: error_msg = ("Adapter for class %s already exists", ho_class.__name__) raise ValueError(error_msg) AdapterBase.SUPPORTED_TYPES_TO_ADAPTERS[ho_class] = cls super().__init_subclass__(**kwargs) @classmethod def get_resource_handler(cls): return AdapterBase.RESOURCE_HANDLER_DICT.get(cls.__name__, None) def get_adapter_id(self, ho=None): ho = ho if ho else self._ho return self.app.mxcubecore._get_adapter_id(ho) def _add_adapter(self, attr_name, ho, adapter_cls): _id = f"{self.get_adapter_id()}.{attr_name}" adapter_instance = adapter_cls(ho, _id, self.app) self.app.mxcubecore._add_adapter(_id, adapter_cls, ho, adapter_instance) setattr(self, attr_name, adapter_instance) def execute_command(self, cmd_name, args): try: self._pydantic_model_for_command(cmd_name).validate(args) except ValidationError: logging.getLogger("MX3.HWR").exception( f"Error when validating input {args} for command {cmd_name}" ) task = gevent.spawn(self._execute_command, cmd_name, args) task.call_args = {"cmd_name": cmd_name, "value": args} task.link_value(self._command_success) def _execute_command(self, cmd_name, args): _cmd = getattr(self, cmd_name, None) logging.getLogger("MX3.HWR").info( f"Calling {self._name}.{cmd_name} with {args}" ) try: if _cmd: return _cmd(**args) return self._ho.execute_exported_command(cmd_name, args) except Exception as ex: self._command_exception(cmd_name, ex) logging.getLogger("MX3.HWR").exception("") def _command_success(self, t): value = t.value cmd_name = t.call_args["cmd_name"] attr = getattr(self._ho, cmd_name) model = self._model_from_typehint(attr) try: model["return"].validate({"return": value}) except ValidationError: attr_name = t.call_args["cmd_name"] logging.getLogger("MX3.HWR").exception( f"Return value of {self._name}.{attr_name} is of wrong type" ) else: logging.getLogger("MX3.HWR").info( f"{self._name}.{cmd_name} returned {value}" ) if value: self._msg = value self.app.server.emit( "hardware_object_command_return", {"cmd_name": cmd_name, "value": value}, namespace="/hwr", ) self.emit_ho_changed(self.state()) def _command_exception(self, cmd_name, ex): self._msg = traceback.format_exc() self.app.server.emit( "hardware_object_command_error", {"cmd_name": cmd_name, "value": str(ex)}, namespace="/hwr", ) self.emit_ho_changed(self.state()) @property def adapter_type(self): """Adapter type. Returns: (str): The data type of the value """ return self._type @property def ho(self): """Underlaying HardwareObject. Returns: (object): HardwareObject """ return self._ho # Abstract method
[docs] def state(self): """Retrieve the state of the underlying hardware object as a JavaScript string. Retrieves the state of the underlying hardware object and converts it to a str that can be used by the javascript front end. Returns: (str): The state """ return self._ho.get_state().name
# Abstract method
[docs] def msg(self): """Return a message describing the current state. Should be used to communicate details of the state to the user. Returns: (str): The message string. """ return self._msg
[docs] def read_only(self): """Return true if the hardware object is read only. Return true if the hardware object is read only and ``set_value`` can not be called. Returns: (bool): True if read enly. """ return self._read_only
[docs] def available(self): """Check if the hardware object is considered to be available/online/enabled. Returns: (bool): True if available. """ return self._available
def _model_from_typehint(self, attr): input_dict = {} output_dict = {} for _n, _t in typing.get_type_hints(attr).items(): if _n != "return": input_dict[_n] = (_t, Field(alias=_n)) else: if not inspect.isclass(_t): _t = _t.__class__ output_dict[_n] = (_t, Field(alias=_n)) return { "args": create_model(attr.__name__, **input_dict), "return": create_model(attr.__name__, **output_dict), "signature": list(input_dict.keys()), } def _pydantic_model_for_command(self, cmd_name): if cmd_name in self.METHODS: return self._model_from_typehint(getattr(self, cmd_name, None))["args"] return self._ho.pydantic_model[cmd_name] def _exported_methods(self): exported_methods = {} # Get exported attributes from underlaying HardwareObject # and only set display True for those methods that have # been explicitly configured to be exported. The method also # needs to be defined as a command in the ResourceHandlerConfigModel # to be exported. configured_exported = self._ho.exported_attributes.keys() rh = ResourceHandlerFactory.get_handler(self.__class__.__name__.lower()) if rh: for export in rh.commands: attr = getattr(self, export["attr"], None) if inspect.ismethod(attr): model = self._model_from_typehint(attr) exported_methods[export["attr"]] = { "signature": model["signature"], "schema": model["args"].schema_json(), "display": export["attr"] in configured_exported, } return exported_methods def commands(self): return self._exported_methods() def attributes(self): _attributes = {} rh = ResourceHandlerFactory.get_handler(self.__class__.__name__.lower()) if rh: for export in rh.attributes: if export["attr"] in ["data", "get_value"]: continue attr = getattr(self, export["attr"], None) if attr: model = self._model_from_typehint(attr) value = attr() try: model["return"].validate({"return": value}) except ValidationError: logging.getLogger("MX3.HWR").exception( "Return value of" f" {self._name}.{export['attr']} is of wrong" " type" ) _attributes[export["attr"]] = {} else: _attributes[export["attr"]] = attr() return _attributes def emit_ho_attribute_changed( self, attribute: str, value: Any, operation: str = "SET" ): self.app.server.emit( "hardware_object_attribute_changed", { "name": self._name, "attribute": attribute, "value": value, "operation": operation.upper(), }, namespace="/hwr", ) def emit_ho_value_changed(self, value: Any): self.app.server.emit( "hardware_object_value_changed", {"name": self._name, "value": value}, namespace="/hwr", )
[docs] def emit_ho_changed(self, state, **kwargs): """Signal handler to send entire object to the client via socketIO.""" data = self.data().dict() if hasattr(state, "name"): data["state"] = state.name else: logging.getLogger("MX3.HWR").info( f"emit_ho_changed with {state} for {self._ho.name}" ) self.app.server.emit("hardware_object_changed", data, namespace="/hwr")
[docs] def state_change(self, state, **kwargs): """Signal handler to send the state to the client via socketIO.""" self.emit_ho_changed(state)
def _dict_repr(self): """Dictionary representation of the hardware object. Returns: (dict): The dictionary. """ try: data = { "name": self._name, "state": self.state(), "msg": self.msg(), "type": self._type, "available": self.available(), "readonly": self.read_only(), "commands": self.commands(), "attributes": self.attributes(), } except Exception as ex: # Return a default representation if there is a problem retrieving # any of the attributes self._available = False data = { "name": self._name, "state": "UNKNOWN", "msg": "Exception: %s" % str(ex), "type": "FLOAT", "available": self.available(), "readonly": False, "commands": {}, "attributes": {}, } logging.getLogger("MX3.HWR").exception( f"Failed to get dictionary representation of {self._name}" ) return data def data(self) -> HOModel: return HOModel(**self._dict_repr())
[docs]class ActuatorAdapterBase(AdapterBase): def __init__(self, ho, role, app, resource_handler_config=None): # noqa: D417 """Initialize. Args: (object): Hardware object to mediate for. (str): The name of the object. """ super().__init__(ho, role, app, resource_handler_config) self._unique = False with contextlib.suppress(AttributeError): self._read_only = ho.read_only # Don't limit rate this method with utils.LimitRate, all subclasses # will share this method thus all methods will be effected if limit rated. # Rather LimitRate the function calling this one.
[docs] def value_change(self, *args, **kwargs): """Signal handler to send values to the client via socketIO.""" self.emit_ho_value_changed(args[0])
# Abstract method
[docs] def set_value(self, value) -> str: """Sets a value on underlying hardware object. Args: value(float): Value to be set. Returns: (str): The actual value set as str. Raises: ValueError: When conversion or treatment of value fails. StopIteration: When a value change was interrupted (abort/cancel). Emits: hardware_object_value_changed with values over websocket """
# Abstract method
[docs] def get_value(self): """Retrieve value from underlying hardware object. Returns: (str): The value. Raises: ValueError: When value for any reason can't be retrieved. """ return self._get_value().value
# Abstract method
[docs] def stop(self): """Stop an action/movement."""
[docs] def limits(self): """Read the energy limits. Returns: (tuple): Two floats (min, max). Raises: ValueError: When limits for any reason can't be retrieved. """ try: # Limits are None when not configured, convert them to -1, -1 # as we are returning floats return (0, 0) if None in self._ho.get_limits() else self._ho.get_limits() except (AttributeError, TypeError): msg = "Could not get limits" raise ValueError(msg)
def _dict_repr(self): """Dictionary representation of the hardware object. Returns: (dict): The dictionary. """ data = super()._dict_repr() try: data.update({"value": self.get_value().value, "limits": self.limits()}) except Exception as ex: logging.getLogger("MX3.HWR").exception( f"Could not get dictionary representation of {self._ho.name}" ) logging.getLogger("MX3.HWR").error( f"Check status of {self._ho.name}, object is" " offline, in fault or returns unexpected value !" ) self._available = False data.update( { "value": 0, "limits": (0, 0), "type": "FLOAT", "msg": "Exception %s" % str(ex), } ) return data def data(self) -> HOActuatorModel: return HOActuatorModel(**self._dict_repr())