装饰

本文展示了如何在 Telegram 机器人中实现管理员回复功能,同时加强机器人安全性。通过消息处理与过滤机制,防止垃圾信息和恶意攻击(如 XSS 和 SQL 注入),确保机器人的高效和安全运行。此外,结合长轮询技术优化消息响应速度,让机器人能够及时处理用户的请求。所有的一切准备,首先查看聊天机器人的初始文章:设计一个telegram聊天机器人(初代机)




1. 一些更新(没提到的还是初代的配置)

1.1 项目结构

创建以下目录结构(未变):

/opt/telegram_bot/
├── bot.py # 主脚本
├── requirements.txt # 依赖库
├── logs/ # 日志目录
└── bot.log # 日志文件

1.2 编写主脚本

以下是 bot.py 更新后的内容:

import os
import logging
import sys
import time
import re
import bleach
import html
from dotenv import load_dotenv
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, MessageHandler, CallbackQueryHandler, ConversationHandler, CommandHandler, filters
from collections import defaultdict

# 设置全局异常处理器,捕获所有未处理的异常
def exception_handler(type, value, tb):
logger.error(f"Unhandled exception: {type} {value}")
logger.error("".join(traceback.format_tb(tb)))

# 设置全局异常处理
sys.excepthook = exception_handler

# 用于填充的示例变量
TOKEN = "your_bot_token" # 填入实际的 Bot Token
OWNER_ID = "your_owner_id" # 填入实际的管理员ID

# 只接收警告及以上的信息
logging.getLogger("httpx").setLevel(logging.WARNING)
# 配置日志记录(只显示警告及以上信息)
logging.basicConfig(
filename="/opt/telegram_bot/logs/bot.log",
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.WARNING,
)
logger = logging.getLogger(__name__)

# 定义状态
WAITING_REPLY = 1

# 存储每个用户的消息时间戳(限制消息频率)
user_message_times = {}

# 存储已通过机器人的用户(限制用户数量)
allowed_users = set()

# 限制用户消息发送频率(秒)
MESSAGE_INTERVAL = 15 # 每个用户最少间隔 15 秒发送一条消息

# 限制允许的最大用户数量
MAX_USERS = 100

# 存储用户消息历史(最多存储 5 条消息)
user_message_history = {}

# 限制输入的最大长度
MAX_REPLY_LENGTH = 500 # 最大回复长度
MAX_MESSAGE_LENGTH = 500 # 最大消息长度

# 管理员操作频率限制
admin_last_action = {}

# 防止输入的XSS攻击和恶意命令
def is_safe_input(text):
allowed_tags = ['b', 'i', 'u', 'strong', 'em', 'a'] # 允许的 HTML 标签
allowed_attributes = {'a': ['href', 'title']} # 允许的 HTML 属性
# 使用 bleach 清理 HTML 内容
clean_text = bleach.clean(text, tags=allowed_tags, attributes=allowed_attributes)
return clean_text

def escape_user_input(text):
# 使用 html.escape 转义用户输入中的特殊字符
return html.escape(text)

# 防止 SQL 注入
def is_valid_command(command):
# 限制命令长度,避免恶意命令过长
if len(command) > 50:
return False
# 使用正则表达式进一步限制命令格式
return re.match(r'^[a-zA-Z0-9_]+$', command) is not None

def is_safe_url(url):
# 检查 URL 是否包含不安全的协议
unsafe_protocols = ["javascript:", "data:", "file:"]
if any(url.startswith(protocol) for protocol in unsafe_protocols):
return False
return True

# 防止暴力破解
user_attempts = defaultdict(int)
user_last_attempt = defaultdict(float)

def check_brute_force(user_id):
# 限制每个用户 5 次尝试/秒
current_time = time.time()
if current_time - user_last_attempt[user_id] < 1:
user_attempts[user_id] += 1
else:
user_attempts[user_id] = 1
user_last_attempt[user_id] = current_time

