Files
UltraChat/twitch.py
sShemet ade2833df7 init
2025-12-22 14:03:10 +05:00

88 lines
3.3 KiB
Python

############################################### twitchSockets
import re
from zoneinfo import *
from datetime import datetime
import socket
# from huy import tw_comments
class TwitchChatIRC():
__HOST = 'irc.chat.twitch.tv'
__NICK = 'justinfan67420'
__PASS = 'SCHMOOPIIE'
__PORT = 6667
all_messages = []
__PATTERN = re.compile(r'@(.+?(?=\s+:)).*PRIVMSG[^:]*:([^\r\n]*)')
def __init__(self, channel):
self.__SOCKET = socket.socket()
self.__SOCKET.connect((self.__HOST, self.__PORT))
print('Connected to twitch socket:',self.__HOST,'on port',self.__PORT)
self.__send_raw('CAP REQ :twitch.tv/tags')
self.__send_raw('PASS ' + self.__PASS)
self.__send_raw('NICK ' + self.__NICK)
self.__send_raw('JOIN #{}'.format(channel))
self.__SOCKET.settimeout(2.0)
def __send_raw(self, string):
self.__SOCKET.send((string+'\r\n').encode('utf-8'))
def __recvall(self,buffer_size):
data = b''
while True:
part =self.__SOCKET.recv(buffer_size)
data += part
if len(part) < buffer_size:
break
return data.decode('utf-8')
def close_connection(__SOCKET):
__SOCKET.close()
print('Connection closed')
def listen(self, timeout=None, message_timeout=1.0, on_message = None, buffer_size = 4096, message_limit = None, output=None):
readbuffer = ''
print('Starting twitch thread...')
try:
while True:
try:
new_info = self.__recvall(buffer_size)
readbuffer += new_info
if('PING :tmi.twitch.tv' in readbuffer):
self.__send_raw('PONG :tmi.twitch.tv')
matches = list(self.__PATTERN.finditer(readbuffer))
if(matches):
if(len(matches) > 1):
matches = matches[:-1] # assume last one is incomplete
last_index = matches[-1].span()[1]
readbuffer = readbuffer[last_index:]
for match in matches:
data = {}
for item in match.group(1).split(';'):
keys = item.split('=',1)
data[keys[0]]=keys[1]
data['message'] = match.group(2)
dt = datetime.fromtimestamp(int(data['tmi-sent-ts'])/1000).replace(tzinfo=ZoneInfo('Asia/Yekaterinburg'))
my_data = dict({'id': data['id'], 'type': 'tw', 'date': dt, 'sendr': data['display-name'], 'msg': data['message'].encode('utf-8').decode('utf-8','ignore')})
# print(data)
self.all_messages.append(my_data)
except socket.timeout:
if(timeout != None):
print('timeout!')
except KeyboardInterrupt:
print('Interrupted by user.')
# except Exception as e:
# print('Unknown Error:',e)
# raise e
# return messages
########################################################################################