from django.shortcuts import render, redirect from django.http import HttpResponse, HttpResponseBadRequest import math import random import requests import os import urllib import json import pprint from datetime import datetime TIME_FORMAT = '%Y-%m-%d-%H-%M-%S' library_stats = {"audio_features":{}, "genres":{}, "year_released":{}, "artists":{}, "num_songs":0, "popularity":[], "total_runtime":0} # generate_random_string {{{ # def generate_random_string(length): """Generates a random string of a certain length Args: length: the desired length of the randomized string Returns: A random string """ rand_str = "" possible_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" for _ in range(length): rand_str += possible_chars[random.randint(0, len(possible_chars) - 1)] return rand_str # }}} generate_random_string # # token_expired {{{ # def token_expired(token_obtained_at, valid_for): """Returns True if token expired, False if otherwise Args: token_obtained_at: datetime object representing the date and time when the token was obtained valid_for: the time duration for which the token is valid, in seconds """ time_elapsed = (datetime.today() - token_obtained_at).total_seconds() return time_elapsed >= valid_for # }}} token_expired # # index {{{ # # Create your views here. def index(request): return render(request, 'spotifyvis/index.html') # }}} index # # login {{{ # def login(request): # use a randomly generated state string to prevent cross-site request forgery attacks state_str = generate_random_string(16) request.session['state_string'] = state_str payload = { 'client_id': os.environ['SPOTIFY_CLIENT_ID'], 'response_type': 'code', 'redirect_uri': 'http://localhost:8000/callback', 'state': state_str, 'scope': 'user-library-read', 'show_dialog': False } params = urllib.parse.urlencode(payload) # turn the payload dict into a query string authorize_url = "https://accounts.spotify.com/authorize/?{}".format(params) return redirect(authorize_url) # }}} login # # callback {{{ # def callback(request): # Attempt to retrieve the authorization code from the query string try: code = request.GET['code'] except KeyError: return HttpResponseBadRequest("

Problem with login

