diff --git a/maubot_llmplus/aibot.py b/maubot_llmplus/aibot.py index 22b88f2..a5b1ffb 100644 --- a/maubot_llmplus/aibot.py +++ b/maubot_llmplus/aibot.py @@ -153,7 +153,7 @@ class AiBotPlugin(AbsExtraConfigPlugin): return None async def _handle_streaming(self, evt: MessageEvent, platform) -> None: - # 发送初始占位消息(typing 保持 on,让用户知道正在处理) + # 发送初始占位消息;on_message 已设 typing=on,等收到第一个 chunk 再关掉 placeholder = TextMessageEventContent( msgtype=MessageType.TEXT, body="▌", format=Format.HTML, formatted_body="▌" ) @@ -163,58 +163,25 @@ class AiBotPlugin(AbsExtraConfigPlugin): accumulated = "" last_edit_len = 0 first_chunk = True - EDIT_THRESHOLD = 100 # 每积累100个字符更新一次消息 - heartbeat_stop = asyncio.Event() - # 显式锁:序列化心跳和主流的 send_message,防止 mautrix 内部锁冲突 - send_lock = asyncio.Lock() + EDIT_THRESHOLD = 100 async def send_edit(content: TextMessageEventContent) -> None: - """顺序发送编辑消息:加用户级锁 + shield,保护 mautrix 内部锁不被 cancel 破坏""" - async with send_lock: - send_task = asyncio.ensure_future(self.client.send_message(evt.room_id, content)) - try: - # shield防止wait_for超时cancel send_task本身,避免mautrix锁残留 - await asyncio.wait_for(asyncio.shield(send_task), timeout=8.0) - except asyncio.TimeoutError: - self.log.debug("Streaming: edit wait_for timed out, awaiting task completion") - await send_task # send_task仍在运行,等待自然完成 - except Exception as e: - self.log.warning(f"Streaming: edit error: {e}") - if not send_task.done(): - await send_task - - async def _heartbeat(): - """等待首个 chunk 期间每 5 秒更新占位消息,给用户可见的进度反馈""" - dots = 0 - while True: - try: - await asyncio.wait_for(heartbeat_stop.wait(), timeout=5.0) - break # stop 信号到了,退出 - except asyncio.TimeoutError: - pass - dots = (dots % 3) + 1 - indicator = "▌" * dots - hb = TextMessageEventContent( - msgtype=MessageType.TEXT, body=indicator, - format=Format.HTML, formatted_body=indicator - ) - hb.set_edit(response_event_id) - try: - await send_edit(hb) # 与主流共用同一把锁,保证串行 - except Exception as e: - self.log.debug(f"Streaming: heartbeat error: {e}") - - heartbeat_task = asyncio.ensure_future(_heartbeat()) + # shield 防止 wait_for 超时时 cancel send_task,保护 mautrix 内部锁不残留 + send_task = asyncio.ensure_future(self.client.send_message(evt.room_id, content)) + try: + await asyncio.wait_for(asyncio.shield(send_task), timeout=8.0) + except asyncio.TimeoutError: + self.log.debug("Streaming: edit timed out, waiting naturally") + await send_task + except Exception as e: + self.log.warning(f"Streaming: edit error: {e}") + if not send_task.done(): + await send_task try: async for chunk in platform.create_chat_completion_stream(self, evt): if first_chunk: - # 收到第一个 chunk:停止心跳、关掉 typing - heartbeat_stop.set() - try: - await heartbeat_task # 等心跳当前轮次完成,避免并发 - except BaseException: - pass + # 收到第一个 chunk 才关掉 typing,等待期间用户可见 typing 指示器 await self.client.set_typing(evt.room_id, timeout=0) first_chunk = False accumulated += chunk @@ -235,19 +202,11 @@ class AiBotPlugin(AbsExtraConfigPlugin): if not accumulated: accumulated = f"Streaming error: {e}" finally: - # 确保心跳和 typing 无论如何都会停止;BaseException 防止 CancelledError 跳过最终编辑 - heartbeat_stop.set() - if not heartbeat_task.done(): - try: - await heartbeat_task - except BaseException: - pass if first_chunk: await self.client.set_typing(evt.room_id, timeout=0) self.log.debug(f"Streaming: loop done, total={len(accumulated)}") - # 输出最终完整内容 if not accumulated: accumulated = "(无响应)" final_content = TextMessageEventContent(