import configparser import random import keyboard import json import os import sys import threading import asyncio from datetime import datetime, timedelta import time from zoneinfo import ZoneInfo from http.server import BaseHTTPRequestHandler, HTTPServer import pytchat import telegram from donationalerts import DonationAlertsAPI, Scope from urllib import parse import emoji sys.path.append('twitchchatirc') import twitch # ==================== # Конфигурация # ==================== class Config: def __init__(self): self.config = configparser.ConfigParser() self.config.read("config.ini") # Настройки self.timezone = ZoneInfo('Asia/Yekaterinburg') self.poll_interval = 2 # seconds self.max_comments_per_source = 15 self.max_total_comments = 150 self.hello_message_interval = 600 # seconds # API keys self.alerts_app_id = self.config['Alerts']['app_id'] self.alerts_api_key = self.config['Alerts']['api_key'] self.telegram_enabled = self.config['Telegram']['enabled'] self.telegram_bot_secret = self.config['Telegram']['bot_secret'] self.youtube_video_id = self.config['Youtube']['video_id'] self.twitch_channel = self.config['Twitch']['channel'] # ==================== # Модели данных # ==================== class Comment: def __init__(self, comment_id, comment_type, sender, message, date, amount=None): self.id = comment_id self.type = comment_type self.sender = sender self.message = message self.date = date self.amount = amount def to_dict(self): return { 'id': self.id, 'type': self.type, 'sender': self.sender, 'message': emoji.emojize(self.message), 'date': self.date, 'amount': self.amount } # ==================== # Сервисы # ==================== class DonationAlertsService: def __init__(self, config): self.config = config self.api = DonationAlertsAPI( config.alerts_app_id, config.alerts_api_key, "http://127.0.0.1:8008/login", [Scope.OAUTH_USER_SHOW, Scope.OAUTH_DONATION_INDEX] ) self.access_token = None def get_donations(self): if not self.access_token: print('Waiting for Alerts auth...') return [] try: donations = self.api.donations.get(self.access_token.access_token) return self._process_donations(donations.items) except Exception as e: print(f'Alerts error: {str(e)}') return [] def _process_donations(self, items): result = [] for item in items: if not item.is_shown: continue sender = item.username or 'Аноним' message = item.message or '---' amount = self._format_amount(item.amount, item.currency) date = self._parse_donation_date(item.shown_at) result.append( Comment( comment_id=item.id, comment_type='donate', sender=sender, message=message, date=date, amount=amount ) ) return result def _format_amount(self, amount, currency): if amount != int(amount): return f"{format(amount, '.2f')} {currency}" return f"{int(amount)} {currency}" def _parse_donation_date(self, date_str): dt = datetime.fromisoformat(date_str).astimezone(self.config.timezone) # Фильтр донатов за последний месяц if dt > datetime.now(self.config.timezone) - timedelta(days=30): return dt return None class TelegramService: def __init__(self, config): self.config = config self.bot = telegram.Bot(config.telegram_bot_secret) if config.telegram_enabled else None async def get_messages(self): if not self.bot: return [] try: updates = await self.bot.get_updates( allowed_updates=['message', 'edited_message'], timeout=None ) return self._process_updates(updates) except Exception as e: print(f'Telegram error: {str(e)}') return [] def _process_updates(self, updates): messages = [] for upd in updates: msg = upd.edited_message if upd.edited_message else upd.message if not hasattr(msg, 'text') or not msg.text: continue sender = self._get_sender_name(msg.from_user) date = msg.date.replace(tzinfo=self.config.timezone) messages.append( Comment( comment_id=msg.message_id, comment_type='tg', sender=sender, message=msg.text, date=date ) ) return messages def _get_sender_name(self, user): if user.first_name and user.last_name: return f"{user.first_name} {user.last_name}" return user.first_name or "Админ" class YouTubeService: def __init__(self, config): self.config = config self.chat = pytchat.create(video_id=config.youtube_video_id) def get_messages(self): try: items = self.chat.get() if not hasattr(items, 'items'): print('YT reconnecting...') self.chat = pytchat.create(video_id=self.config.youtube_video_id) return [] return self._process_messages(items.items) except Exception as e: print(f'YouTube error: {str(e)}') return [] def _process_messages(self, items): messages = [] for item in items: date = datetime.fromisoformat(item.datetime).replace(tzinfo=self.config.timezone) messages.append( Comment( comment_id=item.id, comment_type='yt', sender=item.author.name, message=item.message, date=date ) ) return messages class TwitchService: def __init__(self, config): self.config = config self.socket = twitch.TwitchChatIRC(config.twitch_channel) def get_messages(self): # Предполагаем, что сообщения добавляются в self.socket.all_messages # в отдельном потоке (как в оригинальном коде) return getattr(self.socket, 'all_messages', []) # ==================== # Ядро приложения # ==================== class ChatAggregator: def __init__(self): self.config = Config() self.services = { 'alerts': DonationAlertsService(self.config), 'telegram': TelegramService(self.config), 'youtube': YouTubeService(self.config), 'twitch': TwitchService(self.config) } self.comments = [] self.lock = threading.Lock() self.hello_message = Comment( comment_id=random.randint(100000, 999999), comment_type='hello', sender='Eikichi-bot', message='🔥 Спасибо всем на стриме за интерес к переводу и поддержку! 🔥', date=datetime.now(self.config.timezone) ) def run(self): # Запуск сервера в отдельном потоке server_thread = threading.Thread(target=self._run_server) server_thread.daemon = True server_thread.start() # Запуск Twitch в отдельном потоке twitch_thread = threading.Thread(target=self.services['twitch'].socket.listen) twitch_thread.daemon = True twitch_thread.start() print('System started. Press Ctrl+Shift+Alt+C to exit.') # Главный цикл while True: if keyboard.is_pressed('Ctrl+Shift+Alt+C'): sys.exit(0) if int(time.time()) % self.config.poll_interval == 0: self.update_comments() time.sleep(1) def update_comments(self): # Сбор сообщений со всех сервисов new_comments = [] # DonationAlerts new_comments.extend(self.services['alerts'].get_donations()) # YouTube new_comments.extend(self.services['youtube'].get_messages()) # Telegram if self.config.telegram_enabled: try: telegram_comments = asyncio.run(self.services['telegram'].get_messages()) new_comments.extend(telegram_comments) except Exception as e: print(f'Telegram error: {e}') # Twitch new_comments.extend(self.services['twitch'].get_messages()) # Добавление приветственного сообщения if int(time.time()) % self.config.hello_message_interval == 0: new_comments.append(self.hello_message) # Обновление основного списка комментариев with self.lock: # Добавление новых уникальных комментариев existing_ids = {c.id for c in self.comments} self.comments.extend( c for c in new_comments if c.id not in existing_ids ) # Сортировка по дате self.comments.sort(key=lambda x: x.date) # Ограничение общего количества self.comments = self.comments[-self.config.max_total_comments:] print(f"{datetime.now().strftime('%H:%M:%S')} Updated. Total comments: {len(self.comments)}") def get_comments_json(self): with self.lock: return json.dumps( [c.to_dict() for c in self.comments], cls=DateTimeEncoder, ensure_ascii=False ) def _run_server(self): class Handler(BaseHTTPRequestHandler): def __init__(self, *args, **kwargs): self.aggregator = kwargs.pop('aggregator') super().__init__(*args, **kwargs) def _set_headers(self): self.send_response(200) self.send_header('Content-type', 'application/json; charset=utf-8') self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() def do_GET(self): if self.path == '/alert_auth': self.send_response(301) self.send_header('Location', self.aggregator.services['alerts'].api.authorize.login()) self.end_headers() elif self.path.startswith('/login'): self._handle_alerts_login() else: self._set_headers() self.wfile.write(self.aggregator.get_comments_json().encode('utf-8')) def _handle_alerts_login(self): url = dict(parse.parse_qsl(parse.urlsplit(self.path).query)) code = url.get('code') if code: try: self.aggregator.services['alerts'].access_token = ( self.aggregator.services['alerts'].api.authorize.get_access_token(code) ) print("DonationAlerts authorized successfully!") self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(b'Authorization successful! You can close this page.') except Exception as e: print(f'Alerts auth error: {str(e)}') self.send_error(500, 'Authorization failed') else: self.send_error(400, 'Missing code parameter') def log_message(self, format, *args): return server_address = ('', 8008) httpd = HTTPServer(server_address, lambda *args: Handler(*args, aggregator=self)) print(f'Starting HTTP server on port 8008...') httpd.serve_forever() # ==================== # Вспомогательные классы # ==================== class DateTimeEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, datetime): return o.isoformat() return super().default(o) # ==================== # Точка входа # ==================== if __name__ == '__main__': print('\n--- Telegram/YouTube/Twitch Chat Aggregator ---\n') print('For DonationAlerts authorization visit: http://127.0.0.1:8008/alert_auth') print('Press Ctrl+Shift+Alt+C to exit\n') aggregator = ChatAggregator() aggregator.run()