#!/bin/env python3 import subprocess import re import time import sqlite3 import os import requests def get_working_directory(): if os.path.exists("/userdata/media/0/localwrapped"): return "/userdata/media/0/localwrapped" elif os.path.exists(os.environ["HOME"] + "/localwrapped"): return os.environ["HOME"] + "/localwrapped" else: print("Path not exist") try: if os.path.exists("/userdata/media/0"): os.makedirs("/userdata/media/0/localwrapped") return "/userdata/media/0/localwrapped" elif os.path.exists(os.environ["HOME"]): os.makedirs(os.environ["HOME"] + "/localwrapped") return os.environ["HOME"] + "/localwrapped" except: print("Can't create path") exit() if not os.path.exists(f"{get_working_directory()}/key.txt"): print("Key not exist") exit() def get_verification_code(): try: with open(f"{get_working_directory()}/key.txt", "r") as f: key = f.read() try: verifyrequest = requests.post(os.environ["MSSERVER"] + "/verifykey", json={"key": key}) if verifyrequest.text == "0": print("Verification failed") return None elif len(verifyrequest.text) == 64: print("Verification success") return verifyrequest.text else: print("Unexpected response") return None except: print("Can't reach the server.") return None except: print("Key not exist") return None def get_music(): try: metadata = subprocess.check_output( ["playerctl", "metadata", "--format", "'{{artist}}|||{{title}}|||{{album}}'"], stderr=subprocess.DEVNULL, timeout=2 ).decode('utf-8').strip().replace("'", "") if not metadata or "No players found" in metadata: return None artist, title, album = [part.strip() or "Unknown" for part in metadata.split("|||")] return f"{title}, {artist}, {album}" except subprocess.CalledProcessError: return None except subprocess.TimeoutExpired: print("Playerctl timeout, media player busy?") return None except Exception as e: print(f"Metadata parse error: {str(e)}") return None def get_db_values(url: str = "https://halhadus.rocks/assets/localwrapped/music.db"): if not os.path.exists(get_working_directory() + "/music.db"): try: download = requests.get(url) with open(get_working_directory() + "/music.db", "wb") as f: f.write(download.content) except: print("Could not download the database.") exit() musicconn = sqlite3.connect(get_working_directory() + "/music.db") musicc = musicconn.cursor() musicc.execute("SELECT * FROM music") musicrows = musicc.fetchall() musicconn.close() musicvalues = [] for row in musicrows: musicvalues.append({ "listmusicname": row[0], "listvideoid": row[1], "fileartistname": row[4], "filealbumname": row[5], "filemusictitle": row[6] }) return musicvalues if not os.path.exists(get_working_directory() + "/count.db"): countconn = sqlite3.connect(get_working_directory() + "/count.db") countc = countconn.cursor() countc.execute("CREATE TABLE count (musicname TEXT, count INTEGER)") countconn.commit() countconn.close() a = 0 last_written = None last_sent = None resetstatus = "true" while True: if get_music() == None: continue last_music = get_music() time.sleep(5) if last_music == get_music(): if last_written == last_music: continue a += 1 if a >= 3: for i in get_db_values(): if i["filealbumname"] == None: full_name = i["filemusictitle"] + ", " + i["fileartistname"] + ", " + "Music" else: full_name = i["filemusictitle"] + ", " + i["fileartistname"] + ", " + i["filealbumname"] if full_name == last_music: if a == 12: countconn = sqlite3.connect(get_working_directory() + "/count.db") countc = countconn.cursor() countc.execute("SELECT * FROM count") countdbrows = countc.fetchall() countdbvalues = [] for row in countdbrows: countdbvalues.append({ "musicname": row[0], "count": row[1] }) if not [row for row in countdbvalues if row["musicname"] == i["listmusicname"]]: countc.execute("INSERT INTO count VALUES (?, ?)", (i["listmusicname"], 1)) countconn.commit() countconn.close() else: countc.execute("UPDATE count SET count = count + 1 WHERE musicname = ?", (i["listmusicname"],)) countconn.commit() countconn.close() print(f"Local Wrapped: {i['listmusicname']} has been played.") last_written = last_music a = 0 else: if last_sent == i["listmusicname"]: continue try: verificationcode = get_verification_code() if verificationcode == None: continue requests.post(os.environ["MSSERVER"] + "/musicstatus.html", json={"music": i["listmusicname"], "time": str(int(time.time())),"videoid":i["listvideoid"], "verificationcode": verificationcode, "resetstatus": resetstatus}) print("Data sent") resetstatus = "false" a += 1 last_sent = i["listmusicname"] except: print("Can't reach the server.") a += 1 continue else: a = 0 continue