Files
maubot-llmplus/maubot_llmplus/thrid_platform.py
2026-03-09 23:51:18 +08:00

637 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
from collections import deque
from typing import List
from aiohttp import ClientSession
from mautrix.types import MessageEvent
from mautrix.util.config import BaseProxyConfig
import maubot_llmplus.platforms
from maubot_llmplus.platforms import Platform, ChatCompletion
from maubot_llmplus.plugin import AbsExtraConfigPlugin
async def _read_openai_sse(response):
"""读取 OpenAI 兼容格式的 SSE 流yield 每个 delta content"""
while True:
try:
line_bytes = await asyncio.wait_for(response.content.readline(), timeout=60.0)
except asyncio.TimeoutError:
break
if not line_bytes:
break
line = line_bytes.decode("utf-8").strip()
if not line.startswith("data: "):
continue
data_str = line[6:]
if data_str == "[DONE]":
break
try:
data = json.loads(data_str)
choices = data.get("choices", [])
if choices:
content = choices[0].get("delta", {}).get("content", "")
if content:
yield content
except json.JSONDecodeError:
pass
class Deepseek(Platform):
def __init__(self, config: BaseProxyConfig, http: ClientSession):
super().__init__(config, http)
self.streaming = self.config.get('streaming', False)
def is_streaming_enabled(self) -> bool:
return self.streaming
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
full_context = []
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
full_context.extend(list(context))
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
data = {
"model": self.model,
"messages": full_context,
}
endpoint = f"{self.url}/chat/completions"
async with self.http.post(
endpoint, headers=headers, data=json.dumps(data)
) as response:
if response.status != 200:
return ChatCompletion(
result=False,
message={},
finish_reason=f"Error: {await response.text()}",
model=None
)
response_json = await response.json()
choice = response_json["choices"][0]
return ChatCompletion(
result=True,
message=choice["message"],
finish_reason=choice["finish_reason"],
model=response_json.get("model", None)
)
async def create_chat_completion_stream(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent):
full_context = []
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
full_context.extend(list(context))
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
data = {
"model": self.model,
"messages": full_context,
"stream": True,
}
endpoint = f"{self.url}/chat/completions"
async with self.http.post(endpoint, headers=headers, data=json.dumps(data)) as response:
if response.status != 200:
raise ValueError(f"Error: {await response.text()}")
async for chunk in _read_openai_sse(response):
yield chunk
async def list_models(self) -> List[str]:
models = ["deepseek-chat", "deepseek-reasoner"]
return [f"- {m}" for m in models]
def get_type(self) -> str:
return "deepseek"
class OpenAi(Platform):
max_tokens: int
temperature: float
def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None:
super().__init__(config, http)
self.max_tokens = int(self.config['max_tokens']) if self.config.get('max_tokens') else None
self.temperature = float(self.config['temperature']) if self.config.get('temperature') is not None else None
self.streaming = self.config.get('streaming', False)
def is_streaming_enabled(self) -> bool:
return self.streaming
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
full_context = []
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
full_context.extend(list(context))
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
data = {
"model": self.model,
"messages": full_context,
}
if 'max_tokens' in self.config and self.max_tokens:
# 如果是gpt5的就用max_completion_tokens
if 'gpt-5' in self.model:
data["max_completion_tokens"] = self.max_tokens
else:
# 如果是gpt4之前的就是用max_tokens
data["max_tokens"] = self.max_tokens
if 'temperature' in self.config and self.temperature:
data["temperature"] = self.temperature
endpoint = f"{self.url}/v1/chat/completions"
async with self.http.post(
endpoint, headers=headers, data=json.dumps(data)
) as response:
if response.status != 200:
return ChatCompletion(
result=False,
message={},
finish_reason=f"Error: {await response.text()}",
model=None
)
response_json = await response.json()
choice = response_json["choices"][0]
return ChatCompletion(
result=True,
message=choice["message"],
finish_reason=choice["finish_reason"],
model=choice.get("model", None)
)
async def create_chat_completion_stream(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent):
full_context = []
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
full_context.extend(list(context))
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
data = {
"model": self.model,
"messages": full_context,
"stream": True,
}
if 'max_tokens' in self.config and self.max_tokens:
if 'gpt-5' in self.model:
data["max_completion_tokens"] = self.max_tokens
else:
data["max_tokens"] = self.max_tokens
if 'temperature' in self.config and self.temperature:
data["temperature"] = self.temperature
endpoint = f"{self.url}/v1/chat/completions"
async with self.http.post(endpoint, headers=headers, data=json.dumps(data)) as response:
if response.status != 200:
raise ValueError(f"Error: {await response.text()}")
async for chunk in _read_openai_sse(response):
yield chunk
async def list_models(self) -> List[str]:
# 调用openai接口获取模型列表
full_url = f"{self.url}/v1/models"
headers = {'Authorization': f"Bearer {self.api_key}"}
async with self.http.get(full_url, headers=headers) as response:
if response.status != 200:
return []
response_data = await response.json()
return [f"- {m['id']}" for m in response_data["data"]]
def get_type(self) -> str:
return "openai"
class Anthropic(Platform):
max_tokens: int
streaming: bool
def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None:
super().__init__(config, http)
self.max_tokens = int(self.config['max_tokens']) if self.config.get('max_tokens') else None
self.streaming = self.config.get('streaming', False)
def is_streaming_enabled(self) -> bool:
return self.streaming
def _build_request(self, full_chat_context: list) -> tuple:
endpoint = f"{self.url}/v1/messages"
headers = {"x-api-key": self.api_key, "anthropic-version": "2023-06-01", "content-type": "application/json"}
req_body = {"model": self.model, "max_tokens": self.max_tokens, "system": self.system_prompt,
"messages": full_chat_context}
return endpoint, headers, req_body
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
full_chat_context = []
system_context = deque()
chat_context = await maubot_llmplus.platforms.get_chat_context(system_context, plugin, self, evt)
full_chat_context.extend(list(chat_context))
endpoint, headers, req_body = self._build_request(full_chat_context)
async with self.http.post(endpoint, headers=headers, data=json.dumps(req_body)) as response:
if response.status != 200:
return ChatCompletion(
result=False,
message={},
finish_reason=f"Error: {await response.text()}",
model=None
)
response_json = await response.json()
text = "\n\n".join(c["text"] for c in response_json["content"])
return ChatCompletion(
result=True,
message=dict(role="assistant", content=text),
finish_reason=response_json['stop_reason'],
model=response_json['model']
)
async def create_chat_completion_stream(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent):
full_chat_context = []
system_context = deque()
chat_context = await maubot_llmplus.platforms.get_chat_context(system_context, plugin, self, evt)
full_chat_context.extend(list(chat_context))
endpoint, headers, req_body = self._build_request(full_chat_context)
req_body["stream"] = True
async with self.http.post(endpoint, headers=headers, data=json.dumps(req_body)) as response:
if response.status != 200:
raise ValueError(f"Error: {await response.text()}")
while True:
try:
line_bytes = await asyncio.wait_for(response.content.readline(), timeout=60.0)
except asyncio.TimeoutError:
break
if not line_bytes:
break
line = line_bytes.decode("utf-8").strip()
if not line.startswith("data: "):
continue
data_str = line[6:]
try:
data = json.loads(data_str)
if data.get("type") == "message_stop":
break
if data.get("type") == "content_block_delta":
delta = data.get("delta", {})
if delta.get("type") == "text_delta":
yield delta.get("text", "")
except json.JSONDecodeError:
pass
async def list_models(self) -> List[str]:
full_url = f"{self.url}/v1/models"
headers = {
'anthropic-version': "2023-06-01",
'X-Api-Key': f"{self.api_key}"
}
async with self.http.get(full_url, headers=headers) as response:
if response.status != 200:
return []
response_data = await response.json()
return [f"- {m['id']}" for m in response_data['data']]
def get_type(self) -> str:
return "anthropic"
class Gemini(Platform):
max_tokens: int
temperature: float
def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None:
super().__init__(config, http)
self.max_tokens = int(self.config['max_tokens']) if self.config.get('max_tokens') else None
self.temperature = float(self.config['temperature']) if self.config.get('temperature') is not None else None
self.streaming = self.config.get('streaming', False)
def is_streaming_enabled(self) -> bool:
return self.streaming
def _build_gemini_request(self, context) -> tuple:
system_parts = []
contents = []
for msg in context:
role = msg['role']
content = msg['content']
if role == 'system':
system_parts.append({"text": content})
elif role == 'assistant':
contents.append({"role": "model", "parts": [{"text": content}]})
else:
contents.append({"role": "user", "parts": [{"text": content}]})
request_body = {
"contents": contents,
"generationConfig": {}
}
if system_parts:
request_body["system_instruction"] = {"parts": system_parts}
if self.max_tokens:
request_body["generationConfig"]["maxOutputTokens"] = self.max_tokens
if self.temperature:
request_body["generationConfig"]["temperature"] = self.temperature
headers = {
"Content-Type": "application/json",
"x-goog-api-key": self.api_key
}
return request_body, headers
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
request_body, headers = self._build_gemini_request(context)
endpoint = f"{self.url}/v1beta/models/{self.model}:generateContent"
async with self.http.post(endpoint, headers=headers, data=json.dumps(request_body)) as response:
if response.status != 200:
return ChatCompletion(
result=False,
message={},
finish_reason=f"Error: {await response.text()}",
model=None
)
response_json = await response.json()
candidate = response_json["candidates"][0]
text = "".join(part["text"] for part in candidate["content"]["parts"])
return ChatCompletion(
result=True,
message={"role": "assistant", "content": text},
finish_reason=candidate.get("finishReason", "STOP"),
model=response_json.get("modelVersion", self.model)
)
async def create_chat_completion_stream(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent):
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
request_body, headers = self._build_gemini_request(context)
endpoint = f"{self.url}/v1beta/models/{self.model}:streamGenerateContent"
async with self.http.post(endpoint, headers=headers, data=json.dumps(request_body)) as response:
if response.status != 200:
raise ValueError(f"Error: {await response.text()}")
while True:
try:
line_bytes = await asyncio.wait_for(response.content.readline(), timeout=60.0)
except asyncio.TimeoutError:
break
if not line_bytes:
break
line = line_bytes.decode("utf-8").strip()
if not line.startswith("data: "):
continue
data_str = line[6:]
try:
data = json.loads(data_str)
candidates = data.get("candidates", [])
if candidates:
parts = candidates[0].get("content", {}).get("parts", [])
for part in parts:
text = part.get("text", "")
if text:
yield text
except json.JSONDecodeError:
pass
async def list_models(self) -> List[str]:
full_url = f"{self.url}/v1beta/models"
headers = {"x-goog-api-key": self.api_key}
async with self.http.get(full_url, headers=headers) as response:
if response.status != 200:
return []
response_data = await response.json()
return [
f"- {m['name'].replace('models/', '')}"
for m in response_data.get("models", [])
if "generateContent" in m.get("supportedGenerationMethods", [])
]
def get_type(self) -> str:
return "gemini"
class XAi(Platform):
max_tokens: int
temperature: int
def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None:
super().__init__(config, http)
self.temperature = float(self.config['temperature']) if self.config.get('temperature') is not None else None
self.max_tokens = int(self.config['max_tokens']) if self.config.get('max_tokens') else None
self.streaming = self.config.get('streaming', False)
def is_streaming_enabled(self) -> bool:
return self.streaming
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
full_context = []
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
full_context.extend(list(context))
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
request_body = {
"messages": full_context,
"model": self.model,
"stream": False
}
if 'max_tokens' in self.config and self.max_tokens:
request_body["max_tokens"] = self.max_tokens
if 'temperature' in self.config and self.temperature:
request_body["temperature"] = self.temperature
endpoint = f"{self.url}/v1/chat/completions"
async with self.http.post(url=endpoint, data=json.dumps(request_body), headers=headers) as response:
if response.status != 200:
return ChatCompletion(
result=False,
message={},
finish_reason=f"Error: {await response.text()}",
model=None
)
response_json = await response.json()
choice = response_json["choices"][0]
return ChatCompletion(
result=True,
message=choice["message"],
finish_reason=choice["finish_reason"],
model=response_json["model"]
)
async def create_chat_completion_stream(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent):
full_context = []
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
full_context.extend(list(context))
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
request_body = {
"messages": full_context,
"model": self.model,
"stream": True,
}
if 'max_tokens' in self.config and self.max_tokens:
request_body["max_tokens"] = self.max_tokens
if 'temperature' in self.config and self.temperature:
request_body["temperature"] = self.temperature
endpoint = f"{self.url}/v1/chat/completions"
async with self.http.post(url=endpoint, data=json.dumps(request_body), headers=headers) as response:
if response.status != 200:
raise ValueError(f"Error: {await response.text()}")
async for chunk in _read_openai_sse(response):
yield chunk
async def list_models(self) -> List[str]:
full_url = f"{self.url}/v1/models"
headers = {'Content-Type': 'application/json', 'Authorization': f"Bearer {self.api_key}"}
async with self.http.get(full_url, headers=headers) as response:
if response.status != 200:
return []
response_data = await response.json()
return [f"- {m['id']}" for m in response_data["data"]]
def get_type(self) -> str:
return "xai"
class Qwen(Platform):
max_tokens: int
temperature: float
top_p: float
enable_thinking: bool
def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None:
super().__init__(config, http)
self.max_tokens = int(self.config['max_tokens']) if self.config.get('max_tokens') else None
self.temperature = float(self.config['temperature']) if self.config.get('temperature') is not None else None
self.top_p = float(self.config['top_p']) if self.config.get('top_p') is not None else None
self.enable_thinking = self.config['enable_thinking']
self.streaming = self.config.get('streaming', False)
def is_streaming_enabled(self) -> bool:
return self.streaming
def _build_qwen_request(self, full_context: list) -> tuple:
parameters = {
"result_format": "message"
}
if self.max_tokens:
parameters["max_tokens"] = self.max_tokens
if self.temperature is not None:
parameters["temperature"] = self.temperature
if self.top_p is not None:
parameters["top_p"] = self.top_p
if self.enable_thinking:
parameters["enable_thinking"] = True
request_body = {
"model": self.model,
"input": {
"messages": full_context
},
"parameters": parameters
}
endpoint = f"{self.url}/api/v1/services/aigc/text-generation/generation"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
return endpoint, headers, request_body
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
full_context = []
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
full_context.extend(list(context))
endpoint, headers, request_body = self._build_qwen_request(full_context)
async with self.http.post(endpoint, headers=headers, data=json.dumps(request_body)) as response:
if response.status != 200:
return ChatCompletion(
result=False,
message={},
finish_reason=f"Error: {await response.text()}",
model=None
)
response_json = await response.json()
choice = response_json["output"]["choices"][0]
return ChatCompletion(
result=True,
message=choice["message"],
finish_reason=choice.get("finish_reason", "stop"),
model=response_json.get("model", self.model)
)
async def create_chat_completion_stream(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent):
full_context = []
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
full_context.extend(list(context))
endpoint, headers, request_body = self._build_qwen_request(full_context)
# DashScope SSE 流式:增加 header 和 incremental_output 参数(每次只返回增量)
headers["X-DashScope-SSE"] = "enable"
request_body["parameters"]["incremental_output"] = True
async with self.http.post(endpoint, headers=headers, data=json.dumps(request_body)) as response:
if response.status != 200:
raise ValueError(f"Error: {await response.text()}")
while True:
try:
line_bytes = await asyncio.wait_for(response.content.readline(), timeout=60.0)
except asyncio.TimeoutError:
break
if not line_bytes:
break
line = line_bytes.decode("utf-8").strip()
if not line.startswith("data:"):
continue
data_str = line[5:].strip()
try:
data = json.loads(data_str)
choices = data.get("output", {}).get("choices", [])
if choices:
content = choices[0].get("message", {}).get("content", "")
if content:
yield content
except json.JSONDecodeError:
pass
async def list_models(self) -> List[str]:
models = [
"qwen-max", "qwen-max-latest",
"qwen-plus", "qwen-plus-latest",
"qwen-turbo", "qwen-turbo-latest",
"qwen-long",
"qwen3-235b-a22b", "qwen3-30b-a3b",
"qwq-plus", "qwq-plus-latest",
]
return [f"- {m}" for m in models]
def get_type(self) -> str:
return "qwen"