From fd2e2b0f8a7c620db787008885cdcb82e4e67e32 Mon Sep 17 00:00:00 2001 From: riwiwa Date: Wed, 11 Feb 2026 17:23:17 -0800 Subject: [PATCH] add comments and increase readability --- migrate/spotify.go | 462 ++++++++++++++++++++++++--------------------- web/web.go | 2 - 2 files changed, 246 insertions(+), 218 deletions(-) diff --git a/migrate/spotify.go b/migrate/spotify.go index ef323c1..77b4ff9 100644 --- a/migrate/spotify.go +++ b/migrate/spotify.go @@ -1,5 +1,13 @@ package migrate +// Spotify import functionality for migrating Spotify listening history +// from JSON export files into the database + +// This package handles: +// - Parsing Spotify JSON track data +// - Batch processing with deduplication (20-second window) +// - Efficient bulk inserts using pgx.CopyFrom + import ( "context" "fmt" @@ -12,231 +20,65 @@ import ( "github.com/jackc/pgx/v5" ) -const batchSize = 1000 +const ( + batchSize = 1000 + minPlayTime = 20000 // 20000 ms = 20 sec + timeDiff = 20 * time.Second +) +// Represents a single listening event from Spotify's JSON export format type SpotifyTrack struct { - Timestamp string `json:"ts"` - Played int `json:"ms_played"` - Name string `json:"master_metadata_track_name"` - Artist string `json:"master_metadata_album_artist_name"` - Album string `json:"master_metadata_album_album_name"` + Timestamp time.Time `json:"ts"` + Played int `json:"ms_played"` + Name string `json:"master_metadata_track_name"` + Artist string `json:"master_metadata_album_artist_name"` + Album string `json:"master_metadata_album_album_name"` } +// Implements pgx.CopyFromSource for efficient bulk inserts. +// Filters out duplicates in-memory before sending to PostgreSQL type trackSource struct { - tracks []SpotifyTrack - tracksToSkip map[string]struct{} - idx int - userId int + tracks []SpotifyTrack // Full batch of tracks to process + tracksToSkip map[string]struct{} // Set of duplicate keys to skip + idx int // Current position in tracks slice + userId int // User ID to associate with imported tracks } +// Represents a track already stored in the database, used for duplicate +// detection during import type dbTrack struct { Timestamp time.Time SongName string Artist string } -func (s *trackSource) Next() bool { - for s.idx < len(s.tracks) { - t := s.tracks[s.idx] - ts, err := normalizeTs(t.Timestamp) - if err != nil { - fmt.Fprintf(os.Stderr, "Error normalizing timestamp: %v\n", err) - s.idx++ - continue - } - key := fmt.Sprintf("%s|%s|%s", t.Artist, t.Name, ts) - if _, shouldSkip := s.tracksToSkip[key]; shouldSkip { - s.idx++ - continue - } - s.idx++ - return true - } - return false -} +// Import Spotify listening history into the database. +// Processes tracks in batches of 1000 (default), filters out tracks played < +// 20 seconds, deduplicates against existing data, and sends progress updates +// via progressChan. +// The progressChan must not be closed by the caller. The receiver should +// stop reading when Status is "completed". This avoids panics from +// sending on a closed channel. -func (s *trackSource) Values() ([]any, error) { - // idx is already incremented in Next(), so use idx-1 - t := s.tracks[s.idx-1] - ts, err := time.Parse(time.RFC3339Nano, t.Timestamp) - if err != nil { - return nil, err - } - return []any{ - s.userId, - ts, - t.Name, - t.Artist, - t.Album, - t.Played, - "spotify", - }, nil -} - -func (s *trackSource) Err() error { - return nil -} - -func normalizeTs(ts string) (string, error) { - t, err := time.Parse(time.RFC3339Nano, ts) - if err != nil { - return "", err - } - return t.Format(time.RFC3339Nano), nil -} - -func getExistingTracks( - userId int, tracks []SpotifyTrack, -) (map[string]struct{}, error) { - minTs, maxTs := findTimeRange(tracks) - if minTs.IsZero() { - return map[string]struct{}{}, nil - } - - dbTracks, err := fetchDbTracks(userId, minTs, maxTs) - if err != nil { - return nil, err - } - - dbIndex := buildDbTrackIndex(dbTracks) - - return findDuplicates(tracks, dbIndex), nil -} - -// get the min/max timestamp range for a batch of tracks -func findTimeRange(tracks []SpotifyTrack) (time.Time, time.Time) { - var minTs, maxTs time.Time - for _, t := range tracks { - ts, err := time.Parse(time.RFC3339Nano, t.Timestamp) - if err != nil { - continue - } - if minTs.IsZero() || ts.Before(minTs) { - minTs = ts - } - if ts.After(maxTs) { - maxTs = ts - } - } - return minTs, maxTs -} - -/* - get all tracks in the database for a user that have the same timestamp - range as the current batch -*/ -func fetchDbTracks(userId int, minTs, maxTs time.Time) ([]dbTrack, error) { - rows, err := db.Pool.Query(context.Background(), - `SELECT song_name, artist, timestamp - FROM history - WHERE user_id = $1 - AND timestamp BETWEEN $2 AND $3`, - userId, - // adjust 20 seconds to find duplicates on edges of batch - minTs.Add(-20*time.Second), - maxTs.Add(20*time.Second)) - if err != nil { - return nil, err - } - defer rows.Close() - - var dbTracks []dbTrack - for rows.Next() { - var t dbTrack - if err := rows.Scan(&t.SongName, &t.Artist, &t.Timestamp); err != nil { - continue - } - dbTracks = append(dbTracks, t) - } - return dbTracks, nil -} - -func buildDbTrackIndex(tracks []dbTrack) map[string][]time.Time { - index := make(map[string][]time.Time) - for _, t := range tracks { - key := t.Artist + "|" + t.SongName - index[key] = append(index[key], t.Timestamp) - } - return index -} - -func findDuplicates(tracks []SpotifyTrack, dbIndex map[string][]time.Time) map[string]struct{} { - duplicates := make(map[string]struct{}) - seenInBatch := make(map[string]struct{}) - - for _, track := range tracks { - trackKey, err := createTrackKey(track) - if err != nil { - continue - } - - // in batch check - if _, seen := seenInBatch[trackKey]; seen { - duplicates[trackKey] = struct{}{} - continue - } - seenInBatch[trackKey] = struct{}{} - - // in db check - lookupKey := fmt.Sprintf("%s|%s", track.Artist, track.Name) - if dbTimestamps, found := dbIndex[lookupKey]; found { - if isDuplicateWithinWindow(track, dbTimestamps) { - duplicates[trackKey] = struct{}{} - } - } - } - - return duplicates -} - -func createTrackKey(track SpotifyTrack) (string, error) { - ts, err := normalizeTs(track.Timestamp) - if err != nil { - return "", err - } - return fmt.Sprintf("%s|%s|%s", track.Artist, track.Name, ts), nil -} - -// check if a track timestamp falls < 20 seconds of another -func isDuplicateWithinWindow(track SpotifyTrack, existingTimestamps []time.Time) bool { - trackTime, err := time.Parse(time.RFC3339Nano, track.Timestamp) - if err != nil { - return false - } - for _, existingTime := range existingTimestamps { - diff := trackTime.Sub(existingTime) - if diff < 0 { - diff = -diff - } - if diff < 20*time.Second { - return true - } - } - return false -} - -func ImportSpotify(tracks []SpotifyTrack, userId int, progressChan chan ProgressUpdate) error { +func ImportSpotify(tracks []SpotifyTrack, + userId int, progressChan chan ProgressUpdate, +) { totalImported := 0 totalTracks := len(tracks) batchStart := 0 totalBatches := (totalTracks + batchSize - 1) / batchSize // Send initial progress update - if progressChan != nil { - progressChan <- ProgressUpdate{ - TotalPages: totalBatches, - Status: "running", - } - } + sendProgressUpdate(progressChan, 0, 0, totalBatches, totalImported, "running") for batchStart < totalTracks { - // cap batchEnd at total track count on final batch to prevent OOB error + // Cap batchEnd at total track count on final batch to prevent OOB error batchEnd := min(batchStart+batchSize, totalTracks) currentBatch := (batchStart / batchSize) + 1 var validTracks []SpotifyTrack for i := batchStart; i < batchEnd; i++ { - if tracks[i].Played >= 20000 && // 20 seconds + if tracks[i].Played >= minPlayTime && tracks[i].Name != "" && tracks[i].Artist != "" { validTracks = append(validTracks, tracks[i]) @@ -246,19 +88,18 @@ func ImportSpotify(tracks []SpotifyTrack, userId int, progressChan chan Progress if len(validTracks) == 0 { batchStart += batchSize // Send progress update even for empty batches - if progressChan != nil { - progressChan <- ProgressUpdate{ - CurrentPage: currentBatch, - CompletedPages: currentBatch, - TotalPages: totalBatches, - TracksImported: totalImported, - Status: "running", - } - } + sendProgressUpdate( + progressChan, + currentBatch, + currentBatch, + totalBatches, + totalImported, + "running", + ) continue } - tracksToSkip, err := getExistingTracks(userId, validTracks) + tracksToSkip, err := getDupes(userId, validTracks) if err != nil { fmt.Fprintf(os.Stderr, "Error checking existing tracks: %v\n", err) batchStart += batchSize @@ -286,6 +127,7 @@ func ImportSpotify(tracks []SpotifyTrack, userId int, progressChan chan Progress }, src, ) + // Do not log errors that come from adding duplicate songs if err != nil { if !strings.Contains(err.Error(), "duplicate") { fmt.Fprintf(os.Stderr, "Spotify batch insert failed: %v\n", err) @@ -294,16 +136,204 @@ func ImportSpotify(tracks []SpotifyTrack, userId int, progressChan chan Progress totalImported += int(copyCount) } - // Send progress update - if progressChan != nil { - progressChan <- ProgressUpdate{ - CurrentPage: currentBatch, - CompletedPages: currentBatch, - TotalPages: totalBatches, - TracksImported: totalImported, - Status: "running", + sendProgressUpdate( + progressChan, + currentBatch, + currentBatch, + totalBatches, + totalImported, + "running", + ) + + batchStart += batchSize + } + + sendProgressUpdate( + progressChan, + totalBatches, + totalBatches, + totalBatches, + totalImported, + "completed", + ) +} + +// Sends a progress update to the channel if it's not nil. +// To avoid panics from sending on a closed channel, the channel +// must never be closed by the receiver. The receiver should stop reading when +// Status reads "completed". +func sendProgressUpdate( + ch chan ProgressUpdate, + current, completed, total, imported int, + status string, +) { + if ch != nil { + ch <- ProgressUpdate{ + CurrentPage: current, + CompletedPages: completed, + TotalPages: total, + TracksImported: imported, + Status: status, + } + } +} + +// Finds tracks that already exist in the database or are duplicates within the +// current batch, using a 20-second window to handle minor timestamp variations +func getDupes(userId int, tracks []SpotifyTrack) (map[string]struct{}, error) { + minTs, maxTs := findTimeRange(tracks) + if minTs.IsZero() { + return map[string]struct{}{}, nil + } + + dbTracks, err := fetchDbTracks(userId, minTs, maxTs) + if err != nil { + return nil, err + } + + dbIndex := buildDbTrackIndex(dbTracks) + duplicates := make(map[string]struct{}) + seenInBatch := make(map[string]struct{}) + + for _, track := range tracks { + trackKey := createTrackKey(track) + + // Check in batch + if _, seen := seenInBatch[trackKey]; seen { + duplicates[trackKey] = struct{}{} + continue + } + seenInBatch[trackKey] = struct{}{} + + // Check in DB + lookupKey := fmt.Sprintf("%s|%s", track.Artist, track.Name) + if dbTimestamps, found := dbIndex[lookupKey]; found { + if isDuplicateWithinWindow(track, dbTimestamps) { + duplicates[trackKey] = struct{}{} } } + } + + return duplicates, nil +} + +// Get the min/max timestamp range for a batch of tracks +func findTimeRange(tracks []SpotifyTrack) (time.Time, time.Time) { + var minTs, maxTs time.Time + for _, t := range tracks { + if minTs.IsZero() || t.Timestamp.Before(minTs) { + minTs = t.Timestamp + } + if t.Timestamp.After(maxTs) { + maxTs = t.Timestamp + } + } + return minTs, maxTs +} + +// Get all tracks in the database for a user that have the same timestamp +// range as the current batch +func fetchDbTracks(userId int, minTs, maxTs time.Time) ([]dbTrack, error) { + rows, err := db.Pool.Query(context.Background(), + `SELECT song_name, artist, timestamp + FROM history + WHERE user_id = $1 + AND timestamp BETWEEN $2 AND $3`, + userId, + // Adjust 20 seconds to find duplicates on edges of batch + minTs.Add(-timeDiff), + maxTs.Add(timeDiff)) + if err != nil { + return nil, err + } + defer rows.Close() + + var dbTracks []dbTrack + for rows.Next() { + var t dbTrack + if err := rows.Scan(&t.SongName, &t.Artist, &t.Timestamp); err != nil { + continue + } + dbTracks = append(dbTracks, t) + } + err = rows.Err() + if err != nil { + return nil, err + } + return dbTracks, nil +} + +// Create a lookup map from Artist|Name to timestamps for efficient duplicate +// detection. +func buildDbTrackIndex(tracks []dbTrack) map[string][]time.Time { + index := make(map[string][]time.Time) + for _, t := range tracks { + key := t.Artist + "|" + t.SongName + index[key] = append(index[key], t.Timestamp) + } + return index +} + +// Generate a unique identifier for a track using artist, name, and +// normalized timestamp. +func createTrackKey(track SpotifyTrack) string { + ts := track.Timestamp.Format(time.RFC3339Nano) + return fmt.Sprintf("%s|%s|%s", track.Artist, track.Name, ts) +} + +// Check if a track timestamp falls < 20 seconds of another +func isDuplicateWithinWindow(track SpotifyTrack, + existingTimestamps []time.Time, +) bool { + for _, existingTime := range existingTimestamps { + diff := track.Timestamp.Sub(existingTime) + if diff < 0 { + diff = -diff + } + if diff < timeDiff { + return true + } + } + return false +} + +// Advances to the next valid track, skipping duplicates and invalid timestamps. +// Returns false when all tracks have been processed +func (s *trackSource) Next() bool { + for s.idx < len(s.tracks) { + t := s.tracks[s.idx] + key := createTrackKey(t) + if _, shouldSkip := s.tracksToSkip[key]; shouldSkip { + s.idx++ + continue + } + s.idx++ + return true + } + return false +} + +// Returns the current track's data formatted for database insertion. +// Must only be called after Next() returns true +func (s *trackSource) Values() ([]any, error) { + // idx is already incremented in Next(), so use idx-1 + t := s.tracks[s.idx-1] + return []any{ + s.userId, + t.Timestamp, + t.Name, + t.Artist, + t.Album, + t.Played, + "spotify", + }, nil +} + +// Returns any error encountered during iteration. +// Currently always returns nil as errors are logged in Next() +func (s *trackSource) Err() error { + return nil +} batchStart += batchSize } diff --git a/web/web.go b/web/web.go index b210b4a..4e4cbcb 100644 --- a/web/web.go +++ b/web/web.go @@ -524,7 +524,6 @@ func importSpotifyHandler(w http.ResponseWriter, r *http.Request) { jobsMu.Lock() delete(importJobs, jobID) jobsMu.Unlock() - close(progressChan) }() w.Header().Set("Content-Type", "application/json") @@ -585,7 +584,6 @@ func importLastFMHandler(w http.ResponseWriter, r *http.Request) { jobsMu.Lock() delete(importJobs, jobID) jobsMu.Unlock() - close(progressChan) }() w.Header().Set("Content-Type", "application/json")