commit df8078b3807f17b9703d63c2fec4f251def80873 Author: Kris Date: Sun Dec 8 20:02:53 2024 +0100 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d9d5a68 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +# .gitignore +.env +users.db +.DS_Store \ No newline at end of file diff --git a/LICENCE b/LICENCE new file mode 100644 index 0000000..c6f20d8 --- /dev/null +++ b/LICENCE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2020 naiba + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100755 index 0000000..3be9e3b --- /dev/null +++ b/README.md @@ -0,0 +1,123 @@ +**Nezha Telegram Bot - NextGen V1** 是一个基于 Nezha 监控 API,用于监控服务器状态的 Telegram 机器人。通过简单的命令,您可以实时查看服务器的运行状态、资源使用情况、执行计划任务以及监控服务可用性。 + +## 📋 特性 + +- **账号绑定**:安全绑定您的 Nezha 账户,确保数据隐私。 +- **服务器概览**:查看所有服务器的在线状态、内存使用、交换空间、磁盘使用、网络流量等统计信息。 +- **单台服务器状态**:获取单个服务器的详细状态信息,包括负载、CPU 使用率、内存、磁盘、网络流量等。 +- **计划任务管理**:查看并执行预设的计划任务,自动化管理服务器。 +- **服务可用性监测**:监控服务的可用性和平均延迟,确保服务稳定运行。 +- **数据刷新**:实时刷新数据,确保您获取到最新的服务器状态。 + +## 🚀 快速开始 + +### 📦 前提条件 + +在开始之前,请确保您已经具备以下条件: + +- Python 3.7 或更高版本 +- Telegram 账号 +- 已安装哪吒监控 Dashboard 并完成配置 +- Telegram 机器人 Token(通过 [BotFather](https://t.me/BotFather) 获取) + +### 🔧 安装步骤 + +1. **克隆仓库** + + ```bash + git clone https://github.com/yourusername/nezha-telegram-bot.git + cd nezha-telegram-bot + ``` + +2. **创建虚拟环境** + + 推荐使用 `venv` 创建虚拟环境: + + ```bash + python3 -m venv venv + source venv/bin/activate # 对于 Windows 用户使用 venv\Scripts\activate + ``` + +3. **安装依赖** + + ```bash + pip install -r requirements.txt + ``` + +4. **配置环境变量** + + 为了安全起见,建议使用环境变量存储敏感信息。创建一个 `.env` 文件并添加以下内容: + + ```env + TELEGRAM_TOKEN=your_telegram_bot_token + ``` + +5. **初始化数据库** + + 数据库会在首次运行时自动创建。 + +6. **运行机器人** + + ```bash + python bot.py + ``` + + 您应该会看到类似以下的日志输出,表示机器人已成功启动: + + ``` + 2024-12-08 17:50:39,139 - telegram.ext.Application - INFO - Application started + ``` + +## 🛠️ 使用指南 + +### 📌 绑定账号 + +为了确保您的数据安全,绑定账号仅支持私聊中进行操作。如果在群组中尝试绑定,机器人会提示您需在私聊中执行。 + +1. **私聊中发送 `/bind` 命令**。 + +2. **按照提示依次输入**: + - 用户名 + - 密码 + - Dashboard 地址(例如:https://dashboard.example.com) + +3. **绑定成功后**,您可以开始使用机器人的各项功能。 + +### 📜 可用命令 + +- `/start` - 启动机器人并显示欢迎信息。 +- `/help` - 获取可用命令列表和简要说明。 +- `/bind` - 绑定您的 Nezha 账户。 +- `/unbind` - 解绑您的 Nezha 账户。 +- `/overview` - 查看所有服务器的状态总览。 +- `/server` - 查看单台服务器的详细状态。 +- `/cron` - 执行计划任务。 +- `/services` - 查看服务状态总览。 + +### 📊 服务器概览 + +使用 `/overview` 命令,可以查看所有绑定服务器的统计信息,包括在线状态、内存使用、交换空间、磁盘使用、网络流量等。您还可以通过点击“刷新”按钮实时更新数据。 + +### 🖥️ 单台服务器状态 + +使用 `/server` 命令,输入服务器名称进行搜索,并选择相应的服务器查看详细状态信息。包括负载、CPU 使用率、内存、磁盘、网络流量等数据。 + +### ⏰ 计划任务管理 + +使用 `/cron` 命令,可以查看并执行预设的计划任务。点击相应任务名称进行确认执行或取消操作。 + +### 🌐 服务可用性监测 + +使用 `/services` 命令,可以查看服务的可用性信息,包括可用率、当前状态、平均延迟和剩余流量等。 + + +## 🙏 致谢 + +- [python-telegram-bot](https://github.com/python-telegram-bot/python-telegram-bot) - 用于 Telegram 机器人的开发。 +- [aiohttp](https://github.com/aio-libs/aiohttp) - 异步 HTTP 客户端/服务器框架。 +- [aiosqlite](https://github.com/jreese/aiosqlite) - 异步 SQLite 连接库。 +- [哪吒监控](https://nezha.wiki) - 哪吒服务器监控。 +- [ChatGPT](https://chat.openai.com) - 本项目采用“面向 ChatGPT 编程”的理念,完成了包括本文档在内的 90% 的代码。 +--- + +**免责声明**:使用本机器人时,请确保遵守相关法律法规。开发者不对因使用本机器人导致的任何损失承担责任。 \ No newline at end of file diff --git a/bot.py b/bot.py new file mode 100755 index 0000000..5473a4b --- /dev/null +++ b/bot.py @@ -0,0 +1,672 @@ +import asyncio +import logging +import math +import time +from datetime import datetime, timezone +from dateutil import parser +from dotenv import load_dotenv +import os + +from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup +from telegram.ext import ( + ApplicationBuilder, CommandHandler, MessageHandler, CallbackQueryHandler, + ConversationHandler, ContextTypes, filters +) + +from nezha_api import NezhaAPI +from database import Database + +# 配置日志 +logging.basicConfig( + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + level=logging.INFO +) +logger = logging.getLogger(__name__) + +load_dotenv() + +# 定义常量和配置 +TELEGRAM_TOKEN = os.getenv('TELEGRAM_TOKEN') +DATABASE_PATH = 'users.db' + +# 定义阶段 +BIND_USERNAME, BIND_PASSWORD, BIND_DASHBOARD = range(3) +SEARCH_SERVER = range(1) + +# 初始化数据库 +db = Database(DATABASE_PATH) + +# 添加 format_bytes 函数 +def format_bytes(size_in_bytes): + if size_in_bytes == 0: + return "0B" + units = ['B', 'KB', 'MB', 'GB', 'TB'] + power = int(math.floor(math.log(size_in_bytes, 1024))) + power = min(power, len(units) - 1) # 防止超过单位列表的范围 + size = size_in_bytes / (1024 ** power) + formatted_size = f"{size:.2f}{units[power]}" + return formatted_size + +def is_online(server): + """根据last_active判断服务器是否在线,如果最后活跃时间在10秒内则为在线。""" + now_utc = datetime.now(timezone.utc) + last_active_str = server.get('last_active') + if not last_active_str: + return False + try: + last_active_dt = parser.isoparse(last_active_str) + except ValueError: + return False + last_active_utc = last_active_dt.astimezone(timezone.utc) + diff = now_utc - last_active_utc + is_on = diff.total_seconds() < 10 + logger.info("Checking online: diff=%s now=%s last=%s is_online=%s", + diff, now_utc, last_active_utc, is_on) + return is_on + +# 添加 IP 地址掩码函数 +def mask_ipv4(ipv4_address): + if ipv4_address == '未知' or ipv4_address == '❌': + return ipv4_address + parts = ipv4_address.split('.') + if len(parts) != 4: + return ipv4_address # 非法的 IPv4 地址,直接返回 + # 将后两部分替换为 'xx' + masked_ip = f"{parts[0]}.{parts[1]}.xx.xx" + return masked_ip + +def mask_ipv6(ipv6_address): + if ipv6_address == '未知' or ipv6_address == '❌': + return ipv6_address + parts = ipv6_address.split(':') + if len(parts) < 3: + return ipv6_address # 非法的 IPv6 地址,直接返回 + # 只显示前两个部分,后面用 'xx' 替代 + masked_ip = ':'.join(parts[:2]) + ':xx:xx:xx:xx' + return masked_ip + +async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): + await update.message.reply_text( + "欢迎使用 Nezha 监控机器人!\n请使用 /bind 命令绑定您的账号。\n请注意,使用公共机器人有安全风险,用户名密码将会被记录用以鉴权,解绑删除。" + ) + +async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): + await update.message.reply_text(""" +可用命令: +/bind - 绑定账号 +/unbind - 解绑账号 +/overview - 查看服务器状态总览 +/server - 查看单台服务器状态 +/cron - 执行计划任务 +/services - 查看服务状态总览 +/help - 获取帮助 + """) + +async def bind_start(update: Update, context: ContextTypes.DEFAULT_TYPE): + # 检查当前对话类型 + if update.effective_chat.type != "private": + await update.message.reply_text("请与机器人私聊进行绑定操作,\n避免机密信息泄露。") + return ConversationHandler.END + + user = await db.get_user(update.effective_user.id) + if user: + await update.message.reply_text("您已绑定账号,如需重新绑定,请先使用 /unbind 命令解绑。") + return ConversationHandler.END + else: + await update.message.reply_text("请输入您的用户名:") + return BIND_USERNAME + +async def bind_username(update: Update, context: ContextTypes.DEFAULT_TYPE): + context.user_data['username'] = update.message.text.strip() + await update.message.reply_text("请输入您的密码:") + return BIND_PASSWORD + +async def bind_password(update: Update, context: ContextTypes.DEFAULT_TYPE): + context.user_data['password'] = update.message.text.strip() + await update.message.reply_text("请输入您的 Dashboard 地址(例如:https://nezha.example.com):") + return BIND_DASHBOARD + +async def bind_dashboard(update: Update, context: ContextTypes.DEFAULT_TYPE): + dashboard_url = update.message.text.strip() + context.user_data['dashboard_url'] = dashboard_url + telegram_id = update.effective_user.id + username = context.user_data['username'] + password = context.user_data['password'] + + # 测试连接 + try: + api = NezhaAPI(dashboard_url, username, password) + await api.authenticate() + await api.close() + except Exception as e: + await update.message.reply_text(f"绑定失败:{e}\n请检查您的信息并重新绑定。") + return ConversationHandler.END + + # 保存到数据库 + await db.add_user(telegram_id, username, password, dashboard_url) + await update.message.reply_text("绑定成功!您现在可以使用机器人的功能了。") + return ConversationHandler.END + +async def unbind(update: Update, context: ContextTypes.DEFAULT_TYPE): + user = await db.get_user(update.effective_user.id) + if user: + await db.delete_user(update.effective_user.id) + await update.message.reply_text("已解绑。") + else: + await update.message.reply_text("您尚未绑定账号。") + +async def overview(update: Update, context: ContextTypes.DEFAULT_TYPE): + user = await db.get_user(update.effective_user.id) + if not user: + await update.message.reply_text("请先使用 /bind 命令绑定您的账号。") + return + + api = NezhaAPI(user['dashboard_url'], user['username'], user['password']) + try: + data = await api.get_overview() + except Exception as e: + await update.message.reply_text(f"获取数据失败:{e}") + await api.close() + return + # print("返回的服务数据:", data) + + if data and data.get('success'): + servers = data['data'] + online_servers = sum(1 for s in servers if is_online(s)) + total_servers = len(servers) + total_mem = sum(s['host'].get('mem_total', 0) for s in servers if s.get('host')) + used_mem = sum(s['state'].get('mem_used', 0) for s in servers if s.get('state')) + total_swap = sum(s['host'].get('swap_total', 0) for s in servers if s.get('host')) + used_swap = sum(s['state'].get('swap_used', 0) for s in servers if s.get('state')) + total_disk = sum(s['host'].get('disk_total', 0) for s in servers if s.get('host')) + used_disk = sum(s['state'].get('disk_used', 0) for s in servers if s.get('state')) + net_in_speed = sum(s['state'].get('net_in_speed', 0) for s in servers if s.get('state')) + net_out_speed = sum(s['state'].get('net_out_speed', 0) for s in servers if s.get('state')) + net_in_transfer = sum(s['state'].get('net_in_transfer', 0) for s in servers if s.get('state')) + net_out_transfer = sum(s['state'].get('net_out_transfer', 0) for s in servers if s.get('state')) + transfer_ratio = (net_out_transfer / net_in_transfer * 100) if net_in_transfer else 0 + + response = f"""📊 **统计信息** +=========================== +**服务器数量**: {total_servers} +**在线服务器**: {online_servers} +**内存**: {used_mem / total_mem * 100 if total_mem else 0:.1f}% [{format_bytes(used_mem)}/{format_bytes(total_mem)}] +**交换**: {used_swap / total_swap * 100 if total_swap else 0:.1f}% [{format_bytes(used_swap)}/{format_bytes(total_swap)}] +**磁盘**: {used_disk / total_disk * 100 if total_disk else 0:.1f}% [{format_bytes(used_disk)}/{format_bytes(total_disk)}] +**下行速度**: ↓{format_bytes(net_in_speed)}/s +**上行速度**: ↑{format_bytes(net_out_speed)}/s +**下行流量**: ↓{format_bytes(net_in_transfer)} +**上行流量**: ↑{format_bytes(net_out_transfer)} +**流量对等性**: {transfer_ratio:.1f}% + +**更新于**: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC +""" + keyboard = [[InlineKeyboardButton("刷新", callback_data="refresh_overview")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await update.message.reply_text(response, parse_mode='Markdown', reply_markup=reply_markup) + else: + await update.message.reply_text("获取服务器信息失败。") + await api.close() + +async def server_status(update: Update, context: ContextTypes.DEFAULT_TYPE): + user = await db.get_user(update.effective_user.id) + if not user: + await update.message.reply_text("请先使用 /bind 命令绑定您的账号。") + return + + await update.message.reply_text("请输入要查询的服务器名称(支持模糊搜索):") + return SEARCH_SERVER + +async def search_server(update: Update, context: ContextTypes.DEFAULT_TYPE): + query_text = update.message.text.strip() + user = await db.get_user(update.effective_user.id) + api = NezhaAPI(user['dashboard_url'], user['username'], user['password']) + try: + results = await api.search_servers(query_text) + except Exception as e: + await update.message.reply_text(f"搜索失败:{e}") + await api.close() + return ConversationHandler.END + + if not results: + await update.message.reply_text("未找到匹配的服务器。") + await api.close() + return ConversationHandler.END + + keyboard = [ + [InlineKeyboardButton(s['name'], callback_data=f"server_detail_{s['id']}")] + for s in results + ] + reply_markup = InlineKeyboardMarkup(keyboard) + await update.message.reply_text("请选择服务器:", reply_markup=reply_markup) + await api.close() + return ConversationHandler.END + +async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): + query = update.callback_query + data = query.data + + user = await db.get_user(query.from_user.id) + if not user: + await query.answer("请先使用 /bind 命令绑定您的账号。", show_alert=True) + return + + # 实现刷新频率限制 + last_refresh_time = context.user_data.get('last_refresh_time', 0) + current_time = time.time() + if data.startswith('refresh_'): + if current_time - last_refresh_time < 1: + await query.answer("刷新太频繁,请稍后再试。", show_alert=True) + return + else: + context.user_data['last_refresh_time'] = current_time + + await query.answer() + + api = NezhaAPI(user['dashboard_url'], user['username'], user['password']) + + if data.startswith('server_detail_'): + server_id = int(data.split('_')[-1]) + try: + server = await api.get_server_detail(server_id) + except Exception as e: + await query.edit_message_text(f"获取服务器详情失败:{e}") + await api.close() + return + + await api.close() + + if not server: + await query.edit_message_text("未找到该服务器。") + return + + name = server.get('name', '未知') + online_status = is_online(server) + status = "❇️在线" if online_status else "❌离线" + ipv4 = server.get('geoip', {}).get('ip', {}).get('ipv4_addr', '未知') + ipv6 = server.get('geoip', {}).get('ip', {}).get('ipv6_addr', '❌') + + # 对 IP 地址进行掩码处理 + ipv4 = mask_ipv4(ipv4) + ipv6 = mask_ipv6(ipv6) + + platform = server.get('host', {}).get('platform', '未知') + cpu_info = ', '.join(server.get('host', {}).get('cpu', [])) if server.get('host') else '未知' + uptime_seconds = server.get('state', {}).get('uptime', 0) + uptime_days = uptime_seconds // 86400 + uptime_hours = (uptime_seconds % 86400) // 3600 + load_1 = server.get('state', {}).get('load_1', 0) + load_5 = server.get('state', {}).get('load_5', 0) + load_15 = server.get('state', {}).get('load_15', 0) + cpu_usage = server.get('state', {}).get('cpu', 0) + mem_used = server.get('state', {}).get('mem_used', 0) + mem_total = server.get('host', {}).get('mem_total', 1) + swap_used = server.get('state', {}).get('swap_used', 0) + swap_total = server.get('host', {}).get('swap_total', 1) + disk_used = server.get('state', {}).get('disk_used', 0) + disk_total = server.get('host', {}).get('disk_total', 1) + net_in_transfer = server.get('state', {}).get('net_in_transfer', 0) + net_out_transfer = server.get('state', {}).get('net_out_transfer', 0) + net_in_speed = server.get('state', {}).get('net_in_speed', 0) + net_out_speed = server.get('state', {}).get('net_out_speed', 0) + arch = server.get('host', {}).get('arch', '') + + response = f"""**{name}** {status} +========================== +**ID**: {server.get('id', '未知')} +**IPv4**: {ipv4} +**IPv6**: {ipv6} +**平台**: {platform} +**CPU 信息**: {cpu_info} +**运行时间**: {uptime_days} 天 {uptime_hours} 小时 +**负载**: {load_1:.2f} {load_5:.2f} {load_15:.2f} +**CPU**: {cpu_usage:.2f}% [{arch}] +**内存**: {mem_used / mem_total * 100 if mem_total else 0:.1f}% [{format_bytes(mem_used)}/{format_bytes(mem_total)}] +**交换**: {swap_used / swap_total * 100 if swap_total else 0:.1f}% [{format_bytes(swap_used)}/{format_bytes(swap_total)}] +**磁盘**: {disk_used / disk_total * 100 if disk_total else 0:.1f}% [{format_bytes(disk_used)}/{format_bytes(disk_total)}] +**流量**: ↓{format_bytes(net_in_transfer)} ↑{format_bytes(net_out_transfer)} +**网速**: ↓{format_bytes(net_in_speed)}/s ↑{format_bytes(net_out_speed)}/s + +**更新于**: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC +""" + # 添加刷新按钮 + keyboard = [[InlineKeyboardButton("刷新", callback_data=f"refresh_server_{server_id}")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await query.edit_message_text(response, parse_mode='Markdown', reply_markup=reply_markup) + + elif data.startswith('refresh_server_'): + server_id = int(data.split('_')[-1]) + # 重新获取服务器详情,与上面相同的代码 + try: + server = await api.get_server_detail(server_id) + except Exception as e: + await query.edit_message_text(f"获取服务器详情失败:{e}") + await api.close() + return + + await api.close() + + if not server: + await query.edit_message_text("未找到该服务器。") + return + + # 同上,构建响应和刷新按钮 + name = server.get('name', '未知') + online_status = is_online(server) + status = "❇️在线" if online_status else "❌离线" + ipv4 = server.get('geoip', {}).get('ip', {}).get('ipv4_addr', '未知') + ipv6 = server.get('geoip', {}).get('ip', {}).get('ipv6_addr', '❌') + + # 对 IP 地址进行掩码处理 + ipv4 = mask_ipv4(ipv4) + ipv6 = mask_ipv6(ipv6) + + platform = server.get('host', {}).get('platform', '未知') + cpu_info = ', '.join(server.get('host', {}).get('cpu', [])) if server.get('host') else '未知' + uptime_seconds = server.get('state', {}).get('uptime', 0) + uptime_days = uptime_seconds // 86400 + uptime_hours = (uptime_seconds % 86400) // 3600 + load_1 = server.get('state', {}).get('load_1', 0) + load_5 = server.get('state', {}).get('load_5', 0) + load_15 = server.get('state', {}).get('load_15', 0) + cpu_usage = server.get('state', {}).get('cpu', 0) + mem_used = server.get('state', {}).get('mem_used', 0) + mem_total = server.get('host', {}).get('mem_total', 1) + swap_used = server.get('state', {}).get('swap_used', 0) + swap_total = server.get('host', {}).get('swap_total', 1) + disk_used = server.get('state', {}).get('disk_used', 0) + disk_total = server.get('host', {}).get('disk_total', 1) + net_in_transfer = server.get('state', {}).get('net_in_transfer', 0) + net_out_transfer = server.get('state', {}).get('net_out_transfer', 0) + net_in_speed = server.get('state', {}).get('net_in_speed', 0) + net_out_speed = server.get('state', {}).get('net_out_speed', 0) + arch = server.get('host', {}).get('arch', '') + + response = f"""**{name}** {status} +========================== +**ID**: {server.get('id', '未知')} +**IPv4**: {ipv4} +**IPv6**: {ipv6} +**平台**: {platform} +**CPU 信息**: {cpu_info} +**运行时间**: {uptime_days} 天 {uptime_hours} 小时 +**负载**: {load_1:.2f} {load_5:.2f} {load_15:.2f} +**CPU**: {cpu_usage:.2f}% [{arch}] +**内存**: {mem_used / mem_total * 100 if mem_total else 0:.1f}% [{format_bytes(mem_used)}/{format_bytes(mem_total)}] +**交换**: {swap_used / swap_total * 100 if swap_total else 0:.1f}% [{format_bytes(swap_used)}/{format_bytes(swap_total)}] +**磁盘**: {disk_used / disk_total * 100 if disk_total else 0:.1f}% [{format_bytes(disk_used)}/{format_bytes(disk_total)}] +**流量**: ↓{format_bytes(net_in_transfer)} ↑{format_bytes(net_out_transfer)} +**网速**: ↓{format_bytes(net_in_speed)}/s ↑{format_bytes(net_out_speed)}/s + +**更新于**: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC +""" + keyboard = [[InlineKeyboardButton("刷新", callback_data=f"refresh_server_{server_id}")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await query.edit_message_text(response, parse_mode='Markdown', reply_markup=reply_markup) + + elif data == 'refresh_overview': + # 重新获取概览数据,与 overview 函数类似 + try: + data = await api.get_overview() + except Exception as e: + await query.edit_message_text(f"获取数据失败:{e}") + await api.close() + return + + if data and data.get('success'): + servers = data['data'] + total_servers = len(servers) + online_servers = sum(1 for s in servers if is_online(s)) + total_mem = sum(s['host'].get('mem_total', 0) for s in servers if s.get('host')) + used_mem = sum(s['state'].get('mem_used', 0) for s in servers if s.get('state')) + total_swap = sum(s['host'].get('swap_total', 0) for s in servers if s.get('host')) + used_swap = sum(s['state'].get('swap_used', 0) for s in servers if s.get('state')) + total_disk = sum(s['host'].get('disk_total', 0) for s in servers if s.get('host')) + used_disk = sum(s['state'].get('disk_used', 0) for s in servers if s.get('state')) + net_in_speed = sum(s['state'].get('net_in_speed', 0) for s in servers if s.get('state')) + net_out_speed = sum(s['state'].get('net_out_speed', 0) for s in servers if s.get('state')) + net_in_transfer = sum(s['state'].get('net_in_transfer', 0) for s in servers if s.get('state')) + net_out_transfer = sum(s['state'].get('net_out_transfer', 0) for s in servers if s.get('state')) + transfer_ratio = (net_out_transfer / net_in_transfer * 100) if net_in_transfer else 0 + + response = f"""📊 **统计信息** +=========================== +**服务器数量**: {total_servers} +**在线服务器**: {online_servers} +**内存**: {used_mem / total_mem * 100 if total_mem else 0:.1f}% [{format_bytes(used_mem)}/{format_bytes(total_mem)}] +**交换**: {used_swap / total_swap * 100 if total_swap else 0:.1f}% [{format_bytes(used_swap)}/{format_bytes(total_swap)}] +**磁盘**: {used_disk / total_disk * 100 if total_disk else 0:.1f}% [{format_bytes(used_disk)}/{format_bytes(total_disk)}] +**下行速度**: ↓{format_bytes(net_in_speed)}/s +**上行速度**: ↑{format_bytes(net_out_speed)}/s +**下行流量**: ↓{format_bytes(net_in_transfer)} +**上行流量**: ↑{format_bytes(net_out_transfer)} +**流量对等性**: {transfer_ratio:.1f}% + +**更新于**: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC +""" + keyboard = [[InlineKeyboardButton("刷新", callback_data="refresh_overview")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await query.edit_message_text(response, parse_mode='Markdown', reply_markup=reply_markup) + else: + await query.edit_message_text("获取服务器信息失败。") + await api.close() + + elif data.startswith('cron_job_'): + cron_id = int(data.split('_')[-1]) + keyboard = [ + [InlineKeyboardButton("确认执行", callback_data=f"confirm_cron_{cron_id}")], + [InlineKeyboardButton("取消", callback_data="cancel")] + ] + reply_markup = InlineKeyboardMarkup(keyboard) + await query.edit_message_text("您确定要执行此计划任务吗?", reply_markup=reply_markup) + + elif data.startswith('confirm_cron_'): + cron_id = int(data.split('_')[-1]) + try: + result = await api.run_cron_job(cron_id) + except Exception as e: + await query.edit_message_text(f"执行失败:{e}") + await api.close() + return + + await api.close() + + if result and result.get('success'): + await query.edit_message_text("计划任务已执行。") + else: + await query.edit_message_text("执行失败。") + + elif data == 'cancel': + await query.edit_message_text("操作已取消。") + + elif data == 'view_loop_traffic': + await view_loop_traffic(query, context, api) + + elif data == 'refresh_loop_traffic': + await view_loop_traffic(query, context, api) + + elif data == 'view_availability': + await view_availability(query, context, api) + + elif data == 'refresh_availability': + await view_availability(query, context, api) + +async def view_loop_traffic(query, context, api): + # 获取服务状态 + try: + services_data = await api.get_services_status() + except Exception as e: + await query.edit_message_text(f"获取服务信息失败:{e}") + await api.close() + return + + if services_data and services_data.get('success'): + cycle_stats = services_data['data'].get('cycle_transfer_stats', {}) + if not cycle_stats: + await query.edit_message_text("暂无循环流量信息。") + await api.close() + return + + response = "**循环流量信息总览**\n==========================\n" + for stat_name, stats in cycle_stats.items(): + rule_name = stats.get('name', '未知规则') + server_names = stats.get('server_name', {}) + transfers = stats.get('transfer', {}) + max_transfer = stats.get('max', 1) # 最大流量(字节) + + response += f"**规则:{rule_name}**\n" + for server_id_str, transfer_value in transfers.items(): + server_id = str(server_id_str) + server_name = server_names.get(server_id, f"服务器ID {server_id}") + transfer_formatted = format_bytes(transfer_value) + max_transfer_formatted = format_bytes(max_transfer) + percentage = (transfer_value / max_transfer * 100) if max_transfer else 0 + response += f"服务器 **{server_name}**:已使用 {transfer_formatted} / {max_transfer_formatted},已使用 {percentage:.2f}%\n" + response += "--------------------------\n" + + response += f"**更新于**: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC" + + # 添加刷新按钮 + keyboard = [[InlineKeyboardButton("刷新", callback_data="refresh_loop_traffic")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await query.edit_message_text(response, parse_mode='Markdown', reply_markup=reply_markup) + else: + await query.edit_message_text("获取循环流量信息失败。") + await api.close() + +async def view_availability(query, context, api): + # 获取服务状态 + try: + services_data = await api.get_services_status() + except Exception as e: + await query.edit_message_text(f"获取服务信息失败:{e}") + await api.close() + return + # print("返回的服务数据:", services_data) + + if services_data and services_data.get('success'): + services = services_data['data'].get('services', {}) + if not services: + await query.edit_message_text("暂无可用性监测信息。") + await api.close() + return + + response = "**可用性监测信息总览**\n==========================\n" + for service_id, service_info in services.items(): + service = service_info.get('service', {}) + name = service_info.get('service_name', '未知') + total_up = service_info.get('total_up', 0) + total_down = service_info.get('total_down', 0) + total = total_up + total_down + availability = (total_up / total * 100) if total else 0 + status = "🟢 UP" if service_info.get('current_up', 0) else "🔴 DOWN" + # 计算平均延迟 + delays = service_info.get('delay', []) + if delays: + avg_delay = sum(delays) / len(delays) + else: + avg_delay = None + if avg_delay is not None: + delay_text = f",平均延迟 {avg_delay:.2f}ms" + else: + delay_text = "" + response += f"**{name}**:可用率 {availability:.2f}%,状态 {status}{delay_text}\n------------------\n" + response += f"\n**更新于**: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC" + + # 添加刷新按钮 + keyboard = [[InlineKeyboardButton("刷新", callback_data="refresh_availability")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await query.edit_message_text(response, parse_mode='Markdown', reply_markup=reply_markup) + else: + await query.edit_message_text("获取可用性监测信息失败。") + await api.close() + +async def cron_jobs(update: Update, context: ContextTypes.DEFAULT_TYPE): + user = await db.get_user(update.effective_user.id) + if not user: + await update.message.reply_text("请先使用 /bind 命令绑定您的账号。") + return + + api = NezhaAPI(user['dashboard_url'], user['username'], user['password']) + try: + data = await api.get_cron_jobs() + except Exception as e: + await update.message.reply_text(f"获取计划任务失败:{e}") + await api.close() + return + + if data and data.get('success'): + cron_jobs = data['data'] + if not cron_jobs: + await update.message.reply_text("暂无计划任务。") + await api.close() + return + + keyboard = [ + [InlineKeyboardButton(job['name'], callback_data=f"cron_job_{job['id']}")] + for job in cron_jobs + ] + reply_markup = InlineKeyboardMarkup(keyboard) + await update.message.reply_text("请选择要执行的计划任务:", reply_markup=reply_markup) + else: + await update.message.reply_text("获取计划任务失败。") + await api.close() + +async def services_overview(update: Update, context: ContextTypes.DEFAULT_TYPE): + user = await db.get_user(update.effective_user.id) + if not user: + await update.message.reply_text("请先使用 /bind 命令绑定您的账号。") + return + + keyboard = [ + [InlineKeyboardButton("查看循环流量信息", callback_data="view_loop_traffic")], + [InlineKeyboardButton("查看可用性监测信息", callback_data="view_availability")] + ] + reply_markup = InlineKeyboardMarkup(keyboard) + await update.message.reply_text("请选择要查看的服务信息:", reply_markup=reply_markup) + +def main(): + application = ApplicationBuilder().token(TELEGRAM_TOKEN).build() + + # 初始化数据库 + loop = asyncio.get_event_loop() + loop.run_until_complete(db.initialize()) + + # 回调查询处理(放在最前面) + application.add_handler(CallbackQueryHandler(button_handler)) + + # 命令处理 + application.add_handler(CommandHandler('start', start)) + application.add_handler(CommandHandler('help', help_command)) + application.add_handler(CommandHandler('unbind', unbind)) + application.add_handler(CommandHandler('overview', overview)) + application.add_handler(CommandHandler('cron', cron_jobs)) + application.add_handler(CommandHandler('services', services_overview)) + + # 绑定命令的会话处理 + bind_handler = ConversationHandler( + entry_points=[CommandHandler('bind', bind_start)], + states={ + BIND_USERNAME: [MessageHandler(filters.TEXT & ~filters.COMMAND, bind_username)], + BIND_PASSWORD: [MessageHandler(filters.TEXT & ~filters.COMMAND, bind_password)], + BIND_DASHBOARD: [MessageHandler(filters.TEXT & ~filters.COMMAND, bind_dashboard)], + }, + fallbacks=[] + ) + application.add_handler(bind_handler) + + # 查看单台服务器状态的会话处理 + server_handler = ConversationHandler( + entry_points=[CommandHandler('server', server_status)], + states={ + SEARCH_SERVER: [MessageHandler(filters.TEXT & ~filters.COMMAND, search_server)], + }, + fallbacks=[] + ) + application.add_handler(server_handler) + + # 在 run_polling 中指定 allowed_updates + application.run_polling(allowed_updates=['message', 'callback_query']) + +if __name__ == '__main__': + main() diff --git a/database.py b/database.py new file mode 100755 index 0000000..89a0f41 --- /dev/null +++ b/database.py @@ -0,0 +1,41 @@ +# database.py + +import aiosqlite + +class Database: + def __init__(self, db_path): + self.db_path = db_path + + async def initialize(self): + async with aiosqlite.connect(self.db_path) as db: + await db.execute(''' + CREATE TABLE IF NOT EXISTS users ( + telegram_id INTEGER PRIMARY KEY, + username TEXT NOT NULL, + password TEXT NOT NULL, + dashboard_url TEXT NOT NULL + ) + ''') + await db.commit() + + async def add_user(self, telegram_id, username, password, dashboard_url): + async with aiosqlite.connect(self.db_path) as db: + await db.execute(''' + INSERT OR REPLACE INTO users (telegram_id, username, password, dashboard_url) + VALUES (?, ?, ?, ?) + ''', (telegram_id, username, password, dashboard_url)) + await db.commit() + + async def get_user(self, telegram_id): + async with aiosqlite.connect(self.db_path) as db: + async with db.execute('SELECT username, password, dashboard_url FROM users WHERE telegram_id = ?', (telegram_id,)) as cursor: + row = await cursor.fetchone() + if row: + return {'username': row[0], 'password': row[1], 'dashboard_url': row[2]} + else: + return None + + async def delete_user(self, telegram_id): + async with aiosqlite.connect(self.db_path) as db: + await db.execute('DELETE FROM users WHERE telegram_id = ?', (telegram_id,)) + await db.commit() diff --git a/nezha_api.py b/nezha_api.py new file mode 100755 index 0000000..56d334b --- /dev/null +++ b/nezha_api.py @@ -0,0 +1,100 @@ +import aiohttp +import asyncio +import logging + +class NezhaAPI: + def __init__(self, dashboard_url, username, password): + self.base_url = dashboard_url.rstrip('/') + '/api/v1' + self.username = username + self.password = password + self.token = None + self.session = aiohttp.ClientSession() + self.lock = asyncio.Lock() + + async def close(self): + await self.session.close() + + async def authenticate(self): + async with self.lock: + if self.token is not None: + return + login_url = f'{self.base_url}/login' + payload = { + 'username': self.username, + 'password': self.password + } + async with self.session.post(login_url, json=payload) as resp: + data = await resp.json() + if data.get('success'): + self.token = data['data']['token'] + else: + raise Exception('认证失败,请检查用户名和密码。') + + async def request(self, method, endpoint, **kwargs): + await self.authenticate() + url = f'{self.base_url}{endpoint}' + headers = kwargs.get('headers', {}) + headers['Authorization'] = f'Bearer {self.token}' + kwargs['headers'] = headers + + async with self.session.request(method, url, **kwargs) as resp: + if resp.status == 401: + self.token = None + return await self.request(method, endpoint, **kwargs) + elif resp.status == 200: + return await resp.json() + else: + logging.error(f'API 请求失败:{resp.status}') + return None + + async def get_overview(self): + data = await self.request('GET', '/server') + return data + + async def get_services(self): + data = await self.request('GET', '/service') + return data + + async def get_servers(self): + data = await self.request('GET', '/server') + return data + + async def get_cron_jobs(self): + data = await self.request('GET', '/cron') + return data + + async def run_cron_job(self, cron_id): + endpoint = f'/cron/{cron_id}/manual' + data = await self.request('GET', endpoint) + return data + + async def search_servers(self, query): + servers = await self.get_servers() + if servers and servers.get('success'): + result = [] + for server in servers['data']: + if query.lower() in server['name'].lower(): + result.append(server) + return result + return [] + + async def get_server_detail(self, server_id): + servers = await self.get_servers() + if servers and servers.get('success'): + for server in servers['data']: + if server['id'] == server_id: + return server + return None + + async def get_services_status(self): + data = await self.request('GET', '/service') + return data + + async def get_service_histories(self, server_id): + endpoint = f'/service/{server_id}' + data = await self.request('GET', endpoint) + return data + + async def get_alert_rules(self): + data = await self.request('GET', '/alert-rule') + return data diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..43c3d9a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +python-telegram-bot==20.3 +aiohttp==3.8.1 +aiosqlite==0.19.0 +python-dateutil==2.8.2 +httpx==0.24.0 +python-dotenv==0.21.0 \ No newline at end of file