Browse Source

Initial design for database

Before considering Django models.
master
Kevin Mok 6 years ago
parent
commit
c831e5b9a8
  1. 3
      .gitignore
  2. 47
      database-design.txt
  3. 66
      spotifyvis/tests.py
  4. 289
      spotifyvis/utils.py
  5. 243
      spotifyvis/views.py

3
.gitignore

@ -6,3 +6,6 @@ db.sqlite3
api-keys.sh
Pipfile
super-pass.txt
*.js
*.ini

47
database-design.txt

@ -0,0 +1,47 @@
UserLibrary as ul
-
UserID PK varchar
SavedTracks array # array of track ID's (varchar)
ArtistName
-
TrackID PK varchar FK >- ul.SavedTracks
ArtistName varchar
AudioFeatures
-
TrackID PK varchar FK >- ul.SavedTracks
Danceability decimal
Energy decimal
Loudness decimal
Speechiness decimal
Acousticness decimal
Instrumentalness decimal
Valence decimal
Tempo decimal
Genre
-
TrackID PK varchar FK >- ul.SavedTracks
MainGenre NULL varchar
OtherGenres NULL array
Popularity
-
TrackID PK varchar FK >- ul.SavedTracks
Popularity decimal
Runtime
-
TrackID PK varchar FK >- ul.SavedTracks
Runtime smallint # seconds
TrackName
-
TrackID PK varchar FK >- ul.SavedTracks
TrackName varchar
Year
-
TrackID PK varchar FK >- ul.SavedTracks
Year smallint

66
spotifyvis/tests.py

@ -1,3 +1,67 @@
from django.test import TestCase
from .views import update_std_dev
import math
# Create your tests here.
class UpdateStdDevTest(TestCase):
def test_two_data_points(self):
"""
tests if update_std_dev behaves correctly for two data points
"""
cur_mean = 5
cur_std_dev = 0
new_mean, new_std_dev = update_std_dev(cur_mean, cur_std_dev, 10, 2)
self.assertTrue(math.isclose(new_mean, 7.5, rel_tol=0.01))
self.assertTrue(math.isclose(new_std_dev, 3.5355, rel_tol=0.01))
def test_three_data_points(self):
"""
tests if update_std_dev behaves correctly for three data points
"""
cur_mean = 7.5
cur_std_dev = 3.5355
new_mean, new_std_dev = update_std_dev(cur_mean, cur_std_dev, 15, 3)
self.assertTrue(math.isclose(new_mean, 10, rel_tol=0.01))
self.assertTrue(math.isclose(new_std_dev, 5, rel_tol=0.01))
def test_four_data_points(self):
"""
tests if update_std_dev behaves correctly for four data points
"""
cur_mean = 10
cur_std_dev = 5
new_mean, new_std_dev = update_std_dev(cur_mean, cur_std_dev, 20, 4)
self.assertTrue(math.isclose(new_mean, 12.5, rel_tol=0.01))
self.assertTrue(math.isclose(new_std_dev, 6.455, rel_tol=0.01))
def test_five_data_points(self):
"""
tests if update_std_dev behaves correctly for five data points
"""
cur_mean = 12.5
cur_std_dev = 6.455
new_mean, new_std_dev = update_std_dev(cur_mean, cur_std_dev, 63, 5)
self.assertTrue(math.isclose(new_mean, 22.6, rel_tol=0.01))
self.assertTrue(math.isclose(new_std_dev, 23.2658, rel_tol=0.01))
def test_sixteen_data_points(self):
"""
tests if update_std_dev behaves correctly for sixteen data points
"""
cur_mean = 0.4441
cur_std_dev = 0.2855
new_mean, new_std_dev = update_std_dev(cur_mean, cur_std_dev, 0.7361, 16)
self.assertTrue(math.isclose(new_mean, 0.4624, rel_tol=0.01))
self.assertTrue(math.isclose(new_std_dev, 0.2853, rel_tol=0.01))

289
spotifyvis/utils.py

