Compare commits

...

4 Commits

Author SHA1 Message Date
riwiwa
1df12b1755 update readme 2026-02-09 19:54:02 -08:00
c4314456ae add spotify import progress bar 2026-02-09 19:51:30 -08:00
7fe4d02721 clean up backend spotify import functionality 2026-02-09 18:34:49 -08:00
d35e7bffd3 remove db name environment variable 2026-02-09 18:34:37 -08:00
7 changed files with 361 additions and 215 deletions

View File

@@ -8,7 +8,7 @@
### Roadmap:
- Ability to import all listening statistics and scrobbles from: \[In Progress\]
- LastFM \[Complete\]
- Spotify \[In Progress\]
- Spotify \[Complete\]
- Apple Music \[Planned\]
- WebUI \[In Progress\]

View File

@@ -21,23 +21,23 @@ func CreateAllTables() error {
return CreateSessionsTable()
}
func GetDbUrl(noDBName bool) string {
func GetDbUrl(dbName bool) string {
host := os.Getenv("PGHOST")
port := os.Getenv("PGPORT")
user := os.Getenv("PGUSER")
pass := os.Getenv("PGPASSWORD")
db := os.Getenv("PGDATABASE")
if noDBName {
return fmt.Sprintf("postgres://%s:%s@%s:%s", user, pass, host, port)
if dbName {
return fmt.Sprintf("postgres://%s:%s@%s:%s/%s",
user, pass, host, port, "muzi")
} else {
return fmt.Sprintf("postgres://%s:%s@%s:%s/%s", user, pass, host, port, db)
return fmt.Sprintf("postgres://%s:%s@%s:%s", user, pass, host, port)
}
}
func CreateDB() error {
conn, err := pgx.Connect(context.Background(),
GetDbUrl(true),
GetDbUrl(false),
)
if err != nil {
fmt.Fprintf(os.Stderr, "Cannot connect to PostgreSQL: %v\n", err)

View File

@@ -22,7 +22,7 @@ func main() {
check("ensuring muzi DB exists", db.CreateDB())
var err error
db.Pool, err = pgxpool.New(context.Background(), db.GetDbUrl(false))
db.Pool, err = pgxpool.New(context.Background(), db.GetDbUrl(true))
check("connecting to muzi database", err)
defer db.Pool.Close()

View File

@@ -12,6 +12,8 @@ import (
"github.com/jackc/pgx/v5"
)
const batchSize = 1000
type SpotifyTrack struct {
Timestamp string `json:"ts"`
Played int `json:"ms_played"`
@@ -20,25 +22,30 @@ type SpotifyTrack struct {
Album string `json:"master_metadata_album_album_name"`
}
type existingTrack struct {
type trackSource struct {
tracks []SpotifyTrack
tracksToSkip map[string]struct{}
idx int
userId int
}
type dbTrack struct {
Timestamp time.Time
SongName string
Artist string
}
// trackSource implements pgx.CopyFromSource for efficient bulk inserts
type trackSource struct {
tracks []SpotifyTrack
existing map[string]struct{}
idx int
userId int
}
func (s *trackSource) Next() bool {
for s.idx < len(s.tracks) {
t := s.tracks[s.idx]
key := fmt.Sprintf("%s|%s|%s", t.Artist, t.Name, t.Timestamp)
if _, exists := s.existing[key]; exists {
ts, err := normalizeTs(t.Timestamp)
if err != nil {
fmt.Fprintf(os.Stderr, "Error normalizing timestamp: %v\n", err)
s.idx++
continue
}
key := fmt.Sprintf("%s|%s|%s", t.Artist, t.Name, ts)
if _, shouldSkip := s.tracksToSkip[key]; shouldSkip {
s.idx++
continue
}
@@ -51,7 +58,6 @@ func (s *trackSource) Next() bool {
func (s *trackSource) Values() ([]any, error) {
// idx is already incremented in Next(), so use idx-1
t := s.tracks[s.idx-1]
// parse spotify string timestamp to a real time object
ts, err := time.Parse(time.RFC3339Nano, t.Timestamp)
if err != nil {
return nil, err
@@ -71,48 +77,62 @@ func (s *trackSource) Err() error {
return nil
}
// find tracks with the same artist and name in a 20 second +- timeframe
func getExistingTracks(
userId int,
tracks []SpotifyTrack,
) (map[string]struct{}, error) {
// check for empty track import
if len(tracks) == 0 {
return map[string]struct{}{}, nil
}
// find min/max timestamps in this batch to create time window to
// search for duplicates
var minTs, maxTs time.Time
// go through each track (t) in the array
for _, t := range tracks {
// parse spotify timestamp into operational time datatype
ts, err := time.Parse(time.RFC3339Nano, t.Timestamp)
func normalizeTs(ts string) (string, error) {
t, err := time.Parse(time.RFC3339Nano, ts)
if err != nil {
continue
}
// if minTs uninitialized or timestamp predates minTs
if minTs.IsZero() || ts.Before(minTs) {
minTs = ts
}
// if timestamp comes after maxTs
if ts.After(maxTs) {
maxTs = ts
}
return "", err
}
return t.Format(time.RFC3339Nano), nil
}
// check if all parses failed, therefore no way to find duplicate by time
func getExistingTracks(
userId int, tracks []SpotifyTrack,
) (map[string]struct{}, error) {
minTs, maxTs := findTimeRange(tracks)
if minTs.IsZero() {
return map[string]struct{}{}, nil
}
// find all tracks within [min-20s, max+20s] window (duplicates)
dbTracks, err := fetchDbTracks(userId, minTs, maxTs)
if err != nil {
return nil, err
}
dbIndex := buildDbTrackIndex(dbTracks)
return findDuplicates(tracks, dbIndex), nil
}
// get the min/max timestamp range for a batch of tracks
func findTimeRange(tracks []SpotifyTrack) (time.Time, time.Time) {
var minTs, maxTs time.Time
for _, t := range tracks {
ts, err := time.Parse(time.RFC3339Nano, t.Timestamp)
if err != nil {
continue
}
if minTs.IsZero() || ts.Before(minTs) {
minTs = ts
}
if ts.After(maxTs) {
maxTs = ts
}
}
return minTs, maxTs
}
/*
get all tracks in the database for a user that have the same timestamp
range as the current batch
*/
func fetchDbTracks(userId int, minTs, maxTs time.Time) ([]dbTrack, error) {
rows, err := db.Pool.Query(context.Background(),
`SELECT song_name, artist, timestamp
FROM history
WHERE user_id = $1
AND timestamp BETWEEN $2 AND $3`,
userId,
// adjust 20 seconds to find duplicates on edges of batch
minTs.Add(-20*time.Second),
maxTs.Add(20*time.Second))
if err != nil {
@@ -120,102 +140,138 @@ func getExistingTracks(
}
defer rows.Close()
// prepare map to hold duplicate track keys
existing := make(map[string]struct{})
// create array of tracks
var existingTracks []existingTrack
// for each repeat play (-20s +20s)
var dbTracks []dbTrack
for rows.Next() {
// write the data from json to the track in memory
var t existingTrack
var t dbTrack
if err := rows.Scan(&t.SongName, &t.Artist, &t.Timestamp); err != nil {
continue
}
// add track in memory to existingTracks array
existingTracks = append(existingTracks, t)
dbTracks = append(dbTracks, t)
}
return dbTracks, nil
}
// index existing tracks by artist|name for O(1) lookup
existingIndex := make(map[string][]time.Time)
for _, t := range existingTracks {
func buildDbTrackIndex(tracks []dbTrack) map[string][]time.Time {
index := make(map[string][]time.Time)
for _, t := range tracks {
key := t.Artist + "|" + t.SongName
existingIndex[key] = append(existingIndex[key], t.Timestamp)
index[key] = append(index[key], t.Timestamp)
}
return index
}
// check each new track against indexed existing tracks
for _, newTrack := range tracks {
newTs, err := time.Parse(time.RFC3339Nano, newTrack.Timestamp)
func findDuplicates(tracks []SpotifyTrack, dbIndex map[string][]time.Time) map[string]struct{} {
duplicates := make(map[string]struct{})
seenInBatch := make(map[string]struct{})
for _, track := range tracks {
trackKey, err := createTrackKey(track)
if err != nil {
continue
}
lookupKey := newTrack.Artist + "|" + newTrack.Name
if timestamps, found := existingIndex[lookupKey]; found {
for _, existTs := range timestamps {
diff := newTs.Sub(existTs)
// in batch check
if _, seen := seenInBatch[trackKey]; seen {
duplicates[trackKey] = struct{}{}
continue
}
seenInBatch[trackKey] = struct{}{}
// in db check
lookupKey := fmt.Sprintf("%s|%s", track.Artist, track.Name)
if dbTimestamps, found := dbIndex[lookupKey]; found {
if isDuplicateWithinWindow(track, dbTimestamps) {
duplicates[trackKey] = struct{}{}
}
}
}
return duplicates
}
func createTrackKey(track SpotifyTrack) (string, error) {
ts, err := normalizeTs(track.Timestamp)
if err != nil {
return "", err
}
return fmt.Sprintf("%s|%s|%s", track.Artist, track.Name, ts), nil
}
// check if a track timestamp falls < 20 seconds of another
func isDuplicateWithinWindow(track SpotifyTrack, existingTimestamps []time.Time) bool {
trackTime, err := time.Parse(time.RFC3339Nano, track.Timestamp)
if err != nil {
return false
}
for _, existingTime := range existingTimestamps {
diff := trackTime.Sub(existingTime)
if diff < 0 {
diff = -diff
}
if diff < 20*time.Second {
key := fmt.Sprintf(
"%s|%s|%s",
newTrack.Artist,
newTrack.Name,
newTrack.Timestamp,
)
existing[key] = struct{}{}
break
return true
}
}
}
}
return existing, nil
return false
}
func ImportSpotify(tracks []SpotifyTrack, userId int) error {
func ImportSpotify(tracks []SpotifyTrack, userId int, progressChan chan ProgressUpdate) error {
totalImported := 0
batchSize := 1000
totalTracks := len(tracks)
batchStart := 0
totalBatches := (totalTracks + batchSize - 1) / batchSize
for batchStart := 0; batchStart < len(tracks); batchStart += batchSize {
// get the limit of the current batch
batchEnd := batchStart + batchSize
// set limit to track array length in current batch too big
if batchEnd > len(tracks) {
batchEnd = len(tracks)
// Send initial progress update
if progressChan != nil {
progressChan <- ProgressUpdate{
TotalPages: totalBatches,
Status: "running",
}
}
// create array to hold valid listens
for batchStart < totalTracks {
// cap batchEnd at total track count on final batch to prevent OOB error
batchEnd := min(batchStart+batchSize, totalTracks)
currentBatch := (batchStart / batchSize) + 1
var validTracks []SpotifyTrack
for i := batchStart; i < batchEnd; i++ {
// if current track is listened to for 20 sec and name and artist is not
// blank, add to validTracks array
if tracks[i].Played >= 20000 && tracks[i].Name != "" && tracks[i].Artist != "" {
if tracks[i].Played >= 20000 && // 20 seconds
tracks[i].Name != "" &&
tracks[i].Artist != "" {
validTracks = append(validTracks, tracks[i])
}
}
// if there are no valid tracks in this batch, go to the next
if len(validTracks) == 0 {
batchStart += batchSize
// Send progress update even for empty batches
if progressChan != nil {
progressChan <- ProgressUpdate{
CurrentPage: currentBatch,
CompletedPages: currentBatch,
TotalPages: totalBatches,
TracksImported: totalImported,
Status: "running",
}
}
continue
}
// find replayed tracks in the batch that was just gathered
existing, err := getExistingTracks(userId, validTracks)
tracksToSkip, err := getExistingTracks(userId, validTracks)
if err != nil {
fmt.Fprintf(os.Stderr, "Error checking existing tracks: %v\n", err)
batchStart += batchSize
continue
}
// get data of struct pointer
src := &trackSource{
tracks: validTracks,
existing: existing,
tracksToSkip: tracksToSkip,
idx: 0,
userId: userId,
}
// insert all valid tracks from current batch into db
copyCount, err := db.Pool.CopyFrom(
context.Background(),
pgx.Identifier{"history"},
@@ -237,6 +293,31 @@ func ImportSpotify(tracks []SpotifyTrack, userId int) error {
} else {
totalImported += int(copyCount)
}
// Send progress update
if progressChan != nil {
progressChan <- ProgressUpdate{
CurrentPage: currentBatch,
CompletedPages: currentBatch,
TotalPages: totalBatches,
TracksImported: totalImported,
Status: "running",
}
}
batchStart += batchSize
}
// Send completion update
if progressChan != nil {
progressChan <- ProgressUpdate{
CurrentPage: totalBatches,
CompletedPages: totalBatches,
TotalPages: totalBatches,
TracksImported: totalImported,
Status: "completed",
}
}
return nil
}

83
static/import.js Normal file
View File

@@ -0,0 +1,83 @@
function handleImport(formId, progressPrefix, endpoint, progressUrl, formatLabel) {
const form = document.getElementById(formId);
const progressContainer = document.getElementById(progressPrefix + '-progress');
const progressFill = document.getElementById(progressPrefix + '-progress-fill');
const progressText = document.getElementById(progressPrefix + '-progress-text');
const progressStatus = document.getElementById(progressPrefix + '-progress-status');
const progressTracks = document.getElementById(progressPrefix + '-progress-tracks');
const progressError = document.getElementById(progressPrefix + '-progress-error');
const progressSuccess = document.getElementById(progressPrefix + '-progress-success');
form.addEventListener('submit', async function(e) {
e.preventDefault();
// Reset and show progress
progressFill.style.width = '0%';
progressFill.classList.add('animating');
progressText.textContent = '0%';
progressStatus.textContent = 'Starting import...';
progressTracks.textContent = '';
progressError.textContent = '';
progressSuccess.textContent = '';
progressContainer.style.display = 'block';
try {
const response = await fetch(endpoint, {
method: 'POST',
body: progressPrefix === 'lastfm'
? new URLSearchParams(new FormData(form))
: new FormData(form)
});
if (!response.ok) throw new Error('Failed to start import: ' + response.statusText);
const { job_id } = await response.json();
const eventSource = new EventSource(progressUrl + job_id);
eventSource.onmessage = function(event) {
const update = JSON.parse(event.data);
if (update.status === 'connected') return;
if (update.total_pages > 0) {
const completed = update.completed_pages || update.current_page || 0;
const percent = Math.round((completed / update.total_pages) * 100);
progressFill.style.width = percent + '%';
progressText.textContent = percent + '%';
progressStatus.textContent = 'Processing ' + formatLabel + ' ' + completed + ' of ' + update.total_pages;
}
if (update.tracks_imported !== undefined) {
progressTracks.textContent = update.tracks_imported.toLocaleString() + ' tracks imported';
}
if (update.status === 'completed') {
progressFill.classList.remove('animating');
progressStatus.textContent = 'Import completed!';
progressSuccess.textContent = 'Successfully imported ' + update.tracks_imported.toLocaleString() + ' tracks from ' + (progressPrefix === 'spotify' ? 'Spotify' : 'Last.fm');
eventSource.close();
form.reset();
} else if (update.status === 'error') {
progressFill.classList.remove('animating');
progressStatus.textContent = 'Import failed';
progressError.textContent = 'Error: ' + (update.error || 'Unknown error');
eventSource.close();
}
};
eventSource.onerror = function() {
progressFill.classList.remove('animating');
progressStatus.textContent = 'Connection error';
progressError.textContent = 'Lost connection to server. The import may still be running in the background.';
eventSource.close();
};
} catch (err) {
progressFill.classList.remove('animating');
progressStatus.textContent = 'Import failed';
progressError.textContent = 'Error: ' + err.message;
}
});
}
handleImport('spotify-form', 'spotify', '/import/spotify', '/import/spotify/progress?job=', 'batch');
handleImport('lastfm-form', 'lastfm', '/import/lastfm', '/import/lastfm/progress?job=', 'page');

View File

@@ -19,10 +19,21 @@
<div class="import-section">
<h2>Spotify</h2>
<p>Import your Spotify listening history from your data export.</p>
<form method="POST" action="/import/spotify" enctype="multipart/form-data">
<form id="spotify-form" method="POST" action="/import/spotify" enctype="multipart/form-data">
<input type="file" name="json_files" accept=".json,application/json" multiple required>
<button type="submit">Upload Spotify Data</button>
</form>
<div id="spotify-progress" class="progress-container" style="display: none;">
<div class="progress-status" id="spotify-progress-status">Initializing...</div>
<div class="progress-bar-wrapper">
<div class="progress-bar-fill" id="spotify-progress-fill"></div>
<div class="progress-text" id="spotify-progress-text">0%</div>
</div>
<div class="progress-tracks" id="spotify-progress-tracks"></div>
<div class="progress-error" id="spotify-progress-error"></div>
<div class="progress-success" id="spotify-progress-success"></div>
</div>
</div>
<div class="import-section">
@@ -35,114 +46,18 @@
</form>
<div id="lastfm-progress" class="progress-container" style="display: none;">
<div class="progress-status" id="progress-status">Initializing...</div>
<div class="progress-status" id="lastfm-progress-status">Initializing...</div>
<div class="progress-bar-wrapper">
<div class="progress-bar-fill" id="progress-fill"></div>
<div class="progress-text" id="progress-text">0%</div>
<div class="progress-bar-fill" id="lastfm-progress-fill"></div>
<div class="progress-text" id="lastfm-progress-text">0%</div>
</div>
<div class="progress-tracks" id="progress-tracks"></div>
<div class="progress-error" id="progress-error"></div>
<div class="progress-success" id="progress-success"></div>
<div class="progress-tracks" id="lastfm-progress-tracks"></div>
<div class="progress-error" id="lastfm-progress-error"></div>
<div class="progress-success" id="lastfm-progress-success"></div>
</div>
</div>
</div>
<script>
document.getElementById('lastfm-form').addEventListener('submit', async function(e) {
e.preventDefault();
const form = e.target;
const formData = new FormData(form);
const progressContainer = document.getElementById('lastfm-progress');
const progressFill = document.getElementById('progress-fill');
const progressText = document.getElementById('progress-text');
const progressStatus = document.getElementById('progress-status');
const progressTracks = document.getElementById('progress-tracks');
const progressError = document.getElementById('progress-error');
const progressSuccess = document.getElementById('progress-success');
// Reset and show progress container
progressFill.style.width = '0%';
progressFill.classList.add('animating');
progressText.textContent = '0%';
progressStatus.textContent = 'Starting import...';
progressTracks.textContent = '';
progressError.textContent = '';
progressSuccess.textContent = '';
progressContainer.style.display = 'block';
try {
// Convert FormData to URLSearchParams for proper form encoding
const params = new URLSearchParams();
for (const [key, value] of formData) {
params.append(key, value);
}
// Start the import
const response = await fetch('/import/lastfm', {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
},
body: params
});
if (!response.ok) {
throw new Error('Failed to start import: ' + response.statusText);
}
const data = await response.json();
const jobId = data.job_id;
// Connect to SSE endpoint
const eventSource = new EventSource('/import/lastfm/progress?job=' + jobId);
eventSource.onmessage = function(event) {
const update = JSON.parse(event.data);
if (update.status === 'connected') {
return;
}
if (update.total_pages > 0) {
const completed = update.completed_pages || update.current_page || 0;
const percent = Math.round((completed / update.total_pages) * 100);
progressFill.style.width = percent + '%';
progressText.textContent = percent + '%';
progressStatus.textContent = 'Processing page ' + completed + ' of ' + update.total_pages;
}
if (update.tracks_imported !== undefined) {
progressTracks.textContent = update.tracks_imported.toLocaleString() + ' tracks imported';
}
if (update.status === 'completed') {
progressFill.classList.remove('animating');
progressStatus.textContent = 'Import completed!';
progressSuccess.textContent = 'Successfully imported ' + update.tracks_imported.toLocaleString() + ' tracks from Last.fm';
eventSource.close();
form.reset();
} else if (update.status === 'error') {
progressFill.classList.remove('animating');
progressStatus.textContent = 'Import failed';
progressError.textContent = 'Error: ' + (update.error || 'Unknown error');
eventSource.close();
}
};
eventSource.onerror = function(err) {
progressFill.classList.remove('animating');
progressStatus.textContent = 'Connection error';
progressError.textContent = 'Lost connection to server. The import may still be running in the background.';
eventSource.close();
};
} catch (err) {
progressFill.classList.remove('animating');
progressStatus.textContent = 'Import failed';
progressError.textContent = 'Error: ' + err.message;
}
});
</script>
<script src="/files/import.js"></script>
</body>
</html>

View File

@@ -506,10 +506,32 @@ func importSpotifyHandler(w http.ResponseWriter, r *http.Request) {
return
}
if err := migrate.ImportSpotify(allTracks, userId); err != nil {
http.Error(w, "Failed to process tracks", http.StatusInternalServerError)
jobID, err := generateID()
if err != nil {
fmt.Fprintf(os.Stderr, "Error generating jobID: %v\n", err)
http.Error(w, "Error generating jobID", http.StatusBadRequest)
return
}
progressChan := make(chan migrate.ProgressUpdate, 100)
jobsMu.Lock()
importJobs[jobID] = progressChan
jobsMu.Unlock()
go func() {
migrate.ImportSpotify(allTracks, userId, progressChan)
jobsMu.Lock()
delete(importJobs, jobID)
jobsMu.Unlock()
close(progressChan)
}()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{
"job_id": jobID,
"status": "started",
})
}
func importLastFMHandler(w http.ResponseWriter, r *http.Request) {
@@ -617,6 +639,50 @@ func importLastFMProgressHandler(w http.ResponseWriter, r *http.Request) {
}
}
func importSpotifyProgressHandler(w http.ResponseWriter, r *http.Request) {
jobID := r.URL.Query().Get("job")
if jobID == "" {
http.Error(w, "Missing job ID", http.StatusBadRequest)
return
}
jobsMu.RLock()
job, exists := importJobs[jobID]
jobsMu.RUnlock()
if !exists {
http.Error(w, "Job not found", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming not supported", http.StatusInternalServerError)
return
}
fmt.Fprintf(w, "data: %s\n\n", `{"status":"connected"}`)
flusher.Flush()
for update := range job {
data, err := json.Marshal(update)
if err != nil {
continue
}
fmt.Fprintf(w, "data: %s\n\n", string(data))
flusher.Flush()
if update.Status == "completed" || update.Status == "error" {
return
}
}
}
func Start() {
addr := ":1234"
r := chi.NewRouter()
@@ -632,6 +698,7 @@ func Start() {
r.Post("/import/lastfm", importLastFMHandler)
r.Post("/import/spotify", importSpotifyHandler)
r.Get("/import/lastfm/progress", importLastFMProgressHandler)
r.Get("/import/spotify/progress", importSpotifyProgressHandler)
fmt.Printf("WebUI starting on %s\n", addr)
prot := http.NewCrossOriginProtection()
http.ListenAndServe(addr, prot.Handler(r))