Source code for koapy.backend.kiwoom_open_api_plus.grpc.event.KiwoomOpenApiPlusConditionEventHandler

from koapy.backend.kiwoom_open_api_plus.core.KiwoomOpenApiPlusError import (
    KiwoomOpenApiPlusError,
)
from koapy.backend.kiwoom_open_api_plus.core.KiwoomOpenApiPlusTrInfo import (
    KiwoomOpenApiPlusTrInfo,
)
from koapy.backend.kiwoom_open_api_plus.grpc import KiwoomOpenApiPlusService_pb2
from koapy.backend.kiwoom_open_api_plus.grpc.event.KiwoomOpenApiPlusEventHandlerForGrpc import (
    KiwoomOpenApiPlusEventHandlerForGrpc,
)
from koapy.backend.kiwoom_open_api_plus.utils.list_conversion import string_to_list
from koapy.utils.logging.Logging import Logging


[docs]class KiwoomOpenApiPlusConditionEventHandler( KiwoomOpenApiPlusEventHandlerForGrpc, Logging ): def __init__(self, control, request, context, screen_manager): super().__init__(control, context) self._request = request self._screen_manager = screen_manager self._screen_no = request.screen_no self._condition_name = request.condition_name self._condition_index = request.condition_index self._search_type = request.search_type self._request_name = request.request_name or "관심종목정보요청" self._with_info = request.flags.with_info self._is_future_option = request.flags.is_future_option self._type_flag = 3 if self._is_future_option else 0 self._trcode = {0: "OPTKWFID", 3: "OPTFOFID"}[self._type_flag] self._trinfo = KiwoomOpenApiPlusTrInfo.get_trinfo_by_code(self._trcode) if self._trinfo is None: self.logger.error("Cannot find names for trcode %s", self._trinfo) self._single_names = self._trinfo.get_single_output_names() self._multi_names = self._trinfo.get_multi_output_names()
[docs] def on_enter(self): self.control.EnsureConditionLoaded() condition_names = self.control.GetConditionNameListAsList() assert (self._condition_index, self._condition_name) in condition_names self._screen_no = self._screen_manager.borrow_screen(self._screen_no) self.add_callback(self._screen_manager.return_screen, self._screen_no) self.add_callback(self.control.DisconnectRealData, self._screen_no) self.add_callback( self.control.SendConditionStop, self._screen_no, self._condition_name, self._condition_index, ) KiwoomOpenApiPlusError.try_or_raise_boolean( self.control.RateLimitedSendCondition.async_call( self._screen_no, self._condition_name, self._condition_index, self._search_type, ), "Failed to send condition", except_callback=self.observer.on_error, )
[docs] def OnReceiveTrCondition( self, scrnno, codelist, condition_name, condition_index, prevnext ): if (scrnno, condition_name, condition_index) == ( self._screen_no, self._condition_name, self._condition_index, ): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveTrCondition" # pylint: disable=no-member response.arguments.add().string_value = scrnno # pylint: disable=no-member response.arguments.add().string_value = ( codelist # pylint: disable=no-member ) response.arguments.add().string_value = ( condition_name # pylint: disable=no-member ) response.arguments.add().long_value = ( condition_index # pylint: disable=no-member ) response.arguments.add().long_value = prevnext # pylint: disable=no-member self.observer.on_next(response) # pylint: disable=no-member if self._with_info: if "^" in codelist: items = string_to_list(codelist, sep=";") items = [string_to_list(item, sep="^") for item in items] items = [tuple(item) for item in items] codes = [item[0] for item in items] else: codes = string_to_list(codelist, sep=";") KiwoomOpenApiPlusError.try_or_raise( self.control.RateLimitedCommKwRqData.async_call( codelist, 0, len(codes), self._type_flag, self._request_name, self._screen_no, ), except_callback=self.observer.on_error, ) should_continue = str(prevnext) not in ["", "0"] should_not_complete = ( self._search_type == 1 or self._with_info or should_continue ) should_complete = not should_not_complete if should_complete: self.observer.on_completed() return elif should_continue: try: raise KiwoomOpenApiPlusError("Should not reach here") self.control.RateLimitedSendCondition.async_call( self._screen_no, self._condition_name, self._condition_index, int(prevnext), ) # pylint: disable=unreachable except KiwoomOpenApiPlusError as e: self.observer.on_error(e) return
[docs] def OnReceiveRealCondition( self, code, condition_type, condition_name, condition_index ): if (condition_name, int(condition_index)) == ( self._condition_name, self._condition_index, ): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveRealCondition" # pylint: disable=no-member response.arguments.add().string_value = code # pylint: disable=no-member response.arguments.add().string_value = ( condition_type # pylint: disable=no-member ) response.arguments.add().string_value = ( condition_name # pylint: disable=no-member ) response.arguments.add().string_value = ( condition_index # pylint: disable=no-member ) self.observer.on_next(response) # pylint: disable=no-member if self._with_info: codelist = code codes = [code] KiwoomOpenApiPlusError.try_or_raise( self.control.RateLimitedCommKwRqData.async_call( codelist, 0, len(codes), self._type_flag, self._request_name, self._screen_no, ), except_callback=self.observer.on_error, )
[docs] def OnReceiveTrData( self, scrnno, rqname, trcode, recordname, prevnext, _datalength, _errorcode, _message, _splmmsg, ): if (scrnno, rqname) == (self._screen_no, self._request_name): response = KiwoomOpenApiPlusService_pb2.ListenResponse() response.name = "OnReceiveTrData" # pylint: disable=no-member response.arguments.add().string_value = scrnno # pylint: disable=no-member response.arguments.add().string_value = rqname # pylint: disable=no-member response.arguments.add().string_value = trcode # pylint: disable=no-member response.arguments.add().string_value = ( recordname # pylint: disable=no-member ) response.arguments.add().string_value = ( prevnext # pylint: disable=no-member ) should_continue = str(prevnext) not in ["", "0"] should_not_complete = self._search_type == 1 or should_continue should_complete = not should_not_complete repeat_cnt = self.control.GetRepeatCnt(trcode, recordname) assert trcode.upper() == self._trcode if repeat_cnt > 0: if len(self._multi_names) == 0: self.logger.warning( "Repeat count greater than 0, but no multi data names available, fallback to sigle data names" ) multi_names = self._multi_names self._multi_names = self._single_names self._single_name = multi_names if len(self._multi_names) > 0: rows = [ [ self.control.GetCommData( trcode, recordname, i, name ).strip() for name in self._multi_names ] for i in range(repeat_cnt) ] response.multi_data.names.extend( self._multi_names ) # pylint: disable=no-member for row in rows: response.multi_data.values.add().values.extend( row ) # pylint: disable=no-member if len(self._single_names) > 0: values = [ self.control.GetCommData(trcode, recordname, 0, name).strip() for name in self._single_names ] response.single_data.names.extend( self._single_names ) # pylint: disable=no-member response.single_data.values.extend(values) # pylint: disable=no-member self.observer.on_next(response) # pylint: disable=no-member if should_complete: self.observer.on_completed() return elif should_continue: try: raise KiwoomOpenApiPlusError("Should not reach here") KiwoomOpenApiPlusError.try_or_raise( self.control.RateLimitedCommKwRqData.async_call( self._codelist, int(prevnext), len(self._codes), 3 if self._is_future_option else 0, self._request_name, self._screen_no, ), except_callback=self.observer.on_error, ) # pylint: disable=unreachable except KiwoomOpenApiPlusError as e: self.observer.on_error(e) return