mirror of
https://github.com/riwiwa/muzi.git
synced 2026-02-28 11:56:57 -08:00
rework spotify import, add import webUI
This commit is contained in:
@@ -1,43 +1,23 @@
|
||||
package migrate
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"muzi/db"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
)
|
||||
|
||||
type SpotifyTrack struct {
|
||||
Timestamp string `json:"ts"`
|
||||
Platform string `json:"-"`
|
||||
Played int `json:"ms_played"`
|
||||
Country string `json:"-"`
|
||||
IP string `json:"-"`
|
||||
Name string `json:"master_metadata_track_name"`
|
||||
Artist string `json:"master_metadata_album_artist_name"`
|
||||
Album string `json:"master_metadata_album_album_name"`
|
||||
TrackURI string `json:"-"`
|
||||
Episode string `json:"-"`
|
||||
Show string `json:"-"`
|
||||
EpisodeURI string `json:"-"`
|
||||
Audiobook string `json:"-"`
|
||||
AudiobookURI string `json:"-"`
|
||||
AudiobookChapterURI string `json:"-"`
|
||||
AudiobookChapter string `json:"-"`
|
||||
ReasonStart string `json:"-"`
|
||||
ReasonEnd string `json:"-"`
|
||||
Shuffle bool `json:"-"`
|
||||
Skipped bool `json:"-"`
|
||||
Offline bool `json:"-"`
|
||||
OfflineTimestamp int `json:"-"`
|
||||
Incognito bool `json:"-"`
|
||||
Timestamp string `json:"ts"`
|
||||
Played int `json:"ms_played"`
|
||||
Name string `json:"master_metadata_track_name"`
|
||||
Artist string `json:"master_metadata_album_artist_name"`
|
||||
Album string `json:"master_metadata_album_album_name"`
|
||||
}
|
||||
|
||||
type existingTrack struct {
|
||||
@@ -46,32 +26,88 @@ type existingTrack struct {
|
||||
Artist string
|
||||
}
|
||||
|
||||
func getExistingTracks(conn *pgx.Conn, userId int, tracks []SpotifyTrack) (map[string]bool, error) {
|
||||
// trackSource implements pgx.CopyFromSource for efficient bulk inserts
|
||||
type trackSource struct {
|
||||
tracks []SpotifyTrack
|
||||
existing map[string]struct{}
|
||||
idx int
|
||||
userId int
|
||||
}
|
||||
|
||||
func (s *trackSource) Next() bool {
|
||||
for s.idx < len(s.tracks) {
|
||||
t := s.tracks[s.idx]
|
||||
key := fmt.Sprintf("%s|%s|%s", t.Artist, t.Name, t.Timestamp)
|
||||
if _, exists := s.existing[key]; exists {
|
||||
s.idx++
|
||||
continue
|
||||
}
|
||||
s.idx++
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *trackSource) Values() ([]any, error) {
|
||||
// idx is already incremented in Next(), so use idx-1
|
||||
t := s.tracks[s.idx-1]
|
||||
// parse spotify string timestamp to a real time object
|
||||
ts, err := time.Parse(time.RFC3339Nano, t.Timestamp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []any{
|
||||
s.userId,
|
||||
ts,
|
||||
t.Name,
|
||||
t.Artist,
|
||||
t.Album,
|
||||
t.Played,
|
||||
"spotify",
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *trackSource) Err() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// find tracks with the same artist and name in a 20 second +- timeframe
|
||||
func getExistingTracks(
|
||||
userId int,
|
||||
tracks []SpotifyTrack,
|
||||
) (map[string]struct{}, error) {
|
||||
// check for empty track import
|
||||
if len(tracks) == 0 {
|
||||
return map[string]bool{}, nil
|
||||
return map[string]struct{}{}, nil
|
||||
}
|
||||
|
||||
// find min/max timestamps in this batch to create time window
|
||||
// find min/max timestamps in this batch to create time window to
|
||||
// search for duplicates
|
||||
var minTs, maxTs time.Time
|
||||
// go through each track (t) in the array
|
||||
for _, t := range tracks {
|
||||
// parse spotify timestamp into operational time datatype
|
||||
ts, err := time.Parse(time.RFC3339Nano, t.Timestamp)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
// if minTs uninitialized or timestamp predates minTs
|
||||
if minTs.IsZero() || ts.Before(minTs) {
|
||||
minTs = ts
|
||||
}
|
||||
// if timestamp comes after maxTs
|
||||
if ts.After(maxTs) {
|
||||
maxTs = ts
|
||||
}
|
||||
}
|
||||
|
||||
// check if all parses failed, therefore no way to find duplicate by time
|
||||
if minTs.IsZero() {
|
||||
return map[string]bool{}, nil
|
||||
return map[string]struct{}{}, nil
|
||||
}
|
||||
|
||||
// query only tracks within [min-20s, max+20s] window using timestamp index
|
||||
rows, err := conn.Query(context.Background(),
|
||||
// find all tracks within [min-20s, max+20s] window (duplicates)
|
||||
rows, err := db.Pool.Query(context.Background(),
|
||||
`SELECT song_name, artist, timestamp
|
||||
FROM history
|
||||
WHERE user_id = $1
|
||||
@@ -84,25 +120,39 @@ func getExistingTracks(conn *pgx.Conn, userId int, tracks []SpotifyTrack) (map[s
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
existing := make(map[string]bool)
|
||||
// prepare map to hold duplicate track keys
|
||||
existing := make(map[string]struct{})
|
||||
// create array of tracks
|
||||
var existingTracks []existingTrack
|
||||
// for each repeat play (-20s +20s)
|
||||
for rows.Next() {
|
||||
// write the data from json to the track in memory
|
||||
var t existingTrack
|
||||
if err := rows.Scan(&t.SongName, &t.Artist, &t.Timestamp); err != nil {
|
||||
continue
|
||||
}
|
||||
// add track in memory to existingTracks array
|
||||
existingTracks = append(existingTracks, t)
|
||||
}
|
||||
|
||||
// check each incoming track against existing ones within 20 second window
|
||||
// index existing tracks by artist|name for O(1) lookup
|
||||
existingIndex := make(map[string][]time.Time)
|
||||
for _, t := range existingTracks {
|
||||
key := t.Artist + "|" + t.SongName
|
||||
existingIndex[key] = append(existingIndex[key], t.Timestamp)
|
||||
}
|
||||
|
||||
// check each new track against indexed existing tracks
|
||||
for _, newTrack := range tracks {
|
||||
newTs, err := time.Parse(time.RFC3339Nano, newTrack.Timestamp)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
for _, existTrack := range existingTracks {
|
||||
if newTrack.Name == existTrack.SongName && newTrack.Artist == existTrack.Artist {
|
||||
diff := newTs.Sub(existTrack.Timestamp)
|
||||
|
||||
lookupKey := newTrack.Artist + "|" + newTrack.Name
|
||||
if timestamps, found := existingIndex[lookupKey]; found {
|
||||
for _, existTs := range timestamps {
|
||||
diff := newTs.Sub(existTs)
|
||||
if diff < 0 {
|
||||
diff = -diff
|
||||
}
|
||||
@@ -113,7 +163,7 @@ func getExistingTracks(conn *pgx.Conn, userId int, tracks []SpotifyTrack) (map[s
|
||||
newTrack.Name,
|
||||
newTrack.Timestamp,
|
||||
)
|
||||
existing[key] = true
|
||||
existing[key] = struct{}{}
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -123,246 +173,69 @@ func getExistingTracks(conn *pgx.Conn, userId int, tracks []SpotifyTrack) (map[s
|
||||
return existing, nil
|
||||
}
|
||||
|
||||
func JsonToDB(jsonFile string, userId int) error {
|
||||
conn, err := pgx.Connect(
|
||||
context.Background(),
|
||||
"postgres://postgres:postgres@localhost:5432/muzi",
|
||||
)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Cannot connect to muzi database: %v\n", err)
|
||||
panic(err)
|
||||
}
|
||||
defer conn.Close(context.Background())
|
||||
|
||||
jsonData, err := os.ReadFile(jsonFile)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Cannot read %s: %v\n", jsonFile, err)
|
||||
return err
|
||||
}
|
||||
var tracks []SpotifyTrack
|
||||
err = json.Unmarshal(jsonData, &tracks)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Cannot unmarshal %s: %v\n", jsonFile, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func ImportSpotify(tracks []SpotifyTrack, userId int) error {
|
||||
totalImported := 0
|
||||
batchSize := 1000
|
||||
|
||||
for batchStart := 0; batchStart < len(tracks); batchStart += batchSize {
|
||||
// get the limit of the current batch
|
||||
batchEnd := batchStart + batchSize
|
||||
// set limit to track array length in current batch too big
|
||||
if batchEnd > len(tracks) {
|
||||
batchEnd = len(tracks)
|
||||
}
|
||||
|
||||
// create array to hold valid listens
|
||||
var validTracks []SpotifyTrack
|
||||
for i := batchStart; i < batchEnd; i++ {
|
||||
// if current track is listened to for 20 sec and name and artist is not
|
||||
// blank, add to validTracks array
|
||||
if tracks[i].Played >= 20000 && tracks[i].Name != "" && tracks[i].Artist != "" {
|
||||
validTracks = append(validTracks, tracks[i])
|
||||
}
|
||||
}
|
||||
|
||||
// if there are no valid tracks in this batch, go to the next
|
||||
if len(validTracks) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
existing, err := getExistingTracks(conn, userId, validTracks)
|
||||
// find replayed tracks in the batch that was just gathered
|
||||
existing, err := getExistingTracks(userId, validTracks)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error checking existing tracks: %v\n", err)
|
||||
continue
|
||||
}
|
||||
|
||||
var batchValues []string
|
||||
var batchArgs []any
|
||||
|
||||
for _, t := range validTracks {
|
||||
key := fmt.Sprintf("%s|%s|%s", t.Artist, t.Name, t.Timestamp)
|
||||
if existing[key] {
|
||||
continue
|
||||
}
|
||||
|
||||
batchValues = append(batchValues, fmt.Sprintf(
|
||||
"($%d, $%d, $%d, $%d, $%d, $%d, $%d)",
|
||||
len(batchArgs)+1,
|
||||
len(batchArgs)+2,
|
||||
len(batchArgs)+3,
|
||||
len(batchArgs)+4,
|
||||
len(batchArgs)+5,
|
||||
len(batchArgs)+6,
|
||||
len(batchArgs)+7,
|
||||
))
|
||||
batchArgs = append(
|
||||
batchArgs,
|
||||
userId,
|
||||
t.Timestamp,
|
||||
t.Name,
|
||||
t.Artist,
|
||||
t.Album,
|
||||
t.Played,
|
||||
"spotify",
|
||||
)
|
||||
// get data of struct pointer
|
||||
src := &trackSource{
|
||||
tracks: validTracks,
|
||||
existing: existing,
|
||||
idx: 0,
|
||||
userId: userId,
|
||||
}
|
||||
|
||||
if len(batchValues) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: replace strings.Join with pgx copy
|
||||
|
||||
_, err = conn.Exec(
|
||||
// insert all valid tracks from current batch into db
|
||||
copyCount, err := db.Pool.CopyFrom(
|
||||
context.Background(),
|
||||
`INSERT INTO history (user_id, timestamp, song_name, artist, album_name, ms_played, platform) VALUES `+
|
||||
strings.Join(
|
||||
batchValues,
|
||||
", ",
|
||||
)+
|
||||
` ON CONFLICT ON CONSTRAINT history_user_id_song_name_artist_timestamp_key DO NOTHING;`,
|
||||
batchArgs...,
|
||||
pgx.Identifier{"history"},
|
||||
[]string{
|
||||
"user_id",
|
||||
"timestamp",
|
||||
"song_name",
|
||||
"artist",
|
||||
"album_name",
|
||||
"ms_played",
|
||||
"platform",
|
||||
},
|
||||
src,
|
||||
)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Batch insert failed: %v\n", err)
|
||||
if !strings.Contains(err.Error(), "duplicate") {
|
||||
fmt.Fprintf(os.Stderr, "Spotify batch insert failed: %v\n", err)
|
||||
}
|
||||
} else {
|
||||
totalImported += len(batchValues)
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf("%d tracks imported from %s\n", totalImported, jsonFile)
|
||||
return nil
|
||||
}
|
||||
|
||||
func AddDirToDB(path string, userId int) error {
|
||||
dirs, err := os.ReadDir(path)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error while reading path: %s: %v\n", path, err)
|
||||
return err
|
||||
}
|
||||
for _, dir := range dirs {
|
||||
subPath := filepath.Join(
|
||||
path,
|
||||
dir.Name(),
|
||||
"Spotify Extended Streaming History",
|
||||
)
|
||||
entries, err := os.ReadDir(subPath)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error while reading path: %s: %v\n", subPath, err)
|
||||
return err
|
||||
}
|
||||
for _, f := range entries {
|
||||
jsonFileName := f.Name()
|
||||
if !strings.Contains(jsonFileName, ".json") {
|
||||
continue
|
||||
}
|
||||
if strings.Contains(jsonFileName, "Video") {
|
||||
continue
|
||||
}
|
||||
jsonFilePath := filepath.Join(subPath, jsonFileName)
|
||||
err = JsonToDB(jsonFilePath, userId)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr,
|
||||
"Error adding json data (%s) to muzi database: %v", jsonFilePath, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func ImportSpotify(userId int) error {
|
||||
path := filepath.Join(".", "imports", "spotify", "zip")
|
||||
targetBase := filepath.Join(".", "imports", "spotify", "extracted")
|
||||
entries, err := os.ReadDir(path)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error reading path: %s: %v\n", path, err)
|
||||
return err
|
||||
}
|
||||
for _, f := range entries {
|
||||
reader, err := zip.OpenReader(filepath.Join(path, f.Name()))
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error opening zip: %s: %v\n",
|
||||
filepath.Join(path, f.Name()), err)
|
||||
continue
|
||||
}
|
||||
defer reader.Close()
|
||||
fileName := f.Name()
|
||||
fileFullPath := filepath.Join(path, fileName)
|
||||
fileBaseName := fileName[:(strings.LastIndex(fileName, "."))]
|
||||
targetDirFullPath := filepath.Join(targetBase, fileBaseName)
|
||||
|
||||
err = Extract(fileFullPath, targetDirFullPath)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error extracting %s to %s: %v\n",
|
||||
fileFullPath, targetDirFullPath, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = AddDirToDB(targetBase, userId)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr,
|
||||
"Error adding directory of data (%s) to muzi database: %v\n",
|
||||
targetBase, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Extract(path string, target string) error {
|
||||
archive, err := zip.OpenReader(path)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error opening zip: %s: %v\n", path, err)
|
||||
return err
|
||||
}
|
||||
defer archive.Close()
|
||||
|
||||
zipDir := filepath.Base(path)
|
||||
zipDir = zipDir[:(strings.LastIndex(zipDir, "."))]
|
||||
|
||||
for _, f := range archive.File {
|
||||
filePath := filepath.Join(target, f.Name)
|
||||
fmt.Println("extracting:", filePath)
|
||||
|
||||
if !strings.HasPrefix(
|
||||
filePath,
|
||||
filepath.Clean(target)+string(os.PathSeparator),
|
||||
) {
|
||||
err = fmt.Errorf("Invalid file path: %s", filePath)
|
||||
fmt.Fprintf(os.Stderr, "%v\n", err)
|
||||
return err
|
||||
}
|
||||
if f.FileInfo().IsDir() {
|
||||
fmt.Println("Creating Directory", filePath)
|
||||
os.MkdirAll(filePath, os.ModePerm)
|
||||
continue
|
||||
}
|
||||
if err := os.MkdirAll(filepath.Dir(filePath), os.ModePerm); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error making directory: %s: %v\n",
|
||||
filepath.Dir(filePath), err)
|
||||
return err
|
||||
}
|
||||
fileToExtract, err := os.OpenFile(
|
||||
filePath,
|
||||
os.O_WRONLY|os.O_CREATE|os.O_TRUNC,
|
||||
f.Mode(),
|
||||
)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error opening file: %s: %v\n", filePath, err)
|
||||
return err
|
||||
}
|
||||
defer fileToExtract.Close()
|
||||
extractedFile, err := f.Open()
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error opening file: %s: %v\n", f.Name, err)
|
||||
return err
|
||||
}
|
||||
defer extractedFile.Close()
|
||||
if _, err := io.Copy(fileToExtract, extractedFile); err != nil {
|
||||
fmt.Fprintf(
|
||||
os.Stderr,
|
||||
"Error while copying file: %s to: %s: %v\n",
|
||||
fileToExtract.Name(),
|
||||
extractedFile,
|
||||
err,
|
||||
)
|
||||
return err
|
||||
totalImported += int(copyCount)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user