first commit

This commit is contained in:
Kris 2024-12-08 20:02:53 +01:00
commit df8078b380
No known key found for this signature in database
GPG Key ID: 36AFAA97D2910652
7 changed files with 1147 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
# .gitignore
.env
users.db
.DS_Store

201
LICENCE Normal file
View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2020 naiba
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

123
README.md Executable file
View File

@ -0,0 +1,123 @@
**Nezha Telegram Bot - NextGen V1** 是一个基于 Nezha 监控 API用于监控服务器状态的 Telegram 机器人。通过简单的命令,您可以实时查看服务器的运行状态、资源使用情况、执行计划任务以及监控服务可用性。
## 📋 特性
- **账号绑定**:安全绑定您的 Nezha 账户,确保数据隐私。
- **服务器概览**:查看所有服务器的在线状态、内存使用、交换空间、磁盘使用、网络流量等统计信息。
- **单台服务器状态**获取单个服务器的详细状态信息包括负载、CPU 使用率、内存、磁盘、网络流量等。
- **计划任务管理**:查看并执行预设的计划任务,自动化管理服务器。
- **服务可用性监测**:监控服务的可用性和平均延迟,确保服务稳定运行。
- **数据刷新**:实时刷新数据,确保您获取到最新的服务器状态。
## 🚀 快速开始
### 📦 前提条件
在开始之前,请确保您已经具备以下条件:
- Python 3.7 或更高版本
- Telegram 账号
- 已安装哪吒监控 Dashboard 并完成配置
- Telegram 机器人 Token通过 [BotFather](https://t.me/BotFather) 获取)
### 🔧 安装步骤
1. **克隆仓库**
```bash
git clone https://github.com/yourusername/nezha-telegram-bot.git
cd nezha-telegram-bot
```
2. **创建虚拟环境**
推荐使用 `venv` 创建虚拟环境:
```bash
python3 -m venv venv
source venv/bin/activate # 对于 Windows 用户使用 venv\Scripts\activate
```
3. **安装依赖**
```bash
pip install -r requirements.txt
```
4. **配置环境变量**
为了安全起见,建议使用环境变量存储敏感信息。创建一个 `.env` 文件并添加以下内容:
```env
TELEGRAM_TOKEN=your_telegram_bot_token
```
5. **初始化数据库**
数据库会在首次运行时自动创建。
6. **运行机器人**
```bash
python bot.py
```
您应该会看到类似以下的日志输出,表示机器人已成功启动:
```
2024-12-08 17:50:39,139 - telegram.ext.Application - INFO - Application started
```
## 🛠️ 使用指南
### 📌 绑定账号
为了确保您的数据安全,绑定账号仅支持私聊中进行操作。如果在群组中尝试绑定,机器人会提示您需在私聊中执行。
1. **私聊中发送 `/bind` 命令**
2. **按照提示依次输入**
- 用户名
- 密码
- Dashboard 地址例如https://dashboard.example.com
3. **绑定成功后**,您可以开始使用机器人的各项功能。
### 📜 可用命令
- `/start` - 启动机器人并显示欢迎信息。
- `/help` - 获取可用命令列表和简要说明。
- `/bind` - 绑定您的 Nezha 账户。
- `/unbind` - 解绑您的 Nezha 账户。
- `/overview` - 查看所有服务器的状态总览。
- `/server` - 查看单台服务器的详细状态。
- `/cron` - 执行计划任务。
- `/services` - 查看服务状态总览。
### 📊 服务器概览
使用 `/overview` 命令,可以查看所有绑定服务器的统计信息,包括在线状态、内存使用、交换空间、磁盘使用、网络流量等。您还可以通过点击“刷新”按钮实时更新数据。
### 🖥️ 单台服务器状态
使用 `/server` 命令输入服务器名称进行搜索并选择相应的服务器查看详细状态信息。包括负载、CPU 使用率、内存、磁盘、网络流量等数据。
### ⏰ 计划任务管理
使用 `/cron` 命令,可以查看并执行预设的计划任务。点击相应任务名称进行确认执行或取消操作。
### 🌐 服务可用性监测
使用 `/services` 命令,可以查看服务的可用性信息,包括可用率、当前状态、平均延迟和剩余流量等。
## 🙏 致谢
- [python-telegram-bot](https://github.com/python-telegram-bot/python-telegram-bot) - 用于 Telegram 机器人的开发。
- [aiohttp](https://github.com/aio-libs/aiohttp) - 异步 HTTP 客户端/服务器框架。
- [aiosqlite](https://github.com/jreese/aiosqlite) - 异步 SQLite 连接库。
- [哪吒监控](https://nezha.wiki) - 哪吒服务器监控。
- [ChatGPT](https://chat.openai.com) - 本项目采用“面向 ChatGPT 编程”的理念,完成了包括本文档在内的 90% 的代码。
---
**免责声明**:使用本机器人时,请确保遵守相关法律法规。开发者不对因使用本机器人导致的任何损失承担责任。

672
bot.py Executable file
View File

@ -0,0 +1,672 @@
import asyncio
import logging
import math
import time
from datetime import datetime, timezone
from dateutil import parser
from dotenv import load_dotenv
import os
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
ApplicationBuilder, CommandHandler, MessageHandler, CallbackQueryHandler,
ConversationHandler, ContextTypes, filters
)
from nezha_api import NezhaAPI
from database import Database
# 配置日志
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
load_dotenv()
# 定义常量和配置
TELEGRAM_TOKEN = os.getenv('TELEGRAM_TOKEN')
DATABASE_PATH = 'users.db'
# 定义阶段
BIND_USERNAME, BIND_PASSWORD, BIND_DASHBOARD = range(3)
SEARCH_SERVER = range(1)
# 初始化数据库
db = Database(DATABASE_PATH)
# 添加 format_bytes 函数
def format_bytes(size_in_bytes):
if size_in_bytes == 0:
return "0B"
units = ['B', 'KB', 'MB', 'GB', 'TB']
power = int(math.floor(math.log(size_in_bytes, 1024)))
power = min(power, len(units) - 1) # 防止超过单位列表的范围
size = size_in_bytes / (1024 ** power)
formatted_size = f"{size:.2f}{units[power]}"
return formatted_size
def is_online(server):
"""根据last_active判断服务器是否在线如果最后活跃时间在10秒内则为在线。"""
now_utc = datetime.now(timezone.utc)
last_active_str = server.get('last_active')
if not last_active_str:
return False
try:
last_active_dt = parser.isoparse(last_active_str)
except ValueError:
return False
last_active_utc = last_active_dt.astimezone(timezone.utc)
diff = now_utc - last_active_utc
is_on = diff.total_seconds() < 10
logger.info("Checking online: diff=%s now=%s last=%s is_online=%s",
diff, now_utc, last_active_utc, is_on)
return is_on
# 添加 IP 地址掩码函数
def mask_ipv4(ipv4_address):
if ipv4_address == '未知' or ipv4_address == '':
return ipv4_address
parts = ipv4_address.split('.')
if len(parts) != 4:
return ipv4_address # 非法的 IPv4 地址,直接返回
# 将后两部分替换为 'xx'
masked_ip = f"{parts[0]}.{parts[1]}.xx.xx"
return masked_ip
def mask_ipv6(ipv6_address):
if ipv6_address == '未知' or ipv6_address == '':
return ipv6_address
parts = ipv6_address.split(':')
if len(parts) < 3:
return ipv6_address # 非法的 IPv6 地址,直接返回
# 只显示前两个部分,后面用 'xx' 替代
masked_ip = ':'.join(parts[:2]) + ':xx:xx:xx:xx'
return masked_ip
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text(
"欢迎使用 Nezha 监控机器人!\n请使用 /bind 命令绑定您的账号。\n请注意,使用公共机器人有安全风险,用户名密码将会被记录用以鉴权,解绑删除。"
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("""
可用命令
/bind - 绑定账号
/unbind - 解绑账号
/overview - 查看服务器状态总览
/server - 查看单台服务器状态
/cron - 执行计划任务
/services - 查看服务状态总览
/help - 获取帮助
""")
async def bind_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
# 检查当前对话类型
if update.effective_chat.type != "private":
await update.message.reply_text("请与机器人私聊进行绑定操作,\n避免机密信息泄露。")
return ConversationHandler.END
user = await db.get_user(update.effective_user.id)
if user:
await update.message.reply_text("您已绑定账号,如需重新绑定,请先使用 /unbind 命令解绑。")
return ConversationHandler.END
else:
await update.message.reply_text("请输入您的用户名:")
return BIND_USERNAME
async def bind_username(update: Update, context: ContextTypes.DEFAULT_TYPE):
context.user_data['username'] = update.message.text.strip()
await update.message.reply_text("请输入您的密码:")
return BIND_PASSWORD
async def bind_password(update: Update, context: ContextTypes.DEFAULT_TYPE):
context.user_data['password'] = update.message.text.strip()
await update.message.reply_text("请输入您的 Dashboard 地址例如https://nezha.example.com")
return BIND_DASHBOARD
async def bind_dashboard(update: Update, context: ContextTypes.DEFAULT_TYPE):
dashboard_url = update.message.text.strip()
context.user_data['dashboard_url'] = dashboard_url
telegram_id = update.effective_user.id
username = context.user_data['username']
password = context.user_data['password']
# 测试连接
try:
api = NezhaAPI(dashboard_url, username, password)
await api.authenticate()
await api.close()
except Exception as e:
await update.message.reply_text(f"绑定失败:{e}\n请检查您的信息并重新绑定。")
return ConversationHandler.END
# 保存到数据库
await db.add_user(telegram_id, username, password, dashboard_url)
await update.message.reply_text("绑定成功!您现在可以使用机器人的功能了。")
return ConversationHandler.END
async def unbind(update: Update, context: ContextTypes.DEFAULT_TYPE):
user = await db.get_user(update.effective_user.id)
if user:
await db.delete_user(update.effective_user.id)
await update.message.reply_text("已解绑。")
else:
await update.message.reply_text("您尚未绑定账号。")
async def overview(update: Update, context: ContextTypes.DEFAULT_TYPE):
user = await db.get_user(update.effective_user.id)
if not user:
await update.message.reply_text("请先使用 /bind 命令绑定您的账号。")
return
api = NezhaAPI(user['dashboard_url'], user['username'], user['password'])
try:
data = await api.get_overview()
except Exception as e:
await update.message.reply_text(f"获取数据失败:{e}")
await api.close()
return
# print("返回的服务数据:", data)
if data and data.get('success'):
servers = data['data']
online_servers = sum(1 for s in servers if is_online(s))
total_servers = len(servers)
total_mem = sum(s['host'].get('mem_total', 0) for s in servers if s.get('host'))
used_mem = sum(s['state'].get('mem_used', 0) for s in servers if s.get('state'))
total_swap = sum(s['host'].get('swap_total', 0) for s in servers if s.get('host'))
used_swap = sum(s['state'].get('swap_used', 0) for s in servers if s.get('state'))
total_disk = sum(s['host'].get('disk_total', 0) for s in servers if s.get('host'))
used_disk = sum(s['state'].get('disk_used', 0) for s in servers if s.get('state'))
net_in_speed = sum(s['state'].get('net_in_speed', 0) for s in servers if s.get('state'))
net_out_speed = sum(s['state'].get('net_out_speed', 0) for s in servers if s.get('state'))
net_in_transfer = sum(s['state'].get('net_in_transfer', 0) for s in servers if s.get('state'))
net_out_transfer = sum(s['state'].get('net_out_transfer', 0) for s in servers if s.get('state'))
transfer_ratio = (net_out_transfer / net_in_transfer * 100) if net_in_transfer else 0
response = f"""📊 **统计信息**
===========================
**服务器数量** {total_servers}
**在线服务器** {online_servers}
**内存** {used_mem / total_mem * 100 if total_mem else 0:.1f}% [{format_bytes(used_mem)}/{format_bytes(total_mem)}]
**交换** {used_swap / total_swap * 100 if total_swap else 0:.1f}% [{format_bytes(used_swap)}/{format_bytes(total_swap)}]
**磁盘** {used_disk / total_disk * 100 if total_disk else 0:.1f}% [{format_bytes(used_disk)}/{format_bytes(total_disk)}]
**下行速度** {format_bytes(net_in_speed)}/s
**上行速度** {format_bytes(net_out_speed)}/s
**下行流量** {format_bytes(net_in_transfer)}
**上行流量** {format_bytes(net_out_transfer)}
**流量对等性** {transfer_ratio:.1f}%
**更新于** {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC
"""
keyboard = [[InlineKeyboardButton("刷新", callback_data="refresh_overview")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(response, parse_mode='Markdown', reply_markup=reply_markup)
else:
await update.message.reply_text("获取服务器信息失败。")
await api.close()
async def server_status(update: Update, context: ContextTypes.DEFAULT_TYPE):
user = await db.get_user(update.effective_user.id)
if not user:
await update.message.reply_text("请先使用 /bind 命令绑定您的账号。")
return
await update.message.reply_text("请输入要查询的服务器名称(支持模糊搜索):")
return SEARCH_SERVER
async def search_server(update: Update, context: ContextTypes.DEFAULT_TYPE):
query_text = update.message.text.strip()
user = await db.get_user(update.effective_user.id)
api = NezhaAPI(user['dashboard_url'], user['username'], user['password'])
try:
results = await api.search_servers(query_text)
except Exception as e:
await update.message.reply_text(f"搜索失败:{e}")
await api.close()
return ConversationHandler.END
if not results:
await update.message.reply_text("未找到匹配的服务器。")
await api.close()
return ConversationHandler.END
keyboard = [
[InlineKeyboardButton(s['name'], callback_data=f"server_detail_{s['id']}")]
for s in results
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text("请选择服务器:", reply_markup=reply_markup)
await api.close()
return ConversationHandler.END
async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
data = query.data
user = await db.get_user(query.from_user.id)
if not user:
await query.answer("请先使用 /bind 命令绑定您的账号。", show_alert=True)
return
# 实现刷新频率限制
last_refresh_time = context.user_data.get('last_refresh_time', 0)
current_time = time.time()
if data.startswith('refresh_'):
if current_time - last_refresh_time < 1:
await query.answer("刷新太频繁,请稍后再试。", show_alert=True)
return
else:
context.user_data['last_refresh_time'] = current_time
await query.answer()
api = NezhaAPI(user['dashboard_url'], user['username'], user['password'])
if data.startswith('server_detail_'):
server_id = int(data.split('_')[-1])
try:
server = await api.get_server_detail(server_id)
except Exception as e:
await query.edit_message_text(f"获取服务器详情失败:{e}")
await api.close()
return
await api.close()
if not server:
await query.edit_message_text("未找到该服务器。")
return
name = server.get('name', '未知')
online_status = is_online(server)
status = "❇️在线" if online_status else "❌离线"
ipv4 = server.get('geoip', {}).get('ip', {}).get('ipv4_addr', '未知')
ipv6 = server.get('geoip', {}).get('ip', {}).get('ipv6_addr', '')
# 对 IP 地址进行掩码处理
ipv4 = mask_ipv4(ipv4)
ipv6 = mask_ipv6(ipv6)
platform = server.get('host', {}).get('platform', '未知')
cpu_info = ', '.join(server.get('host', {}).get('cpu', [])) if server.get('host') else '未知'
uptime_seconds = server.get('state', {}).get('uptime', 0)
uptime_days = uptime_seconds // 86400
uptime_hours = (uptime_seconds % 86400) // 3600
load_1 = server.get('state', {}).get('load_1', 0)
load_5 = server.get('state', {}).get('load_5', 0)
load_15 = server.get('state', {}).get('load_15', 0)
cpu_usage = server.get('state', {}).get('cpu', 0)
mem_used = server.get('state', {}).get('mem_used', 0)
mem_total = server.get('host', {}).get('mem_total', 1)
swap_used = server.get('state', {}).get('swap_used', 0)
swap_total = server.get('host', {}).get('swap_total', 1)
disk_used = server.get('state', {}).get('disk_used', 0)
disk_total = server.get('host', {}).get('disk_total', 1)
net_in_transfer = server.get('state', {}).get('net_in_transfer', 0)
net_out_transfer = server.get('state', {}).get('net_out_transfer', 0)
net_in_speed = server.get('state', {}).get('net_in_speed', 0)
net_out_speed = server.get('state', {}).get('net_out_speed', 0)
arch = server.get('host', {}).get('arch', '')
response = f"""**{name}** {status}
==========================
**ID**: {server.get('id', '未知')}
**IPv4**: {ipv4}
**IPv6**: {ipv6}
**平台** {platform}
**CPU 信息** {cpu_info}
**运行时间** {uptime_days} {uptime_hours} 小时
**负载** {load_1:.2f} {load_5:.2f} {load_15:.2f}
**CPU** {cpu_usage:.2f}% [{arch}]
**内存** {mem_used / mem_total * 100 if mem_total else 0:.1f}% [{format_bytes(mem_used)}/{format_bytes(mem_total)}]
**交换** {swap_used / swap_total * 100 if swap_total else 0:.1f}% [{format_bytes(swap_used)}/{format_bytes(swap_total)}]
**磁盘** {disk_used / disk_total * 100 if disk_total else 0:.1f}% [{format_bytes(disk_used)}/{format_bytes(disk_total)}]
**流量** {format_bytes(net_in_transfer)} {format_bytes(net_out_transfer)}
**网速** {format_bytes(net_in_speed)}/s {format_bytes(net_out_speed)}/s
**更新于** {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC
"""
# 添加刷新按钮
keyboard = [[InlineKeyboardButton("刷新", callback_data=f"refresh_server_{server_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(response, parse_mode='Markdown', reply_markup=reply_markup)
elif data.startswith('refresh_server_'):
server_id = int(data.split('_')[-1])
# 重新获取服务器详情,与上面相同的代码
try:
server = await api.get_server_detail(server_id)
except Exception as e:
await query.edit_message_text(f"获取服务器详情失败:{e}")
await api.close()
return
await api.close()
if not server:
await query.edit_message_text("未找到该服务器。")
return
# 同上,构建响应和刷新按钮
name = server.get('name', '未知')
online_status = is_online(server)
status = "❇️在线" if online_status else "❌离线"
ipv4 = server.get('geoip', {}).get('ip', {}).get('ipv4_addr', '未知')
ipv6 = server.get('geoip', {}).get('ip', {}).get('ipv6_addr', '')
# 对 IP 地址进行掩码处理
ipv4 = mask_ipv4(ipv4)
ipv6 = mask_ipv6(ipv6)
platform = server.get('host', {}).get('platform', '未知')
cpu_info = ', '.join(server.get('host', {}).get('cpu', [])) if server.get('host') else '未知'
uptime_seconds = server.get('state', {}).get('uptime', 0)
uptime_days = uptime_seconds // 86400
uptime_hours = (uptime_seconds % 86400) // 3600
load_1 = server.get('state', {}).get('load_1', 0)
load_5 = server.get('state', {}).get('load_5', 0)
load_15 = server.get('state', {}).get('load_15', 0)
cpu_usage = server.get('state', {}).get('cpu', 0)
mem_used = server.get('state', {}).get('mem_used', 0)
mem_total = server.get('host', {}).get('mem_total', 1)
swap_used = server.get('state', {}).get('swap_used', 0)
swap_total = server.get('host', {}).get('swap_total', 1)
disk_used = server.get('state', {}).get('disk_used', 0)
disk_total = server.get('host', {}).get('disk_total', 1)
net_in_transfer = server.get('state', {}).get('net_in_transfer', 0)
net_out_transfer = server.get('state', {}).get('net_out_transfer', 0)
net_in_speed = server.get('state', {}).get('net_in_speed', 0)
net_out_speed = server.get('state', {}).get('net_out_speed', 0)
arch = server.get('host', {}).get('arch', '')
response = f"""**{name}** {status}
==========================
**ID**: {server.get('id', '未知')}
**IPv4**: {ipv4}
**IPv6**: {ipv6}
**平台** {platform}
**CPU 信息** {cpu_info}
**运行时间** {uptime_days} {uptime_hours} 小时
**负载** {load_1:.2f} {load_5:.2f} {load_15:.2f}
**CPU** {cpu_usage:.2f}% [{arch}]
**内存** {mem_used / mem_total * 100 if mem_total else 0:.1f}% [{format_bytes(mem_used)}/{format_bytes(mem_total)}]
**交换** {swap_used / swap_total * 100 if swap_total else 0:.1f}% [{format_bytes(swap_used)}/{format_bytes(swap_total)}]
**磁盘** {disk_used / disk_total * 100 if disk_total else 0:.1f}% [{format_bytes(disk_used)}/{format_bytes(disk_total)}]
**流量** {format_bytes(net_in_transfer)} {format_bytes(net_out_transfer)}
**网速** {format_bytes(net_in_speed)}/s {format_bytes(net_out_speed)}/s
**更新于** {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC
"""
keyboard = [[InlineKeyboardButton("刷新", callback_data=f"refresh_server_{server_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(response, parse_mode='Markdown', reply_markup=reply_markup)
elif data == 'refresh_overview':
# 重新获取概览数据,与 overview 函数类似
try:
data = await api.get_overview()
except Exception as e:
await query.edit_message_text(f"获取数据失败:{e}")
await api.close()
return
if data and data.get('success'):
servers = data['data']
total_servers = len(servers)
online_servers = sum(1 for s in servers if is_online(s))
total_mem = sum(s['host'].get('mem_total', 0) for s in servers if s.get('host'))
used_mem = sum(s['state'].get('mem_used', 0) for s in servers if s.get('state'))
total_swap = sum(s['host'].get('swap_total', 0) for s in servers if s.get('host'))
used_swap = sum(s['state'].get('swap_used', 0) for s in servers if s.get('state'))
total_disk = sum(s['host'].get('disk_total', 0) for s in servers if s.get('host'))
used_disk = sum(s['state'].get('disk_used', 0) for s in servers if s.get('state'))
net_in_speed = sum(s['state'].get('net_in_speed', 0) for s in servers if s.get('state'))
net_out_speed = sum(s['state'].get('net_out_speed', 0) for s in servers if s.get('state'))
net_in_transfer = sum(s['state'].get('net_in_transfer', 0) for s in servers if s.get('state'))
net_out_transfer = sum(s['state'].get('net_out_transfer', 0) for s in servers if s.get('state'))
transfer_ratio = (net_out_transfer / net_in_transfer * 100) if net_in_transfer else 0
response = f"""📊 **统计信息**
===========================
**服务器数量** {total_servers}
**在线服务器** {online_servers}
**内存** {used_mem / total_mem * 100 if total_mem else 0:.1f}% [{format_bytes(used_mem)}/{format_bytes(total_mem)}]
**交换** {used_swap / total_swap * 100 if total_swap else 0:.1f}% [{format_bytes(used_swap)}/{format_bytes(total_swap)}]
**磁盘** {used_disk / total_disk * 100 if total_disk else 0:.1f}% [{format_bytes(used_disk)}/{format_bytes(total_disk)}]
**下行速度** {format_bytes(net_in_speed)}/s
**上行速度** {format_bytes(net_out_speed)}/s
**下行流量** {format_bytes(net_in_transfer)}
**上行流量** {format_bytes(net_out_transfer)}
**流量对等性** {transfer_ratio:.1f}%
**更新于** {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC
"""
keyboard = [[InlineKeyboardButton("刷新", callback_data="refresh_overview")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(response, parse_mode='Markdown', reply_markup=reply_markup)
else:
await query.edit_message_text("获取服务器信息失败。")
await api.close()
elif data.startswith('cron_job_'):
cron_id = int(data.split('_')[-1])
keyboard = [
[InlineKeyboardButton("确认执行", callback_data=f"confirm_cron_{cron_id}")],
[InlineKeyboardButton("取消", callback_data="cancel")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("您确定要执行此计划任务吗?", reply_markup=reply_markup)
elif data.startswith('confirm_cron_'):
cron_id = int(data.split('_')[-1])
try:
result = await api.run_cron_job(cron_id)
except Exception as e:
await query.edit_message_text(f"执行失败:{e}")
await api.close()
return
await api.close()
if result and result.get('success'):
await query.edit_message_text("计划任务已执行。")
else:
await query.edit_message_text("执行失败。")
elif data == 'cancel':
await query.edit_message_text("操作已取消。")
elif data == 'view_loop_traffic':
await view_loop_traffic(query, context, api)
elif data == 'refresh_loop_traffic':
await view_loop_traffic(query, context, api)
elif data == 'view_availability':
await view_availability(query, context, api)
elif data == 'refresh_availability':
await view_availability(query, context, api)
async def view_loop_traffic(query, context, api):
# 获取服务状态
try:
services_data = await api.get_services_status()
except Exception as e:
await query.edit_message_text(f"获取服务信息失败:{e}")
await api.close()
return
if services_data and services_data.get('success'):
cycle_stats = services_data['data'].get('cycle_transfer_stats', {})
if not cycle_stats:
await query.edit_message_text("暂无循环流量信息。")
await api.close()
return
response = "**循环流量信息总览**\n==========================\n"
for stat_name, stats in cycle_stats.items():
rule_name = stats.get('name', '未知规则')
server_names = stats.get('server_name', {})
transfers = stats.get('transfer', {})
max_transfer = stats.get('max', 1) # 最大流量(字节)
response += f"**规则:{rule_name}**\n"
for server_id_str, transfer_value in transfers.items():
server_id = str(server_id_str)
server_name = server_names.get(server_id, f"服务器ID {server_id}")
transfer_formatted = format_bytes(transfer_value)
max_transfer_formatted = format_bytes(max_transfer)
percentage = (transfer_value / max_transfer * 100) if max_transfer else 0
response += f"服务器 **{server_name}**:已使用 {transfer_formatted} / {max_transfer_formatted},已使用 {percentage:.2f}%\n"
response += "--------------------------\n"
response += f"**更新于** {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC"
# 添加刷新按钮
keyboard = [[InlineKeyboardButton("刷新", callback_data="refresh_loop_traffic")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(response, parse_mode='Markdown', reply_markup=reply_markup)
else:
await query.edit_message_text("获取循环流量信息失败。")
await api.close()
async def view_availability(query, context, api):
# 获取服务状态
try:
services_data = await api.get_services_status()
except Exception as e:
await query.edit_message_text(f"获取服务信息失败:{e}")
await api.close()
return
# print("返回的服务数据:", services_data)
if services_data and services_data.get('success'):
services = services_data['data'].get('services', {})
if not services:
await query.edit_message_text("暂无可用性监测信息。")
await api.close()
return
response = "**可用性监测信息总览**\n==========================\n"
for service_id, service_info in services.items():
service = service_info.get('service', {})
name = service_info.get('service_name', '未知')
total_up = service_info.get('total_up', 0)
total_down = service_info.get('total_down', 0)
total = total_up + total_down
availability = (total_up / total * 100) if total else 0
status = "🟢 UP" if service_info.get('current_up', 0) else "🔴 DOWN"
# 计算平均延迟
delays = service_info.get('delay', [])
if delays:
avg_delay = sum(delays) / len(delays)
else:
avg_delay = None
if avg_delay is not None:
delay_text = f",平均延迟 {avg_delay:.2f}ms"
else:
delay_text = ""
response += f"**{name}**:可用率 {availability:.2f}%,状态 {status}{delay_text}\n------------------\n"
response += f"\n**更新于** {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC"
# 添加刷新按钮
keyboard = [[InlineKeyboardButton("刷新", callback_data="refresh_availability")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(response, parse_mode='Markdown', reply_markup=reply_markup)
else:
await query.edit_message_text("获取可用性监测信息失败。")
await api.close()
async def cron_jobs(update: Update, context: ContextTypes.DEFAULT_TYPE):
user = await db.get_user(update.effective_user.id)
if not user:
await update.message.reply_text("请先使用 /bind 命令绑定您的账号。")
return
api = NezhaAPI(user['dashboard_url'], user['username'], user['password'])
try:
data = await api.get_cron_jobs()
except Exception as e:
await update.message.reply_text(f"获取计划任务失败:{e}")
await api.close()
return
if data and data.get('success'):
cron_jobs = data['data']
if not cron_jobs:
await update.message.reply_text("暂无计划任务。")
await api.close()
return
keyboard = [
[InlineKeyboardButton(job['name'], callback_data=f"cron_job_{job['id']}")]
for job in cron_jobs
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text("请选择要执行的计划任务:", reply_markup=reply_markup)
else:
await update.message.reply_text("获取计划任务失败。")
await api.close()
async def services_overview(update: Update, context: ContextTypes.DEFAULT_TYPE):
user = await db.get_user(update.effective_user.id)
if not user:
await update.message.reply_text("请先使用 /bind 命令绑定您的账号。")
return
keyboard = [
[InlineKeyboardButton("查看循环流量信息", callback_data="view_loop_traffic")],
[InlineKeyboardButton("查看可用性监测信息", callback_data="view_availability")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text("请选择要查看的服务信息:", reply_markup=reply_markup)
def main():
application = ApplicationBuilder().token(TELEGRAM_TOKEN).build()
# 初始化数据库
loop = asyncio.get_event_loop()
loop.run_until_complete(db.initialize())
# 回调查询处理(放在最前面)
application.add_handler(CallbackQueryHandler(button_handler))
# 命令处理
application.add_handler(CommandHandler('start', start))
application.add_handler(CommandHandler('help', help_command))
application.add_handler(CommandHandler('unbind', unbind))
application.add_handler(CommandHandler('overview', overview))
application.add_handler(CommandHandler('cron', cron_jobs))
application.add_handler(CommandHandler('services', services_overview))
# 绑定命令的会话处理
bind_handler = ConversationHandler(
entry_points=[CommandHandler('bind', bind_start)],
states={
BIND_USERNAME: [MessageHandler(filters.TEXT & ~filters.COMMAND, bind_username)],
BIND_PASSWORD: [MessageHandler(filters.TEXT & ~filters.COMMAND, bind_password)],
BIND_DASHBOARD: [MessageHandler(filters.TEXT & ~filters.COMMAND, bind_dashboard)],
},
fallbacks=[]
)
application.add_handler(bind_handler)
# 查看单台服务器状态的会话处理
server_handler = ConversationHandler(
entry_points=[CommandHandler('server', server_status)],
states={
SEARCH_SERVER: [MessageHandler(filters.TEXT & ~filters.COMMAND, search_server)],
},
fallbacks=[]
)
application.add_handler(server_handler)
# 在 run_polling 中指定 allowed_updates
application.run_polling(allowed_updates=['message', 'callback_query'])
if __name__ == '__main__':
main()

41
database.py Executable file
View File

@ -0,0 +1,41 @@
# database.py
import aiosqlite
class Database:
def __init__(self, db_path):
self.db_path = db_path
async def initialize(self):
async with aiosqlite.connect(self.db_path) as db:
await db.execute('''
CREATE TABLE IF NOT EXISTS users (
telegram_id INTEGER PRIMARY KEY,
username TEXT NOT NULL,
password TEXT NOT NULL,
dashboard_url TEXT NOT NULL
)
''')
await db.commit()
async def add_user(self, telegram_id, username, password, dashboard_url):
async with aiosqlite.connect(self.db_path) as db:
await db.execute('''
INSERT OR REPLACE INTO users (telegram_id, username, password, dashboard_url)
VALUES (?, ?, ?, ?)
''', (telegram_id, username, password, dashboard_url))
await db.commit()
async def get_user(self, telegram_id):
async with aiosqlite.connect(self.db_path) as db:
async with db.execute('SELECT username, password, dashboard_url FROM users WHERE telegram_id = ?', (telegram_id,)) as cursor:
row = await cursor.fetchone()
if row:
return {'username': row[0], 'password': row[1], 'dashboard_url': row[2]}
else:
return None
async def delete_user(self, telegram_id):
async with aiosqlite.connect(self.db_path) as db:
await db.execute('DELETE FROM users WHERE telegram_id = ?', (telegram_id,))
await db.commit()

100
nezha_api.py Executable file
View File

@ -0,0 +1,100 @@
import aiohttp
import asyncio
import logging
class NezhaAPI:
def __init__(self, dashboard_url, username, password):
self.base_url = dashboard_url.rstrip('/') + '/api/v1'
self.username = username
self.password = password
self.token = None
self.session = aiohttp.ClientSession()
self.lock = asyncio.Lock()
async def close(self):
await self.session.close()
async def authenticate(self):
async with self.lock:
if self.token is not None:
return
login_url = f'{self.base_url}/login'
payload = {
'username': self.username,
'password': self.password
}
async with self.session.post(login_url, json=payload) as resp:
data = await resp.json()
if data.get('success'):
self.token = data['data']['token']
else:
raise Exception('认证失败,请检查用户名和密码。')
async def request(self, method, endpoint, **kwargs):
await self.authenticate()
url = f'{self.base_url}{endpoint}'
headers = kwargs.get('headers', {})
headers['Authorization'] = f'Bearer {self.token}'
kwargs['headers'] = headers
async with self.session.request(method, url, **kwargs) as resp:
if resp.status == 401:
self.token = None
return await self.request(method, endpoint, **kwargs)
elif resp.status == 200:
return await resp.json()
else:
logging.error(f'API 请求失败:{resp.status}')
return None
async def get_overview(self):
data = await self.request('GET', '/server')
return data
async def get_services(self):
data = await self.request('GET', '/service')
return data
async def get_servers(self):
data = await self.request('GET', '/server')
return data
async def get_cron_jobs(self):
data = await self.request('GET', '/cron')
return data
async def run_cron_job(self, cron_id):
endpoint = f'/cron/{cron_id}/manual'
data = await self.request('GET', endpoint)
return data
async def search_servers(self, query):
servers = await self.get_servers()
if servers and servers.get('success'):
result = []
for server in servers['data']:
if query.lower() in server['name'].lower():
result.append(server)
return result
return []
async def get_server_detail(self, server_id):
servers = await self.get_servers()
if servers and servers.get('success'):
for server in servers['data']:
if server['id'] == server_id:
return server
return None
async def get_services_status(self):
data = await self.request('GET', '/service')
return data
async def get_service_histories(self, server_id):
endpoint = f'/service/{server_id}'
data = await self.request('GET', endpoint)
return data
async def get_alert_rules(self):
data = await self.request('GET', '/alert-rule')
return data

6
requirements.txt Normal file
View File

@ -0,0 +1,6 @@
python-telegram-bot==20.3
aiohttp==3.8.1
aiosqlite==0.19.0
python-dateutil==2.8.2
httpx==0.24.0
python-dotenv==0.21.0