~aleteoryx/bcdl

bcdl/playback2scrob.py -rwxr-xr-x 5.4 KiB
52d35072Aleteoryx use download_type_str instead 24 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
#!/bin/env python

doc = '''
scrobble .rockbox/playback.log, handling exclusions and such.
this script assumes filenames of the form '$root/<artist>/<album>? (Disc <#>)?/[Disk # - ]<#> - <name>.<ext>'

config is INI-format. it supports the following parameters:
	[pb2s]
		roots - comma-separated list of valid directories for music
		offset - time-zone offset. e.g. -5 for EST
	
'''

API_KEY = '67c56bce3d1ae7d5574b7d51028e8db0'
SECRET = '8c19d08c8ad2a4c7b490dea2b53876a0' # i guess i have to make this public? WTF.

from sys import stderr, argv, exit

argv0 = 'playback2scrob'

def die(why):
	print('fatal:', why, file=stderr)
	exit(-1)

def usage():
	global argv0, doc
	print(f'usage: {argv0} <CONFIG> <LOGFILE>')
	print()
	print(doc)
	exit(-1)


### ARGS ###

if len(argv) < 1:
	usage()
argv0 = argv[0]
argv = argv[1:]
if len(argv) != 2:
	usage()

# this is deferred so that the user doesn't need to wait
# for imports when they make a mistake
from configparser import ConfigParser
from dataclasses import dataclass
from typing import Optional, Iterator, List, Set
import re
import json
from hashlib import md5
from urllib.request import urlopen
from urllib.parse import urlencode
from urllib.error import HTTPError
from time import sleep, time


### CONFIG ###

@dataclass(init=False)
class Config:
	roots: Set[str]
	offset: int
	
	def __init__(self, config: ConfigParser):
		try:
			self.roots = config.get('pb2s', 'roots', fallback='/')
			self.roots = set([x.strip() for x in self.roots.split(',')])
			
			self.offset = config.getint('pb2s', 'offset', fallback=0)*3600
			
		except Exception as e:
			die(f'config invalid: {e}')


### PARSING ###

@dataclass
class RawLogEntry:
	when: int
	what: str
	elapsed: int
	duration: int

@dataclass
class Scrobble:
	when: int
	track: str
	artist: str
	album: Optional[str]

def split_logfile(config: Config, lines: Iterator[str]) -> Iterator[RawLogEntry]:
	for line in lines:
		line = line.strip()
		if line[0] in '#':
			continue

		when,elapsed,duration,what = line.split(':', 3)
		yield RawLogEntry(int(when)-config.offset, what, int(elapsed), int(duration))

def filter_logfile(config: Config, entries: Iterator[RawLogEntry]) -> Iterator[RawLogEntry]:
	for entry in entries:
		if entry.elapsed < 30_000:
			continue
		if entry.elapsed < entry.duration // 2:
			continue
		if (entry.when + (14 * 86_400)) < time(): # 2 weeks
			continue
		for root in config.roots:
			if entry.what.startswith(root):
				entry.what = entry.what[len(root):]
				if entry.what[0] == '/':
					entry.what = entry.what[1:]
				break
		else:
			continue
		
		yield entry

def scrobblify(entries: Iterator[RawLogEntry]) -> Iterator[Scrobble]:
	for entry in entries:
		segs = entry.what.split('/')
		if len(segs) < 3:
			continue
		
		artist, album, track = segs[-3:]
		
		if (m := re.search(' ?\\(Disc \\d+\\)$', album)) is not None:
			album = album[:m.start()]
		
		if (m := re.match('(Disc \\d+ - )?\\d+ - ', track)) is not None:
			track = track[m.end():]
		track = track.rsplit('.', 1)[0]
		
		yield Scrobble(entry.when, track, artist, album)


### FUCKING LAST.FM ###

class APIException(Exception):
	def __init__(self, method, code, msg):
		super().__init__(f'got error {code} in {method}: {msg}')

def api_call(method: str, *, post=False, **params):
	global API_KEY, SECRET
	params['api_key'] = API_KEY
	params['method'] = method

	keys = sorted(params.keys(), key=lambda k: k.encode('utf-8'))
	sig = md5()
	for k in keys:
		sig.update(k.encode('utf-8'))
		sig.update(params[k].encode('utf-8'))
	sig.update(SECRET.encode('utf-8'))

	params['api_sig'] = sig.hexdigest()
	params['format'] = 'json'
	
	params = urlencode(params)
	
	url = 'https://ws.audioscrobbler.com/2.0/'
	data = None
	if not post:
		url += '?'+params
	else:
		data = params.encode('utf-8')
	
	try:
		req = urlopen(url, data)
	except HTTPError as e:
		req = e
	res = json.loads(req.read())

	if 'error' in res:
		raise APIException(method, res['error'], res['message'])

	return res

def put_scrobbles(sk: str, scrobbles: List[Scrobble]):
	params = {'sk':sk}
	for i,scrobble in enumerate(scrobbles):
		params[f'timestamp[{i}]'] = str(scrobble.when)
		params[f'artist[{i}]'] = scrobble.artist
		params[f'albumArtist[{i}]'] = scrobble.artist
		params[f'albumArtist[{i}]'] = scrobble.artist
		params[f'album[{i}]'] = scrobble.album
		params[f'track[{i}]'] = scrobble.track
	
	api_call('track.scrobble', post=True, **params)


### BOOT ###

config = ConfigParser(strict=False, interpolation=None)
try:
	config.read_file(open(argv[0]))
except Exception as e:
	die(f"can't read {argv[0]}: {e}")
config = Config(config)

try:
	logfile = open(argv[1]).readlines()
except Exception as e:
	die(f"can't read {argv[1]}: {e}")



logfile = split_logfile(config, logfile)
logfile = filter_logfile(config, logfile)
scrobbles = scrobblify(logfile)

token = api_call('auth.getToken')['token']
while True:
	print("open the link and log in, then press enter")
	print(f'https://www.last.fm/api/auth/?api_key={API_KEY}&token={token}')
	input()
	try:
		session = api_call('auth.getSession', token=token)
		break
	except APIException as e:
		print('the server sent back an error. try again?\n')

sk = session['session']['key']

scrobbles = [*scrobbles]
fulllen = len(scrobbles)
count = 0
while len(scrobbles) > 0:
	batch = scrobbles[:50]
	scrobbles = scrobbles[50:]
	put_scrobbles(sk, batch)
	count += len(batch)
	print(f"sent {count}/{fulllen} scrobbles...", end="\r")
	sleep(5)

print()
print("done!")