From 7fe4d02721fe035635709b701254b2569d27c664 Mon Sep 17 00:00:00 2001 From: riwiwa Date: Mon, 9 Feb 2026 18:34:49 -0800 Subject: [PATCH] clean up backend spotify import functionality --- migrate/spotify.go | 231 ++++++++++++++++++++++++++------------------- 1 file changed, 134 insertions(+), 97 deletions(-) diff --git a/migrate/spotify.go b/migrate/spotify.go index b773b65..5b8d755 100644 --- a/migrate/spotify.go +++ b/migrate/spotify.go @@ -12,6 +12,8 @@ import ( "github.com/jackc/pgx/v5" ) +const batchSize = 1000 + type SpotifyTrack struct { Timestamp string `json:"ts"` Played int `json:"ms_played"` @@ -20,25 +22,30 @@ type SpotifyTrack struct { Album string `json:"master_metadata_album_album_name"` } -type existingTrack struct { +type trackSource struct { + tracks []SpotifyTrack + tracksToSkip map[string]struct{} + idx int + userId int +} + +type dbTrack struct { Timestamp time.Time SongName string Artist string } -// trackSource implements pgx.CopyFromSource for efficient bulk inserts -type trackSource struct { - tracks []SpotifyTrack - existing map[string]struct{} - idx int - userId int -} - func (s *trackSource) Next() bool { for s.idx < len(s.tracks) { t := s.tracks[s.idx] - key := fmt.Sprintf("%s|%s|%s", t.Artist, t.Name, t.Timestamp) - if _, exists := s.existing[key]; exists { + ts, err := normalizeTs(t.Timestamp) + if err != nil { + fmt.Fprintf(os.Stderr, "Error normalizing timestamp: %v\n", err) + s.idx++ + continue + } + key := fmt.Sprintf("%s|%s|%s", t.Artist, t.Name, ts) + if _, shouldSkip := s.tracksToSkip[key]; shouldSkip { s.idx++ continue } @@ -51,7 +58,6 @@ func (s *trackSource) Next() bool { func (s *trackSource) Values() ([]any, error) { // idx is already incremented in Next(), so use idx-1 t := s.tracks[s.idx-1] - // parse spotify string timestamp to a real time object ts, err := time.Parse(time.RFC3339Nano, t.Timestamp) if err != nil { return nil, err @@ -71,48 +77,62 @@ func (s *trackSource) Err() error { return nil } -// find tracks with the same artist and name in a 20 second +- timeframe +func normalizeTs(ts string) (string, error) { + t, err := time.Parse(time.RFC3339Nano, ts) + if err != nil { + return "", err + } + return t.Format(time.RFC3339Nano), nil +} + func getExistingTracks( - userId int, - tracks []SpotifyTrack, + userId int, tracks []SpotifyTrack, ) (map[string]struct{}, error) { - // check for empty track import - if len(tracks) == 0 { - return map[string]struct{}{}, nil - } - - // find min/max timestamps in this batch to create time window to - // search for duplicates - var minTs, maxTs time.Time - // go through each track (t) in the array - for _, t := range tracks { - // parse spotify timestamp into operational time datatype - ts, err := time.Parse(time.RFC3339Nano, t.Timestamp) - if err != nil { - continue - } - // if minTs uninitialized or timestamp predates minTs - if minTs.IsZero() || ts.Before(minTs) { - minTs = ts - } - // if timestamp comes after maxTs - if ts.After(maxTs) { - maxTs = ts - } - } - - // check if all parses failed, therefore no way to find duplicate by time + minTs, maxTs := findTimeRange(tracks) if minTs.IsZero() { return map[string]struct{}{}, nil } - // find all tracks within [min-20s, max+20s] window (duplicates) + dbTracks, err := fetchDbTracks(userId, minTs, maxTs) + if err != nil { + return nil, err + } + + dbIndex := buildDbTrackIndex(dbTracks) + + return findDuplicates(tracks, dbIndex), nil +} + +// get the min/max timestamp range for a batch of tracks +func findTimeRange(tracks []SpotifyTrack) (time.Time, time.Time) { + var minTs, maxTs time.Time + for _, t := range tracks { + ts, err := time.Parse(time.RFC3339Nano, t.Timestamp) + if err != nil { + continue + } + if minTs.IsZero() || ts.Before(minTs) { + minTs = ts + } + if ts.After(maxTs) { + maxTs = ts + } + } + return minTs, maxTs +} + +/* + get all tracks in the database for a user that have the same timestamp + range as the current batch +*/ +func fetchDbTracks(userId int, minTs, maxTs time.Time) ([]dbTrack, error) { rows, err := db.Pool.Query(context.Background(), `SELECT song_name, artist, timestamp FROM history WHERE user_id = $1 AND timestamp BETWEEN $2 AND $3`, userId, + // adjust 20 seconds to find duplicates on edges of batch minTs.Add(-20*time.Second), maxTs.Add(20*time.Second)) if err != nil { @@ -120,102 +140,118 @@ func getExistingTracks( } defer rows.Close() - // prepare map to hold duplicate track keys - existing := make(map[string]struct{}) - // create array of tracks - var existingTracks []existingTrack - // for each repeat play (-20s +20s) + var dbTracks []dbTrack for rows.Next() { - // write the data from json to the track in memory - var t existingTrack + var t dbTrack if err := rows.Scan(&t.SongName, &t.Artist, &t.Timestamp); err != nil { continue } - // add track in memory to existingTracks array - existingTracks = append(existingTracks, t) + dbTracks = append(dbTracks, t) } + return dbTracks, nil +} - // index existing tracks by artist|name for O(1) lookup - existingIndex := make(map[string][]time.Time) - for _, t := range existingTracks { +func buildDbTrackIndex(tracks []dbTrack) map[string][]time.Time { + index := make(map[string][]time.Time) + for _, t := range tracks { key := t.Artist + "|" + t.SongName - existingIndex[key] = append(existingIndex[key], t.Timestamp) + index[key] = append(index[key], t.Timestamp) } + return index +} - // check each new track against indexed existing tracks - for _, newTrack := range tracks { - newTs, err := time.Parse(time.RFC3339Nano, newTrack.Timestamp) +func findDuplicates(tracks []SpotifyTrack, dbIndex map[string][]time.Time) map[string]struct{} { + duplicates := make(map[string]struct{}) + seenInBatch := make(map[string]struct{}) + + for _, track := range tracks { + trackKey, err := createTrackKey(track) if err != nil { continue } - lookupKey := newTrack.Artist + "|" + newTrack.Name - if timestamps, found := existingIndex[lookupKey]; found { - for _, existTs := range timestamps { - diff := newTs.Sub(existTs) - if diff < 0 { - diff = -diff - } - if diff < 20*time.Second { - key := fmt.Sprintf( - "%s|%s|%s", - newTrack.Artist, - newTrack.Name, - newTrack.Timestamp, - ) - existing[key] = struct{}{} - break - } + // in batch check + if _, seen := seenInBatch[trackKey]; seen { + duplicates[trackKey] = struct{}{} + continue + } + seenInBatch[trackKey] = struct{}{} + + // in db check + lookupKey := fmt.Sprintf("%s|%s", track.Artist, track.Name) + if dbTimestamps, found := dbIndex[lookupKey]; found { + if isDuplicateWithinWindow(track, dbTimestamps) { + duplicates[trackKey] = struct{}{} } } } - return existing, nil + return duplicates +} + +func createTrackKey(track SpotifyTrack) (string, error) { + ts, err := normalizeTs(track.Timestamp) + if err != nil { + return "", err + } + return fmt.Sprintf("%s|%s|%s", track.Artist, track.Name, ts), nil +} + +// check if a track timestamp falls < 20 seconds of another +func isDuplicateWithinWindow(track SpotifyTrack, existingTimestamps []time.Time) bool { + trackTime, err := time.Parse(time.RFC3339Nano, track.Timestamp) + if err != nil { + return false + } + for _, existingTime := range existingTimestamps { + diff := trackTime.Sub(existingTime) + if diff < 0 { + diff = -diff + } + if diff < 20*time.Second { + return true + } + } + return false } func ImportSpotify(tracks []SpotifyTrack, userId int) error { totalImported := 0 - batchSize := 1000 + totalTracks := len(tracks) + batchStart := 0 - for batchStart := 0; batchStart < len(tracks); batchStart += batchSize { - // get the limit of the current batch - batchEnd := batchStart + batchSize - // set limit to track array length in current batch too big - if batchEnd > len(tracks) { - batchEnd = len(tracks) - } + for batchStart < totalTracks { + // cap batchEnd at total track count on final batch to prevent OOB error + batchEnd := min(batchStart+batchSize, totalTracks) - // create array to hold valid listens var validTracks []SpotifyTrack for i := batchStart; i < batchEnd; i++ { - // if current track is listened to for 20 sec and name and artist is not - // blank, add to validTracks array - if tracks[i].Played >= 20000 && tracks[i].Name != "" && tracks[i].Artist != "" { + if tracks[i].Played >= 20000 && // 20 seconds + tracks[i].Name != "" && + tracks[i].Artist != "" { validTracks = append(validTracks, tracks[i]) } } - // if there are no valid tracks in this batch, go to the next if len(validTracks) == 0 { + batchStart += batchSize continue } - // find replayed tracks in the batch that was just gathered - existing, err := getExistingTracks(userId, validTracks) + tracksToSkip, err := getExistingTracks(userId, validTracks) if err != nil { fmt.Fprintf(os.Stderr, "Error checking existing tracks: %v\n", err) + batchStart += batchSize continue } - // get data of struct pointer src := &trackSource{ - tracks: validTracks, - existing: existing, - idx: 0, - userId: userId, + tracks: validTracks, + tracksToSkip: tracksToSkip, + idx: 0, + userId: userId, } - // insert all valid tracks from current batch into db copyCount, err := db.Pool.CopyFrom( context.Background(), pgx.Identifier{"history"}, @@ -237,6 +273,7 @@ func ImportSpotify(tracks []SpotifyTrack, userId int) error { } else { totalImported += int(copyCount) } + batchStart += batchSize } return nil }