~aleteoryx/bcdl

0f1bf5f2ea0d368e48531348a2ee89481d5808fb — Aleteoryx 2 months ago 9c236f0
simple scrobbler
1 files changed, 233 insertions(+), 0 deletions(-)

A playback2scrob.py
A playback2scrob.py => playback2scrob.py +233 -0
@@ 0,0 1,233 @@
#!/bin/env python

doc = '''
scrobble .rockbox/playback.log, handling exclusions and such.
this script assumes filenames of the form '$root/<artist>/<album>? (Disc <#>)?/<#> - <name>.<ext>'

config is INI-format. it supports the following parameters:
	[pb2s]
		roots - comma-separated list of valid directories for music
		offset - time-zone offset. e.g. -5 for EST
	
'''

API_KEY = '67c56bce3d1ae7d5574b7d51028e8db0'
SECRET = '8c19d08c8ad2a4c7b490dea2b53876a0' # i guess i have to make this public? WTF.

from configparser import ConfigParser
from sys import stderr, argv, exit
from dataclasses import dataclass
from typing import Optional, Iterator, List, Set
import re
import json
from hashlib import md5
from urllib.request import urlopen
from urllib.parse import urlencode
from urllib.error import HTTPError
from time import sleep, time

argv0 = 'playback2scrob'

def die(why):
	print('fatal:', why, file=stderr)
	exit(-1)

def usage():
	global argv0, doc
	print(f'usage: {argv0} <CONFIG> <LOGFILE>')
	print()
	print(doc)
	exit(-1)


### CONFIG ###

@dataclass(init=False)
class Config:
	roots: Set[str]
	offset: int
	
	def __init__(self, config: ConfigParser):
		try:
			self.roots = config.get('pb2s', 'roots', fallback='/')
			self.roots = set([x.strip() for x in self.roots.split(',')])
			
			self.offset = config.getint('pb2s', 'offset', fallback=0)*3600
			
		except Exception as e:
			die(f'config invalid: {e}')


### PARSING ###

@dataclass
class RawLogEntry:
	when: int
	what: str
	elapsed: int
	duration: int

@dataclass
class Scrobble:
	when: int
	track: str
	artist: str
	album: Optional[str]

def split_logfile(config: Config, lines: Iterator[str]) -> Iterator[RawLogEntry]:
	for line in lines:
		line = line.strip()
		if line[0] in '#':
			continue

		when,elapsed,duration,what = line.split(':')
		yield RawLogEntry(int(when)-config.offset, what, int(elapsed), int(duration))

def filter_logfile(config: Config, entries: Iterator[RawLogEntry]) -> Iterator[RawLogEntry]:
	for entry in entries:
		if entry.elapsed < 30_000:
			continue
		if entry.elapsed < entry.duration // 2:
			continue
		if (entry.when + (14 * 86_400)) < time(): # 2 weeks
			continue
		for root in config.roots:
			if entry.what.startswith(root):
				entry.what = entry.what[len(root):]
				if entry.what[0] == '/':
					entry.what = entry.what[1:]
				break
		else:
			continue
		
		yield entry

def scrobblify(entries: Iterator[RawLogEntry]) -> Iterator[Scrobble]:
	for entry in entries:
		segs = entry.what.split('/')
		if len(segs) < 3:
			continue
		
		artist, album, track = segs[-3:]
		
		if (m := re.search(' ?\\(Disc \\d+\\)$', album)) is not None:
			album = album[:m.start()]
		
		if (m := re.match('\\d+ - ', track)) is not None:
			track = track[m.end():]
		track = track.rsplit('.', 1)[0]
		
		yield Scrobble(entry.when, track, artist, album)


### FUCKING LAST.FM ###

class APIException(Exception):
	def __init__(self, method, code, msg):
		super().__init__(f'got error {code} in {method}: {msg}')

def api_call(method: str, *, post=False, **params):
	global API_KEY, SECRET
	params['api_key'] = API_KEY
	params['method'] = method

	keys = sorted(params.keys(), key=lambda k: k.encode('utf-8'))
	sig = md5()
	for k in keys:
		sig.update(k.encode('utf-8'))
		sig.update(params[k].encode('utf-8'))
	sig.update(SECRET.encode('utf-8'))

	params['api_sig'] = sig.hexdigest()
	params['format'] = 'json'
	
	params = urlencode(params)
	
	url = 'https://ws.audioscrobbler.com/2.0/'
	data = None
	if not post:
		url += '?'+params
	else:
		data = params.encode('utf-8')
	
	try:
		req = urlopen(url, data)
	except HTTPError as e:
		req = e
	res = json.loads(req.read())

	if 'error' in res:
		raise APIException(method, res['error'], res['message'])

	print(res)
	return res

def put_scrobbles(sk: str, scrobbles: List[Scrobble]):
	params = {'sk':sk}
	for i,scrobble in enumerate(scrobbles):
		params[f'timestamp[{i}]'] = str(scrobble.when)
		params[f'artist[{i}]'] = scrobble.artist
		params[f'albumArtist[{i}]'] = scrobble.artist
		params[f'albumArtist[{i}]'] = scrobble.artist
		params[f'album[{i}]'] = scrobble.album
		params[f'track[{i}]'] = scrobble.track
	
	api_call('track.scrobble', post=True, **params)


### BOOT ###

if len(argv) < 1:
	usage()
argv0 = argv[0]
argv = argv[1:]
if len(argv) != 2:
	usage()

config = ConfigParser(strict=False, interpolation=None)
try:
	config.read_file(open(argv[0]))
except Exception as e:
	die(f"can't read {argv[0]}: {e}")
config = Config(config)

try:
	logfile = open(argv[1]).readlines()
except Exception as e:
	die(f"can't read {argv[1]}: {e}")

print(config)


logfile = split_logfile(config, logfile)
logfile = filter_logfile(config, logfile)
scrobbles = scrobblify(logfile)

token = api_call('auth.getToken')['token']
while True:
	print("open the link and log in, then press enter")
	print(f'https://www.last.fm/api/auth/?api_key={API_KEY}&token={token}')
	input()
	try:
		session = api_call('auth.getSession', token=token)
		break
	except APIException as e:
		print('the server sent back an error. try again?\n')

sk = session['session']['key']

acc = []
for scrobble in scrobbles:
	acc.append(scrobble)
	if len(acc) < 50:
		continue
	put_scrobbles(sk, acc)
	acc = []
	print("sent 50 scrobbles...")
	sleep(5)

if len(acc) > 0:
	put_scrobbles(sk, acc)
	print(f"sent {len(acc)} scrobbes...")

print("done!")