from telegram import (
InlineQueryResultArticle,
InputTextMessageContent,
Update,
InlineKeyboardMarkup,
InlineKeyboardButton
)
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
InlineQueryHandler,
MessageHandler,
ContextTypes,
filters,
CallbackQueryHandler
)
from deep_translator import GoogleTranslator
from uuid import uuid4
import re
BOT_TOKEN = "YOUR_BOT_TOKEN_HERE"
ADMIN_ID = 123456789 # ضع معرفك هنا
CHANNEL_USERNAME = "@your_channel" # ضع اسم قناتك هنا
SUPPORTED_LANGS = {
'ar': ''العربية,
'en': 'English',
'fr': 'Français',
'de': 'Deutsch',
'es': 'Español',
'tr': 'Türkçe'
}
bot_enabled = True
user_set = set()
broadcast_mode = {}
def translate_text(text: str, target: str) -> str:
try:
return GoogleTranslator(source='auto', target=target).translate(text)
except:
return " حدث خطأ أثناء الترجمة،عذًرا."
def detect_language(text: str) -> str:
if re.search(r'[\u0600-\u06FF]', text):
return 'ar'
return 'en'
async def is_user_subscribed(user_id, context):
try:
member = await context.bot.get_chat_member(CHANNEL_USERNAME, user_id)
return member.status in ["member", "creator", "administrator"]
except:
return False
async def send_subscription_prompt(update: Update, context:
ContextTypes.DEFAULT_TYPE):
keyboard = InlineKeyboardMarkup([
[InlineKeyboardButton(""اشترك في القناة,
url=f"https://t.me/{CHANNEL_USERNAME.lstrip('@')}")],
[InlineKeyboardButton(""تحقق من االشتراك,
callback_data="check_subscription")]
])
await update.message.reply_text(" يجب عليك االشتراك في القناة أوًال الستخدام
البوت:", reply_markup=keyboard)
async def check_subscription_callback(update: Update, context:
ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
if await is_user_subscribed(query.from_user.id, context):
await query.edit_message_text("تم التحقق! يمكنك الآن استخدام البوت.")
else:
await query.edit_message_text(" الرجاء االشتراك والمحاولة.ال تزال غير مشترك
مجدًدا.")
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await is_user_subscribed(update.effective_user.id, context):
await send_subscription_prompt(update, context)
return
# اسم المستخدم
username = update.effective_user.username or ""عزيزي المستخدم
# رسالة الترحيب والشرح
welcome_message = (
f"{ مرحًباusername}!\n\n"
"أهًال وسهًال بك في بوت الترجمة.\n"
"الستخدام البوت يمكنك كتابة النص الذي ترغب في ترجمته بعد الأمر المناسب:\n"
"/ar لترجمة النص إلى العربية.\n"
"/en لترجمة النص إلى الإنجليزية.\n\n"
"كما يمكنك استخدام الوضع الفوري عبر كتابة النص مع إضافة اسم البوت."
)
await update.message.reply_text(welcome_message)
async def translate_command(update: Update, context: ContextTypes.DEFAULT_TYPE,
target_lang: str):
global bot_enabled
user_id = update.effective_user.id
# التأكد من االشتراك في القناة
if not await is_user_subscribed(user_id, context):
await send_subscription_prompt(update, context)
return
# التأكد من أن البوت مفعل
if not bot_enabled:
await update.message.reply_text("البوت حاليًا متوقف.")
return
# أخذ النص من الأوامر
text = ' '.join(context.args)
if not text:
await update.message.reply_text(f"اكتب النص بعد الأمر مثل: /{target_lang}
Your text here")
return
# ترجمة النص
result = translate_text(text, target_lang)
await update.message.reply_text(result)
async def translate_to_ar(update: Update, context: ContextTypes.DEFAULT_TYPE):
await translate_command(update, context, "ar")
async def translate_to_en(update: Update, context: ContextTypes.DEFAULT_TYPE):
await translate_command(update, context, "en")
async def handle_inline_query(update: Update, context: ContextTypes.DEFAULT_TYPE):
global bot_enabled
user_id = update.inline_query.from_user.id
if not await is_user_subscribed(user_id, context):
return
if not bot_enabled:
return
query = update.inline_query.query.strip()
if not query:
return
parts = query.split(" ", 1)
if len(parts) == 2 and parts[0].lower() in SUPPORTED_LANGS:
lang_code, text = parts[0].lower(), parts[1]
else:
text = query
source_lang = detect_language(text)
lang_code = 'en' if source_lang == 'ar' else 'ar'
translated = translate_text(text, lang_code)
language_name = SUPPORTED_LANGS.get(lang_code, lang_code)
result = InlineQueryResultArticle(
id=str(uuid4()),
title=f"{ ترجمة إلىlanguage_name}",
input_message_content=InputTextMessageContent(translated),
description=translated
)
await update.inline_query.answer([result], cache_time=1)
async def admin(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.effective_user.id != ADMIN_ID:
await update.message.reply_text("غير مصرح لك.")
return
msg = (
f"لوحة التحكم:\n"
f"- حالة البوت: {'✅ 'ُمشّغلif bot_enabled else '❌ \}'متوقفn"
f"- عدد المستخدمين: {len(user_set)}"
)
keyboard = InlineKeyboardMarkup([
[InlineKeyboardButton(""تشغيل, callback_data="admin_enable"),
InlineKeyboardButton(""إيقاف, callback_data="admin_disable")],
[InlineKeyboardButton(""تحديث الإحصائيات, callback_data="admin_refresh")]
])
await update.message.reply_text(msg, reply_markup=keyboard)
async def admin_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
global bot_enabled
query = update.callback_query
await query.answer()
if query.from_user.id != ADMIN_ID:
await query.edit_message_text("غير مصرح لك.")
return
if query.data == "admin_enable":
bot_enabled = True
await query.edit_message_text("✅ تم تشغيل البوت.")
elif query.data == "admin_disable":
bot_enabled = False
await query.edit_message_text("❌ تم إيقاف البوت.")
elif query.data == "admin_refresh":
msg = (
f"لوحة التحكم:\n"
f"- حالة البوت: {'✅ 'ُمشّغلif bot_enabled else '❌ \}'متوقفn"
f"- عدد المستخدمين: {len(user_set)}"
)
keyboard = InlineKeyboardMarkup([
[InlineKeyboardButton(""تشغيل, callback_data="admin_enable"),
InlineKeyboardButton(""إيقاف, callback_data="admin_disable")],
[InlineKeyboardButton(""تحديث الإحصائيات,
callback_data="admin_refresh")]
])
await query.edit_message_text(msg, reply_markup=keyboard)
async def broadcast(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.effective_user.id != ADMIN_ID:
return
await update.message.reply_text(" أرسل الآن الرسالة أو الوسائط التي تريد بثها
لجميع المستخدمين.")
broadcast_mode[update.effective_user.id] = True
async def handle_broadcast_message(update: Update, context:
ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
if not broadcast_mode.get(user_id):
return
broadcast_mode[user_id] = False
count = 0
for uid in user_set:
try:
if update.message.text:
await context.bot.send_message(chat_id=uid,
text=update.message.text)
elif update.message.photo:
await context.bot.send_photo(chat_id=uid,
photo=update.message.photo[-1].file_id,
caption=update.message.caption or "")
elif update.message.document:
await context.bot.send_document(chat_id=uid,
document=update.message.document.file_id,
caption=update.message.caption or
"")
elif update.message.video:
await context.bot.send_video(chat_id=uid,
video=update.message.video.file_id,
caption=update.message.caption or "")
count += 1
except:
continue
await update.message.reply_text(f"{ تم إرسال الرسالة إلىcount} مستخدم.")
def main():
app = ApplicationBuilder().token(BOT_TOKEN).build()
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("ar", translate_to_ar))
app.add_handler(CommandHandler("en", translate_to_en))
app.add_handler(CommandHandler("admin", admin))
app.add_handler(CommandHandler("broadcast", broadcast))
app.add_handler(MessageHandler(filters.ALL & filters.User(user_id=ADMIN_ID),
handle_broadcast_message))
app.add_handler(InlineQueryHandler(handle_inline_query))
app.add_handler(CallbackQueryHandler(admin_callback, pattern="admin_.*"))
app.add_handler(CallbackQueryHandler(check_subscription_callback,
pattern="check_subscription"))
print("البوت يعمل...")
app.run_polling()
if __name__ == "__main__":
main()