修复错误代码
This commit is contained in:
@@ -153,7 +153,7 @@ class AiBotPlugin(AbsExtraConfigPlugin):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
async def _handle_streaming(self, evt: MessageEvent, platform) -> None:
|
async def _handle_streaming(self, evt: MessageEvent, platform) -> None:
|
||||||
# 发送初始占位消息(typing 保持 on,让用户知道正在处理)
|
# 发送初始占位消息;on_message 已设 typing=on,等收到第一个 chunk 再关掉
|
||||||
placeholder = TextMessageEventContent(
|
placeholder = TextMessageEventContent(
|
||||||
msgtype=MessageType.TEXT, body="▌", format=Format.HTML, formatted_body="▌"
|
msgtype=MessageType.TEXT, body="▌", format=Format.HTML, formatted_body="▌"
|
||||||
)
|
)
|
||||||
@@ -163,58 +163,25 @@ class AiBotPlugin(AbsExtraConfigPlugin):
|
|||||||
accumulated = ""
|
accumulated = ""
|
||||||
last_edit_len = 0
|
last_edit_len = 0
|
||||||
first_chunk = True
|
first_chunk = True
|
||||||
EDIT_THRESHOLD = 100 # 每积累100个字符更新一次消息
|
EDIT_THRESHOLD = 100
|
||||||
heartbeat_stop = asyncio.Event()
|
|
||||||
# 显式锁:序列化心跳和主流的 send_message,防止 mautrix 内部锁冲突
|
|
||||||
send_lock = asyncio.Lock()
|
|
||||||
|
|
||||||
async def send_edit(content: TextMessageEventContent) -> None:
|
async def send_edit(content: TextMessageEventContent) -> None:
|
||||||
"""顺序发送编辑消息:加用户级锁 + shield,保护 mautrix 内部锁不被 cancel 破坏"""
|
# shield 防止 wait_for 超时时 cancel send_task,保护 mautrix 内部锁不残留
|
||||||
async with send_lock:
|
|
||||||
send_task = asyncio.ensure_future(self.client.send_message(evt.room_id, content))
|
send_task = asyncio.ensure_future(self.client.send_message(evt.room_id, content))
|
||||||
try:
|
try:
|
||||||
# shield防止wait_for超时cancel send_task本身,避免mautrix锁残留
|
|
||||||
await asyncio.wait_for(asyncio.shield(send_task), timeout=8.0)
|
await asyncio.wait_for(asyncio.shield(send_task), timeout=8.0)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
self.log.debug("Streaming: edit wait_for timed out, awaiting task completion")
|
self.log.debug("Streaming: edit timed out, waiting naturally")
|
||||||
await send_task # send_task仍在运行,等待自然完成
|
await send_task
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.warning(f"Streaming: edit error: {e}")
|
self.log.warning(f"Streaming: edit error: {e}")
|
||||||
if not send_task.done():
|
if not send_task.done():
|
||||||
await send_task
|
await send_task
|
||||||
|
|
||||||
async def _heartbeat():
|
|
||||||
"""等待首个 chunk 期间每 5 秒更新占位消息,给用户可见的进度反馈"""
|
|
||||||
dots = 0
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
await asyncio.wait_for(heartbeat_stop.wait(), timeout=5.0)
|
|
||||||
break # stop 信号到了,退出
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
pass
|
|
||||||
dots = (dots % 3) + 1
|
|
||||||
indicator = "▌" * dots
|
|
||||||
hb = TextMessageEventContent(
|
|
||||||
msgtype=MessageType.TEXT, body=indicator,
|
|
||||||
format=Format.HTML, formatted_body=indicator
|
|
||||||
)
|
|
||||||
hb.set_edit(response_event_id)
|
|
||||||
try:
|
|
||||||
await send_edit(hb) # 与主流共用同一把锁,保证串行
|
|
||||||
except Exception as e:
|
|
||||||
self.log.debug(f"Streaming: heartbeat error: {e}")
|
|
||||||
|
|
||||||
heartbeat_task = asyncio.ensure_future(_heartbeat())
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async for chunk in platform.create_chat_completion_stream(self, evt):
|
async for chunk in platform.create_chat_completion_stream(self, evt):
|
||||||
if first_chunk:
|
if first_chunk:
|
||||||
# 收到第一个 chunk:停止心跳、关掉 typing
|
# 收到第一个 chunk 才关掉 typing,等待期间用户可见 typing 指示器
|
||||||
heartbeat_stop.set()
|
|
||||||
try:
|
|
||||||
await heartbeat_task # 等心跳当前轮次完成,避免并发
|
|
||||||
except BaseException:
|
|
||||||
pass
|
|
||||||
await self.client.set_typing(evt.room_id, timeout=0)
|
await self.client.set_typing(evt.room_id, timeout=0)
|
||||||
first_chunk = False
|
first_chunk = False
|
||||||
accumulated += chunk
|
accumulated += chunk
|
||||||
@@ -235,19 +202,11 @@ class AiBotPlugin(AbsExtraConfigPlugin):
|
|||||||
if not accumulated:
|
if not accumulated:
|
||||||
accumulated = f"Streaming error: {e}"
|
accumulated = f"Streaming error: {e}"
|
||||||
finally:
|
finally:
|
||||||
# 确保心跳和 typing 无论如何都会停止;BaseException 防止 CancelledError 跳过最终编辑
|
|
||||||
heartbeat_stop.set()
|
|
||||||
if not heartbeat_task.done():
|
|
||||||
try:
|
|
||||||
await heartbeat_task
|
|
||||||
except BaseException:
|
|
||||||
pass
|
|
||||||
if first_chunk:
|
if first_chunk:
|
||||||
await self.client.set_typing(evt.room_id, timeout=0)
|
await self.client.set_typing(evt.room_id, timeout=0)
|
||||||
|
|
||||||
self.log.debug(f"Streaming: loop done, total={len(accumulated)}")
|
self.log.debug(f"Streaming: loop done, total={len(accumulated)}")
|
||||||
|
|
||||||
# 输出最终完整内容
|
|
||||||
if not accumulated:
|
if not accumulated:
|
||||||
accumulated = "(无响应)"
|
accumulated = "(无响应)"
|
||||||
final_content = TextMessageEventContent(
|
final_content = TextMessageEventContent(
|
||||||
|
|||||||
Reference in New Issue
Block a user