import asyncio import json from collections import deque from typing import List from aiohttp import ClientSession from mautrix.types import MessageEvent from mautrix.util.config import BaseProxyConfig import maubot_llmplus.platforms from maubot_llmplus.platforms import Platform, ChatCompletion from maubot_llmplus.plugin import AbsExtraConfigPlugin class Deepseek(Platform): def __init__(self, config: BaseProxyConfig, http: ClientSession): super().__init__(config, http) async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion: full_context = [] context = await maubot_llmplus.platforms.get_context(plugin, self, evt) full_context.extend(list(context)) headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } data = { "model": self.model, "messages": full_context, } endpoint = f"{self.url}/chat/completions" async with self.http.post( endpoint, headers=headers, data=json.dumps(data) ) as response: if response.status != 200: return ChatCompletion( result=False, message={}, finish_reason=f"Error: {await response.text()}", model=None ) response_json = await response.json() choice = response_json["choices"][0] return ChatCompletion( result=True, message=choice["message"], finish_reason=choice["finish_reason"], model=response_json.get("model", None) ) async def list_models(self) -> List[str]: models = ["deepseek-chat", "deepseek-reasoner"] return [f"- {m}" for m in models] def get_type(self) -> str: return "deepseek" class OpenAi(Platform): max_tokens: int temperature: int def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None: super().__init__(config, http) self.max_tokens = self.config['max_tokens'] self.temperature = self.config['temperature'] async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion: full_context = [] context = await maubot_llmplus.platforms.get_context(plugin, self, evt) full_context.extend(list(context)) headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } data = { "model": self.model, "messages": full_context, } if 'max_tokens' in self.config and self.max_tokens: # 如果是gpt5的,就用max_completion_tokens if 'gpt-5' in self.model: data["max_completion_tokens"] = self.max_tokens else: # 如果是gpt4之前的,就是用max_tokens data["max_tokens"] = self.max_tokens if 'temperature' in self.config and self.temperature: data["temperature"] = self.temperature endpoint = f"{self.url}/v1/chat/completions" async with self.http.post( endpoint, headers=headers, data=json.dumps(data) ) as response: # plugin.log.debug(f"响应内容:{response.status}, {await response.json()}") if response.status != 200: return ChatCompletion( result=False, message={}, finish_reason=f"Error: {await response.text()}", model=None ) response_json = await response.json() choice = response_json["choices"][0] return ChatCompletion( result=True, message=choice["message"], finish_reason=choice["finish_reason"], model=choice.get("model", None) ) async def list_models(self) -> List[str]: # 调用openai接口获取模型列表 full_url = f"{self.url}/v1/models" headers = {'Authorization': f"Bearer {self.api_key}"} async with self.http.get(full_url, headers=headers) as response: if response.status != 200: return [] response_data = await response.json() return [f"- {m['id']}" for m in response_data["data"]] def get_type(self) -> str: return "openai" class Anthropic(Platform): max_tokens: int streaming: bool def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None: super().__init__(config, http) self.max_tokens = self.config['max_tokens'] self.streaming = self.config.get('streaming', False) def is_streaming_enabled(self) -> bool: return self.streaming def _build_request(self, full_chat_context: list) -> tuple: endpoint = f"{self.url}/v1/messages" headers = {"x-api-key": self.api_key, "anthropic-version": "2023-06-01", "content-type": "application/json"} req_body = {"model": self.model, "max_tokens": self.max_tokens, "system": self.system_prompt, "messages": full_chat_context} return endpoint, headers, req_body async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion: full_chat_context = [] system_context = deque() chat_context = await maubot_llmplus.platforms.get_chat_context(system_context, plugin, self, evt) full_chat_context.extend(list(chat_context)) endpoint, headers, req_body = self._build_request(full_chat_context) async with self.http.post(endpoint, headers=headers, data=json.dumps(req_body)) as response: # plugin.log.debug(f"响应内容:{response.status}, {await response.json()}") if response.status != 200: return ChatCompletion( result=False, message={}, finish_reason=f"Error: {await response.text()}", model=None ) response_json = await response.json() text = "\n\n".join(c["text"] for c in response_json["content"]) return ChatCompletion( result=True, message=dict(role="assistant", content=text), finish_reason=response_json['stop_reason'], model=response_json['model'] ) async def create_chat_completion_stream(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent): full_chat_context = [] system_context = deque() chat_context = await maubot_llmplus.platforms.get_chat_context(system_context, plugin, self, evt) full_chat_context.extend(list(chat_context)) endpoint, headers, req_body = self._build_request(full_chat_context) req_body["stream"] = True async with self.http.post(endpoint, headers=headers, data=json.dumps(req_body)) as response: if response.status != 200: raise ValueError(f"Error: {await response.text()}") while True: try: line_bytes = await asyncio.wait_for(response.content.readline(), timeout=60.0) except asyncio.TimeoutError: break if not line_bytes: break line = line_bytes.decode("utf-8").strip() if not line.startswith("data: "): continue data_str = line[6:] try: data = json.loads(data_str) if data.get("type") == "message_stop": break if data.get("type") == "content_block_delta": delta = data.get("delta", {}) if delta.get("type") == "text_delta": yield delta.get("text", "") except json.JSONDecodeError: pass async def list_models(self) -> List[str]: # 调用openai接口获取模型列表 full_url = f"{self.url}/v1/models" headers = { 'anthropic-version': "2023-06-01", 'X-Api-Key': f"{self.api_key}" } async with self.http.get(full_url, headers=headers) as response: if response.status != 200: return [] response_data = await response.json() return [f"- {m['id']}" for m in response_data['data']] def get_type(self) -> str: return "anthropic" class Gemini(Platform): max_tokens: int temperature: float def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None: super().__init__(config, http) self.max_tokens = self.config['max_tokens'] self.temperature = self.config['temperature'] async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion: context = await maubot_llmplus.platforms.get_context(plugin, self, evt) system_parts = [] contents = [] for msg in context: role = msg['role'] content = msg['content'] if role == 'system': system_parts.append({"text": content}) elif role == 'assistant': contents.append({"role": "model", "parts": [{"text": content}]}) else: contents.append({"role": "user", "parts": [{"text": content}]}) request_body = { "contents": contents, "generationConfig": {} } if system_parts: request_body["system_instruction"] = {"parts": system_parts} if self.max_tokens: request_body["generationConfig"]["maxOutputTokens"] = self.max_tokens if self.temperature: request_body["generationConfig"]["temperature"] = self.temperature endpoint = f"{self.url}/v1beta/models/{self.model}:generateContent" headers = { "Content-Type": "application/json", "x-goog-api-key": self.api_key } async with self.http.post(endpoint, headers=headers, data=json.dumps(request_body)) as response: if response.status != 200: return ChatCompletion( result=False, message={}, finish_reason=f"Error: {await response.text()}", model=None ) response_json = await response.json() candidate = response_json["candidates"][0] text = "".join(part["text"] for part in candidate["content"]["parts"]) return ChatCompletion( result=True, message={"role": "assistant", "content": text}, finish_reason=candidate.get("finishReason", "STOP"), model=response_json.get("modelVersion", self.model) ) async def list_models(self) -> List[str]: full_url = f"{self.url}/v1beta/models" headers = {"x-goog-api-key": self.api_key} async with self.http.get(full_url, headers=headers) as response: if response.status != 200: return [] response_data = await response.json() return [ f"- {m['name'].replace('models/', '')}" for m in response_data.get("models", []) if "generateContent" in m.get("supportedGenerationMethods", []) ] def get_type(self) -> str: return "gemini" class XAi(Platform): max_tokens: int temperature: int def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None: super().__init__(config, http) self.temperature = self.config['temperature'] self.max_tokens = self.config['max_tokens'] async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion: full_context = [] context = await maubot_llmplus.platforms.get_context(plugin, self, evt) full_context.extend(list(context)) headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } request_body = { "messages": full_context, "model": self.model, "stream": False } if 'max_tokens' in self.config and self.max_tokens: request_body["max_tokens"] = self.max_tokens if 'temperature' in self.config and self.temperature: request_body["temperature"] = self.temperature endpoint = f"{self.url}/v1/chat/completions" async with self.http.post(url=endpoint, data=json.dumps(request_body), headers=headers) as response: # plugin.log.debug(f"响应内容:{response.status}, {await response.json()}") if response.status != 200: return ChatCompletion( result=False, message={}, finish_reason=f"Error: {await response.text()}", model=None ) response_json = await response.json() choice = response_json["choices"][0] return ChatCompletion( result=True, message=choice["message"], finish_reason=choice["finish_reason"], model=response_json["model"] ) pass async def list_models(self) -> List[str]: # 调用openai接口获取模型列表 full_url = f"{self.url}/v1/models" headers = {'Content-Type': 'application/json', 'Authorization': f"Bearer {self.api_key}"} async with self.http.get(full_url, headers=headers) as response: if response.status != 200: return [] response_data = await response.json() return [f"- {m['id']}" for m in response_data["data"]] pass def get_type(self) -> str: return "xai" class Qwen(Platform): max_tokens: int temperature: float top_p: float enable_thinking: bool def __init__(self, config: BaseProxyConfig, http: ClientSession) -> None: super().__init__(config, http) self.max_tokens = self.config['max_tokens'] self.temperature = self.config['temperature'] self.top_p = self.config['top_p'] self.enable_thinking = self.config['enable_thinking'] async def create_chat_completion(self, plugin: AbsExtraConfigPlugin, evt: MessageEvent) -> ChatCompletion: full_context = [] context = await maubot_llmplus.platforms.get_context(plugin, self, evt) full_context.extend(list(context)) parameters = { "result_format": "message" } if self.max_tokens: parameters["max_tokens"] = self.max_tokens if self.temperature is not None: parameters["temperature"] = self.temperature if self.top_p is not None: parameters["top_p"] = self.top_p if self.enable_thinking: parameters["enable_thinking"] = True request_body = { "model": self.model, "input": { "messages": full_context }, "parameters": parameters } endpoint = f"{self.url}/api/v1/services/aigc/text-generation/generation" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } async with self.http.post(endpoint, headers=headers, data=json.dumps(request_body)) as response: if response.status != 200: return ChatCompletion( result=False, message={}, finish_reason=f"Error: {await response.text()}", model=None ) response_json = await response.json() choice = response_json["output"]["choices"][0] return ChatCompletion( result=True, message=choice["message"], finish_reason=choice.get("finish_reason", "stop"), model=response_json.get("model", self.model) ) async def list_models(self) -> List[str]: models = [ "qwen-max", "qwen-max-latest", "qwen-plus", "qwen-plus-latest", "qwen-turbo", "qwen-turbo-latest", "qwen-long", "qwen3-235b-a22b", "qwen3-30b-a3b", "qwq-plus", "qwq-plus-latest", ] return [f"- {m}" for m in models] def get_type(self) -> str: return "qwen"