geradedenken_mukke/main.py

107 lines
3.0 KiB
Python

import itertools
import os
import re
from enum import Enum
from pprint import pprint
from urllib.parse import urlparse
import pandas as pd
import spotipy
from spotipy.oauth2 import SpotifyOAuth
from config import *
class MusicSource(Enum):
SPOTIFY = 'spotify'
YOUTUBE = 'youtube'
SOUNDCLOUD = 'soundcloud'
BANDCAMP = 'bandcamp'
OTHER = 'other'
os.environ['SPOTIPY_CLIENT_ID'] = SPOTIPY_CLIENT_ID
os.environ['SPOTIPY_CLIENT_SECRET'] = SPOTIPY_CLIENT_SECRET
SERVICES = {
'open.spotify.com': MusicSource.SPOTIFY,
'youtu.be': MusicSource.YOUTUBE,
'www.youtube.com': MusicSource.YOUTUBE,
'soundcloud.app.goo.gl': MusicSource.SOUNDCLOUD,
'on.soundcloud.com': MusicSource.SOUNDCLOUD,
'm.soundcloud.com': MusicSource.SOUNDCLOUD,
'soundcloud.com': MusicSource.SOUNDCLOUD
}
def _split_seq(iterable, size):
it = iter(iterable)
item = list(itertools.islice(it, size))
while item:
yield item
item = list(itertools.islice(it, size))
def echo(link):
o = urlparse(link)
if re.match(r'([A-Za-z0-9\-]*\.)?bandcamp.com', o.hostname):
return {'source': MusicSource.BANDCAMP, 'link': link}
return {'source': SERVICES.get(o.hostname, MusicSource.OTHER), 'link': link}
def update_spotify_from_export():
df = pd.read_json("ChatExport_2022-09-18/result.json")
df1 = pd.json_normalize(df.messages)
reduced = df1[df1['type'] == 'message'][['id', 'type', 'text', 'from', 'from_id']]
get_links = pd.json_normalize(reduced.explode('text').text)
links = get_links[get_links['type'] == 'link']['text'].to_list()
links_w_source = [echo(l) for l in links]
pprint(links_w_source)
spotify_links = []
yt_links = []
soundcloud_links = []
bandcamp = []
other_links = []
for i in links_w_source:
if i['source'] == MusicSource.SPOTIFY:
spotify_links.append(i['link'])
elif i['source'] == MusicSource.YOUTUBE:
yt_links.append(i['link'])
elif i['source'] == MusicSource.SOUNDCLOUD:
soundcloud_links.append(i['link'])
elif i['source'] == MusicSource.BANDCAMP:
bandcamp.append(i['link'])
else:
other_links.append(i['link'])
#print(spotify_links)
#print(yt_links)
#print(soundcloud_links)
#print(bandcamp)
#print(other_links)
print(f'Spotify tracks: {len(spotify_links)}')
scope = "playlist-modify-private"
os.environ['SPOTIPY_REDIRECT_URI'] = 'https://example.com/callback'
sp = spotipy.Spotify(auth_manager=SpotifyOAuth(scope=scope))
sp.playlist(SPOTIFY_PLAYLIST_ID)
# clean playlist itself from spotify links
spotify_links = [s for s in spotify_links if not s.startswith('https://open.spotify.com/playlist/')]
# paginated update of spotify playlist
for i, sublist in enumerate(_split_seq(spotify_links, 100)):
if i==0:
sp.playlist_replace_items(SPOTIFY_PLAYLIST_ID, sublist)
else:
sp.playlist_add_items(SPOTIFY_PLAYLIST_ID, sublist)
if __name__ == '__main__':
update_spotify_from_export()