if user_attempts[user_id] > 5:
return False
return True

# 处理消息的函数
async def handle_message(update, context):
user_id = update.message.chat_id
username = update.message.from_user.username
first_name = update.message.from_user.first_name or ""
last_name = update.message.from_user.last_name or ""
message_text = update.message.text
message_time = update.message.date.strftime("%Y-%m-%d %H:%M:%S")

logger.info(f"处理消息: 用户 {user_id}, 内容: {message_text}") # 用户相关操作日志为警告

# 限制消息长度
if len(message_text) > MAX_MESSAGE_LENGTH:
await update.message.reply_text(f"您的消息内容超出最大长度限制,请确保消息长度不超过 {MAX_MESSAGE_LENGTH} 个字符。")
return

# 根据是否有用户名生成用户链接
if username:
user_link = f"[{username}](https://t.me/{username})"
else:
user_link = f"[{first_name} {last_name}](tg://user?id={user_id})"

# 清理用户消息中的不安全输入
clean_message = is_safe_input(message_text)
if clean_message != message_text:
await update.message.reply_text("您的消息包含不安全的内容,已被清除。")
return

logger.warning(f"收到消息: {message_text} 来自用户: {username or user_id}") # 用户相关操作日志为警告

# 检测垃圾关键词
scam_keywords = ["Spam Info Bot", "点击链接", "账号验证", "永久停用", "恢复功能", "免费领取"]
if any(keyword in message_text for keyword in scam_keywords):
# 如果检测到垃圾信息,删除消息并记录
await update.message.delete()
logger.warning(f"垃圾消息被拦截: {username or user_id} 内容: {message_text}")
return

# 限制用户消息发送频率
current_time = time.time()
last_message_time = user_message_times.get(user_id, 0)

# 如果用户发送消息的间隔太短,阻止消息
if current_time - last_message_time < MESSAGE_INTERVAL:
await update.message.reply_text("请稍等一会儿再发送消息(15秒)。🤖")
return

# 更新用户的消息时间戳
user_message_times[user_id] = current_time

# 限制用户数量
if len(allowed_users) >= MAX_USERS:
await update.message.reply_text("当前消息接收已满,您无法通过机器人发送消息。稍后重试。🤖")
return

# 如果是新用户,允许他们使用机器人
if user_id not in allowed_users:
allowed_users.add(user_id)
# 发送“收到”通知
await update.message.reply_text("❤️❤️感谢联系,我会尽快联系你!❤️❤️")

# 记录用户消息历史
if user_id not in user_message_history:
user_message_history[user_id] = []

# 保持最多 5 条历史消息
user_message_history[user_id].append(message_text)
if len(user_message_history[user_id]) > 5:
user_message_history[user_id].pop(0)

# 获取用户的消息历史
user_history = "\n".join([f"{i+1}. {msg}" for i, msg in enumerate(user_message_history[user_id])])

