Source code for koapy.backend.kiwoom_open_api_plus.core.KiwoomOpenApiPlusSignalConnector

from __future__ import annotations

import inspect
import threading

from typing import Any, Callable, Generic, List, Optional, TypeVar

try:
    from typing import ParamSpec
except ImportError:
    from typing_extensions import ParamSpec

from koapy.backend.kiwoom_open_api_plus.core.KiwoomOpenApiPlusEventHandlerSignature import (
    KiwoomOpenApiPlusEventHandlerSignature,
)
from koapy.compat.pyside2 import PYQT5, PYSIDE2, PythonQtError
from koapy.compat.pyside2.QtAxContainer import QAxWidget
from koapy.utils.logging.Logging import Logging

[docs]P = ParamSpec("P")
[docs]R = TypeVar("R")
[docs]class KiwoomOpenApiPlusSignalConnector(Logging, Generic[P, R]): def __init__(self, name: str): super().__init__() self._name = name self._lock = threading.RLock() self._slots: List[Callable[P, R]] = [] self._signature = KiwoomOpenApiPlusEventHandlerSignature.from_name(self._name) self._param_types = [ (p.annotation) for p in self._signature.parameters.values() ] if PYSIDE2: self._signal = self._signature.to_pyside2_event_signal() self.__name__ = self._name self.__signature__ = self._signature
[docs] def is_valid_slot(self, slot: Callable[..., Any]) -> bool: slot_signature = inspect.signature(slot) slot_types = [(p.annotation) for p in slot_signature.parameters.values()] condition = len(self._param_types) == len( slot_types ) # currently only check parameter length return condition
[docs] def connect_to(self, control: QAxWidget): if PYSIDE2: return control.connect(self._signal, self) elif PYQT5: return getattr(control, self._name).connect(self) else: raise PythonQtError("Unsupported Qt bindings")
[docs] def disconnect_from(self, control: QAxWidget): if PYSIDE2: return control.disconnect(self._signal, self) elif PYQT5: return getattr(control, self._name).disconnect(self) else: raise PythonQtError("Unsupported Qt bindings")
[docs] def connect(self, slot: Callable[..., Any]): if not self.is_valid_slot(slot): raise ValueError("Tried to connect invalid slot: %s" % slot) with self._lock: if slot not in self._slots: self._slots.append(slot)
[docs] def disconnect(self, slot: Optional[Callable[..., Any]] = None): with self._lock: if slot is None: self._slots.clear() elif slot in self._slots: self._slots.remove(slot) else: self.logger.warning("Tried to disconnect a slot that doesn't exist")
[docs] def call(self, *args: P.args, **kwargs: P.kwargs) -> R: # make a copy in order to prevent modification during iteration problem with self._lock: slots = list(self._slots) # use Thread with await/join for concurrency? for slot in slots: slot(*args, **kwargs)
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R: return self.call(*args, **kwargs)