172 lines
5.4 KiB
Python
172 lines
5.4 KiB
Python
import itertools
|
|
import os
|
|
import re
|
|
from enum import Enum
|
|
from pprint import pprint
|
|
from urllib.parse import urlparse
|
|
import json
|
|
|
|
import pandas as pd
|
|
|
|
import spotipy
|
|
from spotipy.oauth2 import SpotifyOAuth
|
|
|
|
from config import *
|
|
|
|
class MusicSource(Enum):
|
|
SPOTIFY = 'spotify'
|
|
YOUTUBE = 'youtube'
|
|
SOUNDCLOUD = 'soundcloud'
|
|
BANDCAMP = 'bandcamp'
|
|
OTHER = 'other'
|
|
|
|
os.environ['SPOTIPY_CLIENT_ID'] = SPOTIPY_CLIENT_ID
|
|
os.environ['SPOTIPY_CLIENT_SECRET'] = SPOTIPY_CLIENT_SECRET
|
|
|
|
SERVICES = {
|
|
'open.spotify.com': MusicSource.SPOTIFY,
|
|
'youtu.be': MusicSource.YOUTUBE,
|
|
'www.youtube.com': MusicSource.YOUTUBE,
|
|
'soundcloud.app.goo.gl': MusicSource.SOUNDCLOUD,
|
|
'on.soundcloud.com': MusicSource.SOUNDCLOUD,
|
|
'm.soundcloud.com': MusicSource.SOUNDCLOUD,
|
|
'soundcloud.com': MusicSource.SOUNDCLOUD
|
|
}
|
|
|
|
class Link:
|
|
def __init__(self, link, reply_to_message_id):
|
|
self.link = link
|
|
self.reply_to_message_id = reply_to_message_id
|
|
self._source = None
|
|
|
|
def __str__(self):
|
|
return f"Source: {self.source()}, Link: {self.link}, Reply to Message ID: {self.reply_to_message_id}"
|
|
|
|
def __repr__(self):
|
|
return f"Link(link={repr(self.link)}, reply_to_message_id={repr(self.reply_to_message_id)})"
|
|
|
|
def source(self):
|
|
if self._source is None:
|
|
o = urlparse(self.link)
|
|
|
|
if re.match(r'([A-Za-z0-9\-]*\.)?bandcamp.com', o.hostname):
|
|
self._source = MusicSource.BANDCAMP
|
|
else:
|
|
self._source = SERVICES.get(o.hostname, MusicSource.OTHER)
|
|
|
|
return self._source
|
|
|
|
|
|
@staticmethod
|
|
def filter_links(links, music_source=None, reply_to_message_id=None):
|
|
filtered_links = links
|
|
|
|
if music_source is not None:
|
|
filtered_links = [link for link in filtered_links if link.source() == music_source]
|
|
|
|
if reply_to_message_id is not None:
|
|
filtered_links = [link for link in filtered_links if link.reply_to_message_id == reply_to_message_id]
|
|
|
|
return filtered_links
|
|
|
|
def _split_seq(iterable, size):
|
|
it = iter(iterable)
|
|
item = list(itertools.islice(it, size))
|
|
while item:
|
|
yield item
|
|
item = list(itertools.islice(it, size))
|
|
|
|
def print_filtered_messages(filtered_messages):
|
|
# Print the filtered messages
|
|
for message in filtered_messages:
|
|
reply_to_message_id = message["reply_to_message_id"]
|
|
link = message["link"]
|
|
print("Reply to Message ID:", reply_to_message_id)
|
|
print("Link:", link)
|
|
print()
|
|
|
|
print(len(filtered_messages))
|
|
|
|
def extract_all_links(file_path):
|
|
# Load the JSON file
|
|
with open(file_path, "r", encoding="utf-8") as file:
|
|
data = json.load(file)
|
|
|
|
# Filter messages of type "message" and extract relevant information
|
|
filtered_messages = []
|
|
for message in data["messages"]:
|
|
if message["type"] == "message":
|
|
text_entries = message.get("text", [])
|
|
for text_entry in text_entries:
|
|
if isinstance(text_entry, dict) and text_entry.get("type") == "link":
|
|
reply_to_message_id = message.get("reply_to_message_id")
|
|
link = text_entry.get("text")
|
|
filtered_messages.append({"reply_to_message_id": reply_to_message_id, "link": link})
|
|
|
|
return filtered_messages
|
|
|
|
|
|
def update_spotify_from_export(messages):
|
|
links_w_source = [Link(link=row['link'], reply_to_message_id=row['reply_to_message_id'])
|
|
for row in messages]
|
|
|
|
for l in links_w_source:
|
|
print(l)
|
|
|
|
spotify_links = []
|
|
#yt_links = []
|
|
#soundcloud_links = []
|
|
#bandcamp = []
|
|
#other_links = []
|
|
|
|
# for i in links_w_source:
|
|
# if i.source() == MusicSource.SPOTIFY:
|
|
# spotify_links.append(i.link)
|
|
# elif i.source() == MusicSource.YOUTUBE:
|
|
# yt_links.append(i.link)
|
|
# elif i.source() == MusicSource.SOUNDCLOUD:
|
|
# soundcloud_links.append(i.link)
|
|
# elif i.source() == MusicSource.BANDCAMP:
|
|
# bandcamp.append(i.link)
|
|
# else:
|
|
# other_links.append(i.link)
|
|
|
|
spotify_links = [l.link for l in filter(lambda x: x.source() == MusicSource.SPOTIFY and x.reply_to_message_id is None, links_w_source)]
|
|
|
|
#print(spotify_links)
|
|
#print(yt_links)
|
|
#print(soundcloud_links)
|
|
#print(bandcamp)
|
|
#print(other_links)
|
|
|
|
print(f'Spotify tracks: {len(spotify_links)}')
|
|
|
|
# clean playlist itself from spotify links
|
|
spotify_links = [s for s in spotify_links if not s.startswith('https://open.spotify.com/playlist/')]
|
|
|
|
# support for links with language codes
|
|
spotify_links = [s.split("/intl-")[0] + "/track" + s.split("/track")[1] if ("open.spotify.com/intl-" in s and "track" in s) else s for s in spotify_links]
|
|
|
|
|
|
print(sorted(spotify_links))
|
|
|
|
scope = "playlist-modify-private"
|
|
os.environ['SPOTIPY_REDIRECT_URI'] = 'http://127.0.0.1:9090'
|
|
|
|
sp = spotipy.Spotify(auth_manager=SpotifyOAuth(scope=scope))
|
|
sp.playlist(SPOTIFY_PLAYLIST_ID)
|
|
|
|
|
|
# paginated update of spotify playlist
|
|
for i, sublist in enumerate(_split_seq(spotify_links, 100)):
|
|
if i==0:
|
|
sp.playlist_replace_items(SPOTIFY_PLAYLIST_ID, sublist)
|
|
else:
|
|
sp.playlist_add_items(SPOTIFY_PLAYLIST_ID, sublist)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
filtered_messages = extract_all_links("ChatExport_2023-06-28/result.json")
|
|
update_spotify_from_export(filtered_messages)
|
|
|