") payload = { 'grant_type': 'authorization_code', 'code': code, 'redirect_uri': 'http://localhost:8000/callback', 'client_id': os.environ['SPOTIFY_CLIENT_ID'], 'client_secret': os.environ['SPOTIFY_CLIENT_SECRET'], } response = requests.post('https://accounts.spotify.com/api/token', data = payload).json() # despite its name, datetime.today() returns a datetime object, not a date object # use datetime.strptime() to get a datetime object from a string request.session['token_obtained_at'] = datetime.strftime(datetime.today(), TIME_FORMAT) request.session['access_token'] = response['access_token'] request.session['refresh_token'] = response['refresh_token'] request.session['valid_for'] = response['expires_in'] # print(response) return redirect('user_data') # }}} callback # # user_data {{{ # def user_data(request): token_obtained_at = datetime.strptime(request.session['token_obtained_at'], TIME_FORMAT) valid_for = int(request.session['valid_for']) if token_expired(token_obtained_at, valid_for): req_body = { 'grant_type': 'refresh_token', 'refresh_token': request.session['refresh_token'], 'client_id': os.environ['SPOTIFY_CLIENT_ID'], 'client_secret': os.environ['SPOTIFY_CLIENT_SECRET'] } refresh_token_response = requests.post('https://accounts.spotify.com/api/token', data = req_body).json() request.session['access_token'] = refresh_token_response['access_token'] request.session['valid_for'] = refresh_token_response['expires_in'] auth_token_str = "Bearer " + request.session['access_token'] headers = { 'Authorization': auth_token_str } user_data_response = requests.get('https://api.spotify.com/v1/me', headers = headers).json() context = { 'user_name': user_data_response['display_name'], 'id': user_data_response['id'], } tracks_to_query = 5 parse_library(headers, tracks_to_query) return render(request, 'spotifyvis/user_data.html', context) # }}} user_data # # parse_library {{{ # def parse_library(headers, tracks): """Scans user's library for certain number of tracks to update library_stats with. :headers: For API call. :tracks: Number of tracks to get from user's library. :returns: None """ # TODO: implement importing entire library with 0 as tracks param # number of tracks to get with each call limit = 5 # keeps track of point to get songs from offset = 0 payload = {'limit': str(limit)} for _ in range(0, tracks, limit): payload['offset'] = str(offset) saved_tracks_response = requests.get('https://api.spotify.com/v1/me/tracks', headers=headers, params=payload).json() num_samples = offset for track_dict in saved_tracks_response['items']: # Track the number of samples for calculating # audio feature averages and standard deviations on the fly num_samples += 1 get_track_info(track_dict['track']) # get_genre(headers, track_dict['track']['album']['id']) audio_features_dict = get_audio_features(headers, track_dict['track']['id']) for feature, feature_data in audio_features_dict.items(): update_audio_feature_stats(feature, feature_data, num_samples) for artist_dict in track_dict['track']['artists']: increase_artist_count(headers, artist_dict['name'], artist_dict['id']) # calculates num_songs with offset + songs retrieved library_stats['num_songs'] = offset + len(saved_tracks_response['items']) offset += limit calculate_genres_from_artists(headers) pprint.pprint(library_stats) # }}} parse_library # def get_audio_features(headers, track_id): """Returns the audio features of a soundtrack Args: headers: headers containing the API token track_id: the id of the soundtrack, needed to query the Spotify API Returns: A dictionary with the features as its keys """ response = requests.get("https://api.spotify.com/v1/audio-features/{}".format(track_id), headers = headers).json() features_dict = {} # Data that we don't need useless_keys = [ "key", "mode", "type", "liveness", "id", "uri", "track_href", "analysis_url", "time_signature", ] for key, val in response.items(): if key not in useless_keys: features_dict[key] = val return features_dict def update_std_dev(cur_mean, cur_std_dev, new_data_point, sample_size): """Calculates the standard deviation for a sample without storing all data points Args: cur_mean: the current mean for N = (sample_size - 1) cur_std_dev: the current standard deviation for N = (sample_size - 1) new_data_point: a new data point sample_size: sample size including the new data point Returns: (new_mean, new_std_dev) """ # This is an implementation of Welford's method # http://jonisalonen.com/2013/deriving-welfords-method-for-computing-variance/ new_mean = ((sample_size - 1) * cur_mean + new_data_point) / sample_size delta_variance = (new_data_point - new_mean) * (new_data_point - cur_mean) new_std_dev = math.sqrt( (math.pow(cur_std_dev, 2) * (sample_size - 2) + delta_variance) / ( sample_size - 1 )) return new_mean, new_std_dev def update_audio_feature_stats(feature, new_data_point, sample_size): """Updates the audio feature statistics in library_stats Args: feature: the audio feature to be updated (string) new_data_point: new data to update the stats with sample_size: sample size including the new data point Returns: None """ # first time the feature is considered if sample_size < 2: library_stats['audio_features'][feature] = { "average": new_data_point, "std_dev": 0, } else: cur_mean = library_stats['audio_features'][feature]['average'] cur_std_dev = library_stats['audio_features'][feature]['std_dev'] new_mean, new_std_dev = update_std_dev(cur_mean, cur_std_dev, new_data_point, sample_size) library_stats['audio_features'][feature]['average'] = new_mean library_stats['audio_features'][feature]['std_dev'] = new_std_dev # increase_nested_key {{{ # def increase_nested_key(top_key, nested_key, amount=1): """Increases count for the value of library_stats[top_key][nested_key]. Checks if nested_key exists already and takes appropriate action. :top_key: First key of library_stats. :nested_key: Key in top_key's dict for which we want to increase value of. :returns: None """ if nested_key not in library_stats[top_key]: library_stats[top_key][nested_key] = amount else: library_stats[top_key][nested_key] += amount # }}} increase_nested_key # # increase_artist_count {{{ # def increase_artist_count(headers, artist_name, artist_id): """Increases count for artist in library_stats and stores the artist_id. :headers: For making the API call. :artist_name: Artist to increase count for. :artist_id: The Spotify ID for the artist. :returns: None """ if artist_name not in library_stats['artists']: library_stats['artists'][artist_name] = {} library_stats['artists'][artist_name]['count'] = 1 library_stats['artists'][artist_name]['id'] = artist_id else: library_stats['artists'][artist_name]['count'] += 1 # }}} increase_artist_count # # get_track_info {{{ # def get_track_info(track_dict): """Get all the info from the track_dict directly returned by the API call in parse_library. :track_dict: Dict returned from the API call containing the track info. :returns: None """ # popularity library_stats['popularity'].append(track_dict['popularity']) # year year_released = track_dict['album']['release_date'].split('-')[0] increase_nested_key('year_released', year_released) # artist # artist_names = [artist['name'] for artist in track_dict['artists']] # for artist_name in artist_names: # increase_nested_key('artists', artist_name) # runtime library_stats['total_runtime'] += float(track_dict['duration_ms']) / 60 # }}} get_track_info # # calculate_genres_from_artists {{{ # def calculate_genres_from_artists(headers): """Tallies up genre counts based on artists in library_stats. :headers: For making the API call. :returns: None """ for artist_entry in library_stats['artists'].values(): artist_response = requests.get('https://api.spotify.com/v1/artists/' + artist_entry['id'], headers=headers).json() # increase each genre count by artist count for genre in artist_response['genres']: increase_nested_key('genres', genre, artist_entry['count']) # }}} calculate_genres_from_artists #