Graphs and tables for your Spotify account.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

288 lines
8.9 KiB

# imports {{{ #
import math
import random
import requests
import os
import urllib
import secrets
import pprint
import string
from datetime import datetime
from django.shortcuts import render, redirect
from django.http import HttpResponse, HttpResponseBadRequest, JsonResponse
from django.db.models import Count, Q
from .utils import parse_library, get_artists_in_genre, update_track_genres
from .models import User, Track, AudioFeatures, Artist
# }}} imports #
TIME_FORMAT = '%Y-%m-%d-%H-%M-%S'
TRACKS_TO_QUERY = 200
# generate_random_string {{{ #
def generate_random_string(length):
"""Generates a random string of a certain length
Args:
length: the desired length of the randomized string
Returns:
A random string
"""
all_chars = string.ascii_letters + string.digits
rand_str = "".join(random.choice(all_chars) for _ in range(length))
return rand_str
# }}} generate_random_string #
# token_expired {{{ #
def token_expired(token_obtained_at, valid_for):
"""Returns True if token expired, False if otherwise
Args:
token_obtained_at: datetime object representing the date and time when the token was obtained
valid_for: the time duration for which the token is valid, in seconds
"""
time_elapsed = (datetime.today() - token_obtained_at).total_seconds()
return time_elapsed >= valid_for
# }}} token_expired #
# index {{{ #
# Create your views here.
def index(request):
return render(request, 'spotifyvis/index.html')
# }}} index #
# login {{{ #
# uses Authorization Code flow
def login(request):
# use a randomly generated state string to prevent cross-site request forgery attacks
state_str = generate_random_string(16)
request.session['state_string'] = state_str
payload = {
'client_id': os.environ['SPOTIFY_CLIENT_ID'],
'response_type': 'code',
'redirect_uri': 'http://localhost:8000/callback',
'state': state_str,
'scope': 'user-library-read',
'show_dialog': False
}
params = urllib.parse.urlencode(payload) # turn the payload dict into a query string
authorize_url = "https://accounts.spotify.com/authorize/?{}".format(params)
return redirect(authorize_url)
# }}} login #
# callback {{{ #
def callback(request):
# Attempt to retrieve the authorization code from the query string
try:
code = request.GET['code']
except KeyError:
return HttpResponseBadRequest("<h1>Problem with login</h1>")
payload = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': 'http://localhost:8000/callback',
'client_id': os.environ['SPOTIFY_CLIENT_ID'],
'client_secret': os.environ['SPOTIFY_CLIENT_SECRET'],
}
response = requests.post('https://accounts.spotify.com/api/token', data=payload).json()
# despite its name, datetime.today() returns a datetime object, not a date object
# use datetime.strptime() to get a datetime object from a string
request.session['token_obtained_at'] = datetime.strftime(datetime.today(), TIME_FORMAT)
request.session['access_token'] = response['access_token']
request.session['refresh_token'] = response['refresh_token']
request.session['valid_for'] = response['expires_in']
# print(response)
return redirect('user_data')
# }}} callback #
# user_data {{{ #
def user_data(request):
# get user token {{{ #
token_obtained_at = datetime.strptime(request.session['token_obtained_at'], TIME_FORMAT)
valid_for = int(request.session['valid_for'])
if token_expired(token_obtained_at, valid_for):
req_body = {
'grant_type': 'refresh_token',
'refresh_token': request.session['refresh_token'],
'client_id': os.environ['SPOTIFY_CLIENT_ID'],
'client_secret': os.environ['SPOTIFY_CLIENT_SECRET']
}
refresh_token_response = requests.post('https://accounts.spotify.com/api/token', data=req_body).json()
request.session['access_token'] = refresh_token_response['access_token']
request.session['valid_for'] = refresh_token_response['expires_in']
# }}} get user token #
auth_token_str = "Bearer " + request.session['access_token']
headers = {
'Authorization': auth_token_str
}
user_data_response = requests.get('https://api.spotify.com/v1/me', headers = headers).json()
# store the user_id so it may be used to create model
request.session['user_id'] = user_data_response['id']
# create user obj {{{ #
try:
user = User.objects.get(user_id=user_data_response['id'])
except User.DoesNotExist:
# Python docs recommends 32 bytes of randomness against brute force attacks
user = User(user_id=user_data_response['id'], user_secret=secrets.token_urlsafe(32))
request.session['user_secret'] = user.user_secret
user.save()
# }}} create user obj #
context = {
'user_id': user.user_id,
'user_secret': user.user_secret,
}
parse_library(headers, TRACKS_TO_QUERY, user)
return render(request, 'spotifyvis/logged_in.html', context)
# }}} user_data #
def admin_graphs(request):
"""TODO
"""
user_id = "polarbier"
# user_id = "chrisshyi13"
user_obj = User.objects.get(user_id=user_id)
context = {
'user_id': user_id,
'user_secret': user_obj.user_secret,
}
update_track_genres(user_obj)
return render(request, 'spotifyvis/logged_in.html', context)
def artist_data(request, user_secret):
"""Renders the artist data graph display page
:param request: the HTTP request
:param user_secret: the user secret used for identification
:return: render the artist data graph display page
"""
user = User.objects.get(user_secret=user_secret)
context = {
'user_id': user.user_id,
'user_secret': user_secret,
}
return render(request, "spotifyvis/artist_graph.html", context)
# get_artist_data {{{ #
def get_artist_data(request, user_secret):
"""Returns artist data as a JSON serialized list of dictionaries
The (key, value) pairs are (artist name, song count for said artist)
:param request: the HTTP request
:param user_secret: the user secret used for identification
:return: a JsonResponse
"""
user = User.objects.get(user_secret=user_secret)
artist_counts = Artist.objects.annotate(num_songs=Count('track',
filter=Q(track__users=user)))
processed_artist_counts = [{'name': artist.name,
'num_songs': artist.num_songs} for artist in artist_counts]
return JsonResponse(data=processed_artist_counts, safe=False)
# }}} get_artist_data #
def display_genre_graph(request, user_secret):
user = User.objects.get(user_secret=user_secret)
context = {
'user_secret': user_secret,
}
return render(request, "spotifyvis/genre_graph.html", context)
def audio_features(request, user_secret):
"""Renders the audio features page
:param request: the HTTP request
:param user_secret: user secret used for identification
:return: renders the audio features page
"""
user = User.objects.get(user_secret=user_secret)
context = {
'user_id': user.user_id,
'user_secret': user_secret,
}
return render(request, "spotifyvis/audio_features.html", context)
# get_audio_feature_data {{{ #
def get_audio_feature_data(request, audio_feature, user_secret):
"""Returns all data points for a given audio feature
Args:
request: the HTTP request
audio_feature: The audio feature to be queried
user_secret: client secret, used to identify the user
"""
user = User.objects.get(user_secret=user_secret)
user_tracks = Track.objects.filter(users=user)
response_payload = {
'data_points': [],
}
for track in user_tracks:
try:
audio_feature_obj = AudioFeatures.objects.get(track=track)
response_payload['data_points'].append(getattr(audio_feature_obj, audio_feature))
except AudioFeatures.DoesNotExist:
continue
return JsonResponse(response_payload)
# }}} get_audio_feature_data #
# get_genre_data {{{ #
def get_genre_data(request, user_secret):
"""Return genre data needed to create the graph user.
TODO
"""
user = User.objects.get(user_secret=user_secret)
genre_counts = (Track.objects.filter(users__exact=user)
.values('genre')
.order_by('genre')
.annotate(num_songs=Count('genre'))
)
for genre_dict in genre_counts:
genre_dict['artists'] = get_artists_in_genre(user, genre_dict['genre'],
genre_dict['num_songs'])
print("*** Genre Breakdown ***")
pprint.pprint(list(genre_counts))
return JsonResponse(data=list(genre_counts), safe=False)
# }}} get_genre_data #