import subprocess import re import time import sqlite3 import os import requests def get_working_directory(): # If you see this function, that means you are using the pyinstaller. Use this command to create the executable: # pyinstaller -s -F combomodule-pyi.py --runtime-tmpdir /data/local/tmp if os.path.exists("/sdcard/localwrapped"): return "/sdcard/localwrapped" elif os.path.exists("/storage/emulated/0/localwrapped"): return "/storage/emulated/0/localwrapped" elif os.path.exists("/data/media/0/localwrapped"): return "/data/media/0/localwrapped" elif os.path.exists("/mnt/user/0/emulated/0/localwrapped"): return "/mnt/user/0/emulated/0/localwrapped" else: print("Path not found.") try: if os.path.exists("/sdcard"): os.mkdir("/sdcard/localwrapped") return "/sdcard/localwrapped" else: raise Exception except: try: if os.path.exists("/storage/emulated/0"): os.mkdir("/storage/emulated/0/localwrapped") return "/storage/emulated/0/localwrapped" else: raise Exception except: try: if os.path.exists("/data/media/0"): os.mkdir("/data/media/0/localwrapped") return "/data/media/0/localwrapped" else: raise Exception except: try: if os.path.exists("/mnt/user/0/emulated/0"): os.mkdir("/mnt/user/0/emulated/0/localwrapped") return "/mnt/user/0/emulated/0/localwrapped" else: raise Exception except: print("Path not found and could not be created.") exit() if not os.path.exists(f"{get_working_directory()}/key.txt"): print("Key not exist") exit() def get_verification_code(): try: with open(f"{get_working_directory()}/key.txt", "r") as f: key = f.read() try: verifyrequest = requests.post(os.environ["MSSERVER"] + "/verifykey", json={"key": key}) if verifyrequest.text == "0": print("Verification failed") return None elif len(verifyrequest.text) == 64: print("Verification success") return verifyrequest.text else: print("Unexpected response") return None except: print("Can't reach the server.") return None except: print("Key not exist") return None def get_music(): dumpsys_output = subprocess.check_output("dumpsys media_session", shell=True).decode("utf-8").split("\n") if not [line for line in dumpsys_output if "PLAYING" in line]: return None else: description = [line for line in dumpsys_output if "description=" in line] try: return description[0].split("description=")[1] except: return None def get_db_values(url: str = "https://halhadus.rocks/assets/localwrapped/music.db"): if not os.path.exists(get_working_directory() + "/music.db"): try: download = requests.get(url) with open(get_working_directory() + "/music.db", "wb") as f: f.write(download.content) except: print("Could not download the database.") exit() musicconn = sqlite3.connect(get_working_directory() + "/music.db") musicc = musicconn.cursor() musicc.execute("SELECT * FROM music") musicrows = musicc.fetchall() musicconn.close() musicvalues = [] for row in musicrows: musicvalues.append({ "listmusicname": row[0], "fileartistname": row[4], "filealbumname": row[5], "filemusictitle": row[6] }) return musicvalues if not os.path.exists(get_working_directory() + "/count.db"): countconn = sqlite3.connect(get_working_directory() + "/count.db") countc = countconn.cursor() countc.execute("CREATE TABLE count (musicname TEXT, count INTEGER)") countconn.commit() countconn.close() a = 0 last_written = None last_sent = None resetstatus = "true" while True: if get_music() == None: continue last_music = get_music() time.sleep(5) if last_music == get_music(): if last_written == last_music: continue a += 1 if a >= 3: for i in get_db_values(): if i["filealbumname"] == None: full_name = i["filemusictitle"] + ", " + i["fileartistname"] + ", " + "Music" else: full_name = i["filemusictitle"] + ", " + i["fileartistname"] + ", " + i["filealbumname"] if full_name == last_music: if a == 12: countconn = sqlite3.connect(get_working_directory() + "/count.db") countc = countconn.cursor() countc.execute("SELECT * FROM count") countdbrows = countc.fetchall() countdbvalues = [] for row in countdbrows: countdbvalues.append({ "musicname": row[0], "count": row[1] }) if not [row for row in countdbvalues if row["musicname"] == i["listmusicname"]]: countc.execute("INSERT INTO count VALUES (?, ?)", (i["listmusicname"], 1)) countconn.commit() countconn.close() else: countc.execute("UPDATE count SET count = count + 1 WHERE musicname = ?", (i["listmusicname"],)) countconn.commit() countconn.close() print(f"Local Wrapped: {i['listmusicname']} has been played.") os.system(f"echo {i['listmusicname']} has been played. >> /cache/magisk.log") last_written = last_music a = 0 else: if last_sent == i["listmusicname"]: continue try: verificationcode = get_verification_code() if verificationcode == None: continue requests.post(os.environ["MSSERVER"] + "/musicstatus.html", json={"music": i["listmusicname"], "time": str(int(time.time())), "verificationcode": verificationcode, "resetstatus": resetstatus}) print("Data sent") resetstatus = "false" a += 1 last_sent = i["listmusicname"] except: print("Can't reach the server.") a += 1 continue else: a = 0 continue