Compare commits

...

17 Commits

Author SHA1 Message Date
taylorxie
249f225045 修复错误代码 2026-03-10 14:27:49 +08:00
taylorxie
448a95134f 修复错误代码 2026-03-10 14:01:41 +08:00
taylorxie
6b2fc9ea07 修复错误代码 2026-03-10 13:51:13 +08:00
taylorxie
96373a9c14 修复错误代码 2026-03-10 11:30:36 +08:00
taylorxie
70ea0a6916 修复错误代码 2026-03-10 11:01:44 +08:00
taylorxie
98a4dba820 修复错误代码 2026-03-10 10:15:22 +08:00
taylorxie
a5e43190f4 修复错误代码 2026-03-10 09:48:18 +08:00
taylorxie
d5d634bf14 增加流式功能 2026-03-09 23:51:18 +08:00
taylorxie
bf4d2a444c 增加流式功能 2026-03-09 23:44:39 +08:00
taylorxie
89160ce482 add 2026-03-09 23:29:50 +08:00
taylorxie
11e37a157d add 2026-03-09 23:21:30 +08:00
taylorxie
caddfb61f1 add 2026-03-09 23:15:54 +08:00
taylorxie
1070cf517f add 2026-03-09 23:06:06 +08:00
taylorxie
87d9ab789c add 2026-03-09 23:01:29 +08:00
taylorxie
300a7fbfd6 add 2026-03-09 22:53:24 +08:00
taylorxie
9f25fdab12 add 2026-03-09 22:43:02 +08:00
taylorxie
b53a918aaa add 2026-03-09 22:34:58 +08:00
5 changed files with 543 additions and 59 deletions

View File

@@ -31,35 +31,31 @@ platforms:
max_tokens: 2000 max_tokens: 2000
max_words: 1000 max_words: 1000
max_context_messages: 20 max_context_messages: 20
# 是否开启流式输出(开启后 Element 中消息会逐步更新)
streaming: false
qwen:
# 国内: https://dashscope.aliyuncs.com
# 海外: https://dashscope-intl.aliyuncs.com
url: https://dashscope.aliyuncs.com
api_key:
model: qwen-plus
temperature: 0.7
top_p: 0.8
max_tokens: 2000
max_words: 1000
max_context_messages: 20
# 是否开启深度思考模式(仅 qwq 系列支持)
enable_thinking: false
# 是否开启流式输出(开启后 Element 中消息会逐步更新)
streaming: false
deepseek: deepseek:
url: https://api.deepseek.com url: https://api.deepseek.com
api_key: api_key:
model: model:
max_words: 1000 max_words: 1000
max_context_messages: 20 max_context_messages: 20
openai: # 是否开启流式输出(开启后 Element 中消息会逐步更新)
url: https://api.openai.com streaming: false
api_key:
model: gpt-4o-mini
max_tokens: 2000
max_words: 1000
max_context_messages: 20
temperature: 1
anthropic:
url: https://api.anthropic.com
api_key:
model: claude-3-5-sonnet-20240620
max_words: 1000
max_tokens: 2000
max_context_messages: 20
xai:
url: https://api.x.ai
api_key:
model: grok-beta
temperature: 1
max_tokens: 1000
max_words: 2000
max_context_messages: 20
gemini: gemini:
url: https://generativelanguage.googleapis.com url: https://generativelanguage.googleapis.com
api_key: api_key:
@@ -68,6 +64,38 @@ platforms:
max_tokens: 2000 max_tokens: 2000
max_words: 1000 max_words: 1000
max_context_messages: 20 max_context_messages: 20
# 是否开启流式输出(开启后 Element 中消息会逐步更新)
streaming: false
openai:
url: https://api.openai.com
api_key:
model: gpt-4o-mini
max_tokens: 2000
max_words: 1000
max_context_messages: 20
temperature: 1
# 是否开启流式输出(开启后 Element 中消息会逐步更新)
streaming: false
anthropic:
url: https://api.anthropic.com
api_key:
model: claude-3-5-sonnet-20240620
max_words: 1000
max_tokens: 2000
max_context_messages: 20
# 是否开启流式输出(开启后 Element 中消息会逐步更新)
streaming: false
xai:
url: https://api.x.ai
api_key:
model: grok-beta
temperature: 1
max_tokens: 1000
max_words: 2000
max_context_messages: 20
# 是否开启流式输出(开启后 Element 中消息会逐步更新)
streaming: false
# additional prompt # additional prompt
additional_prompt: additional_prompt:

View File

@@ -1,3 +1,4 @@
import asyncio
import re import re
from typing import Type from typing import Type
@@ -10,7 +11,7 @@ from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
from maubot_llmplus.local_paltform import Ollama, LmStudio from maubot_llmplus.local_paltform import Ollama, LmStudio
from maubot_llmplus.platforms import Platform from maubot_llmplus.platforms import Platform
from maubot_llmplus.plugin import AbsExtraConfigPlugin, Config from maubot_llmplus.plugin import AbsExtraConfigPlugin, Config
from maubot_llmplus.thrid_platform import OpenAi, Anthropic, XAi, Deepseek, Gemini from maubot_llmplus.thrid_platform import OpenAi, Anthropic, XAi, Deepseek, Gemini, Qwen
class AiBotPlugin(AbsExtraConfigPlugin): class AiBotPlugin(AbsExtraConfigPlugin):
@@ -124,15 +125,16 @@ class AiBotPlugin(AbsExtraConfigPlugin):
await event.mark_read() await event.mark_read()
await self.client.set_typing(event.room_id, timeout=99999) await self.client.set_typing(event.room_id, timeout=99999)
platform = self.get_ai_platform() platform = self.get_ai_platform()
if platform.is_streaming_enabled():
await self._handle_streaming(event, platform)
return
chat_completion = await platform.create_chat_completion(self, event) chat_completion = await platform.create_chat_completion(self, event)
self.log.debug( self.log.debug(
f"发送结果 {chat_completion.message}, {chat_completion.model}, {chat_completion.finish_reason}") f"发送结果 {chat_completion.message}, {chat_completion.model}, {chat_completion.finish_reason}")
# ai gpt调用
# 关闭typing提示
await self.client.set_typing(event.room_id, timeout=0) await self.client.set_typing(event.room_id, timeout=0)
# 打开typing提示
if chat_completion.result: if chat_completion.result:
# if hasattr(chat_completion.message, 'content'):
resp_content = chat_completion.message['content'] resp_content = chat_completion.message['content']
response = TextMessageEventContent(msgtype=MessageType.TEXT, body=resp_content, format=Format.HTML, response = TextMessageEventContent(msgtype=MessageType.TEXT, body=resp_content, format=Format.HTML,
formatted_body=markdown.render(resp_content)) formatted_body=markdown.render(resp_content))
@@ -150,6 +152,74 @@ class AiBotPlugin(AbsExtraConfigPlugin):
return None return None
async def _handle_streaming(self, evt: MessageEvent, platform) -> None:
# 发送初始占位消息on_message 已设 typing=on等收到第一个 chunk 再关掉
placeholder = TextMessageEventContent(
msgtype=MessageType.TEXT, body="", format=Format.HTML, formatted_body=""
)
response_event_id = await evt.respond(placeholder, in_thread=self.config['reply_in_thread'])
self.log.debug("Streaming: placeholder sent")
accumulated = ""
last_edit_len = 0
first_chunk = True
EDIT_THRESHOLD = 100
async def send_edit(content: TextMessageEventContent) -> None:
# shield 防止 wait_for 超时时 cancel send_task保护 mautrix 内部锁不残留
send_task = asyncio.ensure_future(self.client.send_message(evt.room_id, content))
try:
await asyncio.wait_for(asyncio.shield(send_task), timeout=8.0)
except asyncio.TimeoutError:
self.log.debug("Streaming: edit timed out, waiting naturally")
await send_task
except Exception as e:
self.log.warning(f"Streaming: edit error: {e}")
if not send_task.done():
await send_task
try:
async for chunk in platform.create_chat_completion_stream(self, evt):
if first_chunk:
# 收到第一个 chunk 才关掉 typing等待期间用户可见 typing 指示器
await self.client.set_typing(evt.room_id, timeout=0)
first_chunk = False
accumulated += chunk
if len(accumulated) - last_edit_len >= EDIT_THRESHOLD:
display = accumulated + ""
new_content = TextMessageEventContent(
msgtype=MessageType.TEXT,
body=display,
format=Format.HTML,
formatted_body=markdown.render(display)
)
new_content.set_edit(response_event_id)
self.log.debug(f"Streaming: mid-edit, accumulated={len(accumulated)}")
await send_edit(new_content)
last_edit_len = len(accumulated)
except Exception as e:
self.log.exception(f"Streaming error: {e}")
if not accumulated:
accumulated = f"Streaming error: {e}"
finally:
if first_chunk:
await self.client.set_typing(evt.room_id, timeout=0)
self.log.debug(f"Streaming: loop done, total={len(accumulated)}")
if not accumulated:
accumulated = "(无响应)"
final_content = TextMessageEventContent(
msgtype=MessageType.TEXT,
body=accumulated,
format=Format.HTML,
formatted_body=markdown.render(accumulated)
)
final_content.set_edit(response_event_id)
self.log.debug("Streaming: sending final edit")
await send_edit(final_content)
self.log.debug("Streaming: final edit done")
def get_ai_platform(self) -> Platform: def get_ai_platform(self) -> Platform:
use_platform = self.config.cur_platform use_platform = self.config.cur_platform
if use_platform == 'openai': if use_platform == 'openai':
@@ -162,6 +232,8 @@ class AiBotPlugin(AbsExtraConfigPlugin):
return Deepseek(self.config, self.http) return Deepseek(self.config, self.http)
if use_platform == 'gemini': if use_platform == 'gemini':
return Gemini(self.config, self.http) return Gemini(self.config, self.http)
if use_platform == 'qwen':
return Qwen(self.config, self.http)
if use_platform == 'local_ai#ollama': if use_platform == 'local_ai#ollama':
return Ollama(self.config, self.http) return Ollama(self.config, self.http)
if use_platform == 'local_ai#lmstudio': if use_platform == 'local_ai#lmstudio':
@@ -300,7 +372,7 @@ class AiBotPlugin(AbsExtraConfigPlugin):
self.config.cur_model = self.config['platforms'][argus.split("#")[0]]['model'] self.config.cur_model = self.config['platforms'][argus.split("#")[0]]['model']
await event.react("") await event.react("")
# 如果是openai或者是claude # 如果是openai或者是claude
elif argus == 'openai' or argus == 'anthropic' or argus == 'xai' or argus == 'deepseek' or argus == 'gemini': elif argus == 'openai' or argus == 'anthropic' or argus == 'xai' or argus == 'deepseek' or argus == 'gemini' or argus == 'qwen':
if argus == self.config.cur_platform: if argus == self.config.cur_platform:
await event.reply(f"current ai platform has be {argus}") await event.reply(f"current ai platform has be {argus}")
pass pass