@ -0,0 +1,289 @@
import requests
import math
import pprint
# parse_library {{{ #
def parse_library(headers, tracks, library_stats):
"""Scans user's library for certain number of tracks to update library_stats with.
:headers: For API call.
:tracks: Number of tracks to get from user's library.
:library_stats: Dictionary containing the data mined from user's library
:returns: None
"""
# TODO: implement importing entire library with 0 as tracks param
# number of tracks to get with each call
limit = 5
# keeps track of point to get songs from
offset = 0
payload = {'limit': str(limit)}
for _ in range(0, tracks, limit):
payload['offset'] = str(offset)
saved_tracks_response = requests.get('https://api.spotify.com/v1/me/tracks', headers=headers, params=payload).json()
num_samples = offset
for track_dict in saved_tracks_response['items']:
# Track the number of samples for calculating
# audio feature averages and standard deviations on the fly
num_samples += 1
get_track_info(track_dict['track'], library_stats, num_samples)
# get_genre(headers, track_dict['track']['album']['id'])
audio_features_dict = get_audio_features(headers, track_dict['track']['id'])
for feature, feature_data in audio_features_dict.items():
update_audio_feature_stats(feature, feature_data, num_samples, library_stats)
for artist_dict in track_dict['track']['artists']:
increase_artist_count(headers, artist_dict['name'], artist_dict['id'], library_stats)
# calculates num_songs with offset + songs retrieved
library_stats['num_songs'] = offset + len(saved_tracks_response['items'])
offset += limit
calculate_genres_from_artists(headers, library_stats)
pprint.pprint(library_stats)
# }}} parse_library #
def get_audio_features(headers, track_id):
"""Returns the audio features of a soundtrack
Args:
headers: headers containing the API token
track_id: the id of the soundtrack, needed to query the Spotify API
Returns:
A dictionary with the features as its keys
"""
response = requests.get("https://api.spotify.com/v1/audio-features/{}".format(track_id), headers = headers).json()
features_dict = {}
# Data that we don't need
useless_keys = [
"key", "mode", "type", "liveness", "id", "uri", "track_href", "analysis_url", "time_signature",
]
for key, val in response.items():
if key not in useless_keys:
features_dict[key] = val
return features_dict
def update_std_dev(cur_mean, cur_std_dev, new_data_point, sample_size):
"""Calculates the standard deviation for a sample without storing all data points
Args:
cur_mean: the current mean for N = (sample_size - 1)
cur_std_dev: the current standard deviation for N = (sample_size - 1)
new_data_point: a new data point
sample_size: sample size including the new data point
Returns:
(new_mean, new_std_dev)
"""
# This is an implementation of Welford's method
# http://jonisalonen.com/2013/deriving-welfords-method-for-computing-variance/
new_mean = ((sample_size - 1) * cur_mean + new_data_point) / sample_size
delta_variance = (new_data_point - new_mean) * (new_data_point - cur_mean)
new_std_dev = math.sqrt(
(math.pow(cur_std_dev, 2) * (sample_size - 2) + delta_variance) / (
sample_size - 1
))
return new_mean, new_std_dev
def update_audio_feature_stats(feature, new_data_point, sample_size, library_stats):
"""Updates the audio feature statistics in library_stats
Args:
feature: the audio feature to be updated (string)
new_data_point: new data to update the stats with
sample_size: sample size including the new data point
library_stats Dictionary containing the data mined from user's Spotify library
Returns:
None
"""
# first time the feature is considered
if sample_size < 2:
library_stats['audio_features'][feature] = {
"average": new_data_point,
"std_dev": 0,
}
else:
cur_mean = library_stats['audio_features'][feature]['average']
cur_std_dev = library_stats['audio_features'][feature]['std_dev']
new_mean, new_std_dev = update_std_dev(cur_mean, cur_std_dev, new_data_point, sample_size)
library_stats['audio_features'][feature] = {
"average": new_mean,
"std_dev": new_std_dev
}
# increase_nested_key {{{ #
def increase_nested_key(top_key, nested_key, library_stats, amount=1):
"""Increases count for the value of library_stats[top_key][nested_key]. Checks if nested_key exists already and takes
appropriate action.
:top_key: First key of library_stats.
:nested_key: Key in top_key's dict for which we want to increase value of.
:library_stats: Dictionary containing the data mined from user's Spotify library
:returns: None
"""
if nested_key not in library_stats[top_key]:
library_stats[top_key][nested_key] = amount
else:
library_stats[top_key][nested_key] += amount
# }}} increase_nested_key #
# increase_artist_count {{{ #
def increase_artist_count(headers, artist_name, artist_id, library_stats):
"""Increases count for artist in library_stats and stores the artist_id.
:headers: For making the API call.
:artist_name: Artist to increase count for.
:artist_id: The Spotify ID for the artist.
:library_stats: Dictionary containing the data mined from user's Spotify library
:returns: None
"""
if artist_name not in library_stats['artists']:
library_stats['artists'][artist_name] = {}
library_stats['artists'][artist_name]['count'] = 1
library_stats['artists'][artist_name]['id'] = artist_id
else:
library_stats['artists'][artist_name]['count'] += 1
# }}} increase_artist_count #
def update_popularity_stats(new_data_point, library_stats, sample_size):
"""Updates the popularity statistics in library_stats
Args:
new_data_point: new data to update the popularity stats with
library_stats: Dictionary containing data mined from user's Spotify library
sample_size: The sample size including the new data
Returns:
None
"""
if sample_size < 2:
library_stats['popularity'] = {
"average": new_data_point,
"std_dev": 0,
}
else :
cur_mean_popularity = library_stats['popularity']['average']
cur_popularity_stdev = library_stats['popularity']['std_dev']
new_mean, new_std_dev = update_std_dev(
cur_mean_popularity, cur_popularity_stdev, new_data_point, sample_size)
library_stats['popularity'] = {
"average": new_mean,
"std_dev": new_std_dev,
}
# get_track_info {{{ #
def get_track_info(track_dict, library_stats, sample_size):
"""Get all the info from the track_dict directly returned by the API call in parse_library.
:track_dict: Dict returned from the API call containing the track info.
:library_stats: Dictionary containing the data mined from user's Spotify library
:sample_size: The sample size so far including this track
:returns: None
"""
# popularity
update_popularity_stats(track_dict['popularity'], library_stats, sample_size)
# year
year_released = track_dict['album']['release_date'].split('-')[0]
increase_nested_key('year_released', year_released, library_stats)
# artist
# artist_names = [artist['name'] for artist in track_dict['artists']]
# for artist_name in artist_names:
# increase_nested_key('artists', artist_name)
# runtime
library_stats['total_runtime'] += float(track_dict['duration_ms']) / (1000 * 60)
# }}} get_track_info #
# calculate_genres_from_artists {{{ #
def calculate_genres_from_artists(headers, library_stats):
"""Tallies up genre counts based on artists in library_stats.
:headers: For making the API call.
:library_stats: Dictionary containing the data mined from user's Spotify library
:returns: None
"""
for artist_entry in library_stats['artists'].values():
artist_response = requests.get('https://api.spotify.com/v1/artists/' + artist_entry['id'], headers=headers).json()
# increase each genre count by artist count
for genre in artist_response['genres']:
increase_nested_key('genres', genre, library_stats, artist_entry['count'])
# }}} calculate_genres_from_artists #
def process_library_stats(library_stats):
"""Processes library_stats into format more suitable for D3 consumption
Args:
library_stats: Dictionary containing the data mined from user's Spotify library
Returns:
A new dictionary that contains the data in library_stats, in a format more suitable for D3 consumption
"""
processed_library_stats = {}
for key in library_stats:
if key == 'artists' or key == 'genres' or key == 'year_released':
for inner_key in library_stats[key]:
if key not in processed_library_stats:
processed_library_stats[key] = []
processed_item_key = '' # identifier key for each dict in the list
count = 0
if 'artist' in key:
processed_item_key = 'name'
count = library_stats[key][inner_key]['count']
elif 'genre' in key:
processed_item_key = 'genre'
count = library_stats[key][inner_key]
else:
processed_item_key = 'year'
count = library_stats[key][inner_key]
processed_library_stats[key].append({
processed_item_key: inner_key,
"count": count
})
elif key == 'audio_features':
for audio_feature in library_stats[key]:
if 'audio_features' not in processed_library_stats:
processed_library_stats['audio_features'] = []
processed_library_stats['audio_features'].append({
'feature': audio_feature,
'average': library_stats[key][audio_feature]['average'],
'std_dev': library_stats[key][audio_feature]['std_dev']
})
# TODO: Not sure about final form for 'popularity'
# elif key == 'popularity':
# processed_library_stats[key] = []
# processed_library_stats[key].append({
# })
elif key == 'num_songs' or key == 'total_runtime' or key == 'popularity':
processed_library_stats[key] = library_stats[key]
return processed_library_stats

