diff --git a/migrate/lastfm.go b/migrate/lastfm.go index 8d9e9fd..2d8d95e 100644 --- a/migrate/lastfm.go +++ b/migrate/lastfm.go @@ -12,6 +12,8 @@ import ( "time" "muzi/db" + + "github.com/jackc/pgx/v5" ) type LastFMTrack struct { @@ -60,6 +62,40 @@ type Response struct { } `json:"recenttracks"` } +func fetchPage(client *http.Client, page int, lfmUsername, apiKey string, userId int) pageResult { + resp, err := client.Get( + "https://ws.audioscrobbler.com/2.0/?method=user.getrecenttracks&user=" + + lfmUsername + "&api_key=" + apiKey + "&format=json&limit=100&page=" + strconv.Itoa(page), + ) + if err != nil { + return pageResult{pageNum: page, err: err} + } + defer resp.Body.Close() + var data Response + if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { + return pageResult{pageNum: page, err: err} + } + + var pageTracks []LastFMTrack + for j := range data.Recenttracks.Track { + if data.Recenttracks.Track[j].Attr.Nowplaying == "true" { + continue + } + unixTime, err := strconv.ParseInt(data.Recenttracks.Track[j].Date.Uts, 10, 64) + if err != nil { + continue + } + pageTracks = append(pageTracks, LastFMTrack{ + UserId: userId, + Timestamp: time.Unix(unixTime, 0), + SongName: data.Recenttracks.Track[j].Name, + Artist: data.Recenttracks.Track[j].Artist.Text, + Album: data.Recenttracks.Track[j].Album.Text, + }) + } + return pageResult{pageNum: page, tracks: pageTracks, err: nil} +} + func ImportLastFM( username string, apiKey string, @@ -83,10 +119,15 @@ func ImportLastFM( } return err } + defer resp.Body.Close() var initialData Response - json.NewDecoder(resp.Body).Decode(&initialData) + err = json.NewDecoder(resp.Body).Decode(&initialData) + if err != nil { + fmt.Fprintf(os.Stderr, + "Error decoding initial LastFM response: %v\n", err) + return err + } totalPages, err := strconv.Atoi(initialData.Recenttracks.Attr.TotalPages) - resp.Body.Close() if err != nil { fmt.Fprintf(os.Stderr, "Error parsing total pages: %v\n", err) if progressChan != nil { @@ -115,40 +156,7 @@ func ImportLastFM( go func(workerID int) { defer wg.Done() for page := workerID + 1; page <= totalPages; page += 10 { - resp, err := client.Get( - "https://ws.audioscrobbler.com/2.0/?method=user.getrecenttracks&user=" + - username + "&api_key=" + apiKey + "&format=json&limit=100&page=" + strconv.Itoa(page), - ) - if err != nil { - pageChan <- pageResult{pageNum: page, err: err} - continue - } - var data Response - if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { - resp.Body.Close() - pageChan <- pageResult{pageNum: page, err: err} - continue - } - resp.Body.Close() - - var pageTracks []LastFMTrack - for j := range data.Recenttracks.Track { - if data.Recenttracks.Track[j].Attr.Nowplaying == "true" { - continue - } - unixTime, err := strconv.ParseInt(data.Recenttracks.Track[j].Date.Uts, 10, 64) - if err != nil { - continue - } - pageTracks = append(pageTracks, LastFMTrack{ - UserId: userId, - Timestamp: time.Unix(unixTime, 0), - SongName: data.Recenttracks.Track[j].Name, - Artist: data.Recenttracks.Track[j].Artist.Text, - Album: data.Recenttracks.Track[j].Album.Text, - }) - } - pageChan <- pageResult{pageNum: page, tracks: pageTracks, err: nil} + pageChan <- fetchPage(client, page, lfmUsername, apiKey, userId) } }(worker) } @@ -171,13 +179,14 @@ func ImportLastFM( for len(trackBatch) >= batchSize { batch := trackBatch[:batchSize] trackBatch = trackBatch[batchSize:] - err := insertBatch(batch, &totalImported, batchSize) + err := insertBatch(batch, &totalImported) if err != nil { - fmt.Fprintf(os.Stderr, "Batch insert failed: %v\n", err) + // prevent logs being filled by duplicate warnings + if !strings.Contains(err.Error(), "duplicate") { + fmt.Fprintf(os.Stderr, "Batch insert failed: %v\n", err) + } } } - fmt.Printf("Processed page %d/%d\n", result.pageNum, totalPages) - // increment completed pages counter completedMu.Lock() completedPages++ @@ -197,9 +206,12 @@ func ImportLastFM( } if len(trackBatch) > 0 { - err := insertBatch(trackBatch, &totalImported, batchSize) + err := insertBatch(trackBatch, &totalImported) if err != nil { - fmt.Fprintf(os.Stderr, "Final batch insert failed: %v\n", err) + // prevent logs being filled by duplicate warnings + if !strings.Contains(err.Error(), "duplicate") { + fmt.Fprintf(os.Stderr, "Final batch insert failed: %v\n", err) + } } } @@ -218,65 +230,21 @@ func ImportLastFM( return nil } -func insertBatch(tracks []LastFMTrack, totalImported *int, batchSize int) error { - tx, err := db.Pool.Begin(context.Background()) - if err != nil { - return err - } - - var batchValues []string - var batchArgs []any - - for i, track := range tracks { - batchValues = append(batchValues, fmt.Sprintf( - "($%d, $%d, $%d, $%d, $%d, $%d, $%d)", - len(batchArgs)+1, - len(batchArgs)+2, - len(batchArgs)+3, - len(batchArgs)+4, - len(batchArgs)+5, - len(batchArgs)+6, - len(batchArgs)+7, - )) - // lastfm doesn't store playtime for each track, so set to 0 - batchArgs = append( - batchArgs, - track.UserId, - track.Timestamp, - track.SongName, - track.Artist, - track.Album, - 0, - "lastfm", - ) - - if len(batchValues) >= batchSize || i == len(tracks)-1 { - result, err := tx.Exec( - context.Background(), - `INSERT INTO history (user_id, timestamp, song_name, artist, album_name, ms_played, platform) VALUES `+ - strings.Join( - batchValues, - ", ", - )+` ON CONFLICT ON CONSTRAINT history_user_id_song_name_artist_timestamp_key DO NOTHING;`, - batchArgs..., - ) - if err != nil { - tx.Rollback(context.Background()) - return err - } - rowsAffected := result.RowsAffected() - if rowsAffected > 0 { - *totalImported += int(rowsAffected) - } - batchValues = batchValues[:0] - batchArgs = batchArgs[:0] - } - } - - if err := tx.Commit(context.Background()); err != nil { - tx.Rollback(context.Background()) - return err - } - - return nil +func insertBatch(tracks []LastFMTrack, totalImported *int) error { + copyCount, err := db.Pool.CopyFrom(context.Background(), + pgx.Identifier{"history"}, + []string{ + "user_id", "timestamp", "song_name", "artist", "album_name", + "ms_played", "platform", + }, + pgx.CopyFromSlice(len(tracks), func(i int) ([]any, error) { + t := tracks[i] + return []any{ + t.UserId, t.Timestamp, t.SongName, t.Artist, + t.Album, 0, "lastfm", + }, nil + }), + ) + *totalImported += int(copyCount) + return err }