Source code for koapy.backend.kiwoom_open_api_plus.utils.pyside2.QDialogHandler
import re
from typing import Dict, List, Optional, Sequence, Set
from koapy.compat.pyside2.QtCore import QObject, Signal
from koapy.compat.pywinauto import Desktop, WindowSpecification
from koapy.compat.pywinauto.findwindows import ElementNotFoundError
from koapy.compat.pywinauto.timings import TimeoutError as PywinautoTimeoutError
from koapy.utils.logging.pyside2.QThreadLogging import QThreadLogging
[docs]class QDialogHandler(QThreadLogging):
[docs] readyDialog = Signal(WindowSpecification)
[docs] notReadyDialog = Signal(WindowSpecification)
def __init__(
self,
specifications: Optional[Sequence[WindowSpecification]] = None,
parent: Optional[QObject] = None,
):
QThreadLogging.__init__(self, parent)
self._specifications: List[WindowSpecification] = (
specifications and list(self._specifications) or []
)
self._dialogs_not_ready: Set[WindowSpecification] = set(self._specifications)
self._dialogs_ready: Set[WindowSpecification] = set()
self._text_by_dialog: Dict[WindowSpecification, str] = {}
self._should_stop = False
self._timeout = 1 / len(self._dialogs_not_ready)
self._retry_interval = self._timeout / 5
assert self._timeout >= self._retry_interval * 2
@classmethod
[docs] def from_titles(cls, titles: Sequence[str], allow_magic_lookup: bool = False):
titles_escaped = [re.escape(title) for title in titles]
desktop = Desktop(allow_magic_lookup=allow_magic_lookup)
specifications = [
desktop.window(title_re=title_re) for title_re in titles_escaped
]
return cls(specifications)
[docs] def get_text_of_dialog(
self,
dialog: WindowSpecification,
default: Optional[str] = None,
):
if dialog in self._text_by_dialog:
dialog_text = self._text_by_dialog[dialog]
else:
try:
dialog_text = dialog.wrapper_object().window_text()
except ElementNotFoundError:
dialog_text = default
else:
self._text_by_dialog[dialog] = dialog_text
return dialog_text
[docs] def run(self):
while not self._should_stop:
for dialog in list(self._dialogs_not_ready):
try:
dialog.wait("ready", self._timeout, self._retry_interval)
except (PywinautoTimeoutError, ElementNotFoundError):
continue
else:
dialog_text = self.get_text_of_dialog(dialog)
self.logger.debug("Dialog found: %s", dialog_text)
self._dialogs_not_ready.remove(dialog)
self._dialogs_ready.add(dialog)
self.readyDialog.emit(dialog)
for dialog in list(self._dialogs_ready):
try:
dialog.wait_not("ready", self._timeout, self._retry_interval)
except (PywinautoTimeoutError, ElementNotFoundError):
continue
else:
dialog_text = self.get_text_of_dialog(dialog) or "(Unknown)"
self.logger.debug("Dialog closed: %s", dialog_text)
self._dialogs_ready.remove(dialog)
self._dialogs_not_ready.add(dialog)
self.notReadyDialog.emit(dialog)
[docs] def stop(self):
self._should_stop = True
[docs] def wait_for_termination(self, timeout=None):
if timeout is not None:
return self.wait(timeout)
else:
return self.wait()
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.stop()
self.wait_for_termination()