243
spotifyvis/views.py

@ -1,5 +1,3 @@
# imports {{{ #
from django.shortcuts import render, redirect
from django.http import HttpResponse, HttpResponseBadRequest
import math
@ -10,16 +8,11 @@ import urllib
import json
import pprint
from datetime import datetime
# }}} imports #
# global vars {{{ #
from .utils import parse_library, process_library_stats
TIME_FORMAT = '%Y-%m-%d-%H-%M-%S'
library_stats = {"audio_features":{}, "genres":{}, "year_released":{}, "artists":{}, "num_songs":0, "popularity":[], "total_runtime":0}
# }}} global vars #
# generate_random_string {{{ #
def generate_random_string(length):
@ -139,226 +132,30 @@ def user_data(request):
'Authorization': auth_token_str
}
tracks_to_query = 5
parse_library(headers, tracks_to_query)
user_data_response = requests.get('https://api.spotify.com/v1/me', headers = headers).json()
context = {
'user_name': user_data_response['display_name'],
'id': user_data_response['id'],
'genre_dict': library_stats['genres']
}
return render(request, 'spotifyvis/user_data.html', context)
# }}} user_data #
# parse_library {{{ #
def parse_library(headers, tracks):
"""Scans user's library for certain number of tracks to update library_stats with.
:headers: For API call.
:tracks: Number of tracks to get from user's library.
:returns: None
"""
# TODO: implement importing entire library with 0 as tracks param
# number of tracks to get with each call
limit = 5
# keeps track of point to get songs from
offset = 0
payload = {'limit': str(limit)}
for _ in range(0, tracks, limit):
payload['offset'] = str(offset)
saved_tracks_response = requests.get('https://api.spotify.com/v1/me/tracks', headers=headers, params=payload).json()
num_samples = offset
for track_dict in saved_tracks_response['items']:
# Track the number of samples for calculating
# audio feature averages and standard deviations on the fly
num_samples += 1
get_track_info(track_dict['track'])
# get_genre(headers, track_dict['track']['album']['id'])
audio_features_dict = get_audio_features(headers, track_dict['track']['id'])
for feature, feature_data in audio_features_dict.items():
update_audio_feature_stats(feature, feature_data, num_samples)
for artist_dict in track_dict['track']['artists']:
increase_artist_count(headers, artist_dict['name'], artist_dict['id'])
# calculates num_songs with offset + songs retrieved
library_stats['num_songs'] = offset + len(saved_tracks_response['items'])
offset += limit
calculate_genres_from_artists(headers)
pprint.pprint(library_stats)
# }}} parse_library #
# get_audio_features {{{ #
def get_audio_features(headers, track_id):
"""Returns the audio features of a soundtrack
Args:
headers: headers containing the API token
track_id: the id of the soundtrack, needed to query the Spotify API
Returns:
A dictionary with the features as its keys
"""
response = requests.get("https://api.spotify.com/v1/audio-features/{}".format(track_id), headers = headers).json()
features_dict = {}
# Data that we don't need
useless_keys = [
"key", "mode", "type", "liveness", "id", "uri", "track_href", "analysis_url", "time_signature",
]
for key, val in response.items():
if key not in useless_keys:
features_dict[key] = val
return features_dict
# }}} get_audio_features #
# update_std_dev {{{ #
def update_std_dev(cur_mean, cur_std_dev, new_data_point, sample_size):
"""Calculates the standard deviation for a sample without storing all data points
Args:
cur_mean: the current mean for N = (sample_size - 1)
cur_std_dev: the current standard deviation for N = (sample_size - 1)
new_data_point: a new data point
sample_size: sample size including the new data point
Returns:
(new_mean, new_std_dev)
"""
# This is an implementationof Welford's method
# http://jonisalonen.com/2013/deriving-welfords-method-for-computing-variance/
new_mean = ((sample_size - 1) * cur_mean + new_data_point) / sample_size
delta_variance = (new_data_point - new_mean) * (new_data_point - cur_mean)
new_std_dev = math.sqrt(
(math.pow(cur_std_dev, 2) * (sample_size - 2) + delta_variance) / (
sample_size - 1
))
return new_mean, new_std_dev
# }}} update_std_dev #
# update_audio_feature_stats {{{ #
def update_audio_feature_stats(feature, new_data_point, sample_size):
"""Updates the audio feature statistics in library_stats
Args:
feature: the audio feature to be updated (string)
new_data_point: new data to update the stats with
sample_size: sample size including the new data point
Returns:
None
"""
# first time the feature is considered
if sample_size < 2:
library_stats['audio_features'][feature] = {
"average": new_data_point,
tracks_to_query = 5
library_stats = {
"audio_features":{},
"genres":{},
"year_released":{},
"artists":{},
"num_songs": 0,
"popularity": {
"average": 0,
"std_dev": 0,
}
else:
cur_mean = library_stats['audio_features'][feature]['average']
cur_std_dev = library_stats['audio_features'][feature]['std_dev']
new_mean, new_std_dev = update_std_dev(cur_mean, cur_std_dev, new_data_point, sample_size)
library_stats['audio_features'][feature]['average'] = new_mean
library_stats['audio_features'][feature]['std_dev'] = new_std_dev
# }}} update_audio_feature_stats #
# increase_nested_key {{{ #
def increase_nested_key(top_key, nested_key, amount=1):
"""Increases count for the value of library_stats[top_key][nested_key]. Checks if nested_key exists already and takes
appropriate action.
:top_key: First key of library_stats.
:nested_key: Key in top_key's dict for which we want to increase value of.
:returns: None
"""
if nested_key not in library_stats[top_key]:
library_stats[top_key][nested_key] = amount
else:
library_stats[top_key][nested_key] += amount
# }}} increase_nested_key #
# increase_artist_count {{{ #
def increase_artist_count(headers, artist_name, artist_id):
"""Increases count for artist in library_stats and stores the artist_id.
:headers: For making the API call.
:artist_name: Artist to increase count for.
:artist_id: The Spotify ID for the artist.
:returns: None
"""
if artist_name not in library_stats['artists']:
library_stats['artists'][artist_name] = {}
library_stats['artists'][artist_name]['count'] = 1
library_stats['artists'][artist_name]['id'] = artist_id
else:
library_stats['artists'][artist_name]['count'] += 1
# }}} increase_artist_count #
# get_track_info {{{ #
def get_track_info(track_dict):
"""Get all the info from the track_dict directly returned by the API call in parse_library.
:track_dict: Dict returned from the API call containing the track info.
:returns: None
"""
# popularity
library_stats['popularity'].append(track_dict['popularity'])
# year
year_released = track_dict['album']['release_date'].split('-')[0]
increase_nested_key('year_released', year_released)
# artist
# artist_names = [artist['name'] for artist in track_dict['artists']]
# for artist_name in artist_names:
# increase_nested_key('artists', artist_name)
# runtime
library_stats['total_runtime'] += float(track_dict['duration_ms']) / 60
# }}} get_track_info #
# calculate_genres_from_artists {{{ #
def calculate_genres_from_artists(headers):
"""Tallies up genre counts based on artists in library_stats.
:headers: For making the API call.
:returns: None
"""
for artist_entry in library_stats['artists'].values():
artist_response = requests.get('https://api.spotify.com/v1/artists/' + artist_entry['id'], headers=headers).json()
# increase each genre count by artist count
# for genre in artist_response['genres']:
# print(genre, end='')
# increase_nested_key('genres', genre, artist_entry['count'])
# print('')
# only use first genre for simplicity right now
if len(artist_response['genres']) > 0:
print(artist_response['genres'][0])
increase_nested_key('genres', artist_response['genres'][0], artist_entry['count'])
},
"total_runtime": 0
}
parse_library(headers, tracks_to_query, library_stats)
processed_library_stats = process_library_stats(library_stats)
print("================================================")
print("Processed data follows\n")
pprint.pprint(processed_library_stats)
return render(request, 'spotifyvis/user_data.html', context)
# }}} calculate_genres_from_artists #
# }}} user_data #
Loading…
Cancel
Save