combo-allinone-linux/combolinux.py
2025-05-08 09:20:17 +00:00

169 lines
6.5 KiB
Python

#!/bin/env python3
import subprocess
import re
import time
import sqlite3
import os
import requests
def get_working_directory():
if os.path.exists("/userdata/media/0/localwrapped"):
return "/userdata/media/0/localwrapped"
elif os.path.exists(os.environ["HOME"] + "/localwrapped"):
return os.environ["HOME"] + "/localwrapped"
else:
print("Path not exist")
try:
if os.path.exists("/userdata/media/0"):
os.makedirs("/userdata/media/0/localwrapped")
return "/userdata/media/0/localwrapped"
elif os.path.exists(os.environ["HOME"]):
os.makedirs(os.environ["HOME"] + "/localwrapped")
return os.environ["HOME"] + "/localwrapped"
except:
print("Can't create path")
exit()
if not os.path.exists(f"{get_working_directory()}/key.txt"):
print("Key not exist")
exit()
def get_verification_code():
try:
with open(f"{get_working_directory()}/key.txt", "r") as f:
key = f.read()
try:
verifyrequest = requests.post(os.environ["MSSERVER"] + "/verifykey", json={"key": key})
if verifyrequest.text == "0":
print("Verification failed")
return None
elif len(verifyrequest.text) == 64:
print("Verification success")
return verifyrequest.text
else:
print("Unexpected response")
return None
except:
print("Can't reach the server.")
return None
except:
print("Key not exist")
return None
def get_music():
try:
metadata = subprocess.check_output(
["playerctl", "metadata", "--format", "'{{artist}}|||{{title}}|||{{album}}'"],
stderr=subprocess.DEVNULL,
timeout=2
).decode('utf-8').strip().replace("'", "")
if not metadata or "No players found" in metadata:
return None
artist, title, album = [part.strip() or "Unknown" for part in metadata.split("|||")]
return f"{title}, {artist}, {album}"
except subprocess.CalledProcessError:
return None
except subprocess.TimeoutExpired:
print("Playerctl timeout, media player busy?")
return None
except Exception as e:
print(f"Metadata parse error: {str(e)}")
return None
def get_db_values(url: str = "https://halhadus.rocks/assets/localwrapped/music.db"):
if not os.path.exists(get_working_directory() + "/music.db"):
try:
download = requests.get(url)
with open(get_working_directory() + "/music.db", "wb") as f:
f.write(download.content)
except:
print("Could not download the database.")
exit()
musicconn = sqlite3.connect(get_working_directory() + "/music.db")
musicc = musicconn.cursor()
musicc.execute("SELECT * FROM music")
musicrows = musicc.fetchall()
musicconn.close()
musicvalues = []
for row in musicrows:
musicvalues.append({
"listmusicname": row[0],
"listvideoid": row[1],
"fileartistname": row[4],
"filealbumname": row[5],
"filemusictitle": row[6]
})
return musicvalues
if not os.path.exists(get_working_directory() + "/count.db"):
countconn = sqlite3.connect(get_working_directory() + "/count.db")
countc = countconn.cursor()
countc.execute("CREATE TABLE count (musicname TEXT, count INTEGER)")
countconn.commit()
countconn.close()
a = 0
last_written = None
last_sent = None
resetstatus = "true"
while True:
if get_music() == None:
continue
last_music = get_music()
time.sleep(5)
if last_music == get_music():
if last_written == last_music:
continue
a += 1
if a >= 3:
for i in get_db_values():
if i["filealbumname"] == None:
full_name = i["filemusictitle"] + ", " + i["fileartistname"] + ", " + "Music"
else:
full_name = i["filemusictitle"] + ", " + i["fileartistname"] + ", " + i["filealbumname"]
if full_name == last_music:
if a == 12:
countconn = sqlite3.connect(get_working_directory() + "/count.db")
countc = countconn.cursor()
countc.execute("SELECT * FROM count")
countdbrows = countc.fetchall()
countdbvalues = []
for row in countdbrows:
countdbvalues.append({
"musicname": row[0],
"count": row[1]
})
if not [row for row in countdbvalues if row["musicname"] == i["listmusicname"]]:
countc.execute("INSERT INTO count VALUES (?, ?)", (i["listmusicname"], 1))
countconn.commit()
countconn.close()
else:
countc.execute("UPDATE count SET count = count + 1 WHERE musicname = ?", (i["listmusicname"],))
countconn.commit()
countconn.close()
print(f"Local Wrapped: {i['listmusicname']} has been played.")
last_written = last_music
a = 0
else:
if last_sent == i["listmusicname"]:
continue
try:
verificationcode = get_verification_code()
if verificationcode == None:
continue
requests.post(os.environ["MSSERVER"] + "/musicstatus.html", json={"music": i["listmusicname"], "time": str(int(time.time())),"videoid":i["listvideoid"], "verificationcode": verificationcode, "resetstatus": resetstatus})
print("Data sent")
resetstatus = "false"
a += 1
last_sent = i["listmusicname"]
except:
print("Can't reach the server.")
a += 1
continue
else:
a = 0
continue