import os import json as jsond # json import time # sleep before exit import binascii # hex encoding import platform # check platform import subprocess # needed for mac device import qrcode from datetime import datetime, timezone, timedelta from discord_interactions import verify_key # used for signature verification from PIL import Image try: if os.name == 'nt': import win32security # get sid (WIN only) import requests # https requests except ModuleNotFoundError: print("Exception when importing modules") print("Installing necessary modules....") if os.path.isfile("requirements.txt"): os.system("pip install -r requirements.txt") else: if os.name == 'nt': os.system("pip install pywin32") os.system("pip install requests") print("Modules installed!") time.sleep(1.5) os._exit(1) class api: name = ownerid = version = hash_to_check = "" def __init__(self, name, ownerid, version, hash_to_check): if len(ownerid) != 10: print("Visit https://keyauth.cc/app/, copy Pthon code, and replace code in main.py with that") time.sleep(3) os._exit(1) self.name = name self.ownerid = ownerid self.version = version self.hash_to_check = hash_to_check self.init() sessionid = enckey = "" initialized = False def init(self): if self.sessionid != "": print("You've already initialized!") time.sleep(3) os._exit(1) post_data = { "type": "init", "ver": self.version, "hash": self.hash_to_check, "name": self.name, "ownerid": self.ownerid } response = self.__do_request(post_data) if response == "KeyAuth_Invalid": print("The application doesn't exist") time.sleep(3) os._exit(1) json = jsond.loads(response) if json["message"] == "invalidver": if json["download"] != "": print("New Version Available") download_link = json["download"] os.system(f"start {download_link}") time.sleep(3) os._exit(1) else: print("Invalid Version, Contact owner to add download link to latest app version") time.sleep(3) os._exit(1) if not json["success"]: print(json["message"]) time.sleep(3) os._exit(1) self.sessionid = json["sessionid"] self.initialized = True def register(self, user, password, license, hwid=None): self.checkinit() if hwid is None: hwid = others.get_hwid() post_data = { "type": "register", "username": user, "pass": password, "key": license, "hwid": hwid, "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid } response = self.__do_request(post_data) json = jsond.loads(response) if json["success"]: print(json["message"]) self.__load_user_data(json["info"]) else: print(json["message"]) time.sleep(3) os._exit(1) def upgrade(self, user, license): self.checkinit() post_data = { "type": "upgrade", "username": user, "key": license, "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid } response = self.__do_request(post_data) json = jsond.loads(response) if json["success"]: print(json["message"]) print("Please restart program and login") time.sleep(3) os._exit(1) else: print(json["message"]) time.sleep(3) os._exit(1) def login(self, user, password, code=None, hwid=None): self.checkinit() if hwid is None: hwid = others.get_hwid() post_data = { "type": "login", "username": user, "pass": password, "hwid": hwid, "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid, } if code is not None: post_data["code"] = code response = self.__do_request(post_data) json = jsond.loads(response) if json["success"]: self.__load_user_data(json["info"]) print(json["message"]) else: print(json["message"]) time.sleep(3) os._exit(1) def license(self, key, code=None, hwid=None): self.checkinit() if hwid is None: hwid = others.get_hwid() post_data = { "type": "license", "key": key, "hwid": hwid, "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid } if code is not None: post_data["code"] = code response = self.__do_request(post_data) json = jsond.loads(response) if json["success"]: self.__load_user_data(json["info"]) print(json["message"]) else: print(json["message"]) time.sleep(3) os._exit(1) def var(self, name): self.checkinit() post_data = { "type": "var", "varid": name, "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid } response = self.__do_request(post_data) json = jsond.loads(response) if json["success"]: return json["message"] else: print(json["message"]) time.sleep(3) os._exit(1) def getvar(self, var_name): self.checkinit() post_data = { "type": "getvar", "var": var_name, "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid } response = self.__do_request(post_data) json = jsond.loads(response) if json["success"]: return json["response"] else: print(f"NOTE: This is commonly misunderstood. This is for user variables, not the normal variables.\nUse keyauthapp.var(\"{var_name}\") for normal variables"); print(json["message"]) time.sleep(3) os._exit(1) def setvar(self, var_name, var_data): self.checkinit() post_data = { "type": "setvar", "var": var_name, "data": var_data, "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid } response = self.__do_request(post_data) json = jsond.loads(response) if json["success"]: return True else: print(json["message"]) time.sleep(3) os._exit(1) def ban(self): self.checkinit() post_data = { "type": "ban", "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid } response = self.__do_request(post_data) json = jsond.loads(response) if json["success"]: return True else: print(json["message"]) time.sleep(3) os._exit(1) def file(self, fileid): self.checkinit() post_data = { "type": "file", "fileid": fileid, "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid } response = self.__do_request(post_data) json = jsond.loads(response) if not json["success"]: print(json["message"]) time.sleep(3) os._exit(1) return binascii.unhexlify(json["contents"]) def webhook(self, webid, param, body = "", conttype = ""): self.checkinit() post_data = { "type": "webhook", "webid": webid, "params": param, "body": body, "conttype": conttype, "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid } response = self.__do_request(post_data) json = jsond.loads(response) if json["success"]: return json["message"] else: print(json["message"]) time.sleep(3) os._exit(1) def check(self): self.checkinit() post_data = { "type": "check", "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid } response = self.__do_request(post_data) json = jsond.loads(response) if json["success"]: return True else: return False def checkblacklist(self): self.checkinit() hwid = others.get_hwid() post_data = { "type": "checkblacklist", "hwid": hwid, "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid } response = self.__do_request(post_data) json = jsond.loads(response) if json["success"]: return True else: return False def log(self, message): self.checkinit() post_data = { "type": "log", "pcuser": os.getenv('username'), "message": message, "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid } self.__do_request(post_data) def fetchOnline(self): self.checkinit() post_data = { "type": "fetchOnline", "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid } response = self.__do_request(post_data) json = jsond.loads(response) if json["success"]: if len(json["users"]) == 0: return None else: return json["users"] else: return None def fetchStats(self): self.checkinit() post_data = { "type": "fetchStats", "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid } response = self.__do_request(post_data) json = jsond.loads(response) if json["success"]: self.__load_app_data(json["appinfo"]) def chatGet(self, channel): self.checkinit() post_data = { "type": "chatget", "channel": channel, "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid } response = self.__do_request(post_data) json = jsond.loads(response) if json["success"]: return json["messages"] else: return None def chatSend(self, message, channel): self.checkinit() post_data = { "type": "chatsend", "message": message, "channel": channel, "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid } response = self.__do_request(post_data) json = jsond.loads(response) if json["success"]: return True else: return False def checkinit(self): if not self.initialized: print("Initialize first, in order to use the functions") time.sleep(3) os._exit(1) def changeUsername(self, username): self.checkinit() post_data = { "type": "changeUsername", "newUsername": username, "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid } response = self.__do_request(post_data) json = jsond.loads(response) if json["success"]: print("Successfully changed username") else: print(json["message"]) time.sleep(3) os._exit(1) def logout(self): self.checkinit() post_data = { "type": "logout", "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid } response = self.__do_request(post_data) json = jsond.loads(response) if json["success"]: print("Successfully logged out") time.sleep(3) os._exit(1) else: print(json["message"]) time.sleep(3) os._exit(1) def enable2fa(self, code=None): self.checkinit() post_data = { "type": "2faenable", "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid, "code": code } response = self.__do_request(post_data) json = jsond.loads(response) if json["success"]: if code is None: # First request: Display the 2FA secret code print(f"Your 2FA secret code is: {json['2fa']['secret_code']}") qr_code = json['2fa']['QRCode'] self.display_qr_code(qr_code) code_input = input("Enter the 6 digit 2fa code to enable 2fa: ") self.enable2fa(code_input); else: # Second request: Confirm successful 2FA activation print("2FA has been successfully enabled!") time.sleep(3) else: print(f"Error: {json['message']}") time.sleep(3) os._exit(1) def disable2fa(self, code=None): self.checkinit() code = input("Enter the 6 digit 2fa code to disable 2fa: ") post_data = { "type": "2fadisable", "sessionid": self.sessionid, "name": self.name, "ownerid": self.ownerid, "code": code } response = self.__do_request(post_data) json = jsond.loads(response) print(json['message']) time.sleep(3) def display_qr_code(self, qr_code_url): # Generate QR code image qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4, ) # Add the QR code URL data qr.add_data(qr_code_url) qr.make(fit=True) # Create an image from the QR code img = qr.make_image(fill='black', back_color='white') # Display the QR code image img.show() def __do_request(self, post_data): try: response = requests.post( "https://keyauth.win/api/1.3/", data=post_data, timeout=10 ) if post_data["type"] == "log" or post_data["type"] == "file" or post_data["type"] == "2faenable" or post_data["type"] == "2fadisable": return response.text # Get the signature and timestamp from the headers signature = response.headers.get("x-signature-ed25519") timestamp = response.headers.get("x-signature-timestamp") if not signature or not timestamp: print("Missing headers for signature verification.") time.sleep(3) os._exit(1) server_time = datetime.fromtimestamp(int(timestamp), timezone.utc) current_time = datetime.now(timezone.utc) #print(f"Server Timestamp (UTC seconds): {timestamp}") #print(f"Server Time (UTC seconds): {server_time.timestamp()}") #print(f"Current Time (UTC seconds): {current_time.timestamp()}") buffer_seconds = 5 time_difference = current_time - server_time if time_difference > timedelta(seconds=20 + buffer_seconds): print("Timestamp is too old (exceeded 20 seconds + buffer).") time.sleep(3) os._exit(1) if not verify_key(response.text.encode('utf-8'), signature, timestamp, '5586b4bc69c7a4b487e4563a4cd96afd39140f919bd31cea7d1c6a1e8439422b'): print("Signature checksum failed. Request was tampered with or session ended most likely.") time.sleep(3) os._exit(1) return response.text except requests.exceptions.Timeout: print("Request timed out. Server is probably down/slow at the moment") class application_data_class: numUsers = numKeys = app_ver = customer_panel = onlineUsers = "" class user_data_class: username = ip = hwid = expires = createdate = lastlogin = subscription = subscriptions = "" user_data = user_data_class() app_data = application_data_class() def __load_app_data(self, data): self.app_data.numUsers = data["numUsers"] self.app_data.numKeys = data["numKeys"] self.app_data.app_ver = data["version"] self.app_data.customer_panel = data["customerPanelLink"] self.app_data.onlineUsers = data["numOnlineUsers"] def __load_user_data(self, data): self.user_data.username = data["username"] self.user_data.ip = data["ip"] self.user_data.hwid = data["hwid"] or "N/A" self.user_data.expires = data["subscriptions"][0]["expiry"] self.user_data.createdate = data["createdate"] self.user_data.lastlogin = data["lastlogin"] self.user_data.subscription = data["subscriptions"][0]["subscription"] self.user_data.subscriptions = data["subscriptions"] class others: @staticmethod def get_hwid(): if platform.system() == "Linux": with open("/etc/machine-id") as f: hwid = f.read() return hwid elif platform.system() == 'Windows': winuser = os.getlogin() sid = win32security.LookupAccountName(None, winuser)[0] # You can also use WMIC (better than SID, some users had problems with WMIC) hwid = win32security.ConvertSidToStringSid(sid) return hwid ''' cmd = subprocess.Popen( "wmic useraccount where name='%username%' get sid", stdout=subprocess.PIPE, shell=True, ) (suppost_sid, error) = cmd.communicate() suppost_sid = suppost_sid.split(b"\n")[1].strip() return suppost_sid.decode() ^^ HOW TO DO IT USING WMIC ''' elif platform.system() == 'Darwin': output = subprocess.Popen("ioreg -l | grep IOPlatformSerialNumber", stdout=subprocess.PIPE, shell=True).communicate()[0] serial = output.decode().split('=', 1)[1].replace(' ', '') hwid = serial[1:-2] return hwid