Files
maubot-llmplus/maubot_llmplus/thrid_platform.py
taylorxie caddfb61f1 add
2026-03-09 23:15:54 +08:00

445 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
from collections import deque
from typing import List
from aiohttp import ClientSession
from mautrix.types import MessageEvent
from mautrix.util.config import BaseProxyConfig
import maubot_llmplus.platforms
from maubot_llmplus.platforms import Platform, ChatCompletion
from maubot_llmplus.plugin import AbsExtraConfigPlugin
class Deepseek(Platform):
def __init__(self, config: BaseProxyConfig, http: ClientSession):
super().__init__(config, http)
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
full_context = []
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
full_context.extend(list(context))
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
data = {
"model": self.model,
"messages": full_context,
}
endpoint = f"{self.url}/chat/completions"
async with self.http.post(
endpoint, headers=headers, data=json.dumps(data)
) as response:
if response.status != 200:
return ChatCompletion(
result=False,
message={},
finish_reason=f"Error: {await response.text()}",
model=None
)
response_json = await response.json()
choice = response_json["choices"][0]
return ChatCompletion(
result=True,
message=choice["message"],
finish_reason=choice["finish_reason"],
model=response_json.get("model", None)
)
async def list_models(self) -> List[str]:
models = ["deepseek-chat", "deepseek-reasoner"]
return [f"- {m}" for m in models]
def get_type(self) -> str:
return "deepseek"
class OpenAi(Platform):
max_tokens: int
temperature: int
def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None:
super().__init__(config, http)
self.max_tokens = self.config['max_tokens']
self.temperature = self.config['temperature']
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
full_context = []
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
full_context.extend(list(context))
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
data = {
"model": self.model,
"messages": full_context,
}
if 'max_tokens' in self.config and self.max_tokens:
# 如果是gpt5的就用max_completion_tokens
if 'gpt-5' in self.model:
data["max_completion_tokens"] = self.max_tokens
else:
# 如果是gpt4之前的就是用max_tokens
data["max_tokens"] = self.max_tokens
if 'temperature' in self.config and self.temperature:
data["temperature"] = self.temperature
endpoint = f"{self.url}/v1/chat/completions"
async with self.http.post(
endpoint, headers=headers, data=json.dumps(data)
) as response:
# plugin.log.debug(f"响应内容:{response.status}, {await response.json()}")
if response.status != 200:
return ChatCompletion(
result=False,
message={},
finish_reason=f"Error: {await response.text()}",
model=None
)
response_json = await response.json()
choice = response_json["choices"][0]
return ChatCompletion(
result=True,
message=choice["message"],
finish_reason=choice["finish_reason"],
model=choice.get("model", None)
)
async def list_models(self) -> List[str]:
# 调用openai接口获取模型列表
full_url = f"{self.url}/v1/models"
headers = {'Authorization': f"Bearer {self.api_key}"}
async with self.http.get(full_url, headers=headers) as response:
if response.status != 200:
return []
response_data = await response.json()
return [f"- {m['id']}" for m in response_data["data"]]
def get_type(self) -> str:
return "openai"
class Anthropic(Platform):
max_tokens: int
streaming: bool
def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None:
super().__init__(config, http)
self.max_tokens = self.config['max_tokens']
self.streaming = self.config.get('streaming', False)
def is_streaming_enabled(self) -> bool:
return self.streaming
def _build_request(self, full_chat_context: list) -> tuple:
endpoint = f"{self.url}/v1/messages"
headers = {"x-api-key": self.api_key, "anthropic-version": "2023-06-01", "content-type": "application/json"}
req_body = {"model": self.model, "max_tokens": self.max_tokens, "system": self.system_prompt,
"messages": full_chat_context}
return endpoint, headers, req_body
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
full_chat_context = []
system_context = deque()
chat_context = await maubot_llmplus.platforms.get_chat_context(system_context, plugin, self, evt)
full_chat_context.extend(list(chat_context))
endpoint, headers, req_body = self._build_request(full_chat_context)
async with self.http.post(endpoint, headers=headers, data=json.dumps(req_body)) as response:
# plugin.log.debug(f"响应内容:{response.status}, {await response.json()}")
if response.status != 200:
return ChatCompletion(
result=False,
message={},
finish_reason=f"Error: {await response.text()}",
model=None
)
response_json = await response.json()
text = "\n\n".join(c["text"] for c in response_json["content"])
return ChatCompletion(
result=True,
message=dict(role="assistant", content=text),
finish_reason=response_json['stop_reason'],
model=response_json['model']
)
async def create_chat_completion_stream(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent):
full_chat_context = []
system_context = deque()
chat_context = await maubot_llmplus.platforms.get_chat_context(system_context, plugin, self, evt)
full_chat_context.extend(list(chat_context))
endpoint, headers, req_body = self._build_request(full_chat_context)
req_body["stream"] = True
async with self.http.post(endpoint, headers=headers, data=json.dumps(req_body)) as response:
if response.status != 200:
raise ValueError(f"Error: {await response.text()}")
while True:
try:
line_bytes = await asyncio.wait_for(response.content.readline(), timeout=60.0)
except asyncio.TimeoutError:
break
if not line_bytes:
break
line = line_bytes.decode("utf-8").strip()
if not line.startswith("data: "):
continue
data_str = line[6:]
try:
data = json.loads(data_str)
if data.get("type") == "message_stop":
break
if data.get("type") == "content_block_delta":
delta = data.get("delta", {})
if delta.get("type") == "text_delta":
yield delta.get("text", "")
except json.JSONDecodeError:
pass
async def list_models(self) -> List[str]:
# 调用openai接口获取模型列表
full_url = f"{self.url}/v1/models"
headers = {
'anthropic-version': "2023-06-01",
'X-Api-Key': f"{self.api_key}"
}
async with self.http.get(full_url, headers=headers) as response:
if response.status != 200:
return []
response_data = await response.json()
return [f"- {m['id']}" for m in response_data['data']]
def get_type(self) -> str:
return "anthropic"
class Gemini(Platform):
max_tokens: int
temperature: float
def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None:
super().__init__(config, http)
self.max_tokens = self.config['max_tokens']
self.temperature = self.config['temperature']
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
system_parts = []
contents = []
for msg in context:
role = msg['role']
content = msg['content']
if role == 'system':
system_parts.append({"text": content})
elif role == 'assistant':
contents.append({"role": "model", "parts": [{"text": content}]})
else:
contents.append({"role": "user", "parts": [{"text": content}]})
request_body = {
"contents": contents,
"generationConfig": {}
}
if system_parts:
request_body["system_instruction"] = {"parts": system_parts}
if self.max_tokens:
request_body["generationConfig"]["maxOutputTokens"] = self.max_tokens
if self.temperature:
request_body["generationConfig"]["temperature"] = self.temperature
endpoint = f"{self.url}/v1beta/models/{self.model}:generateContent"
headers = {
"Content-Type": "application/json",
"x-goog-api-key": self.api_key
}
async with self.http.post(endpoint, headers=headers, data=json.dumps(request_body)) as response:
if response.status != 200:
return ChatCompletion(
result=False,
message={},
finish_reason=f"Error: {await response.text()}",
model=None
)
response_json = await response.json()
candidate = response_json["candidates"][0]
text = "".join(part["text"] for part in candidate["content"]["parts"])
return ChatCompletion(
result=True,
message={"role": "assistant", "content": text},
finish_reason=candidate.get("finishReason", "STOP"),
model=response_json.get("modelVersion", self.model)
)
async def list_models(self) -> List[str]:
full_url = f"{self.url}/v1beta/models"
headers = {"x-goog-api-key": self.api_key}
async with self.http.get(full_url, headers=headers) as response:
if response.status != 200:
return []
response_data = await response.json()
return [
f"- {m['name'].replace('models/', '')}"
for m in response_data.get("models", [])
if "generateContent" in m.get("supportedGenerationMethods", [])
]
def get_type(self) -> str:
return "gemini"
class XAi(Platform):
max_tokens: int
temperature: int
def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None:
super().__init__(config, http)
self.temperature = self.config['temperature']
self.max_tokens = self.config['max_tokens']
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
full_context = []
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
full_context.extend(list(context))
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
request_body = {
"messages": full_context,
"model": self.model,
"stream": False
}
if 'max_tokens' in self.config and self.max_tokens:
request_body["max_tokens"] = self.max_tokens
if 'temperature' in self.config and self.temperature:
request_body["temperature"] = self.temperature
endpoint = f"{self.url}/v1/chat/completions"
async with self.http.post(url=endpoint, data=json.dumps(request_body), headers=headers) as response:
# plugin.log.debug(f"响应内容:{response.status}, {await response.json()}")
if response.status != 200:
return ChatCompletion(
result=False,
message={},
finish_reason=f"Error: {await response.text()}",
model=None
)
response_json = await response.json()
choice = response_json["choices"][0]
return ChatCompletion(
result=True,
message=choice["message"],
finish_reason=choice["finish_reason"],
model=response_json["model"]
)
pass
async def list_models(self) -> List[str]:
# 调用openai接口获取模型列表
full_url = f"{self.url}/v1/models"
headers = {'Content-Type': 'application/json', 'Authorization': f"Bearer {self.api_key}"}
async with self.http.get(full_url, headers=headers) as response:
if response.status != 200:
return []
response_data = await response.json()
return [f"- {m['id']}" for m in response_data["data"]]
pass
def get_type(self) -> str:
return "xai"
class Qwen(Platform):
max_tokens: int
temperature: float
top_p: float
enable_thinking: bool
def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None:
super().__init__(config, http)
self.max_tokens = self.config['max_tokens']
self.temperature = self.config['temperature']
self.top_p = self.config['top_p']
self.enable_thinking = self.config['enable_thinking']
async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion:
full_context = []
context = await maubot_llmplus.platforms.get_context(plugin, self, evt)
full_context.extend(list(context))
parameters = {
"result_format": "message"
}
if self.max_tokens:
parameters["max_tokens"] = self.max_tokens
if self.temperature is not None:
parameters["temperature"] = self.temperature
if self.top_p is not None:
parameters["top_p"] = self.top_p
if self.enable_thinking:
parameters["enable_thinking"] = True
request_body = {
"model": self.model,
"input": {
"messages": full_context
},
"parameters": parameters
}
endpoint = f"{self.url}/api/v1/services/aigc/text-generation/generation"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
async with self.http.post(endpoint, headers=headers, data=json.dumps(request_body)) as response:
if response.status != 200:
return ChatCompletion(
result=False,
message={},
finish_reason=f"Error: {await response.text()}",
model=None
)
response_json = await response.json()
choice = response_json["output"]["choices"][0]
return ChatCompletion(
result=True,
message=choice["message"],
finish_reason=choice.get("finish_reason", "stop"),
model=response_json.get("model", self.model)
)
async def list_models(self) -> List[str]:
models = [
"qwen-max", "qwen-max-latest",
"qwen-plus", "qwen-plus-latest",
"qwen-turbo", "qwen-turbo-latest",
"qwen-long",
"qwen3-235b-a22b", "qwen3-30b-a3b",
"qwq-plus", "qwq-plus-latest",
]
return [f"- {m}" for m in models]
def get_type(self) -> str:
return "qwen"