From fe49b255b3d1a77aec001dfd809863df1a35f73f Mon Sep 17 00:00:00 2001 From: Soulter <905617992@qq.com> Date: Mon, 9 Feb 2026 15:10:49 +0800 Subject: [PATCH] feat: enhance Dingtalk adapter with active push message and image, video, audio message type --- .../sources/dingtalk/dingtalk_adapter.py | 489 +++++++++++++++--- .../sources/dingtalk/dingtalk_event.py | 139 +---- astrbot/core/utils/media_utils.py | 90 ++++ 3 files changed, 525 insertions(+), 193 deletions(-) diff --git a/astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py b/astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py index 41f4aa611..fd0be3f1c 100644 --- a/astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py +++ b/astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py @@ -1,8 +1,9 @@ import asyncio -import os +import json import threading import uuid -from typing import NoReturn, cast +from pathlib import Path +from typing import Literal, NoReturn, cast import aiohttp import dingtalk_stream @@ -10,7 +11,7 @@ from astrbot import logger from astrbot.api.event import MessageChain -from astrbot.api.message_components import At, Image, Plain +from astrbot.api.message_components import At, Image, Plain, Record, Video from astrbot.api.platform import ( AstrBotMessage, MessageMember, @@ -18,9 +19,16 @@ Platform, PlatformMetadata, ) +from astrbot.core import sp from astrbot.core.platform.astr_message_event import MessageSesion from astrbot.core.utils.astrbot_path import get_astrbot_data_path from astrbot.core.utils.io import download_file +from astrbot.core.utils.media_utils import ( + convert_audio_format, + convert_video_format, + extract_video_cover, + get_media_duration, +) from ...register import register_platform_adapter from .dingtalk_event import DingtalkMessageEvent @@ -75,8 +83,6 @@ async def process(self, message: dingtalk_stream.CallbackMessage): ) self.client_ = client # 用于 websockets 的 client self._shutdown_event: threading.Event | None = None - self.card_template_id = platform_config.get("card_template_id") - self.card_instance_id_dict = {} def _id_to_sid(self, dingtalk_id: str | None) -> str: if not dingtalk_id: @@ -91,7 +97,44 @@ async def send_by_session( session: MessageSesion, message_chain: MessageChain, ) -> None: - raise NotImplementedError("钉钉机器人适配器不支持 send_by_session") + robot_code = self.client_id + + if session.message_type == MessageType.GROUP_MESSAGE: + open_conversation_id = session.session_id + await self.send_message_chain_to_group( + open_conversation_id=open_conversation_id, + robot_code=robot_code, + message_chain=message_chain, + ) + else: + staff_id = await self._get_sender_staff_id(session) + if not staff_id: + logger.warning( + "钉钉私聊会话缺少 staff_id 映射,回退使用 session_id 作为 userId 发送", + ) + staff_id = session.session_id + await self.send_message_chain_to_user( + staff_id=staff_id, + robot_code=robot_code, + message_chain=message_chain, + ) + + await super().send_by_session(session, message_chain) + + async def send_with_session( + self, + session: MessageSesion, + message_chain: MessageChain, + ) -> None: + await self.send_by_session(session, message_chain) + + async def send_with_sesison( + self, + session: MessageSesion, + message_chain: MessageChain, + ) -> None: + # backward typo compatibility + await self.send_by_session(session, message_chain) def meta(self) -> PlatformMetadata: return PlatformMetadata( @@ -99,67 +142,9 @@ def meta(self) -> PlatformMetadata: description="钉钉机器人官方 API 适配器", id=cast(str, self.config.get("id")), support_streaming_message=True, - support_proactive_message=False, + support_proactive_message=True, ) - async def create_message_card( - self, message_id: str, incoming_message: dingtalk_stream.ChatbotMessage - ) -> bool | None: - if not self.card_template_id: - return False - - card_instance = dingtalk_stream.AICardReplier(self.client_, incoming_message) - card_data = {"content": ""} # Initial content empty - - try: - card_instance_id = await card_instance.async_create_and_deliver_card( - self.card_template_id, - card_data, - ) - self.card_instance_id_dict[message_id] = (card_instance, card_instance_id) - return True - except Exception as e: - logger.error(f"创建钉钉卡片失败: {e}") - return False - - async def send_card_message( - self, message_id: str, content: str, is_final: bool - ) -> None: - if message_id not in self.card_instance_id_dict: - return - - card_instance, card_instance_id = self.card_instance_id_dict[message_id] - content_key = "content" - - try: - # 钉钉卡片流式更新 - - await card_instance.async_streaming( - card_instance_id, - content_key=content_key, - content_value=content, - append=False, - finished=is_final, - failed=False, - ) - except Exception as e: - logger.error(f"发送钉钉卡片消息失败: {e}") - # Try to report failure - try: - await card_instance.async_streaming( - card_instance_id, - content_key=content_key, - content_value=content, # Keep existing content - append=False, - finished=True, - failed=True, - ) - except Exception: - pass - - if is_final: - self.card_instance_id_dict.pop(message_id, None) - async def convert_msg( self, message: dingtalk_stream.ChatbotMessage, @@ -217,8 +202,35 @@ async def convert_msg( case "audio": pass + await self._remember_sender_binding(message, abm) return abm # 别忘了返回转换后的消息对象 + async def _remember_sender_binding( + self, + message: dingtalk_stream.ChatbotMessage, + abm: AstrBotMessage, + ) -> None: + try: + if abm.type == MessageType.FRIEND_MESSAGE: + sender_id = abm.sender.user_id + sender_staff_id = cast(str, message.sender_staff_id or "") + if sender_staff_id: + umo = str( + MessageSesion( + platform_name=self.meta().id, + message_type=abm.type, + session_id=sender_id, + ) + ) + await sp.put_async( + "global", + umo, + "dingtalk_staffid", + sender_staff_id, + ) + except Exception as e: + logger.warning(f"保存钉钉会话映射失败: {e}") + async def download_ding_file( self, download_code: str, @@ -241,8 +253,9 @@ async def download_ding_file( "downloadCode": download_code, "robotCode": robot_code, } - temp_dir = os.path.join(get_astrbot_data_path(), "temp") - f_path = os.path.join(temp_dir, f"dingtalk_file_{uuid.uuid4()}.{ext}") + temp_dir = Path(get_astrbot_data_path()) / "temp" + temp_dir.mkdir(parents=True, exist_ok=True) + f_path = temp_dir / f"dingtalk_file_{uuid.uuid4()}.{ext}" async with ( aiohttp.ClientSession() as session, session.post( @@ -258,14 +271,21 @@ async def download_ding_file( return "" resp_data = await resp.json() download_url = resp_data["data"]["downloadUrl"] - await download_file(download_url, f_path) - return f_path + await download_file(download_url, str(f_path)) + return str(f_path) async def get_access_token(self) -> str: - payload = { - "appKey": self.client_id, - "appSecret": self.client_secret, - } + try: + access_token = await asyncio.get_event_loop().run_in_executor( + None, + self.client_.get_access_token, + ) + if access_token: + return access_token + except Exception as e: + logger.warning(f"通过 dingtalk_stream 获取 access_token 失败: {e}") + + payload = {"appKey": self.client_id, "appSecret": self.client_secret} async with aiohttp.ClientSession() as session: async with session.post( "https://api.dingtalk.com/v1.0/oauth2/accessToken", @@ -276,7 +296,328 @@ async def get_access_token(self) -> str: f"获取钉钉机器人 access_token 失败: {resp.status}, {await resp.text()}", ) return "" - return (await resp.json())["data"]["accessToken"] + data = await resp.json() + return cast(str, data.get("data", {}).get("accessToken", "")) + + async def _get_sender_staff_id(self, session: MessageSesion) -> str: + try: + staff_id = await sp.get_async( + "global", + str(session), + "dingtalk_staffid", + "", + ) + return cast(str, staff_id or "") + except Exception as e: + logger.warning(f"读取钉钉 staff_id 映射失败: {e}") + return "" + + async def _send_group_message( + self, + open_conversation_id: str, + robot_code: str, + msg_key: str, + msg_param: dict, + ) -> None: + access_token = await self.get_access_token() + if not access_token: + logger.error("钉钉群消息发送失败: access_token 为空") + return + + payload = { + "msgKey": msg_key, + "msgParam": json.dumps(msg_param, ensure_ascii=False), + "openConversationId": open_conversation_id, + "robotCode": robot_code, + } + headers = { + "Content-Type": "application/json", + "x-acs-dingtalk-access-token": access_token, + } + async with aiohttp.ClientSession() as session: + async with session.post( + "https://api.dingtalk.com/v1.0/robot/groupMessages/send", + headers=headers, + json=payload, + ) as resp: + if resp.status != 200: + logger.error( + f"钉钉群消息发送失败: {resp.status}, {await resp.text()}", + ) + + async def _send_private_message( + self, + staff_id: str, + robot_code: str, + msg_key: str, + msg_param: dict, + ) -> None: + access_token = await self.get_access_token() + if not access_token: + logger.error("钉钉私聊消息发送失败: access_token 为空") + return + + payload = { + "robotCode": robot_code, + "userIds": [staff_id], + "msgKey": msg_key, + "msgParam": json.dumps(msg_param, ensure_ascii=False), + } + headers = { + "Content-Type": "application/json", + "x-acs-dingtalk-access-token": access_token, + } + async with aiohttp.ClientSession() as session: + async with session.post( + "https://api.dingtalk.com/v1.0/robot/oToMessages/batchSend", + headers=headers, + json=payload, + ) as resp: + if resp.status != 200: + logger.error( + f"钉钉私聊消息发送失败: {resp.status}, {await resp.text()}", + ) + + def _safe_remove_file(self, file_path: str | None) -> None: + if not file_path: + return + try: + p = Path(file_path) + if p.exists() and p.is_file(): + p.unlink() + except Exception as e: + logger.warning(f"清理临时文件失败: {file_path}, {e}") + + async def _prepare_voice_for_dingtalk(self, input_path: str) -> tuple[str, bool]: + """优先转换为 OGG(Opus),不可用时回退 AMR。""" + lower_path = input_path.lower() + if lower_path.endswith((".amr", ".ogg")): + return input_path, False + + try: + converted = await convert_audio_format(input_path, "ogg") + return converted, converted != input_path + except Exception as e: + logger.warning(f"钉钉语音转 OGG 失败,回退 AMR: {e}") + converted = await convert_audio_format(input_path, "amr") + return converted, converted != input_path + + async def upload_media(self, file_path: str, media_type: str) -> str: + media_file_path = Path(file_path) + access_token = await self.get_access_token() + if not access_token: + logger.error("钉钉媒体上传失败: access_token 为空") + return "" + + form = aiohttp.FormData() + form.add_field( + "media", + media_file_path.read_bytes(), + filename=media_file_path.name, + content_type="application/octet-stream", + ) + async with aiohttp.ClientSession() as session: + async with session.post( + f"https://oapi.dingtalk.com/media/upload?access_token={access_token}&type={media_type}", + data=form, + ) as resp: + if resp.status != 200: + logger.error( + f"钉钉媒体上传失败: {resp.status}, {await resp.text()}" + ) + return "" + data = await resp.json() + if data.get("errcode") != 0: + logger.error(f"钉钉媒体上传失败: {data}") + return "" + return cast(str, data.get("media_id", "")) + + async def upload_image(self, image: Image) -> str: + image_file_path = await image.convert_to_file_path() + return await self.upload_media(image_file_path, "image") + + async def _send_message_chain( + self, + target_type: Literal["group", "user"], + target_id: str, + robot_code: str, + message_chain: MessageChain, + at_str: str = "", + ) -> None: + async def send_message(msg_key: str, msg_param: dict) -> None: + if target_type == "group": + await self._send_group_message( + open_conversation_id=target_id, + robot_code=robot_code, + msg_key=msg_key, + msg_param=msg_param, + ) + else: + await self._send_private_message( + staff_id=target_id, + robot_code=robot_code, + msg_key=msg_key, + msg_param=msg_param, + ) + + for segment in message_chain.chain: + if isinstance(segment, Plain): + text = segment.text.strip() + if not text and not at_str: + continue + await send_message( + msg_key="sampleMarkdown", + msg_param={ + "title": "AstrBot", + "text": f"{at_str} {text}".strip(), + }, + ) + elif isinstance(segment, Image): + photo_url = segment.file or segment.url or "" + if photo_url.startswith(("http://", "https://")): + pass + else: + photo_url = await self.upload_image(segment) + if not photo_url: + continue + await send_message( + msg_key="sampleImageMsg", + msg_param={"photoURL": photo_url}, + ) + elif isinstance(segment, Record): + converted_audio = None + try: + audio_path = await segment.convert_to_file_path() + ( + audio_path, + converted_audio, + ) = await self._prepare_voice_for_dingtalk(audio_path) + media_id = await self.upload_media(audio_path, "voice") + if not media_id: + continue + duration_ms = await get_media_duration(audio_path) + await send_message( + msg_key="sampleAudio", + msg_param={ + "mediaId": media_id, + "duration": str(duration_ms or 1000), + }, + ) + except Exception as e: + logger.warning(f"钉钉语音发送失败: {e}") + continue + finally: + if converted_audio: + self._safe_remove_file(audio_path) + elif isinstance(segment, Video): + converted_video = False + cover_path = None + try: + source_video_path = await segment.convert_to_file_path() + video_path = source_video_path + if not video_path.lower().endswith(".mp4"): + video_path = await convert_video_format(video_path, "mp4") + converted_video = video_path != source_video_path + cover_path = await extract_video_cover(video_path) + video_media_id = await self.upload_media(video_path, "file") + pic_media_id = await self.upload_media(cover_path, "image") + if not video_media_id or not pic_media_id: + continue + duration_ms = await get_media_duration(video_path) + duration_sec = max(1, int((duration_ms or 1000) / 1000)) + await send_message( + msg_key="sampleVideo", + msg_param={ + "duration": str(duration_sec), + "videoMediaId": video_media_id, + "videoType": "mp4", + "picMediaId": pic_media_id, + }, + ) + except Exception as e: + logger.warning(f"钉钉视频发送失败: {e}") + continue + finally: + self._safe_remove_file(cover_path) + if converted_video: + self._safe_remove_file(video_path) + + async def send_message_chain_to_group( + self, + open_conversation_id: str, + robot_code: str, + message_chain: MessageChain, + at_str: str = "", + ) -> None: + await self._send_message_chain( + target_type="group", + target_id=open_conversation_id, + robot_code=robot_code, + message_chain=message_chain, + at_str=at_str, + ) + + async def send_message_chain_to_user( + self, + staff_id: str, + robot_code: str, + message_chain: MessageChain, + at_str: str = "", + ) -> None: + await self._send_message_chain( + target_type="user", + target_id=staff_id, + robot_code=robot_code, + message_chain=message_chain, + at_str=at_str, + ) + + async def send_message_chain_with_incoming( + self, + incoming_message: dingtalk_stream.ChatbotMessage, + message_chain: MessageChain, + ) -> None: + robot_code = self.client_id + + # at_list: list[str] = [] + sender_id = cast(str, incoming_message.sender_id or "") + sender_staff_id = cast(str, incoming_message.sender_staff_id or "") + normalized_sender_id = self._id_to_sid(sender_id) + # 现在用的发消息接口不支持 at + # for segment in message_chain.chain: + # if isinstance(segment, At): + # if ( + # str(segment.qq) in {sender_id, normalized_sender_id} + # and sender_staff_id + # ): + # at_list.append(f"@{sender_staff_id}") + # else: + # at_list.append(f"@{segment.qq}") + # at_str = " ".join(at_list) + + if incoming_message.conversation_type == "2": + await self.send_message_chain_to_group( + open_conversation_id=cast(str, incoming_message.conversation_id), + robot_code=robot_code, + message_chain=message_chain, + # at_str=at_str, + ) + else: + session = MessageSesion( + platform_name=self.meta().id, + message_type=MessageType.FRIEND_MESSAGE, + session_id=normalized_sender_id, + ) + staff_id = sender_staff_id or await self._get_sender_staff_id(session) + if not staff_id: + logger.error("钉钉私聊回复失败: 缺少 sender_staff_id") + return + await self.send_message_chain_to_user( + staff_id=staff_id, + robot_code=robot_code, + message_chain=message_chain, + # at_str=at_str, + ) async def handle_msg(self, abm: AstrBotMessage) -> None: event = DingtalkMessageEvent( diff --git a/astrbot/core/platform/sources/dingtalk/dingtalk_event.py b/astrbot/core/platform/sources/dingtalk/dingtalk_event.py index 6c7d2ff4c..3331c5147 100644 --- a/astrbot/core/platform/sources/dingtalk/dingtalk_event.py +++ b/astrbot/core/platform/sources/dingtalk/dingtalk_event.py @@ -1,9 +1,5 @@ -import asyncio -from typing import Any, cast +from typing import Any -import dingtalk_stream - -import astrbot.api.message_components as Comp from astrbot import logger from astrbot.api.event import AstrMessageEvent, MessageChain @@ -15,128 +11,33 @@ def __init__( message_obj, platform_meta, session_id, - client: dingtalk_stream.ChatbotHandler, + client: Any = None, adapter: "Any" = None, ) -> None: super().__init__(message_str, message_obj, platform_meta, session_id) self.client = client self.adapter = adapter - async def send_with_client( - self, - client: dingtalk_stream.ChatbotHandler, - message: MessageChain, - ) -> None: - icm = cast(dingtalk_stream.ChatbotMessage, self.message_obj.raw_message) - ats = [] - # fixes: #4218 - # 钉钉 at 机器人需要使用 sender_staff_id 而不是 sender_id - for i in message.chain: - if isinstance(i, Comp.At): - print(i.qq, icm.sender_id, icm.sender_staff_id) - if str(i.qq) in str(icm.sender_id or ""): - # 适配器会将开头的 $:LWCP_v1:$ 去掉,因此我们用 in 判断 - ats.append(f"@{icm.sender_staff_id}") - else: - ats.append(f"@{i.qq}") - at_str = " ".join(ats) - - for segment in message.chain: - if isinstance(segment, Comp.Plain): - segment.text = segment.text.strip() - await asyncio.get_event_loop().run_in_executor( - None, - client.reply_markdown, - segment.text, - f"{at_str} {segment.text}".strip(), - cast(dingtalk_stream.ChatbotMessage, self.message_obj.raw_message), - ) - elif isinstance(segment, Comp.Image): - markdown_str = "" - - try: - if not segment.file: - logger.warning("钉钉图片 segment 缺少 file 字段,跳过") - continue - if segment.file.startswith(("http://", "https://")): - image_url = segment.file - else: - image_url = await segment.register_to_file_service() - - markdown_str = f"![image]({image_url})\n\n" - - ret = await asyncio.get_event_loop().run_in_executor( - None, - client.reply_markdown, - "😄", - markdown_str, - cast( - dingtalk_stream.ChatbotMessage, self.message_obj.raw_message - ), - ) - logger.debug(f"send image: {ret}") - - except Exception as e: - logger.warning(f"钉钉图片处理失败: {e}, 跳过图片发送") - continue - async def send(self, message: MessageChain) -> None: - await self.send_with_client(self.client, message) + if not self.adapter: + logger.error("钉钉消息发送失败: 缺少 adapter") + return + await self.adapter.send_message_chain_with_incoming( + incoming_message=self.message_obj.raw_message, + message_chain=message, + ) await super().send(message) async def send_streaming(self, generator, use_fallback: bool = False): - if not self.adapter or not self.adapter.card_template_id: - logger.warning( - f"DingTalk streaming is enabled, but 'card_template_id' is not configured for platform '{self.platform_meta.id}'. Falling back to text streaming." - ) - # Fallback to default behavior (buffer and send) - buffer = None - async for chain in generator: - if not buffer: - buffer = chain - else: - buffer.chain.extend(chain.chain) - if not buffer: - return None - buffer.squash_plain() - await self.send(buffer) - return await super().send_streaming(generator, use_fallback) - - # Create card - msg_id = self.message_obj.message_id - incoming_msg = self.message_obj.raw_message - created = await self.adapter.create_message_card(msg_id, incoming_msg) - - if not created: - # Fallback to default behavior (buffer and send) - buffer = None - async for chain in generator: - if not buffer: - buffer = chain - else: - buffer.chain.extend(chain.chain) + # 钉钉统一回退为缓冲发送:最终发送仍使用新的 HTTP 消息接口。 + buffer = None + async for chain in generator: if not buffer: - return None - buffer.squash_plain() - await self.send(buffer) - return await super().send_streaming(generator, use_fallback) - - full_content = "" - seq = 0 - try: - async for chain in generator: - for segment in chain.chain: - if isinstance(segment, Comp.Plain): - full_content += segment.text - - seq += 1 - if seq % 2 == 0: # Update every 2 chunks to be more responsive than 8 - await self.adapter.send_card_message( - msg_id, full_content, is_final=False - ) - - await self.adapter.send_card_message(msg_id, full_content, is_final=True) - except Exception as e: - logger.error(f"DingTalk streaming error: {e}") - # Try to ensure final state is sent or cleaned up? - await self.adapter.send_card_message(msg_id, full_content, is_final=True) + buffer = chain + else: + buffer.chain.extend(chain.chain) + if not buffer: + return None + buffer.squash_plain() + await self.send(buffer) + return await super().send_streaming(generator, use_fallback) diff --git a/astrbot/core/utils/media_utils.py b/astrbot/core/utils/media_utils.py index d6287373d..48f302984 100644 --- a/astrbot/core/utils/media_utils.py +++ b/astrbot/core/utils/media_utils.py @@ -7,6 +7,7 @@ import os import subprocess import uuid +from pathlib import Path from astrbot import logger from astrbot.core.utils.astrbot_path import get_astrbot_data_path @@ -205,3 +206,92 @@ async def convert_video_format( except Exception as e: logger.error(f"[Media Utils] 转换视频格式时出错: {e}") raise + + +async def convert_audio_format( + audio_path: str, + output_format: str = "amr", + output_path: str | None = None, +) -> str: + """使用ffmpeg将音频转换为指定格式。 + + Args: + audio_path: 原始音频文件路径 + output_format: 目标格式,例如 amr / ogg + output_path: 输出文件路径,如果为None则自动生成 + + Returns: + 转换后的音频文件路径 + """ + if audio_path.lower().endswith(f".{output_format}"): + return audio_path + + if output_path is None: + temp_dir = Path(get_astrbot_data_path()) / "temp" + temp_dir.mkdir(parents=True, exist_ok=True) + output_path = str(temp_dir / f"{uuid.uuid4()}.{output_format}") + + args = ["ffmpeg", "-y", "-i", audio_path] + if output_format == "amr": + args.extend(["-ac", "1", "-ar", "8000", "-ab", "12.2k"]) + elif output_format == "ogg": + args.extend(["-acodec", "libopus", "-ac", "1", "-ar", "16000"]) + args.append(output_path) + + try: + process = await asyncio.create_subprocess_exec( + *args, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + _, stderr = await process.communicate() + if process.returncode != 0: + if output_path and os.path.exists(output_path): + try: + os.remove(output_path) + except OSError as e: + logger.warning(f"[Media Utils] 清理失败的音频输出文件时出错: {e}") + error_msg = stderr.decode() if stderr else "未知错误" + raise Exception(f"ffmpeg conversion failed: {error_msg}") + logger.debug(f"[Media Utils] 音频转换成功: {audio_path} -> {output_path}") + return output_path + except FileNotFoundError: + raise Exception("ffmpeg not found") + + +async def extract_video_cover( + video_path: str, + output_path: str | None = None, +) -> str: + """从视频中提取封面图(JPG)。""" + if output_path is None: + temp_dir = Path(get_astrbot_data_path()) / "temp" + temp_dir.mkdir(parents=True, exist_ok=True) + output_path = str(temp_dir / f"{uuid.uuid4()}.jpg") + + try: + process = await asyncio.create_subprocess_exec( + "ffmpeg", + "-y", + "-i", + video_path, + "-ss", + "00:00:00", + "-frames:v", + "1", + output_path, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + _, stderr = await process.communicate() + if process.returncode != 0: + if output_path and os.path.exists(output_path): + try: + os.remove(output_path) + except OSError as e: + logger.warning(f"[Media Utils] 清理失败的视频封面文件时出错: {e}") + error_msg = stderr.decode() if stderr else "未知错误" + raise Exception(f"ffmpeg extract cover failed: {error_msg}") + return output_path + except FileNotFoundError: + raise Exception("ffmpeg not found")