working TG YT TW

This commit is contained in:
sShemet
2026-01-07 02:22:34 +05:00
parent 62263b713a
commit 7561e6f411
7 changed files with 592 additions and 534 deletions

View File

@@ -6,373 +6,281 @@ import os
import sys
import threading
import asyncio
import socket
import emoji
from datetime import datetime, timedelta
import time
import tzlocal
from zoneinfo import ZoneInfo
from http.server import BaseHTTPRequestHandler, HTTPServer
import pytchat
import telegram
from donationalerts import DonationAlertsAPI, Scopes
from urllib import parse
import emoji
sys.path.append('twitchchatirc')
import twitch
# ====================
# Конфигурация
# ====================
class Config:
def __init__(self):
self.config = configparser.ConfigParser()
self.config.read("config.ini")
# Настройки
self.timezone = ZoneInfo('Asia/Yekaterinburg')
self.poll_interval = 2 # seconds
self.max_comments_per_source = 15
self.max_total_comments = 150
self.hello_message_interval = 600 # seconds
# API keys
self.alerts_app_id = self.config['Alerts']['app_id']
self.alerts_api_key = self.config['Alerts']['api_key']
self.telegram_enabled = self.config['Telegram']['enabled']
self.telegram_bot_secret = self.config['Telegram']['bot_secret']
self.youtube_video_id = self.config['Youtube']['video_id']
self.twitch_channel = self.config['Twitch']['channel']
# Чтение конфигурации
config = configparser.ConfigParser()
config.read("config.ini")
# ====================
# Модели данных
# ====================
class Comment:
def __init__(self, comment_id, comment_type, sender, message, date, amount=None):
self.id = comment_id
self.type = comment_type
self.sender = sender
self.message = message
self.date = date
self.amount = amount
def to_dict(self):
return {
'id': self.id,
'type': self.type,
'sender': self.sender,
'message': emoji.emojize(self.message),
'date': self.date,
'amount': self.amount
}
# Инициализация модулей на основе конфигурации
chat_connectors = {}
chat_comments = {}
# ====================
# Сервисы
# ====================
class DonationAlertsService:
def __init__(self, config):
self.config = config
self.api = DonationAlertsAPI(
config.alerts_app_id,
config.alerts_api_key,
"http://127.0.0.1:8008/login",
[Scopes.USER_SHOW]
)
self.access_token = None
def get_donations(self):
if not self.access_token:
print('Waiting for Alerts auth...')
return []
try:
donations = self.api.donations.get(self.access_token.access_token)
return self._process_donations(donations.items)
except Exception as e:
print(f'Alerts error: {str(e)}')
return []
def _process_donations(self, items):
result = []
for item in items:
if not item.is_shown:
continue
sender = item.username or 'Аноним'
message = item.message or '---'
amount = self._format_amount(item.amount, item.currency)
date = self._parse_donation_date(item.shown_at)
result.append(
Comment(
comment_id=item.id,
comment_type='donate',
sender=sender,
message=message,
date=date,
amount=amount
)
)
return result
def _format_amount(self, amount, currency):
if amount != int(amount):
return f"{format(amount, '.2f')} {currency}"
return f"{int(amount)} {currency}"
def _parse_donation_date(self, date_str):
dt = datetime.fromisoformat(date_str).astimezone(self.config.timezone)
# Фильтр донатов за последний месяц
if dt > datetime.now(self.config.timezone) - timedelta(days=30):
return dt
return None
# Telegram (опционально)
if config.getboolean('Telegram', 'enabled', fallback=False):
import telegram
bot = telegram.Bot(config['Telegram']['bot_secret'])
chat_connectors['telegram'] = bot
chat_comments['tg'] = []
class TelegramService:
def __init__(self, config):
self.config = config
self.bot = telegram.Bot(config.telegram_bot_secret) if config.telegram_enabled else None
async def get_messages(self):
if not self.bot:
return []
try:
updates = await self.bot.get_updates(
allowed_updates=['message', 'edited_message'],
timeout=None
)
return self._process_updates(updates)
except Exception as e:
print(f'Telegram error: {str(e)}')
return []
def _process_updates(self, updates):
messages = []
for upd in updates:
msg = upd.edited_message if upd.edited_message else upd.message
if not hasattr(msg, 'text') or not msg.text:
continue
sender = self._get_sender_name(msg.from_user)
date = msg.date.replace(tzinfo=self.config.timezone)
messages.append(
Comment(
comment_id=msg.message_id,
comment_type='tg',
sender=sender,
message=msg.text,
date=date
)
)
return messages
def _get_sender_name(self, user):
if user.first_name and user.last_name:
return f"{user.first_name} {user.last_name}"
return user.first_name or "Админ"
# YouTube (опционально)
if config.getboolean('YouTube', 'enabled', fallback=False):
print('Youtube enabled')
import pytchat
video_id = config['YouTube']['video_id']
chat = pytchat.create(video_id=video_id)
chat_connectors['youtube'] = chat
chat_comments['yt'] = []
class YouTubeService:
def __init__(self, config):
self.config = config
self.chat = pytchat.create(video_id=config.youtube_video_id)
def get_messages(self):
try:
items = self.chat.get()
if not hasattr(items, 'items'):
print('YT reconnecting...')
self.chat = pytchat.create(video_id=self.config.youtube_video_id)
return []
return self._process_messages(items.items)
except Exception as e:
print(f'YouTube error: {str(e)}')
return []
def _process_messages(self, items):
messages = []
for item in items:
date = datetime.fromisoformat(item.datetime).replace(tzinfo=self.config.timezone)
messages.append(
Comment(
comment_id=item.id,
comment_type='yt',
sender=item.author.name,
message=item.message,
date=date
)
)
return messages
# Twitch (опционально)
if config.getboolean('Twitch', 'enabled', fallback=False):
sys.path.append('twitchchatirc')
import twitch
twitch_channel = config['Twitch']['channel']
twitch_socket = twitch.TwitchChatIRC(twitch_channel)
chat_connectors['twitch'] = twitch_socket
chat_comments['tw'] = []
class TwitchService:
def __init__(self, config):
self.config = config
self.socket = twitch.TwitchChatIRC(config.twitch_channel)
def get_messages(self):
# Предполагаем, что сообщения добавляются в self.socket.all_messages
# в отдельном потоке (как в оригинальном коде)
return getattr(self.socket, 'all_messages', [])
# ====================
# Ядро приложения
# ====================
class ChatAggregator:
def __init__(self):
self.config = Config()
self.services = {
'alerts': DonationAlertsService(self.config),
'telegram': TelegramService(self.config),
# 'youtube': YouTubeService(self.config),
'twitch': TwitchService(self.config)
}
self.comments = []
self.lock = threading.Lock()
self.hello_message = Comment(
comment_id=random.randint(100000, 999999),
comment_type='hello',
sender='Eikichi-bot',
message='🔥 Спасибо всем на стриме за интерес к переводу и поддержку! 🔥',
date=datetime.now(self.config.timezone)
)
def run(self):
# Запуск сервера в отдельном потоке
server_thread = threading.Thread(target=self._run_server)
server_thread.daemon = True
server_thread.start()
# Запуск Twitch в отдельном потоке
twitch_thread = threading.Thread(target=self.services['twitch'].socket.listen)
twitch_thread.daemon = True
twitch_thread.start()
print('System started. Press Ctrl+Shift+Alt+C to exit.')
# Главный цикл
while True:
if keyboard.is_pressed('Ctrl+Shift+Alt+C'):
sys.exit(0)
if int(time.time()) % self.config.poll_interval == 0:
self.update_comments()
time.sleep(1)
def update_comments(self):
# Сбор сообщений со всех сервисов
new_comments = []
# DonationAlerts
new_comments.extend(self.services['alerts'].get_donations())
# YouTube
# new_comments.extend(self.services['youtube'].get_messages())
# Telegram
if self.config.telegram_enabled:
try:
telegram_comments = asyncio.run(self.services['telegram'].get_messages())
new_comments.extend(telegram_comments)
except Exception as e:
print(f'Telegram error: {e}')
# Twitch
new_comments.extend(self.services['twitch'].get_messages())
# Добавление приветственного сообщения
if int(time.time()) % self.config.hello_message_interval == 0:
new_comments.append(self.hello_message)
# Обновление основного списка комментариев
with self.lock:
# Добавление новых уникальных комментариев
existing_ids = {c.id for c in self.comments}
self.comments.extend(
c for c in new_comments
if c.id not in existing_ids
)
# Сортировка по дате
self.comments.sort(key=lambda x: x.date)
# Ограничение общего количества
self.comments = self.comments[-self.config.max_total_comments:]
print(f"{datetime.now().strftime('%H:%M:%S')} Updated. Total comments: {len(self.comments)}")
def get_comments_json(self):
with self.lock:
return json.dumps(
[c.to_dict() for c in self.comments],
cls=DateTimeEncoder,
ensure_ascii=False
)
def _run_server(self):
class Handler(BaseHTTPRequestHandler):
def __init__(self, *args, **kwargs):
self.aggregator = kwargs.pop('aggregator')
super().__init__(*args, **kwargs)
def _set_headers(self):
self.send_response(200)
self.send_header('Content-type', 'application/json; charset=utf-8')
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
def do_GET(self):
if self.path == '/alert_auth':
self.send_response(301)
self.send_header('Location', self.aggregator.services['alerts'].api.authorize.login())
self.end_headers()
elif self.path.startswith('/login'):
self._handle_alerts_login()
else:
self._set_headers()
self.wfile.write(self.aggregator.get_comments_json().encode('utf-8'))
def _handle_alerts_login(self):
url = dict(parse.parse_qsl(parse.urlsplit(self.path).query))
code = url.get('code')
if code:
try:
self.aggregator.services['alerts'].access_token = (
self.aggregator.services['alerts'].api.authorize.get_access_token(code)
)
print("DonationAlerts authorized successfully!")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(b'Authorization successful! You can close this page.')
except Exception as e:
print(f'Alerts auth error: {str(e)}')
self.send_error(500, 'Authorization failed')
else:
self.send_error(400, 'Missing code parameter')
def log_message(self, format, *args):
return
server_address = ('127.0.0.1', 8008)
httpd = HTTPServer(server_address, lambda *args: Handler(*args, aggregator=self))
print(f'Starting HTTP server on port 8008...')
httpd.serve_forever()
# ====================
# Вспомогательные классы
# ====================
# Функции для работы с датами (оставлены как есть)
class DateTimeEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, datetime):
return o.isoformat()
return super().default(o)
if isinstance(o, bytes):
return list(o)
return json.JSONEncoder.default(self, o)
# ====================
# Точка входа
# ====================
if __name__ == '__main__':
print('\n--- Telegram/YouTube/Twitch Chat Aggregator ---\n')
print('For DonationAlerts authorization visit: http://127.0.0.1:8008/alert_auth')
print('Press Ctrl+Shift+Alt+C to exit\n')
LOCAL_TIMEZONE = tzlocal.get_localzone_name()
tz = ZoneInfo('Asia/Yekaterinburg')
all_comments = []
all_old_comments = []
is_changed = False
overallcount = 0
# Получение комментариев из Telegram
async def get_telegram_comments():
global chat_comments
chat_comments['tg'] = []
if 'telegram' not in chat_connectors:
return
async with bot:
try:
updates = await bot.get_updates(
allowed_updates=['message', 'edited_message'],
timeout=None
)
except telegram.error.TimedOut:
print('TG connection timeout')
time.sleep(5)
return
except Exception as e:
print(f'TG connection error: {e}')
return
for upd in updates:
msg = upd.message
if upd.edited_message:
msg = upd.edited_message
if not hasattr(msg, 'text') or not msg.text:
continue
sendr = msg.from_user.first_name
if msg.from_user.last_name:
sendr = sendr + ' ' + msg.from_user.last_name
if sendr == "Group":
sendr = 'Админ'
netdatetime = msg.date.replace(tzinfo=tz)
comm = {
'id': msg.message_id,
'type': 'tg',
'date': netdatetime,
'sendr': sendr,
'msg': msg.text
}
chat_comments['tg'].append(comm)
# Получение комментариев из YouTube
def get_youtube_comments():
global chat_comments
aggregator = ChatAggregator()
aggregator.run()
if 'youtube' not in chat_connectors:
return
chat = chat_connectors['youtube']
itms = chat.get()
if not hasattr(itms, 'items'):
print('YT has no items attribute! (Empty?). Reconnecting...')
time.sleep(5)
video_id = config['YouTube']['video_id']
chat_connectors['youtube'] = pytchat.create(video_id=video_id)
return
for c in itms.items:
dt = datetime.fromisoformat(c.datetime).replace(tzinfo=tz)
comm = {
'id': c.id,
'type': 'yt',
'date': dt,
'sendr': c.author.name,
'msg': emoji.emojize(c.message)
}
chat_comments['yt'].append(comm)
# Получение комментариев из Twitch
def get_twitch_comments():
global chat_comments
if 'twitch' not in chat_connectors:
return
chat_comments['tw'] = chat_connectors['twitch'].all_messages
def sort_by_date(e):
return e['date']
def update_all_comments():
global all_comments
global chat_comments
# Добавляем новые комментарии из всех источников
for source in ['tg', 'yt', 'tw']:
if source in chat_comments:
for comment in chat_comments[source]:
if comment not in all_comments:
all_comments.append(comment)
# Сортируем по дате и ограничиваем количество
all_comments.sort(key=sort_by_date)
all_comments = all_comments[-150:]
def make_json_object():
global all_comments
global chat_comments
global overallcount
# Получаем комментарии из всех включенных источников
if 'youtube' in chat_connectors:
get_youtube_comments()
if 'twitch' in chat_connectors:
get_twitch_comments()
if 'telegram' in chat_connectors:
try:
asyncio.run(get_telegram_comments())
except Exception as e:
print(f'TG error: {e}')
time.sleep(2)
# Ограничиваем буферы для каждого источника
for source in ['yt', 'tg', 'tw']:
if source in chat_comments and len(chat_comments[source]) > 15:
chat_comments[source] = chat_comments[source][-15:]
# Добавляем приветственное сообщение каждые 10 минут
if int(time.time()) % 600 == 0:
dt = datetime.now(ZoneInfo('UTC')).astimezone(tz)
hello_msg = {
'id': random.randint(100000, 999999),
'type': 'hello',
'date': dt,
'sendr': 'Eikichi-bot',
'msg': '🔥 Спасибо всем на стриме за интерес к переводу и поддержку! 🔥'
}
all_comments.append(hello_msg)
# Обновляем общий список комментариев
update_all_comments()
overallcount += 1
print(f"{datetime.now().strftime('%H:%M:%S')} Проверка чатов... {len(all_comments)} элементов ({overallcount})")
def print_all_comments():
global all_comments
print('-' * 40)
print(all_comments)
print('-' * 40)
# HTTP сервер
class ChatServer(BaseHTTPRequestHandler):
def _set_headers(self):
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
def do_HEAD(self):
self._set_headers()
def log_message(self, format, *args):
# Отключаем логи сервера
return
def do_GET(self):
if self.path == '/':
self._set_headers()
self.wfile.write(json.dumps(all_comments, cls=DateTimeEncoder).encode('utf-8'))
else:
self.send_error(404, "Not Found")
def run_server(port=8008):
server_address = ('', port)
httpd = HTTPServer(server_address, ChatServer)
print(f'Starting HTTP сервер на http://127.0.0.1:{port} ...')
print()
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
httpd.server_close()
sys.exit(1)
# Основная часть программы
print()
print('--- Парсер чатов Telegram/YouTube/Twitch --- Sergey Shemet (C) 2025 ---')
print()
# Запускаем Twitch в отдельном потоке если включен
if 'twitch' in chat_connectors:
twitch_thread = threading.Thread(target=twitch_socket.listen, name='twitchirc')
twitch_thread.daemon = True
twitch_thread.start()
# Запускаем HTTP сервер в отдельном потоке
server_thread = threading.Thread(target=run_server, name='HTTPServer')
server_thread.daemon = True
server_thread.start()
print('Запуск подсистем чатов...')
print('...Удерживайте Ctrl+Alt+Shift+C для выхода и C для очистки консоли...')
poll_time = 2 # Интервал проверки в секундах
while True:
# Обработка горячих клавиш
if keyboard.is_pressed('Ctrl+Shift+Alt+c'):
sys.exit(1)
if keyboard.is_pressed('Ctrl+Shift+Alt+z'):
print_all_comments()
if keyboard.is_pressed('c'):
os.system("cls")
# Периодическая проверка чатов
this_second = int(time.time())
if this_second % poll_time == 0:
make_json_object()
time.sleep(1)