diff --git a/migrate/spotify.go b/migrate/spotify.go index 0c0531c..b773b65 100644 --- a/migrate/spotify.go +++ b/migrate/spotify.go @@ -1,43 +1,23 @@ package migrate import ( - "archive/zip" "context" - "encoding/json" "fmt" - "io" "os" - "path/filepath" "strings" "time" + "muzi/db" + "github.com/jackc/pgx/v5" ) type SpotifyTrack struct { - Timestamp string `json:"ts"` - Platform string `json:"-"` - Played int `json:"ms_played"` - Country string `json:"-"` - IP string `json:"-"` - Name string `json:"master_metadata_track_name"` - Artist string `json:"master_metadata_album_artist_name"` - Album string `json:"master_metadata_album_album_name"` - TrackURI string `json:"-"` - Episode string `json:"-"` - Show string `json:"-"` - EpisodeURI string `json:"-"` - Audiobook string `json:"-"` - AudiobookURI string `json:"-"` - AudiobookChapterURI string `json:"-"` - AudiobookChapter string `json:"-"` - ReasonStart string `json:"-"` - ReasonEnd string `json:"-"` - Shuffle bool `json:"-"` - Skipped bool `json:"-"` - Offline bool `json:"-"` - OfflineTimestamp int `json:"-"` - Incognito bool `json:"-"` + Timestamp string `json:"ts"` + Played int `json:"ms_played"` + Name string `json:"master_metadata_track_name"` + Artist string `json:"master_metadata_album_artist_name"` + Album string `json:"master_metadata_album_album_name"` } type existingTrack struct { @@ -46,32 +26,88 @@ type existingTrack struct { Artist string } -func getExistingTracks(conn *pgx.Conn, userId int, tracks []SpotifyTrack) (map[string]bool, error) { +// trackSource implements pgx.CopyFromSource for efficient bulk inserts +type trackSource struct { + tracks []SpotifyTrack + existing map[string]struct{} + idx int + userId int +} + +func (s *trackSource) Next() bool { + for s.idx < len(s.tracks) { + t := s.tracks[s.idx] + key := fmt.Sprintf("%s|%s|%s", t.Artist, t.Name, t.Timestamp) + if _, exists := s.existing[key]; exists { + s.idx++ + continue + } + s.idx++ + return true + } + return false +} + +func (s *trackSource) Values() ([]any, error) { + // idx is already incremented in Next(), so use idx-1 + t := s.tracks[s.idx-1] + // parse spotify string timestamp to a real time object + ts, err := time.Parse(time.RFC3339Nano, t.Timestamp) + if err != nil { + return nil, err + } + return []any{ + s.userId, + ts, + t.Name, + t.Artist, + t.Album, + t.Played, + "spotify", + }, nil +} + +func (s *trackSource) Err() error { + return nil +} + +// find tracks with the same artist and name in a 20 second +- timeframe +func getExistingTracks( + userId int, + tracks []SpotifyTrack, +) (map[string]struct{}, error) { + // check for empty track import if len(tracks) == 0 { - return map[string]bool{}, nil + return map[string]struct{}{}, nil } - // find min/max timestamps in this batch to create time window + // find min/max timestamps in this batch to create time window to + // search for duplicates var minTs, maxTs time.Time + // go through each track (t) in the array for _, t := range tracks { + // parse spotify timestamp into operational time datatype ts, err := time.Parse(time.RFC3339Nano, t.Timestamp) if err != nil { continue } + // if minTs uninitialized or timestamp predates minTs if minTs.IsZero() || ts.Before(minTs) { minTs = ts } + // if timestamp comes after maxTs if ts.After(maxTs) { maxTs = ts } } + // check if all parses failed, therefore no way to find duplicate by time if minTs.IsZero() { - return map[string]bool{}, nil + return map[string]struct{}{}, nil } - // query only tracks within [min-20s, max+20s] window using timestamp index - rows, err := conn.Query(context.Background(), + // find all tracks within [min-20s, max+20s] window (duplicates) + rows, err := db.Pool.Query(context.Background(), `SELECT song_name, artist, timestamp FROM history WHERE user_id = $1 @@ -84,25 +120,39 @@ func getExistingTracks(conn *pgx.Conn, userId int, tracks []SpotifyTrack) (map[s } defer rows.Close() - existing := make(map[string]bool) + // prepare map to hold duplicate track keys + existing := make(map[string]struct{}) + // create array of tracks var existingTracks []existingTrack + // for each repeat play (-20s +20s) for rows.Next() { + // write the data from json to the track in memory var t existingTrack if err := rows.Scan(&t.SongName, &t.Artist, &t.Timestamp); err != nil { continue } + // add track in memory to existingTracks array existingTracks = append(existingTracks, t) } - // check each incoming track against existing ones within 20 second window + // index existing tracks by artist|name for O(1) lookup + existingIndex := make(map[string][]time.Time) + for _, t := range existingTracks { + key := t.Artist + "|" + t.SongName + existingIndex[key] = append(existingIndex[key], t.Timestamp) + } + + // check each new track against indexed existing tracks for _, newTrack := range tracks { newTs, err := time.Parse(time.RFC3339Nano, newTrack.Timestamp) if err != nil { continue } - for _, existTrack := range existingTracks { - if newTrack.Name == existTrack.SongName && newTrack.Artist == existTrack.Artist { - diff := newTs.Sub(existTrack.Timestamp) + + lookupKey := newTrack.Artist + "|" + newTrack.Name + if timestamps, found := existingIndex[lookupKey]; found { + for _, existTs := range timestamps { + diff := newTs.Sub(existTs) if diff < 0 { diff = -diff } @@ -113,7 +163,7 @@ func getExistingTracks(conn *pgx.Conn, userId int, tracks []SpotifyTrack) (map[s newTrack.Name, newTrack.Timestamp, ) - existing[key] = true + existing[key] = struct{}{} break } } @@ -123,246 +173,69 @@ func getExistingTracks(conn *pgx.Conn, userId int, tracks []SpotifyTrack) (map[s return existing, nil } -func JsonToDB(jsonFile string, userId int) error { - conn, err := pgx.Connect( - context.Background(), - "postgres://postgres:postgres@localhost:5432/muzi", - ) - if err != nil { - fmt.Fprintf(os.Stderr, "Cannot connect to muzi database: %v\n", err) - panic(err) - } - defer conn.Close(context.Background()) - - jsonData, err := os.ReadFile(jsonFile) - if err != nil { - fmt.Fprintf(os.Stderr, "Cannot read %s: %v\n", jsonFile, err) - return err - } - var tracks []SpotifyTrack - err = json.Unmarshal(jsonData, &tracks) - if err != nil { - fmt.Fprintf(os.Stderr, "Cannot unmarshal %s: %v\n", jsonFile, err) - return err - } - +func ImportSpotify(tracks []SpotifyTrack, userId int) error { totalImported := 0 batchSize := 1000 for batchStart := 0; batchStart < len(tracks); batchStart += batchSize { + // get the limit of the current batch batchEnd := batchStart + batchSize + // set limit to track array length in current batch too big if batchEnd > len(tracks) { batchEnd = len(tracks) } + // create array to hold valid listens var validTracks []SpotifyTrack for i := batchStart; i < batchEnd; i++ { + // if current track is listened to for 20 sec and name and artist is not + // blank, add to validTracks array if tracks[i].Played >= 20000 && tracks[i].Name != "" && tracks[i].Artist != "" { validTracks = append(validTracks, tracks[i]) } } + // if there are no valid tracks in this batch, go to the next if len(validTracks) == 0 { continue } - existing, err := getExistingTracks(conn, userId, validTracks) + // find replayed tracks in the batch that was just gathered + existing, err := getExistingTracks(userId, validTracks) if err != nil { fmt.Fprintf(os.Stderr, "Error checking existing tracks: %v\n", err) continue } - var batchValues []string - var batchArgs []any - - for _, t := range validTracks { - key := fmt.Sprintf("%s|%s|%s", t.Artist, t.Name, t.Timestamp) - if existing[key] { - continue - } - - batchValues = append(batchValues, fmt.Sprintf( - "($%d, $%d, $%d, $%d, $%d, $%d, $%d)", - len(batchArgs)+1, - len(batchArgs)+2, - len(batchArgs)+3, - len(batchArgs)+4, - len(batchArgs)+5, - len(batchArgs)+6, - len(batchArgs)+7, - )) - batchArgs = append( - batchArgs, - userId, - t.Timestamp, - t.Name, - t.Artist, - t.Album, - t.Played, - "spotify", - ) + // get data of struct pointer + src := &trackSource{ + tracks: validTracks, + existing: existing, + idx: 0, + userId: userId, } - if len(batchValues) == 0 { - continue - } - - // TODO: replace strings.Join with pgx copy - - _, err = conn.Exec( + // insert all valid tracks from current batch into db + copyCount, err := db.Pool.CopyFrom( context.Background(), - `INSERT INTO history (user_id, timestamp, song_name, artist, album_name, ms_played, platform) VALUES `+ - strings.Join( - batchValues, - ", ", - )+ - ` ON CONFLICT ON CONSTRAINT history_user_id_song_name_artist_timestamp_key DO NOTHING;`, - batchArgs..., + pgx.Identifier{"history"}, + []string{ + "user_id", + "timestamp", + "song_name", + "artist", + "album_name", + "ms_played", + "platform", + }, + src, ) if err != nil { - fmt.Fprintf(os.Stderr, "Batch insert failed: %v\n", err) + if !strings.Contains(err.Error(), "duplicate") { + fmt.Fprintf(os.Stderr, "Spotify batch insert failed: %v\n", err) + } } else { - totalImported += len(batchValues) - } - } - - fmt.Printf("%d tracks imported from %s\n", totalImported, jsonFile) - return nil -} - -func AddDirToDB(path string, userId int) error { - dirs, err := os.ReadDir(path) - if err != nil { - fmt.Fprintf(os.Stderr, "Error while reading path: %s: %v\n", path, err) - return err - } - for _, dir := range dirs { - subPath := filepath.Join( - path, - dir.Name(), - "Spotify Extended Streaming History", - ) - entries, err := os.ReadDir(subPath) - if err != nil { - fmt.Fprintf(os.Stderr, "Error while reading path: %s: %v\n", subPath, err) - return err - } - for _, f := range entries { - jsonFileName := f.Name() - if !strings.Contains(jsonFileName, ".json") { - continue - } - if strings.Contains(jsonFileName, "Video") { - continue - } - jsonFilePath := filepath.Join(subPath, jsonFileName) - err = JsonToDB(jsonFilePath, userId) - if err != nil { - fmt.Fprintf(os.Stderr, - "Error adding json data (%s) to muzi database: %v", jsonFilePath, err) - return err - } - } - } - return nil -} - -func ImportSpotify(userId int) error { - path := filepath.Join(".", "imports", "spotify", "zip") - targetBase := filepath.Join(".", "imports", "spotify", "extracted") - entries, err := os.ReadDir(path) - if err != nil { - fmt.Fprintf(os.Stderr, "Error reading path: %s: %v\n", path, err) - return err - } - for _, f := range entries { - reader, err := zip.OpenReader(filepath.Join(path, f.Name())) - if err != nil { - fmt.Fprintf(os.Stderr, "Error opening zip: %s: %v\n", - filepath.Join(path, f.Name()), err) - continue - } - defer reader.Close() - fileName := f.Name() - fileFullPath := filepath.Join(path, fileName) - fileBaseName := fileName[:(strings.LastIndex(fileName, "."))] - targetDirFullPath := filepath.Join(targetBase, fileBaseName) - - err = Extract(fileFullPath, targetDirFullPath) - if err != nil { - fmt.Fprintf(os.Stderr, "Error extracting %s to %s: %v\n", - fileFullPath, targetDirFullPath, err) - return err - } - } - err = AddDirToDB(targetBase, userId) - if err != nil { - fmt.Fprintf(os.Stderr, - "Error adding directory of data (%s) to muzi database: %v\n", - targetBase, err) - return err - } - return nil -} - -func Extract(path string, target string) error { - archive, err := zip.OpenReader(path) - if err != nil { - fmt.Fprintf(os.Stderr, "Error opening zip: %s: %v\n", path, err) - return err - } - defer archive.Close() - - zipDir := filepath.Base(path) - zipDir = zipDir[:(strings.LastIndex(zipDir, "."))] - - for _, f := range archive.File { - filePath := filepath.Join(target, f.Name) - fmt.Println("extracting:", filePath) - - if !strings.HasPrefix( - filePath, - filepath.Clean(target)+string(os.PathSeparator), - ) { - err = fmt.Errorf("Invalid file path: %s", filePath) - fmt.Fprintf(os.Stderr, "%v\n", err) - return err - } - if f.FileInfo().IsDir() { - fmt.Println("Creating Directory", filePath) - os.MkdirAll(filePath, os.ModePerm) - continue - } - if err := os.MkdirAll(filepath.Dir(filePath), os.ModePerm); err != nil { - fmt.Fprintf(os.Stderr, "Error making directory: %s: %v\n", - filepath.Dir(filePath), err) - return err - } - fileToExtract, err := os.OpenFile( - filePath, - os.O_WRONLY|os.O_CREATE|os.O_TRUNC, - f.Mode(), - ) - if err != nil { - fmt.Fprintf(os.Stderr, "Error opening file: %s: %v\n", filePath, err) - return err - } - defer fileToExtract.Close() - extractedFile, err := f.Open() - if err != nil { - fmt.Fprintf(os.Stderr, "Error opening file: %s: %v\n", f.Name, err) - return err - } - defer extractedFile.Close() - if _, err := io.Copy(fileToExtract, extractedFile); err != nil { - fmt.Fprintf( - os.Stderr, - "Error while copying file: %s to: %s: %v\n", - fileToExtract.Name(), - extractedFile, - err, - ) - return err + totalImported += int(copyCount) } } return nil diff --git a/templates/import.gohtml b/templates/import.gohtml new file mode 100644 index 0000000..449e7d7 --- /dev/null +++ b/templates/import.gohtml @@ -0,0 +1,148 @@ + + +
+ +Welcome, {{.Username}}!
+ +Import your Spotify listening history from your data export.
+ +Import your Last.fm scrobbles.
+ + + +