From 96373a9c14a97b5e02b839860259e5487a3969d9 Mon Sep 17 00:00:00 2001 From: taylorxie Date: Tue, 10 Mar 2026 11:30:36 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E9=94=99=E8=AF=AF=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- maubot_llmplus/aibot.py | 66 ++++++++++++++++++++++++++++++++--------- 1 file changed, 52 insertions(+), 14 deletions(-) diff --git a/maubot_llmplus/aibot.py b/maubot_llmplus/aibot.py index 851aacb..22b88f2 100644 --- a/maubot_llmplus/aibot.py +++ b/maubot_llmplus/aibot.py @@ -164,25 +164,57 @@ class AiBotPlugin(AbsExtraConfigPlugin): last_edit_len = 0 first_chunk = True EDIT_THRESHOLD = 100 # 每积累100个字符更新一次消息 + heartbeat_stop = asyncio.Event() + # 显式锁:序列化心跳和主流的 send_message,防止 mautrix 内部锁冲突 + send_lock = asyncio.Lock() async def send_edit(content: TextMessageEventContent) -> None: - """顺序发送编辑消息:用shield确保send_message不被cancel,保护mautrix内部锁""" - send_task = asyncio.ensure_future(self.client.send_message(evt.room_id, content)) - try: - # shield防止wait_for超时cancel send_task本身,避免mautrix锁残留 - await asyncio.wait_for(asyncio.shield(send_task), timeout=8.0) - except asyncio.TimeoutError: - self.log.debug("Streaming: edit wait_for timed out, awaiting task completion") - await send_task # send_task仍在运行,等待自然完成 - except Exception as e: - self.log.warning(f"Streaming: edit error: {e}") - if not send_task.done(): - await send_task + """顺序发送编辑消息:加用户级锁 + shield,保护 mautrix 内部锁不被 cancel 破坏""" + async with send_lock: + send_task = asyncio.ensure_future(self.client.send_message(evt.room_id, content)) + try: + # shield防止wait_for超时cancel send_task本身,避免mautrix锁残留 + await asyncio.wait_for(asyncio.shield(send_task), timeout=8.0) + except asyncio.TimeoutError: + self.log.debug("Streaming: edit wait_for timed out, awaiting task completion") + await send_task # send_task仍在运行,等待自然完成 + except Exception as e: + self.log.warning(f"Streaming: edit error: {e}") + if not send_task.done(): + await send_task + + async def _heartbeat(): + """等待首个 chunk 期间每 5 秒更新占位消息,给用户可见的进度反馈""" + dots = 0 + while True: + try: + await asyncio.wait_for(heartbeat_stop.wait(), timeout=5.0) + break # stop 信号到了,退出 + except asyncio.TimeoutError: + pass + dots = (dots % 3) + 1 + indicator = "▌" * dots + hb = TextMessageEventContent( + msgtype=MessageType.TEXT, body=indicator, + format=Format.HTML, formatted_body=indicator + ) + hb.set_edit(response_event_id) + try: + await send_edit(hb) # 与主流共用同一把锁,保证串行 + except Exception as e: + self.log.debug(f"Streaming: heartbeat error: {e}") + + heartbeat_task = asyncio.ensure_future(_heartbeat()) try: async for chunk in platform.create_chat_completion_stream(self, evt): if first_chunk: - # 收到第一个 chunk 才关掉 typing,此前 typing 持续显示(解决高 TTFT 卡顿感) + # 收到第一个 chunk:停止心跳、关掉 typing + heartbeat_stop.set() + try: + await heartbeat_task # 等心跳当前轮次完成,避免并发 + except BaseException: + pass await self.client.set_typing(evt.room_id, timeout=0) first_chunk = False accumulated += chunk @@ -203,7 +235,13 @@ class AiBotPlugin(AbsExtraConfigPlugin): if not accumulated: accumulated = f"Streaming error: {e}" finally: - # 确保无论如何 typing 都会关掉(含未收到任何 chunk 的情况) + # 确保心跳和 typing 无论如何都会停止;BaseException 防止 CancelledError 跳过最终编辑 + heartbeat_stop.set() + if not heartbeat_task.done(): + try: + await heartbeat_task + except BaseException: + pass if first_chunk: await self.client.set_typing(evt.room_id, timeout=0)