You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
259 lines
7.9 KiB
259 lines
7.9 KiB
# imports {{{ #
|
|
|
|
import math
|
|
import random
|
|
import requests
|
|
import os
|
|
import urllib
|
|
import secrets
|
|
import pprint
|
|
import string
|
|
from datetime import datetime
|
|
|
|
from django.shortcuts import render, redirect
|
|
from django.http import HttpResponse, HttpResponseBadRequest, JsonResponse
|
|
from django.db.models import Count, Q
|
|
from .utils import parse_library, get_artists_in_genre, update_track_genres
|
|
from .models import User, Track, AudioFeatures, Artist
|
|
|
|
# }}} imports #
|
|
|
|
TIME_FORMAT = '%Y-%m-%d-%H-%M-%S'
|
|
TRACKS_TO_QUERY = 200
|
|
|
|
# generate_random_string {{{ #
|
|
|
|
|
|
def generate_random_string(length):
|
|
"""Generates a random string of a certain length
|
|
|
|
Args:
|
|
length: the desired length of the randomized string
|
|
|
|
Returns:
|
|
A random string
|
|
"""
|
|
all_chars = string.ascii_letters + string.digits
|
|
rand_str = "".join(random.choice(all_chars) for _ in range(length))
|
|
|
|
return rand_str
|
|
|
|
# }}} generate_random_string #
|
|
|
|
# token_expired {{{ #
|
|
|
|
def token_expired(token_obtained_at, valid_for):
|
|
"""Returns True if token expired, False if otherwise
|
|
|
|
Args:
|
|
token_obtained_at: datetime object representing the date and time when the token was obtained
|
|
valid_for: the time duration for which the token is valid, in seconds
|
|
"""
|
|
time_elapsed = (datetime.today() - token_obtained_at).total_seconds()
|
|
return time_elapsed >= valid_for
|
|
|
|
# }}} token_expired #
|
|
|
|
# index {{{ #
|
|
|
|
# Create your views here.
|
|
def index(request):
|
|
return render(request, 'spotifyvis/index.html')
|
|
|
|
# }}} index #
|
|
|
|
# login {{{ #
|
|
|
|
# uses Authorization Code flow
|
|
def login(request):
|
|
# use a randomly generated state string to prevent cross-site request forgery attacks
|
|
state_str = generate_random_string(16)
|
|
request.session['state_string'] = state_str
|
|
|
|
payload = {
|
|
'client_id': os.environ['SPOTIFY_CLIENT_ID'],
|
|
'response_type': 'code',
|
|
'redirect_uri': 'http://localhost:8000/callback',
|
|
'state': state_str,
|
|
'scope': 'user-library-read',
|
|
'show_dialog': False
|
|
}
|
|
|
|
params = urllib.parse.urlencode(payload) # turn the payload dict into a query string
|
|
authorize_url = "https://accounts.spotify.com/authorize/?{}".format(params)
|
|
return redirect(authorize_url)
|
|
|
|
# }}} login #
|
|
|
|
# callback {{{ #
|
|
|
|
def callback(request):
|
|
# Attempt to retrieve the authorization code from the query string
|
|
try:
|
|
code = request.GET['code']
|
|
except KeyError:
|
|
return HttpResponseBadRequest("<h1>Problem with login</h1>")
|
|
|
|
payload = {
|
|
'grant_type': 'authorization_code',
|
|
'code': code,
|
|
'redirect_uri': 'http://localhost:8000/callback',
|
|
'client_id': os.environ['SPOTIFY_CLIENT_ID'],
|
|
'client_secret': os.environ['SPOTIFY_CLIENT_SECRET'],
|
|
}
|
|
|
|
response = requests.post('https://accounts.spotify.com/api/token', data=payload).json()
|
|
# despite its name, datetime.today() returns a datetime object, not a date object
|
|
# use datetime.strptime() to get a datetime object from a string
|
|
request.session['token_obtained_at'] = datetime.strftime(datetime.today(), TIME_FORMAT)
|
|
request.session['access_token'] = response['access_token']
|
|
request.session['refresh_token'] = response['refresh_token']
|
|
request.session['valid_for'] = response['expires_in']
|
|
# print(response)
|
|
|
|
return redirect('user_data')
|
|
|
|
# }}} callback #
|
|
|
|
# user_data {{{ #
|
|
|
|
|
|
def user_data(request):
|
|
|
|
# get user token {{{ #
|
|
|
|
token_obtained_at = datetime.strptime(request.session['token_obtained_at'], TIME_FORMAT)
|
|
valid_for = int(request.session['valid_for'])
|
|
|
|
if token_expired(token_obtained_at, valid_for):
|
|
req_body = {
|
|
'grant_type': 'refresh_token',
|
|
'refresh_token': request.session['refresh_token'],
|
|
'client_id': os.environ['SPOTIFY_CLIENT_ID'],
|
|
'client_secret': os.environ['SPOTIFY_CLIENT_SECRET']
|
|
}
|
|
|
|
refresh_token_response = requests.post('https://accounts.spotify.com/api/token', data=req_body).json()
|
|
request.session['access_token'] = refresh_token_response['access_token']
|
|
request.session['valid_for'] = refresh_token_response['expires_in']
|
|
|
|
# }}} get user token #
|
|
|
|
auth_token_str = "Bearer " + request.session['access_token']
|
|
headers = {
|
|
'Authorization': auth_token_str
|
|
}
|
|
|
|
user_data_response = requests.get('https://api.spotify.com/v1/me', headers = headers).json()
|
|
# store the user_id so it may be used to create model
|
|
request.session['user_id'] = user_data_response['id']
|
|
|
|
# create user obj {{{ #
|
|
|
|
try:
|
|
user = User.objects.get(user_id=user_data_response['id'])
|
|
except User.DoesNotExist:
|
|
# Python docs recommends 32 bytes of randomness against brute force attacks
|
|
user = User(user_id=user_data_response['id'], user_secret=secrets.token_urlsafe(32))
|
|
request.session['user_secret'] = user.user_secret
|
|
user.save()
|
|
|
|
# }}} create user obj #
|
|
|
|
context = {
|
|
'user_id': user.user_id,
|
|
'user_secret': user.user_secret,
|
|
}
|
|
|
|
parse_library(headers, TRACKS_TO_QUERY, user)
|
|
return render(request, 'spotifyvis/logged_in.html', context)
|
|
|
|
# }}} user_data #
|
|
|
|
def admin_graphs(request):
|
|
"""TODO
|
|
"""
|
|
user_id = "polarbier"
|
|
# user_id = "chrisshyi13"
|
|
user_obj = User.objects.get(user_id=user_id)
|
|
context = {
|
|
'user_id': user_id,
|
|
'user_secret': user_obj.user_secret,
|
|
}
|
|
update_track_genres(user_obj)
|
|
return render(request, 'spotifyvis/logged_in.html', context)
|
|
|
|
# get_artist_data {{{ #
|
|
|
|
def get_artist_data(request, user_secret):
|
|
"""TODO
|
|
"""
|
|
user = User.objects.get(user_id=user_secret)
|
|
artist_counts = Artist.objects.annotate(num_songs=Count('track',
|
|
filter=Q(track__users=user)))
|
|
processed_artist_counts = [{'name': artist.name,
|
|
'num_songs': artist.num_songs} for artist in artist_counts]
|
|
return JsonResponse(data=processed_artist_counts, safe=False)
|
|
|
|
# }}} get_artist_data #
|
|
|
|
def display_genre_graph(request, client_secret):
|
|
user = User.objects.get(user_secret=client_secret)
|
|
context = {
|
|
'user_secret': client_secret,
|
|
}
|
|
return render(request, "spotifyvis/genre_graph.html", context)
|
|
|
|
def audio_features(request, client_secret):
|
|
user = User.objects.get(user_secret=client_secret)
|
|
context = {
|
|
'user_id': user.user_id,
|
|
'user_secret': client_secret,
|
|
}
|
|
return render(request, "spotifyvis/audio_features.html", context)
|
|
|
|
# get_audio_feature_data {{{ #
|
|
|
|
def get_audio_feature_data(request, audio_feature, client_secret):
|
|
"""Returns all data points for a given audio feature
|
|
|
|
Args:
|
|
request: the HTTP request
|
|
audio_feature: The audio feature to be queried
|
|
client_secret: client secret, used to identify the user
|
|
"""
|
|
user = User.objects.get(user_secret=client_secret)
|
|
user_tracks = Track.objects.filter(users=user)
|
|
response_payload = {
|
|
'data_points': [],
|
|
}
|
|
for track in user_tracks:
|
|
try:
|
|
audio_feature_obj = AudioFeatures.objects.get(track=track)
|
|
response_payload['data_points'].append(getattr(audio_feature_obj, audio_feature))
|
|
except AudioFeatures.DoesNotExist:
|
|
continue
|
|
return JsonResponse(response_payload)
|
|
|
|
# }}} get_audio_feature_data #
|
|
|
|
# get_genre_data {{{ #
|
|
|
|
def get_genre_data(request, user_secret):
|
|
"""Return genre data needed to create the graph user.
|
|
TODO
|
|
"""
|
|
user = User.objects.get(user_secret=user_secret)
|
|
genre_counts = (Track.objects.filter(users__exact=user)
|
|
.values('genre')
|
|
.order_by('genre')
|
|
.annotate(num_songs=Count('genre'))
|
|
)
|
|
for genre_dict in genre_counts:
|
|
genre_dict['artists'] = get_artists_in_genre(user, genre_dict['genre'],
|
|
genre_dict['num_songs'])
|
|
print("*** Genre Breakdown ***")
|
|
pprint.pprint(list(genre_counts))
|
|
return JsonResponse(data=list(genre_counts), safe=False)
|
|
|
|
# }}} get_genre_data #
|