Nezha-telegram-Bot-V1/database.py

165 lines
6.7 KiB
Python
Raw Normal View History

2024-12-09 03:02:53 +08:00
# database.py
import aiosqlite
class Database:
def __init__(self, db_path):
self.db_path = db_path
async def initialize(self):
async with aiosqlite.connect(self.db_path) as db:
2024-12-09 22:31:23 +08:00
# 创建用户表
2024-12-09 03:02:53 +08:00
await db.execute('''
CREATE TABLE IF NOT EXISTS users (
telegram_id INTEGER PRIMARY KEY,
2024-12-09 22:31:23 +08:00
default_dashboard_id INTEGER,
FOREIGN KEY (default_dashboard_id) REFERENCES dashboards (id)
)
''')
# 创建 dashboard 表
await db.execute('''
CREATE TABLE IF NOT EXISTS dashboards (
id INTEGER PRIMARY KEY AUTOINCREMENT,
telegram_id INTEGER,
2024-12-09 03:02:53 +08:00
username TEXT NOT NULL,
password TEXT NOT NULL,
2024-12-09 22:31:23 +08:00
dashboard_url TEXT NOT NULL,
alias TEXT,
FOREIGN KEY (telegram_id) REFERENCES users (telegram_id)
2024-12-09 03:02:53 +08:00
)
''')
await db.commit()
2024-12-09 22:31:23 +08:00
async def add_user(self, telegram_id, username, password, dashboard_url, alias=None):
async with aiosqlite.connect(self.db_path) as db:
# 首先确保用户存在
await db.execute('INSERT OR IGNORE INTO users (telegram_id) VALUES (?)', (telegram_id,))
# 如果没有提供别名,使用 URL 的第一部分作为默认别名
if not alias:
try:
alias = dashboard_url.split('://')[1].split('.')[0].upper()
except:
alias = "NEZHA"
# 添加 dashboard
cursor = await db.execute('''
INSERT INTO dashboards (telegram_id, username, password, dashboard_url, alias)
VALUES (?, ?, ?, ?, ?)
RETURNING id
''', (telegram_id, username, password, dashboard_url, alias))
dashboard_id = (await cursor.fetchone())[0]
# 如果用户还没有默认 dashboard设置这个为默认
await db.execute('''
UPDATE users
SET default_dashboard_id = COALESCE(default_dashboard_id, ?)
WHERE telegram_id = ?
''', (dashboard_id, telegram_id))
await db.commit()
return dashboard_id
async def update_alias(self, dashboard_id, alias):
2024-12-09 03:02:53 +08:00
async with aiosqlite.connect(self.db_path) as db:
await db.execute('''
2024-12-09 22:31:23 +08:00
UPDATE dashboards
SET alias = ?
WHERE id = ?
''', (alias, dashboard_id))
2024-12-09 03:02:53 +08:00
await db.commit()
async def get_user(self, telegram_id):
async with aiosqlite.connect(self.db_path) as db:
2024-12-09 22:31:23 +08:00
# 获取用户的默认 dashboard
async with db.execute('''
SELECT d.username, d.password, d.dashboard_url, d.alias
FROM users u
JOIN dashboards d ON d.id = u.default_dashboard_id
WHERE u.telegram_id = ?
''', (telegram_id,)) as cursor:
2024-12-09 03:02:53 +08:00
row = await cursor.fetchone()
if row:
2024-12-09 22:31:23 +08:00
return {'username': row[0], 'password': row[1], 'dashboard_url': row[2], 'alias': row[3]}
return None
async def get_all_dashboards(self, telegram_id):
async with aiosqlite.connect(self.db_path) as db:
async with db.execute('''
SELECT d.id, d.username, d.password, d.dashboard_url, d.alias,
CASE WHEN u.default_dashboard_id = d.id THEN 1 ELSE 0 END as is_default
FROM dashboards d
LEFT JOIN users u ON u.telegram_id = d.telegram_id
WHERE d.telegram_id = ?
''', (telegram_id,)) as cursor:
rows = await cursor.fetchall()
return [
{
'id': row[0],
'username': row[1],
'password': row[2],
'dashboard_url': row[3],
'alias': row[4],
'is_default': bool(row[5])
}
for row in rows
]
async def set_default_dashboard(self, telegram_id, dashboard_id):
async with aiosqlite.connect(self.db_path) as db:
await db.execute('''
UPDATE users
SET default_dashboard_id = ?
WHERE telegram_id = ?
''', (dashboard_id, telegram_id))
await db.commit()
async def delete_dashboard(self, telegram_id, dashboard_id):
async with aiosqlite.connect(self.db_path) as db:
# 检查是否是默认面板
async with db.execute('''
SELECT default_dashboard_id
FROM users
WHERE telegram_id = ?
''', (telegram_id,)) as cursor:
row = await cursor.fetchone()
is_default = row and row[0] == dashboard_id
# 删除 dashboard
await db.execute('''
DELETE FROM dashboards
WHERE id = ? AND telegram_id = ?
''', (dashboard_id, telegram_id))
# 检查是否还有其他面板
async with db.execute('''
SELECT id
FROM dashboards
WHERE telegram_id = ?
ORDER BY id ASC
''', (telegram_id,)) as cursor:
remaining_dashboards = await cursor.fetchall()
if not remaining_dashboards:
# 如果没有面板了,删除用户
await db.execute('DELETE FROM users WHERE telegram_id = ?', (telegram_id,))
elif is_default and remaining_dashboards:
# 如果删除的是默认面板且还有其他面板,设置第一个面板为默认
new_default_id = remaining_dashboards[0][0]
await db.execute('''
UPDATE users
SET default_dashboard_id = ?
WHERE telegram_id = ?
''', (new_default_id, telegram_id))
await db.commit()
return bool(remaining_dashboards) # 返回是否还有其他面板
2024-12-09 03:02:53 +08:00
async def delete_user(self, telegram_id):
async with aiosqlite.connect(self.db_path) as db:
2024-12-09 22:31:23 +08:00
# 删除用户的所有 dashboard
await db.execute('DELETE FROM dashboards WHERE telegram_id = ?', (telegram_id,))
# 删除用户
2024-12-09 03:02:53 +08:00
await db.execute('DELETE FROM users WHERE telegram_id = ?', (telegram_id,))
await db.commit()