From 89160ce4827d0065ea42f4dc4fb8944e75758419 Mon Sep 17 00:00:00 2001 From: taylorxie Date: Mon, 9 Mar 2026 23:29:50 +0800 Subject: [PATCH] add --- maubot_llmplus/aibot.py | 53 ++++++++++++++++++++--------------------- 1 file changed, 26 insertions(+), 27 deletions(-) diff --git a/maubot_llmplus/aibot.py b/maubot_llmplus/aibot.py index 88bf237..e9cdb7e 100644 --- a/maubot_llmplus/aibot.py +++ b/maubot_llmplus/aibot.py @@ -164,30 +164,36 @@ class AiBotPlugin(AbsExtraConfigPlugin): accumulated = "" last_edit_len = 0 EDIT_THRESHOLD = 100 # 每积累100个字符更新一次消息 - pending_edit: asyncio.Task = None # 追踪正在发送的编辑任务 + + async def send_edit(content: TextMessageEventContent) -> None: + """顺序发送编辑消息:用shield确保send_message不被cancel,保护mautrix内部锁""" + send_task = asyncio.ensure_future(self.client.send_message(evt.room_id, content)) + try: + # shield防止wait_for超时cancel send_task本身,避免mautrix锁残留 + await asyncio.wait_for(asyncio.shield(send_task), timeout=8.0) + except asyncio.TimeoutError: + self.log.debug("Streaming: edit wait_for timed out, awaiting task completion") + await send_task # send_task仍在运行,等待自然完成 + except Exception as e: + self.log.warning(f"Streaming: edit error: {e}") + if not send_task.done(): + await send_task try: async for chunk in platform.create_chat_completion_stream(self, evt): accumulated += chunk - # 只有在没有正在进行的编辑任务时才发送中间更新 if len(accumulated) - last_edit_len >= EDIT_THRESHOLD: - if pending_edit is None or pending_edit.done(): - display = accumulated + " ▌" - new_content = TextMessageEventContent( - msgtype=MessageType.TEXT, - body=display, - format=Format.HTML, - formatted_body=markdown.render(display) - ) - new_content.set_edit(response_event_id) - # 使用fire-and-forget,不等待,不取消,避免mautrix内部锁死锁 - pending_edit = asyncio.ensure_future( - self.client.send_message(evt.room_id, new_content) - ) - last_edit_len = len(accumulated) - self.log.debug(f"Streaming: mid-edit fired, accumulated={len(accumulated)}") - else: - self.log.debug(f"Streaming: skipping mid-edit, previous still in flight, accumulated={len(accumulated)}") + display = accumulated + " ▌" + new_content = TextMessageEventContent( + msgtype=MessageType.TEXT, + body=display, + format=Format.HTML, + formatted_body=markdown.render(display) + ) + new_content.set_edit(response_event_id) + self.log.debug(f"Streaming: mid-edit, accumulated={len(accumulated)}") + await send_edit(new_content) + last_edit_len = len(accumulated) except Exception as e: self.log.exception(f"Streaming error: {e}") if not accumulated: @@ -195,13 +201,6 @@ class AiBotPlugin(AbsExtraConfigPlugin): self.log.debug(f"Streaming: loop done, total={len(accumulated)}") - # 等待最后一个中间编辑完成(不取消,避免锁问题) - if pending_edit and not pending_edit.done(): - try: - await asyncio.wait_for(asyncio.shield(pending_edit), timeout=10.0) - except Exception: - pass - # 输出最终完整内容 if not accumulated: accumulated = "(无响应)" @@ -213,7 +212,7 @@ class AiBotPlugin(AbsExtraConfigPlugin): ) final_content.set_edit(response_event_id) self.log.debug("Streaming: sending final edit") - await self.client.send_message(evt.room_id, final_content) + await send_edit(final_content) self.log.debug("Streaming: final edit done") def get_ai_platform(self) -> Platform: