diff --git a/dingtalk_stream/stream.py b/dingtalk_stream/stream.py index 91dc243..8ecd23b 100644 --- a/dingtalk_stream/stream.py +++ b/dingtalk_stream/stream.py @@ -38,6 +38,7 @@ def __init__(self, credential: Credential, logger: logging.Logger = None): self._pre_started = False self._is_event_required = False self._access_token = {} + self._running_tasks = set() def register_all_event_handler(self, handler: EventHandler): handler.dingtalk_client = self @@ -59,37 +60,50 @@ def pre_start(self): async def start(self): self.pre_start() + self._running_tasks = set() while True: + keepalive_task = None try: connection = self.open_connection() if not connection: self.logger.error('open connection failed') - await asyncio.sleep(10) + await asyncio.sleep(5) continue self.logger.info('endpoint is %s', connection) uri = f'{connection["endpoint"]}?ticket={quote_plus(connection["ticket"])}' async with websockets.connect(uri) as websocket: self.websocket = websocket - asyncio.create_task(self.keepalive(websocket)) + keepalive_task = asyncio.create_task(self.keepalive(websocket)) async for raw_message in websocket: - json_message = json.loads(raw_message) - asyncio.create_task(self.background_task(json_message)) + try: + json_message = json.loads(raw_message) + except json.JSONDecodeError as e: + self.logger.error('invalid json message, error=%s, raw=%s', e, raw_message[:200]) + continue + task = asyncio.create_task(self.background_task(json_message)) + self._running_tasks.add(task) + task.add_done_callback(self._task_done_callback) except KeyboardInterrupt as e: break except (asyncio.exceptions.CancelledError, websockets.exceptions.ConnectionClosedError) as e: self.logger.error('[start] network exception, error=%s', e) - await asyncio.sleep(10) + await asyncio.sleep(3) continue except Exception as e: await asyncio.sleep(3) self.logger.exception('unknown exception', e) continue finally: - pass + if keepalive_task: + keepalive_task.cancel() + try: + await keepalive_task + except asyncio.CancelledError: + pass async def keepalive(self, ws, ping_interval=60): while True: @@ -100,12 +114,28 @@ async def keepalive(self, ws, ping_interval=60): break async def background_task(self, json_message): + message_id = json_message.get('headers', {}).get('messageId', 'unknown') try: route_result = await self.route_message(json_message) if route_result == DingTalkStreamClient.TAG_DISCONNECT: await self.websocket.close() except Exception as e: - self.logger.error(f"error processing message: {e}") + self.logger.error( + "error processing message, messageId=%s, type=%s, topic=%s, error=%s", + message_id, + json_message.get('type'), + json_message.get('headers', {}).get('topic'), + e, + exc_info=True, + ) + + def _task_done_callback(self, task): + self._running_tasks.discard(task) + if task.cancelled(): + return + error = task.exception() + if error: + self.logger.error('background task failed, error=%s', error, exc_info=error) async def route_message(self, json_message): result = ''