Compare commits

..

2 Commits

Author SHA1 Message Date
1478410d0c remove redundant time parsing 2026-02-11 17:23:39 -08:00
fd2e2b0f8a add comments and increase readability 2026-02-11 17:23:17 -08:00
2 changed files with 262 additions and 228 deletions

View File

@@ -1,7 +1,16 @@
package migrate package migrate
// Spotify import functionality for migrating Spotify listening history
// from JSON export files into the database
// This package handles:
// - Parsing Spotify JSON track data
// - Batch processing with deduplication (20-second window)
// - Efficient bulk inserts using pgx.CopyFrom
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"os" "os"
"strings" "strings"
@@ -12,231 +21,65 @@ import (
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
) )
const batchSize = 1000 const (
batchSize = 1000
minPlayTime = 20000 // 20000 ms = 20 sec
timeDiff = 20 * time.Second
)
// Represents a single listening event from Spotify's JSON export format
type SpotifyTrack struct { type SpotifyTrack struct {
Timestamp string `json:"ts"` Timestamp time.Time `json:"ts"`
Played int `json:"ms_played"` Played int `json:"ms_played"`
Name string `json:"master_metadata_track_name"` Name string `json:"master_metadata_track_name"`
Artist string `json:"master_metadata_album_artist_name"` Artist string `json:"master_metadata_album_artist_name"`
Album string `json:"master_metadata_album_album_name"` Album string `json:"master_metadata_album_album_name"`
} }
// Implements pgx.CopyFromSource for efficient bulk inserts.
// Filters out duplicates in-memory before sending to PostgreSQL
type trackSource struct { type trackSource struct {
tracks []SpotifyTrack tracks []SpotifyTrack // Full batch of tracks to process
tracksToSkip map[string]struct{} tracksToSkip map[string]struct{} // Set of duplicate keys to skip
idx int idx int // Current position in tracks slice
userId int userId int // User ID to associate with imported tracks
} }
// Represents a track already stored in the database, used for duplicate
// detection during import
type dbTrack struct { type dbTrack struct {
Timestamp time.Time Timestamp time.Time
SongName string SongName string
Artist string Artist string
} }
func (s *trackSource) Next() bool { // Import Spotify listening history into the database.
for s.idx < len(s.tracks) { // Processes tracks in batches of 1000 (default), filters out tracks played <
t := s.tracks[s.idx] // 20 seconds, deduplicates against existing data, and sends progress updates
ts, err := normalizeTs(t.Timestamp) // via progressChan.
if err != nil { // The progressChan must not be closed by the caller. The receiver should
fmt.Fprintf(os.Stderr, "Error normalizing timestamp: %v\n", err) // stop reading when Status is "completed". This avoids panics from
s.idx++ // sending on a closed channel.
continue
}
key := fmt.Sprintf("%s|%s|%s", t.Artist, t.Name, ts)
if _, shouldSkip := s.tracksToSkip[key]; shouldSkip {
s.idx++
continue
}
s.idx++
return true
}
return false
}
func (s *trackSource) Values() ([]any, error) { func ImportSpotify(tracks []SpotifyTrack,
// idx is already incremented in Next(), so use idx-1 userId int, progressChan chan ProgressUpdate,
t := s.tracks[s.idx-1] ) {
ts, err := time.Parse(time.RFC3339Nano, t.Timestamp)
if err != nil {
return nil, err
}
return []any{
s.userId,
ts,
t.Name,
t.Artist,
t.Album,
t.Played,
"spotify",
}, nil
}
func (s *trackSource) Err() error {
return nil
}
func normalizeTs(ts string) (string, error) {
t, err := time.Parse(time.RFC3339Nano, ts)
if err != nil {
return "", err
}
return t.Format(time.RFC3339Nano), nil
}
func getExistingTracks(
userId int, tracks []SpotifyTrack,
) (map[string]struct{}, error) {
minTs, maxTs := findTimeRange(tracks)
if minTs.IsZero() {
return map[string]struct{}{}, nil
}
dbTracks, err := fetchDbTracks(userId, minTs, maxTs)
if err != nil {
return nil, err
}
dbIndex := buildDbTrackIndex(dbTracks)
return findDuplicates(tracks, dbIndex), nil
}
// get the min/max timestamp range for a batch of tracks
func findTimeRange(tracks []SpotifyTrack) (time.Time, time.Time) {
var minTs, maxTs time.Time
for _, t := range tracks {
ts, err := time.Parse(time.RFC3339Nano, t.Timestamp)
if err != nil {
continue
}
if minTs.IsZero() || ts.Before(minTs) {
minTs = ts
}
if ts.After(maxTs) {
maxTs = ts
}
}
return minTs, maxTs
}
/*
get all tracks in the database for a user that have the same timestamp
range as the current batch
*/
func fetchDbTracks(userId int, minTs, maxTs time.Time) ([]dbTrack, error) {
rows, err := db.Pool.Query(context.Background(),
`SELECT song_name, artist, timestamp
FROM history
WHERE user_id = $1
AND timestamp BETWEEN $2 AND $3`,
userId,
// adjust 20 seconds to find duplicates on edges of batch
minTs.Add(-20*time.Second),
maxTs.Add(20*time.Second))
if err != nil {
return nil, err
}
defer rows.Close()
var dbTracks []dbTrack
for rows.Next() {
var t dbTrack
if err := rows.Scan(&t.SongName, &t.Artist, &t.Timestamp); err != nil {
continue
}
dbTracks = append(dbTracks, t)
}
return dbTracks, nil
}
func buildDbTrackIndex(tracks []dbTrack) map[string][]time.Time {
index := make(map[string][]time.Time)
for _, t := range tracks {
key := t.Artist + "|" + t.SongName
index[key] = append(index[key], t.Timestamp)
}
return index
}
func findDuplicates(tracks []SpotifyTrack, dbIndex map[string][]time.Time) map[string]struct{} {
duplicates := make(map[string]struct{})
seenInBatch := make(map[string]struct{})
for _, track := range tracks {
trackKey, err := createTrackKey(track)
if err != nil {
continue
}
// in batch check
if _, seen := seenInBatch[trackKey]; seen {
duplicates[trackKey] = struct{}{}
continue
}
seenInBatch[trackKey] = struct{}{}
// in db check
lookupKey := fmt.Sprintf("%s|%s", track.Artist, track.Name)
if dbTimestamps, found := dbIndex[lookupKey]; found {
if isDuplicateWithinWindow(track, dbTimestamps) {
duplicates[trackKey] = struct{}{}
}
}
}
return duplicates
}
func createTrackKey(track SpotifyTrack) (string, error) {
ts, err := normalizeTs(track.Timestamp)
if err != nil {
return "", err
}
return fmt.Sprintf("%s|%s|%s", track.Artist, track.Name, ts), nil
}
// check if a track timestamp falls < 20 seconds of another
func isDuplicateWithinWindow(track SpotifyTrack, existingTimestamps []time.Time) bool {
trackTime, err := time.Parse(time.RFC3339Nano, track.Timestamp)
if err != nil {
return false
}
for _, existingTime := range existingTimestamps {
diff := trackTime.Sub(existingTime)
if diff < 0 {
diff = -diff
}
if diff < 20*time.Second {
return true
}
}
return false
}
func ImportSpotify(tracks []SpotifyTrack, userId int, progressChan chan ProgressUpdate) error {
totalImported := 0 totalImported := 0
totalTracks := len(tracks) totalTracks := len(tracks)
batchStart := 0 batchStart := 0
totalBatches := (totalTracks + batchSize - 1) / batchSize totalBatches := (totalTracks + batchSize - 1) / batchSize
// Send initial progress update // Send initial progress update
if progressChan != nil { sendProgressUpdate(progressChan, 0, 0, totalBatches, totalImported, "running")
progressChan <- ProgressUpdate{
TotalPages: totalBatches,
Status: "running",
}
}
for batchStart < totalTracks { for batchStart < totalTracks {
// cap batchEnd at total track count on final batch to prevent OOB error // Cap batchEnd at total track count on final batch to prevent OOB error
batchEnd := min(batchStart+batchSize, totalTracks) batchEnd := min(batchStart+batchSize, totalTracks)
currentBatch := (batchStart / batchSize) + 1 currentBatch := (batchStart / batchSize) + 1
var validTracks []SpotifyTrack var validTracks []SpotifyTrack
for i := batchStart; i < batchEnd; i++ { for i := batchStart; i < batchEnd; i++ {
if tracks[i].Played >= 20000 && // 20 seconds if tracks[i].Played >= minPlayTime &&
tracks[i].Name != "" && tracks[i].Name != "" &&
tracks[i].Artist != "" { tracks[i].Artist != "" {
validTracks = append(validTracks, tracks[i]) validTracks = append(validTracks, tracks[i])
@@ -246,19 +89,18 @@ func ImportSpotify(tracks []SpotifyTrack, userId int, progressChan chan Progress
if len(validTracks) == 0 { if len(validTracks) == 0 {
batchStart += batchSize batchStart += batchSize
// Send progress update even for empty batches // Send progress update even for empty batches
if progressChan != nil { sendProgressUpdate(
progressChan <- ProgressUpdate{ progressChan,
CurrentPage: currentBatch, currentBatch,
CompletedPages: currentBatch, currentBatch,
TotalPages: totalBatches, totalBatches,
TracksImported: totalImported, totalImported,
Status: "running", "running",
} )
}
continue continue
} }
tracksToSkip, err := getExistingTracks(userId, validTracks) tracksToSkip, err := getDupes(userId, validTracks)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "Error checking existing tracks: %v\n", err) fmt.Fprintf(os.Stderr, "Error checking existing tracks: %v\n", err)
batchStart += batchSize batchStart += batchSize
@@ -286,6 +128,7 @@ func ImportSpotify(tracks []SpotifyTrack, userId int, progressChan chan Progress
}, },
src, src,
) )
// Do not log errors that come from adding duplicate songs
if err != nil { if err != nil {
if !strings.Contains(err.Error(), "duplicate") { if !strings.Contains(err.Error(), "duplicate") {
fmt.Fprintf(os.Stderr, "Spotify batch insert failed: %v\n", err) fmt.Fprintf(os.Stderr, "Spotify batch insert failed: %v\n", err)
@@ -294,30 +137,223 @@ func ImportSpotify(tracks []SpotifyTrack, userId int, progressChan chan Progress
totalImported += int(copyCount) totalImported += int(copyCount)
} }
// Send progress update sendProgressUpdate(
if progressChan != nil { progressChan,
progressChan <- ProgressUpdate{ currentBatch,
CurrentPage: currentBatch, currentBatch,
CompletedPages: currentBatch, totalBatches,
TotalPages: totalBatches, totalImported,
TracksImported: totalImported, "running",
Status: "running", )
}
}
batchStart += batchSize batchStart += batchSize
} }
// Send completion update sendProgressUpdate(
if progressChan != nil { progressChan,
progressChan <- ProgressUpdate{ totalBatches,
CurrentPage: totalBatches, totalBatches,
CompletedPages: totalBatches, totalBatches,
TotalPages: totalBatches, totalImported,
TracksImported: totalImported, "completed",
Status: "completed", )
}
// Sends a progress update to the channel if it's not nil.
// To avoid panics from sending on a closed channel, the channel
// must never be closed by the receiver. The receiver should stop reading when
// Status reads "completed".
func sendProgressUpdate(
ch chan ProgressUpdate,
current, completed, total, imported int,
status string,
) {
if ch != nil {
ch <- ProgressUpdate{
CurrentPage: current,
CompletedPages: completed,
TotalPages: total,
TracksImported: imported,
Status: status,
}
}
}
// Finds tracks that already exist in the database or are duplicates within the
// current batch, using a 20-second window to handle minor timestamp variations
func getDupes(userId int, tracks []SpotifyTrack) (map[string]struct{}, error) {
minTs, maxTs := findTimeRange(tracks)
if minTs.IsZero() {
return map[string]struct{}{}, nil
}
dbTracks, err := fetchDbTracks(userId, minTs, maxTs)
if err != nil {
return nil, err
}
dbIndex := buildDbTrackIndex(dbTracks)
duplicates := make(map[string]struct{})
seenInBatch := make(map[string]struct{})
for _, track := range tracks {
trackKey := createTrackKey(track)
// Check in batch
if _, seen := seenInBatch[trackKey]; seen {
duplicates[trackKey] = struct{}{}
continue
}
seenInBatch[trackKey] = struct{}{}
// Check in DB
lookupKey := fmt.Sprintf("%s|%s", track.Artist, track.Name)
if dbTimestamps, found := dbIndex[lookupKey]; found {
if isDuplicateWithinWindow(track, dbTimestamps) {
duplicates[trackKey] = struct{}{}
}
} }
} }
return duplicates, nil
}
// Get the min/max timestamp range for a batch of tracks
func findTimeRange(tracks []SpotifyTrack) (time.Time, time.Time) {
var minTs, maxTs time.Time
for _, t := range tracks {
if minTs.IsZero() || t.Timestamp.Before(minTs) {
minTs = t.Timestamp
}
if t.Timestamp.After(maxTs) {
maxTs = t.Timestamp
}
}
return minTs, maxTs
}
// Get all tracks in the database for a user that have the same timestamp
// range as the current batch
func fetchDbTracks(userId int, minTs, maxTs time.Time) ([]dbTrack, error) {
rows, err := db.Pool.Query(context.Background(),
`SELECT song_name, artist, timestamp
FROM history
WHERE user_id = $1
AND timestamp BETWEEN $2 AND $3`,
userId,
// Adjust 20 seconds to find duplicates on edges of batch
minTs.Add(-timeDiff),
maxTs.Add(timeDiff))
if err != nil {
return nil, err
}
defer rows.Close()
var dbTracks []dbTrack
for rows.Next() {
var t dbTrack
if err := rows.Scan(&t.SongName, &t.Artist, &t.Timestamp); err != nil {
continue
}
dbTracks = append(dbTracks, t)
}
err = rows.Err()
if err != nil {
return nil, err
}
return dbTracks, nil
}
// Create a lookup map from Artist|Name to timestamps for efficient duplicate
// detection.
func buildDbTrackIndex(tracks []dbTrack) map[string][]time.Time {
index := make(map[string][]time.Time)
for _, t := range tracks {
key := t.Artist + "|" + t.SongName
index[key] = append(index[key], t.Timestamp)
}
return index
}
// Generate a unique identifier for a track using artist, name, and
// normalized timestamp.
func createTrackKey(track SpotifyTrack) string {
ts := track.Timestamp.Format(time.RFC3339Nano)
return fmt.Sprintf("%s|%s|%s", track.Artist, track.Name, ts)
}
// Check if a track timestamp falls < 20 seconds of another
func isDuplicateWithinWindow(track SpotifyTrack,
existingTimestamps []time.Time,
) bool {
for _, existingTime := range existingTimestamps {
diff := track.Timestamp.Sub(existingTime)
if diff < 0 {
diff = -diff
}
if diff < timeDiff {
return true
}
}
return false
}
// Advances to the next valid track, skipping duplicates and invalid timestamps.
// Returns false when all tracks have been processed
func (s *trackSource) Next() bool {
for s.idx < len(s.tracks) {
t := s.tracks[s.idx]
key := createTrackKey(t)
if _, shouldSkip := s.tracksToSkip[key]; shouldSkip {
s.idx++
continue
}
s.idx++
return true
}
return false
}
// Returns the current track's data formatted for database insertion.
// Must only be called after Next() returns true
func (s *trackSource) Values() ([]any, error) {
// idx is already incremented in Next(), so use idx-1
t := s.tracks[s.idx-1]
return []any{
s.userId,
t.Timestamp,
t.Name,
t.Artist,
t.Album,
t.Played,
"spotify",
}, nil
}
// Returns any error encountered during iteration.
// Currently always returns nil as errors are logged in Next()
func (s *trackSource) Err() error {
return nil
}
// Implements custom JSON unmarshaling to parse the timestamp
func (s *SpotifyTrack) UnmarshalJSON(data []byte) error {
type Alias SpotifyTrack
aux := &struct {
Timestamp string `json:"ts"`
*Alias
}{
Alias: (*Alias)(s),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
ts, err := time.Parse(time.RFC3339Nano, aux.Timestamp)
if err != nil {
return fmt.Errorf("parsing timestamp: %w", err)
}
s.Timestamp = ts
return nil return nil
} }

View File

@@ -524,7 +524,6 @@ func importSpotifyHandler(w http.ResponseWriter, r *http.Request) {
jobsMu.Lock() jobsMu.Lock()
delete(importJobs, jobID) delete(importJobs, jobID)
jobsMu.Unlock() jobsMu.Unlock()
close(progressChan)
}() }()
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
@@ -585,7 +584,6 @@ func importLastFMHandler(w http.ResponseWriter, r *http.Request) {
jobsMu.Lock() jobsMu.Lock()
delete(importJobs, jobID) delete(importJobs, jobID)
jobsMu.Unlock() jobsMu.Unlock()
close(progressChan)
}() }()
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")