# 转发正常消息给管理员,并添加一个“回复”按钮
keyboard = [
[InlineKeyboardButton("回复", callback_data=f"reply_{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)

await context.bot.send_message(
chat_id=OWNER_ID,
text=f"📩 收到消息:\n用户: {user_link}\n时间: {message_time}\n内容: {message_text}\n\n用户的历史消息:\n{user_history}",
parse_mode="Markdown",
reply_markup=reply_markup # 添加按钮
)


# 处理按钮点击事件
async def button(update, context):
query = update.callback_query
data = query.data # 获取回调数据
logger.info(f"按钮被点击,回调数据:{data}") # 管理员操作日志为信息

try:
# 判断是否是“回复”按钮
if data.startswith("reply_"):
_, user_id = data.split('_') # 解析回调数据
user_id = int(user_id)

# 检查用户的操作频率
if not check_brute_force(user_id):
await query.answer("操作太频繁,请稍后再试!😃")
return

# 提示管理员输入自定义消息
await context.bot.send_message(
chat_id=OWNER_ID,
text="回复的消息内容:"
)

# 将用户 ID 存入上下文
context.user_data["user_to_reply"] = user_id
logger.info(f"管理员 {query.from_user.id} 开始回复用户 {user_id}")
return WAITING_REPLY

except Exception as e:
logger.error(f"处理按钮点击事件时发生错误: {e}")
await query.answer("发生了错误,请稍后再试!")

# 处理管理员输入的回复消息
async def reply_message(update, context):
# 获取管理员输入的回复文本
reply_text = update.message.text

# 输入长度限制:避免恶意长文本攻击
if len(reply_text) > MAX_REPLY_LENGTH:
await update.message.reply_text(f"回复内容过长,请输入少于 {MAX_REPLY_LENGTH} 个字符的消息。")
return

# 如果输入的是"/cancel",取消回复
if reply_text.strip().lower() == "/cancel":
logger.info(f"管理员 {update.message.from_user.id} 取消了回复操作")
del context.user_data["user_to_reply"] # 清除用户的回复状态
await update.message.reply_text("已取消回复。")
return ConversationHandler.END

# 检查是否有待回复的用户
if "user_to_reply" in context.user_data:
user_id = context.user_data.pop("user_to_reply")

# 发送自定义回复给用户
await context.bot.send_message(
chat_id=user_id,
text=reply_text # 发送自定义消息
)

# 通知管理员回复已发送
await update.message.reply_text("回复已成功发送!")
return ConversationHandler.END
else:
# 如果没有正在等待的回复
await update.message.reply_text("没有待回复的消息。")
return ConversationHandler.END

# 监听取消动作
async def cancel(update, context):
await update.message.reply_text("已取消回复。")
return ConversationHandler.END

# 初始化机器人
application = Application.builder().token(TOKEN).build()

# 定义对话处理器
conv_handler = ConversationHandler(
entry_points=[CallbackQueryHandler(button)],
states={WAITING_REPLY: [MessageHandler(filters.TEXT & ~filters.COMMAND, reply_message)]},
fallbacks=[CommandHandler('cancel', cancel)],
)

# 添加处理器
application.add_handler(conv_handler)
# 添加消息处理器
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))

# 启动机器人
logger.info("机器人已启动...") # 系统相关信息日志为信息
application.run_polling(poll_interval=15) # 15秒轮询一次

1.3 安装依赖

更改 requirements.txt,内容如下:

python-telegram-bot
python-dotenv
bleach
apscheduler

安装依赖:

pip3 install -r /opt/telegram_bot/requirements.txt



2. 重新启动机器人

  • 启动机器人服务:
sudo systemctl restart telegram_bot
  • 设置开机自启:
sudo systemctl enable telegram_bot



3. 配置日志轮转

未变。




4. 总结

重启机器人过后,直接去TG使用即可。长轮询有一个坏处(弊端吧):就是需要配置了机器人的服务器在一段时间后(文中设置为15s)主动询问telegram服务器是否有新消息发送出来,有时候这种询问有一定的延迟,同时回复多个用户时有肉眼可见的速度差异。预计后面(时间未知,随性而为)会更新另一种获取消息的方式webhook服务器:采用这个方式,telegram服务器会主动将新消息发送给你的服务器,这样你的服务器就不用长轮询去试探telegram是否有新消息,节省了服务器的开销,但是当你回复用户和telegram发新消息同时进行时,会产生冲突。还有就是公网传输不是很安全。

ps:我想去商店买一个我特别喜欢的东西,但是恰好没货,长轮询的你就会隔一段时间往商店跑看看有没有货,有时当货到了一段时间过后你才会知道需要的东西已经到货了。采用WebHook的你只需要给商店留个电话,到货的瞬间商店给你打电话让你过来就好了,不过电话容易被监听,双方同时拨打时容易占线。。。。。。。

参考链接

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注