Files
UltraChat/huySeek.py
sShemet c623b8c2f9 fixes
2026-01-09 17:40:45 +05:00

339 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import configparser
import random
import keyboard
import json
import os
import sys
import threading
import asyncio
import socket
import emoji
from datetime import datetime, timedelta
import time
import tzlocal
from zoneinfo import ZoneInfo
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib import parse
from datetime import timezone
# Чтение конфигурации
config = configparser.ConfigParser()
config.read("config.ini")
# Инициализация модулей на основе конфигурации
chat_connectors = {}
chat_comments = {}
# Telegram
if config.getboolean('Telegram', 'enabled', fallback=False):
import telegram
bot = telegram.Bot(config['Telegram']['bot_secret'])
chat_connectors['telegram'] = bot
chat_comments['tg'] = []
# YouTube
if config.getboolean('YouTube', 'enabled', fallback=False):
print('Youtube enabled')
import pytchat
video_id = config['YouTube']['video_id']
chat = pytchat.create(video_id=video_id)
chat_connectors['youtube'] = chat
chat_comments['yt'] = []
# Twitch
TWITCH_CHANNEL = None
if config.getboolean('Twitch', 'enabled', fallback=False):
sys.path.append('twitchchatirc')
import twitch
twitch_channel = config['Twitch']['channel']
twitch_socket = twitch.TwitchChatIRC(twitch_channel)
chat_connectors['twitch'] = twitch_socket
TWITCH_CHANNEL = config['Twitch']['channel']
chat_comments['tw'] = []
# Функции для работы с датами
class DateTimeEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, datetime):
return o.isoformat()
if isinstance(o, bytes):
return list(o)
return json.JSONEncoder.default(self, o)
LOCAL_TIMEZONE = tzlocal.get_localzone_name()
tz = ZoneInfo('Asia/Yekaterinburg')
all_comments = []
all_old_comments = []
is_changed = False
overallcount = 0
last_tg_update_id = 0
# Получение комментариев из Telegram
async def get_telegram_comments():
global chat_comments
global last_tg_update_id
chat_comments['tg'] = []
if 'telegram' not in chat_connectors:
return
async with bot:
try:
updates = await bot.get_updates(
offset=last_tg_update_id + 1,
allowed_updates=['message', 'edited_message'],
timeout=None
)
except telegram.error.TimedOut:
print('TG connection timeout')
time.sleep(5)
return
except Exception as e:
print(f'TG connection error: {e}')
return
if not updates:
return
last_tg_update_id = updates[-1].update_id
for upd in updates:
msg = upd.message
if upd.edited_message:
msg = upd.edited_message
if not hasattr(msg, 'text') or not msg.text:
continue
sendr = msg.from_user.first_name
if msg.from_user.last_name:
sendr = sendr + ' ' + msg.from_user.last_name
if sendr == "Group":
sendr = 'Админ'
try:
netdatetime = msg.date.replace(tzinfo=timezone.utc).astimezone(tz)
comm = {
'id': msg.message_id,
'type': 'tg',
'date': netdatetime,
'sendr': sendr,
'msg': msg.text
}
chat_comments['tg'].append(comm)
except Exception as e:
print(f"Ошибка преобразования времени: {e}")
# Получение комментариев из YouTube
def get_youtube_comments():
global chat_comments
if 'youtube' not in chat_connectors:
return
chat = chat_connectors['youtube']
itms = chat.get()
if not hasattr(itms, 'items'):
print('YT has no items attribute! (Empty?). Reconnecting...')
time.sleep(5)
video_id = config['YouTube']['video_id']
chat_connectors['youtube'] = pytchat.create(video_id=video_id)
return
for c in itms.items:
dt = datetime.fromisoformat(c.datetime).replace(tzinfo=tz)
comm = {
'id': c.id,
'type': 'yt',
'date': dt,
'sendr': c.author.name,
'msg': emoji.emojize(c.message)
}
chat_comments['yt'].append(comm)
# Получение комментариев из Twitch
def get_twitch_comments():
global chat_comments
if 'twitch' not in chat_connectors:
return
chat_comments['tw'] = chat_connectors['twitch'].all_messages
def sort_by_date(e):
return e['date']
def update_all_comments():
global all_comments
global chat_comments
# Временный список для новых комментариев
new_comments = []
# Собираем новые комментарии из всех источников
for source in ['tg', 'yt', 'tw']:
if source in chat_comments and chat_comments[source]:
# Добавляем все новые комментарии из этого источника
new_comments.extend(chat_comments[source])
# Очищаем буфер этого источника
chat_comments[source] = []
# Добавляем новые комментарии к существующим
all_comments.extend(new_comments)
# Сортируем по дате
all_comments.sort(key=sort_by_date)
# Удаляем дубликаты по ID (если они есть)
seen_ids = set()
unique_comments = []
for comment in all_comments:
comment_id = (comment['id'], comment['type']) # ID уникален в рамках типа
if comment_id not in seen_ids:
seen_ids.add(comment_id)
unique_comments.append(comment)
# Ограничиваем количество и сохраняем
all_comments = unique_comments[-150:]
def make_json_object():
global all_comments
global chat_comments
global overallcount
# Получаем комментарии из всех включенных источников
if 'youtube' in chat_connectors:
get_youtube_comments()
if 'twitch' in chat_connectors:
get_twitch_comments()
if 'telegram' in chat_connectors:
try:
asyncio.run(get_telegram_comments())
except Exception as e:
print(f'TG error: {e}')
time.sleep(2)
# Ограничиваем буферы для каждого источника
for source in ['yt', 'tg', 'tw']:
if source in chat_comments and len(chat_comments[source]) > 15:
# Сортируем ВСЕ источники по дате (старые -> новые)
chat_comments[source].sort(key=lambda x: x['date'])
# Берём последние 15 (самые новые)
chat_comments[source] = chat_comments[source][-15:]
# Добавляем приветственное сообщение каждые 10 минут
current_second = int(time.time())
if current_second % 600 <= 5:
# Проверяем, есть ли приветственное сообщение от Eikichi-bot в последних 20 сообщениях
has_recent_hello = any(
comment.get('sendr') == 'Eikichi-bot' and comment.get('type') == 'hello'
for comment in all_comments[-20:] # Последние 20 сообщений
)
if not has_recent_hello:
dt = datetime.now(ZoneInfo('UTC')).astimezone(tz)
hello_msg = {
'id': random.randint(100000, 999999),
'type': 'hello',
'date': dt,
'sendr': 'Eikichi-bot',
'msg': '🔥 Спасибо всем на стриме за интерес к переводу и поддержку! 🔥'
}
all_comments.append(hello_msg)
print(f"{datetime.now().strftime('%H:%M:%S')} Добавлено приветственное сообщение")
# Обновляем общий список комментариев
update_all_comments()
overallcount += 1
print(f"{datetime.now().strftime('%H:%M:%S')} Проверка чатов... {len(all_comments)} элементов ({overallcount})")
def print_all_comments():
global all_comments
global chat_comments
print('-' * 40)
print(all_comments)
print('-' * 40)
print(chat_comments)
# HTTP сервер
class ChatServer(BaseHTTPRequestHandler):
def _set_headers(self):
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
def do_HEAD(self):
self._set_headers()
def log_message(self, format, *args):
# Отключаем логи сервера
return
def do_GET(self):
if self.path == '/':
self._set_headers()
self.wfile.write(json.dumps(all_comments, cls=DateTimeEncoder).encode('utf-8'))
else:
self.send_error(404, "Not Found")
def run_server(port=8008):
server_address = ('', port)
httpd = HTTPServer(server_address, ChatServer)
print(f'Starting HTTP сервер на http://127.0.0.1:{port} ...')
print()
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
httpd.server_close()
sys.exit(1)
# Основная часть программы
print()
print('--- Парсер чатов Telegram/YouTube/Twitch --- Sergey Shemet (C) 2025 ---')
print()
# Запускаем Twitch в отдельном потоке если включен
if 'twitch' in chat_connectors:
twitch_thread = threading.Thread(target=twitch_socket.listen, name='twitchirc')
twitch_thread.daemon = True
twitch_thread.start()
# Запускаем HTTP сервер в отдельном потоке
server_thread = threading.Thread(target=run_server, name='HTTPServer')
server_thread.daemon = True
server_thread.start()
print('Запуск подсистем чатов...')
print('...Удерживайте Ctrl+Alt+Shift+C для выхода и C для очистки консоли...')
# print('Загрузка 7TV эмодзи...')
# emotes_cache = get_7tv_emotes() if TWITCH_CHANNEL else {}
poll_time = 2 # Интервал проверки в секундах
while True:
# Обработка горячих клавиш
if keyboard.is_pressed('Ctrl+Shift+Alt+c'):
sys.exit(1)
if keyboard.is_pressed('Ctrl+Shift+Alt+z'):
print_all_comments()
if keyboard.is_pressed('c'):
os.system("cls")
# Периодическая проверка чатов
this_second = int(time.time())
if this_second % poll_time == 0:
make_json_object()
time.sleep(1)