View File

@@ -1,3 +1,4 @@
import asyncio
import json import json
from typing import List from typing import List
@@ -11,12 +12,17 @@ import maubot_llmplus
import maubot_llmplus.platforms import maubot_llmplus.platforms
from maubot_llmplus.platforms import Platform, ChatCompletion from maubot_llmplus.platforms import Platform, ChatCompletion
from maubot_llmplus.plugin import AbsExtraConfigPlugin from maubot_llmplus.plugin import AbsExtraConfigPlugin
from maubot_llmplus.thrid_platform import _read_openai_sse
class Ollama(Platform): class Ollama(Platform):
def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None: def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None:
super().__init__(config, http) super().__init__(config, http)
self.streaming = self.config.get('streaming', False)
def is_streaming_enabled(self) -> bool:
return self.streaming
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion: async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
full_context = [] full_context = []
@@ -27,20 +33,52 @@ class Ollama(Platform):
req_body = {'model': self.model, 'messages': full_context, 'stream': False} req_body = {'model': self.model, 'messages': full_context, 'stream': False}
headers = {'Content-Type': 'application/json'} headers = {'Content-Type': 'application/json'}
async with self.http.post(endpoint, headers=headers, json=req_body) as response: async with self.http.post(endpoint, headers=headers, json=req_body) as response:
# plugin.log.debug(f"响应内容:{response.status}, {await response.json()}")
if response.status != 200: if response.status != 200:
return ChatCompletion( return ChatCompletion(
result=False,
message={}, message={},
finish_reason=f"http status {response.status}", finish_reason=f"http status {response.status}",
model=None model=None
) )
response_json = await response.json() response_json = await response.json()
return ChatCompletion( return ChatCompletion(
result=True,
message=response_json['message'], message=response_json['message'],
finish_reason='success', finish_reason='success',
model=response_json['model'] model=response_json['model']
) )
async def create_chat_completion_stream(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent):
full_context = []
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
full_context.extend(list(context))
endpoint = f"{self.url}/api/chat"
req_body = {'model': self.model, 'messages': full_context, 'stream': True}
headers = {'Content-Type': 'application/json'}
async with self.http.post(endpoint, headers=headers, json=req_body) as response:
if response.status != 200:
raise ValueError(f"Error: http status {response.status}")
while True:
try:
line_bytes = await asyncio.wait_for(response.content.readline(), timeout=60.0)
except asyncio.TimeoutError:
break
if not line_bytes:
break
line = line_bytes.decode("utf-8").strip()
if not line:
continue
try:
data = json.loads(line)
if data.get("done"):
break
content = data.get("message", {}).get("content", "")
if content:
yield content
except json.JSONDecodeError:
pass
async def list_models(self) -> List[str]: async def list_models(self) -> List[str]:
full_url = f"{self.url}/api/tags" full_url = f"{self.url}/api/tags"
async with self.http.get(full_url) as response: async with self.http.get(full_url) as response:
@@ -58,8 +96,11 @@ class LmStudio(Platform) :
def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None: def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None:
super().__init__(config, http) super().__init__(config, http)
self.temperature = self.config['temperature'] self.temperature = float(self.config['temperature']) if self.config.get('temperature') is not None else None
pass self.streaming = self.config.get('streaming', False)
def is_streaming_enabled(self) -> bool:
return self.streaming
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion: async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
full_context = [] full_context = []
@@ -72,9 +113,9 @@ class LmStudio(Platform) :
async with self.http.post( async with self.http.post(
endpoint, headers=headers, data=json.dumps(req_body) endpoint, headers=headers, data=json.dumps(req_body)
) as response: ) as response:
# plugin.log.debug(f"响应内容:{response.status}, {await response.json()}")
if response.status != 200: if response.status != 200:
return ChatCompletion( return ChatCompletion(
result=False,
message={}, message={},
finish_reason=f"Error: {await response.text()}", finish_reason=f"Error: {await response.text()}",
model=None model=None
@@ -82,11 +123,26 @@ class LmStudio(Platform) :
response_json = await response.json() response_json = await response.json()
choice = response_json["choices"][0] choice = response_json["choices"][0]
return ChatCompletion( return ChatCompletion(
result=True,
message=choice["message"], message=choice["message"],
finish_reason=choice["finish_reason"], finish_reason=choice["finish_reason"],
model=choice.get("model", None) model=choice.get("model", None)
) )
async def create_chat_completion_stream(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent):
full_context = []
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
full_context.extend(list(context))
endpoint = f"{self.url}/v1/chat/completions"
headers = {"content-type": "application/json"}
req_body = {"model": self.model, "messages": full_context, "temperature": self.temperature, "stream": True}
async with self.http.post(endpoint, headers=headers, data=json.dumps(req_body)) as response:
if response.status != 200:
raise ValueError(f"Error: {await response.text()}")
async for chunk in _read_openai_sse(response):
yield chunk
async def list_models(self) -> List[str]: async def list_models(self) -> List[str]:
full_url = f"{self.url}/v1/models" full_url = f"{self.url}/v1/models"
async with self.http.get(full_url) as response: async with self.http.get(full_url) as response:

View File

@@ -1,7 +1,7 @@
import json import json
from collections import deque from collections import deque
from datetime import datetime from datetime import datetime
from typing import Optional, List, Generator from typing import Optional, List, Generator, AsyncIterator
from aiohttp import ClientSession from aiohttp import ClientSession
from maubot import Plugin from maubot import Plugin
@@ -55,6 +55,12 @@ class Platform:
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion: async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
raise NotImplementedError() raise NotImplementedError()
async def create_chat_completion_stream(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> AsyncIterator[str]:
raise NotImplementedError()
def is_streaming_enabled(self) -> bool:
return False
async def list_models(self) -> List[str]: async def list_models(self) -> List[str]:
raise NotImplementedError() raise NotImplementedError()

View File

@@ -1,4 +1,6 @@
import asyncio
import json import json
import logging
from collections import deque from collections import deque
from typing import List from typing import List
@@ -12,10 +14,41 @@ from maubot_llmplus.platforms import Platform, ChatCompletion
from maubot_llmplus.plugin import AbsExtraConfigPlugin from maubot_llmplus.plugin import AbsExtraConfigPlugin
async def _read_openai_sse(response):
"""读取 OpenAI 兼容格式的 SSE 流yield 每个 delta content"""
while True:
try:
line_bytes = await asyncio.wait_for(response.content.readline(), timeout=60.0)
except asyncio.TimeoutError:
break
if not line_bytes:
break
line = line_bytes.decode("utf-8").strip()
if not line.startswith("data: "):
continue
data_str = line[6:]
if data_str == "[DONE]":
break
try:
data = json.loads(data_str)
choices = data.get("choices", [])
if choices:
content = choices[0].get("delta", {}).get("content", "")
if content:
yield content
except json.JSONDecodeError:
pass
class Deepseek(Platform): class Deepseek(Platform):
def __init__(self, config: BaseProxyConfig, http: ClientSession): def __init__(self, config: BaseProxyConfig, http: ClientSession):
super().__init__(config, http) super().__init__(config, http)
self.streaming = self.config.get('streaming', False)
def is_streaming_enabled(self) -> bool:
return self.streaming
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion: async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
full_context = [] full_context = []
@@ -51,6 +84,28 @@ class Deepseek(Platform):
model=response_json.get("model", None) model=response_json.get("model", None)
) )
async def create_chat_completion_stream(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent):
full_context = []
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
full_context.extend(list(context))
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
data = {
"model": self.model,
"messages": full_context,
"stream": True,
}
endpoint = f"{self.url}/chat/completions"
async with self.http.post(endpoint, headers=headers, data=json.dumps(data)) as response:
if response.status != 200:
raise ValueError(f"Error: {await response.text()}")
async for chunk in _read_openai_sse(response):
yield chunk
async def list_models(self) -> List[str]: async def list_models(self) -> List[str]:
models = ["deepseek-chat", "deepseek-reasoner"] models = ["deepseek-chat", "deepseek-reasoner"]
return [f"- {m}" for m in models] return [f"- {m}" for m in models]
@@ -60,12 +115,16 @@ class Deepseek(Platform):
class OpenAi(Platform): class OpenAi(Platform):
max_tokens: int max_tokens: int
temperature: int temperature: float
def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None: def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None:
super().__init__(config, http) super().__init__(config, http)
self.max_tokens = self.config['max_tokens'] self.max_tokens = int(self.config['max_tokens']) if self.config.get('max_tokens') else None
self.temperature = self.config['temperature'] self.temperature = float(self.config['temperature']) if self.config.get('temperature') is not None else None
self.streaming = self.config.get('streaming', False)
def is_streaming_enabled(self) -> bool:
return self.streaming
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion: async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
full_context = [] full_context = []
@@ -96,7 +155,6 @@ class OpenAi(Platform):
async with self.http.post( async with self.http.post(
endpoint, headers=headers, data=json.dumps(data) endpoint, headers=headers, data=json.dumps(data)
) as response: ) as response:
# plugin.log.debug(f"响应内容:{response.status}, {await response.json()}")
if response.status != 200: if response.status != 200:
return ChatCompletion( return ChatCompletion(
result=False, result=False,
@@ -113,6 +171,37 @@ class OpenAi(Platform):
model=choice.get("model", None) model=choice.get("model", None)
) )
async def create_chat_completion_stream(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent):
full_context = []
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
full_context.extend(list(context))
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
data = {
"model": self.model,
"messages": full_context,
"stream": True,
}
if 'max_tokens' in self.config and self.max_tokens:
if 'gpt-5' in self.model:
data["max_completion_tokens"] = self.max_tokens
else:
data["max_tokens"] = self.max_tokens
if 'temperature' in self.config and self.temperature:
data["temperature"] = self.temperature
endpoint = f"{self.url}/v1/chat/completions"
async with self.http.post(endpoint, headers=headers, data=json.dumps(data)) as response:
if response.status != 200:
raise ValueError(f"Error: {await response.text()}")
async for chunk in _read_openai_sse(response):
yield chunk
async def list_models(self) -> List[str]: async def list_models(self) -> List[str]:
# 调用openai接口获取模型列表 # 调用openai接口获取模型列表
full_url = f"{self.url}/v1/models" full_url = f"{self.url}/v1/models"
@@ -129,10 +218,22 @@ class OpenAi(Platform):
class Anthropic(Platform): class Anthropic(Platform):
max_tokens: int max_tokens: int
streaming: bool
def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None: def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None:
super().__init__(config, http) super().__init__(config, http)
self.max_tokens = self.config['max_tokens'] self.max_tokens = int(self.config['max_tokens']) if self.config.get('max_tokens') else None
self.streaming = self.config.get('streaming', False)
def is_streaming_enabled(self) -> bool:
return self.streaming
def _build_request(self, full_chat_context: list) -> tuple:
endpoint = f"{self.url}/v1/messages"
headers = {"x-api-key": self.api_key, "anthropic-version": "2023-06-01", "content-type": "application/json"}
req_body = {"model": self.model, "max_tokens": self.max_tokens, "system": self.system_prompt,
"messages": full_chat_context}
return endpoint, headers, req_body
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion: async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
full_chat_context = [] full_chat_context = []
@@ -140,13 +241,9 @@ class Anthropic(Platform):
chat_context = await maubot_llmplus.platforms.get_chat_context(system_context, plugin, self, evt) chat_context = await maubot_llmplus.platforms.get_chat_context(system_context, plugin, self, evt)
full_chat_context.extend(list(chat_context)) full_chat_context.extend(list(chat_context))
endpoint = f"{self.url}/v1/messages" endpoint, headers, req_body = self._build_request(full_chat_context)
headers = {"x-api-key": self.api_key, "anthropic-version": "2023-06-01", "content-type": "application/json"}
req_body = {"model": self.model, "max_tokens": self.max_tokens, "system": self.system_prompt,
"messages": full_chat_context}
async with self.http.post(endpoint, headers=headers, data=json.dumps(req_body)) as response: async with self.http.post(endpoint, headers=headers, data=json.dumps(req_body)) as response:
# plugin.log.debug(f"响应内容:{response.status}, {await response.json()}")
if response.status != 200: if response.status != 200:
return ChatCompletion( return ChatCompletion(
result=False, result=False,
@@ -162,10 +259,42 @@ class Anthropic(Platform):
finish_reason=response_json['stop_reason'], finish_reason=response_json['stop_reason'],
model=response_json['model'] model=response_json['model']
) )
async def create_chat_completion_stream(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent):
full_chat_context = []
system_context = deque()
chat_context = await maubot_llmplus.platforms.get_chat_context(system_context, plugin, self, evt)
full_chat_context.extend(list(chat_context))
endpoint, headers, req_body = self._build_request(full_chat_context)
req_body["stream"] = True
async with self.http.post(endpoint, headers=headers, data=json.dumps(req_body)) as response:
if response.status != 200:
raise ValueError(f"Error: {await response.text()}")
while True:
try:
line_bytes = await asyncio.wait_for(response.content.readline(), timeout=60.0)
except asyncio.TimeoutError:
break
if not line_bytes:
break
line = line_bytes.decode("utf-8").strip()
if not line.startswith("data: "):
continue
data_str = line[6:]
try:
data = json.loads(data_str)
if data.get("type") == "message_stop":
break
if data.get("type") == "content_block_delta":
delta = data.get("delta", {})
if delta.get("type") == "text_delta":
yield delta.get("text", "")
except json.JSONDecodeError:
pass pass
async def list_models(self) -> List[str]: async def list_models(self) -> List[str]:
# 调用openai接口获取模型列表
full_url = f"{self.url}/v1/models" full_url = f"{self.url}/v1/models"
headers = { headers = {
'anthropic-version': "2023-06-01", 'anthropic-version': "2023-06-01",
@@ -187,12 +316,14 @@ class Gemini(Platform):
def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None: def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None:
super().__init__(config, http) super().__init__(config, http)
self.max_tokens = self.config['max_tokens'] self.max_tokens = int(self.config['max_tokens']) if self.config.get('max_tokens') else None
self.temperature = self.config['temperature'] self.temperature = float(self.config['temperature']) if self.config.get('temperature') is not None else None
self.streaming = self.config.get('streaming', False)
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion: def is_streaming_enabled(self) -> bool:
context = await maubot_llmplus.platforms.get_context(plugin, self, evt) return self.streaming
def _build_gemini_request(self, context) -> tuple:
system_parts = [] system_parts = []
contents = [] contents = []
for msg in context: for msg in context:
@@ -219,12 +350,17 @@ class Gemini(Platform):
if self.temperature: if self.temperature:
request_body["generationConfig"]["temperature"] = self.temperature request_body["generationConfig"]["temperature"] = self.temperature
endpoint = f"{self.url}/v1beta/models/{self.model}:generateContent"
headers = { headers = {
"Content-Type": "application/json", "Content-Type": "application/json",
"x-goog-api-key": self.api_key "x-goog-api-key": self.api_key
} }
return request_body, headers
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
request_body, headers = self._build_gemini_request(context)
endpoint = f"{self.url}/v1beta/models/{self.model}:generateContent"
async with self.http.post(endpoint, headers=headers, data=json.dumps(request_body)) as response: async with self.http.post(endpoint, headers=headers, data=json.dumps(request_body)) as response:
if response.status != 200: if response.status != 200:
return ChatCompletion( return ChatCompletion(
@@ -243,6 +379,47 @@ class Gemini(Platform):
model=response_json.get("modelVersion", self.model) model=response_json.get("modelVersion", self.model)
) )
async def create_chat_completion_stream(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent):
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
request_body, headers = self._build_gemini_request(context)
endpoint = f"{self.url}/v1beta/models/{self.model}:streamGenerateContent?alt=sse"
async with self.http.post(endpoint, headers=headers, data=json.dumps(request_body)) as response:
if response.status != 200:
raise ValueError(f"Error: {await response.text()}")
# 与 Anthropic 保持一致:内联 while 循环,避免双层异步生成器代理
while True:
try:
line_bytes = await asyncio.wait_for(response.content.readline(), timeout=60.0)
except asyncio.TimeoutError:
break
if not line_bytes:
break
line = line_bytes.decode("utf-8").strip()
if not line.startswith("data: "):
continue
data_str = line[6:]
try:
data = json.loads(data_str)
candidates = data.get("candidates", [])
if not candidates:
continue
candidate = candidates[0]
# 先 yield 文本,再判断是否结束(对齐 OpenAI [DONE] 逻辑)
parts = candidate.get("content", {}).get("parts", [])
for part in parts:
text = part.get("text", "")
if text:
yield text
finish_reason = candidate.get("finishReason")
if finish_reason:
logging.getLogger("instance/aibot").debug(
f"Gemini stream finished: finishReason={finish_reason}"
)
break
except json.JSONDecodeError:
pass
async def list_models(self) -> List[str]: async def list_models(self) -> List[str]:
full_url = f"{self.url}/v1beta/models" full_url = f"{self.url}/v1beta/models"
headers = {"x-goog-api-key": self.api_key} headers = {"x-goog-api-key": self.api_key}
@@ -266,8 +443,12 @@ class XAi(Platform):
def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None: def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None:
super().__init__(config, http) super().__init__(config, http)
self.temperature = self.config['temperature'] self.temperature = float(self.config['temperature']) if self.config.get('temperature') is not None else None
self.max_tokens = self.config['max_tokens'] self.max_tokens = int(self.config['max_tokens']) if self.config.get('max_tokens') else None
self.streaming = self.config.get('streaming', False)
def is_streaming_enabled(self) -> bool:
return self.streaming
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion: async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
full_context = [] full_context = []
@@ -292,7 +473,6 @@ class XAi(Platform):
endpoint = f"{self.url}/v1/chat/completions" endpoint = f"{self.url}/v1/chat/completions"
async with self.http.post(url=endpoint, data=json.dumps(request_body), headers=headers) as response: async with self.http.post(url=endpoint, data=json.dumps(request_body), headers=headers) as response:
# plugin.log.debug(f"响应内容:{response.status}, {await response.json()}")
if response.status != 200: if response.status != 200:
return ChatCompletion( return ChatCompletion(
result=False, result=False,
@@ -309,10 +489,35 @@ class XAi(Platform):
model=response_json["model"] model=response_json["model"]
) )
pass async def create_chat_completion_stream(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent):
full_context = []
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
full_context.extend(list(context))
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
request_body = {
"messages": full_context,
"model": self.model,
"stream": True,
}
if 'max_tokens' in self.config and self.max_tokens:
request_body["max_tokens"] = self.max_tokens
if 'temperature' in self.config and self.temperature:
request_body["temperature"] = self.temperature
endpoint = f"{self.url}/v1/chat/completions"
async with self.http.post(url=endpoint, data=json.dumps(request_body), headers=headers) as response:
if response.status != 200:
raise ValueError(f"Error: {await response.text()}")
async for chunk in _read_openai_sse(response):
yield chunk
async def list_models(self) -> List[str]: async def list_models(self) -> List[str]:
# 调用openai接口获取模型列表
full_url = f"{self.url}/v1/models" full_url = f"{self.url}/v1/models"
headers = {'Content-Type': 'application/json', 'Authorization': f"Bearer {self.api_key}"} headers = {'Content-Type': 'application/json', 'Authorization': f"Bearer {self.api_key}"}
async with self.http.get(full_url, headers=headers) as response: async with self.http.get(full_url, headers=headers) as response:
@@ -320,7 +525,124 @@ class XAi(Platform):
return [] return []
response_data = await response.json() response_data = await response.json()
return [f"- {m['id']}" for m in response_data["data"]] return [f"- {m['id']}" for m in response_data["data"]]
pass
def get_type(self) -> str: def get_type(self) -> str:
return "xai" return "xai"
class Qwen(Platform):
max_tokens: int
temperature: float
top_p: float
enable_thinking: bool
def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None:
super().__init__(config, http)
self.max_tokens = int(self.config['max_tokens']) if self.config.get('max_tokens') else None
self.temperature = float(self.config['temperature']) if self.config.get('temperature') is not None else None
self.top_p = float(self.config['top_p']) if self.config.get('top_p') is not None else None
self.enable_thinking = self.config['enable_thinking']
self.streaming = self.config.get('streaming', False)
def is_streaming_enabled(self) -> bool:
return self.streaming
def _build_qwen_request(self, full_context: list) -> tuple:
parameters = {
"result_format": "message"
}
if self.max_tokens:
parameters["max_tokens"] = self.max_tokens
if self.temperature is not None:
parameters["temperature"] = self.temperature
if self.top_p is not None:
parameters["top_p"] = self.top_p
if self.enable_thinking:
parameters["enable_thinking"] = True
request_body = {
"model": self.model,
"input": {
"messages": full_context
},
"parameters": parameters
}
endpoint = f"{self.url}/api/v1/services/aigc/text-generation/generation"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
return endpoint, headers, request_body
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
full_context = []
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
full_context.extend(list(context))
endpoint, headers, request_body = self._build_qwen_request(full_context)
async with self.http.post(endpoint, headers=headers, data=json.dumps(request_body)) as response:
if response.status != 200:
return ChatCompletion(
result=False,
message={},
finish_reason=f"Error: {await response.text()}",
model=None
)
response_json = await response.json()
choice = response_json["output"]["choices"][0]
return ChatCompletion(
result=True,
message=choice["message"],
finish_reason=choice.get("finish_reason", "stop"),
model=response_json.get("model", self.model)
)
async def create_chat_completion_stream(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent):
full_context = []
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
full_context.extend(list(context))
endpoint, headers, request_body = self._build_qwen_request(full_context)
# DashScope SSE 流式:增加 header 和 incremental_output 参数(每次只返回增量)
headers["X-DashScope-SSE"] = "enable"
request_body["parameters"]["incremental_output"] = True
async with self.http.post(endpoint, headers=headers, data=json.dumps(request_body)) as response:
if response.status != 200:
raise ValueError(f"Error: {await response.text()}")
while True:
try:
line_bytes = await asyncio.wait_for(response.content.readline(), timeout=60.0)
except asyncio.TimeoutError:
break
if not line_bytes:
break
line = line_bytes.decode("utf-8").strip()
if not line.startswith("data:"):
continue
data_str = line[5:].strip()
try:
data = json.loads(data_str)
choices = data.get("output", {}).get("choices", [])
if choices:
content = choices[0].get("message", {}).get("content", "")
if content:
yield content
except json.JSONDecodeError:
pass
async def list_models(self) -> List[str]:
models = [
"qwen-max", "qwen-max-latest",
"qwen-plus", "qwen-plus-latest",
"qwen-turbo", "qwen-turbo-latest",
"qwen-long",
"qwen3-235b-a22b", "qwen3-30b-a3b",
"qwq-plus", "qwq-plus-latest",
]
return [f"- {m}" for m in models]
def get_type(self) -> str:
return "qwen"